diff --git a/api/agent_skills/coding_style.md b/api/agent_skills/coding_style.md index 26be5990aa..c2f161fa22 100644 --- a/api/agent_skills/coding_style.md +++ b/api/agent_skills/coding_style.md @@ -19,7 +19,6 @@ - Prefer simple functions over classes for lightweight helpers. - Keep files below 800 lines; split when necessary. - Keep code readable—no clever hacks. -- Never use type annotations. - Never use `print`; log with `logger = logging.getLogger(__name__)`. ## Guiding Principles diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 62d50303cd..a6e2d64b19 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -1072,7 +1072,9 @@ class DraftWorkflowTriggerNodeApi(Resource): return jsonable_encoder(node_execution) except Exception as e: logger.exception("Error running draft workflow trigger node") - return jsonable_encoder({"status": "error", "error": str(e)}), 500 + return jsonable_encoder( + {"status": "error", "error": "An unexpected error occurred while running the node."} + ), 500 @console_ns.route("/apps//workflows/draft/trigger/run-all") @@ -1144,7 +1146,7 @@ class DraftWorkflowTriggerRunAllApi(Resource): except InvokeRateLimitError as ex: raise InvokeRateLimitHttpError(ex.description) except Exception: - logger.exception("Error running draft workflow trigger webhook run") + logger.exception("Error running draft workflow trigger run-all") return jsonable_encoder( { "status": "error", diff --git a/api/controllers/console/workspace/trigger_providers.py b/api/controllers/console/workspace/trigger_providers.py index 3f398fbbc3..bbbbe12fb0 100644 --- a/api/controllers/console/workspace/trigger_providers.py +++ b/api/controllers/console/workspace/trigger_providers.py @@ -20,8 +20,8 @@ from models.account import Account from models.provider_ids import TriggerProviderID from services.plugin.oauth_service import OAuthProxyService from services.trigger.trigger_provider_service import TriggerProviderService -from services.trigger.trigger_service import TriggerService from services.trigger.trigger_subscription_builder_service import TriggerSubscriptionBuilderService +from services.trigger.trigger_subscription_operator_service import TriggerSubscriptionOperatorService logger = logging.getLogger(__name__) @@ -265,7 +265,7 @@ class TriggerSubscriptionDeleteApi(Resource): @setup_required @login_required @account_initialization_required - def post(self, subscription_id): + def post(self, subscription_id: str): """Delete a subscription instance""" user = current_user assert isinstance(user, Account) @@ -282,7 +282,7 @@ class TriggerSubscriptionDeleteApi(Resource): subscription_id=subscription_id, ) # Delete plugin triggers - TriggerService.delete_plugin_trigger_by_subscription( + TriggerSubscriptionOperatorService.delete_plugin_trigger_by_subscription( session=session, tenant_id=user.current_tenant_id, subscription_id=subscription_id, @@ -427,7 +427,7 @@ class TriggerOAuthCallbackApi(Resource): expires_at = credentials_response.expires_at if not credentials: - raise Exception("Failed to get OAuth credentials") + raise ValueError("Failed to get OAuth credentials from the provider.") # Update subscription builder TriggerSubscriptionBuilderService.update_trigger_subscription_builder( diff --git a/api/controllers/trigger/trigger.py b/api/controllers/trigger/trigger.py index 4640f0b198..169b5302aa 100644 --- a/api/controllers/trigger/trigger.py +++ b/api/controllers/trigger/trigger.py @@ -41,4 +41,4 @@ def trigger_endpoint(endpoint_id: str): return jsonify({"error": "Endpoint processing failed", "message": str(e)}), 500 except Exception as e: logger.exception("Webhook processing failed for {endpoint_id}") - return jsonify({"error": "Internal server error", "message": str(e)}), 500 + return jsonify({"error": "Internal server error"}), 500 diff --git a/api/controllers/trigger/webhook.py b/api/controllers/trigger/webhook.py index 0eb42e8f0d..cec5c3d8ae 100644 --- a/api/controllers/trigger/webhook.py +++ b/api/controllers/trigger/webhook.py @@ -102,4 +102,4 @@ def handle_webhook_debug(webhook_id: str): raise except Exception as e: logger.exception("Webhook debug processing failed for %s", webhook_id) - return jsonify({"error": "Internal server error", "message": str(e)}), 500 + return jsonify({"error": "Internal server error", "message": "An internal error has occurred."}), 500 diff --git a/api/core/trigger/debug/event_bus.py b/api/core/trigger/debug/event_bus.py index e5f2ac2c00..9d10e1a0e0 100644 --- a/api/core/trigger/debug/event_bus.py +++ b/api/core/trigger/debug/event_bus.py @@ -107,7 +107,7 @@ class TriggerDebugEventBus: Returns: Event object if available, None otherwise """ - address_id: str = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest() + address_id: str = hashlib.sha256(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest() address: str = f"trigger_debug_inbox:{tenant_id}:{address_id}" try: diff --git a/api/migrations/versions/2025_10_27_1201-4558cfabe44e_add_workflow_trigger_logs.py b/api/migrations/versions/2025_10_27_1201-4558cfabe44e_add_workflow_trigger_logs.py deleted file mode 100644 index 1fe46972c1..0000000000 --- a/api/migrations/versions/2025_10_27_1201-4558cfabe44e_add_workflow_trigger_logs.py +++ /dev/null @@ -1,67 +0,0 @@ -"""Add workflow trigger logs table - -Revision ID: 4558cfabe44e -Revises: ae662b25d9bc -Create Date: 2025-10-27 12:01:00.000000 - -""" -from alembic import op -import models as models -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '4558cfabe44e' -down_revision = '03f8dcbc611e' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('workflow_trigger_logs', - sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), - sa.Column('tenant_id', models.types.StringUUID(), nullable=False), - sa.Column('app_id', models.types.StringUUID(), nullable=False), - sa.Column('workflow_id', models.types.StringUUID(), nullable=False), - sa.Column('workflow_run_id', models.types.StringUUID(), nullable=True), - sa.Column('root_node_id', sa.String(length=255), nullable=True), - sa.Column('trigger_type', sa.String(length=50), nullable=False), - sa.Column('trigger_data', sa.Text(), nullable=False), - sa.Column('inputs', sa.Text(), nullable=False), - sa.Column('outputs', sa.Text(), nullable=True), - sa.Column('status', sa.String(length=50), nullable=False), - sa.Column('error', sa.Text(), nullable=True), - sa.Column('queue_name', sa.String(length=100), nullable=False), - sa.Column('celery_task_id', sa.String(length=255), nullable=True), - sa.Column('retry_count', sa.Integer(), nullable=False), - sa.Column('elapsed_time', sa.Float(), nullable=True), - sa.Column('total_tokens', sa.Integer(), nullable=True), - sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.Column('created_by_role', sa.String(length=255), nullable=False), - sa.Column('created_by', sa.String(length=255), nullable=False), - sa.Column('triggered_at', sa.DateTime(), nullable=True), - sa.Column('finished_at', sa.DateTime(), nullable=True), - sa.PrimaryKeyConstraint('id', name='workflow_trigger_log_pkey') - ) - with op.batch_alter_table('workflow_trigger_logs', schema=None) as batch_op: - batch_op.create_index('workflow_trigger_log_created_at_idx', ['created_at'], unique=False) - batch_op.create_index('workflow_trigger_log_status_idx', ['status'], unique=False) - batch_op.create_index('workflow_trigger_log_tenant_app_idx', ['tenant_id', 'app_id'], unique=False) - batch_op.create_index('workflow_trigger_log_workflow_id_idx', ['workflow_id'], unique=False) - batch_op.create_index('workflow_trigger_log_workflow_run_idx', ['workflow_run_id'], unique=False) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('workflow_trigger_logs', schema=None) as batch_op: - batch_op.drop_index('workflow_trigger_log_workflow_run_idx') - batch_op.drop_index('workflow_trigger_log_workflow_id_idx') - batch_op.drop_index('workflow_trigger_log_tenant_app_idx') - batch_op.drop_index('workflow_trigger_log_status_idx') - batch_op.drop_index('workflow_trigger_log_created_at_idx') - - op.drop_table('workflow_trigger_logs') - # ### end Alembic commands ### diff --git a/api/migrations/versions/2025_10_27_1202-5871f634954d_add_workflow_webhook_table.py b/api/migrations/versions/2025_10_27_1202-5871f634954d_add_workflow_webhook_table.py deleted file mode 100644 index 43466a0697..0000000000 --- a/api/migrations/versions/2025_10_27_1202-5871f634954d_add_workflow_webhook_table.py +++ /dev/null @@ -1,47 +0,0 @@ -"""Add workflow webhook table - -Revision ID: 5871f634954d -Revises: 4558cfabe44e -Create Date: 2025-10-27 12:02:00.000000 - -""" -from alembic import op -import models as models -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '5871f634954d' -down_revision = '4558cfabe44e' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('workflow_webhook_triggers', - sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), - sa.Column('app_id', models.types.StringUUID(), nullable=False), - sa.Column('node_id', sa.String(length=64), nullable=False), - sa.Column('tenant_id', models.types.StringUUID(), nullable=False), - sa.Column('webhook_id', sa.String(length=24), nullable=False), - sa.Column('created_by', models.types.StringUUID(), nullable=False), - sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.PrimaryKeyConstraint('id', name='workflow_webhook_trigger_pkey'), - sa.UniqueConstraint('app_id', 'node_id', name='uniq_node'), - sa.UniqueConstraint('webhook_id', name='uniq_webhook_id') - ) - with op.batch_alter_table('workflow_webhook_triggers', schema=None) as batch_op: - batch_op.create_index('workflow_webhook_trigger_tenant_idx', ['tenant_id'], unique=False) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('workflow_webhook_triggers', schema=None) as batch_op: - batch_op.drop_index('workflow_webhook_trigger_tenant_idx') - - op.drop_table('workflow_webhook_triggers') - # ### end Alembic commands ### diff --git a/api/migrations/versions/2025_10_27_1203-9ee7d347f4c1_add_app_triggers_table.py b/api/migrations/versions/2025_10_27_1203-9ee7d347f4c1_add_app_triggers_table.py deleted file mode 100644 index fe4cd24ad6..0000000000 --- a/api/migrations/versions/2025_10_27_1203-9ee7d347f4c1_add_app_triggers_table.py +++ /dev/null @@ -1,47 +0,0 @@ -"""Add app triggers table - -Revision ID: 9ee7d347f4c1 -Revises: 5871f634954d -Create Date: 2025-10-27 12:03:00.000000 - -""" -from alembic import op -import models as models -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '9ee7d347f4c1' -down_revision = '5871f634954d' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('app_triggers', - sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), - sa.Column('tenant_id', models.types.StringUUID(), nullable=False), - sa.Column('app_id', models.types.StringUUID(), nullable=False), - sa.Column('node_id', sa.String(length=64), nullable=False), - sa.Column('trigger_type', sa.String(length=50), nullable=False), - sa.Column('title', sa.String(length=255), nullable=False), - sa.Column('provider_name', sa.String(length=255), server_default='', nullable=True), - sa.Column('status', sa.String(length=50), nullable=False), - sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.Column('updated_at', sa.DateTime(), nullable=False), - sa.PrimaryKeyConstraint('id', name='app_trigger_pkey') - ) - with op.batch_alter_table('app_triggers', schema=None) as batch_op: - batch_op.create_index('app_trigger_tenant_app_idx', ['tenant_id', 'app_id'], unique=False) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('app_triggers', schema=None) as batch_op: - batch_op.drop_index('app_trigger_tenant_app_idx') - - op.drop_table('app_triggers') - # ### end Alembic commands ### diff --git a/api/migrations/versions/2025_10_27_1204-c19938f630b6_add_workflow_schedule_plan.py b/api/migrations/versions/2025_10_27_1204-c19938f630b6_add_workflow_schedule_plan.py deleted file mode 100644 index 85e7e0c735..0000000000 --- a/api/migrations/versions/2025_10_27_1204-c19938f630b6_add_workflow_schedule_plan.py +++ /dev/null @@ -1,47 +0,0 @@ -"""Add workflow schedule plan table - -Revision ID: c19938f630b6 -Revises: 9ee7d347f4c1 -Create Date: 2025-10-27 12:04:00.000000 - -""" -from alembic import op -import models as models -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = 'c19938f630b6' -down_revision = '9ee7d347f4c1' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('workflow_schedule_plans', - sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), - sa.Column('app_id', models.types.StringUUID(), nullable=False), - sa.Column('node_id', sa.String(length=64), nullable=False), - sa.Column('tenant_id', models.types.StringUUID(), nullable=False), - sa.Column('cron_expression', sa.String(length=255), nullable=False), - sa.Column('timezone', sa.String(length=64), nullable=False), - sa.Column('next_run_at', sa.DateTime(), nullable=True), - sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.PrimaryKeyConstraint('id', name='workflow_schedule_plan_pkey'), - sa.UniqueConstraint('app_id', 'node_id', name='uniq_app_node') - ) - with op.batch_alter_table('workflow_schedule_plans', schema=None) as batch_op: - batch_op.create_index('workflow_schedule_plan_next_idx', ['next_run_at'], unique=False) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('workflow_schedule_plans', schema=None) as batch_op: - batch_op.drop_index('workflow_schedule_plan_next_idx') - - op.drop_table('workflow_schedule_plans') - # ### end Alembic commands ### diff --git a/api/migrations/versions/2025_10_27_1205-132392a2635f_plugin_trigger.py b/api/migrations/versions/2025_10_27_1205-132392a2635f_plugin_trigger.py deleted file mode 100644 index 426be1b071..0000000000 --- a/api/migrations/versions/2025_10_27_1205-132392a2635f_plugin_trigger.py +++ /dev/null @@ -1,102 +0,0 @@ -"""plugin_trigger - -Revision ID: 132392a2635f -Revises: c19938f630b6 -Create Date: 2025-10-27 12:05:00.000000 - -""" -from alembic import op -import models as models -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '132392a2635f' -down_revision = 'c19938f630b6' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('trigger_oauth_system_clients', - sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), - sa.Column('plugin_id', sa.String(length=512), nullable=False), - sa.Column('provider', sa.String(length=255), nullable=False), - sa.Column('encrypted_oauth_params', sa.Text(), nullable=False), - sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.PrimaryKeyConstraint('id', name='trigger_oauth_system_client_pkey'), - sa.UniqueConstraint('plugin_id', 'provider', name='trigger_oauth_system_client_plugin_id_provider_idx') - ) - op.create_table('trigger_oauth_tenant_clients', - sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), - sa.Column('tenant_id', models.types.StringUUID(), nullable=False), - sa.Column('plugin_id', sa.String(length=512), nullable=False), - sa.Column('provider', sa.String(length=255), nullable=False), - sa.Column('enabled', sa.Boolean(), server_default=sa.text('true'), nullable=False), - sa.Column('encrypted_oauth_params', sa.Text(), nullable=False), - sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.PrimaryKeyConstraint('id', name='trigger_oauth_tenant_client_pkey'), - sa.UniqueConstraint('tenant_id', 'plugin_id', 'provider', name='unique_trigger_oauth_tenant_client') - ) - op.create_table('trigger_subscriptions', - sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), - sa.Column('name', sa.String(length=255), nullable=False, comment='Subscription instance name'), - sa.Column('tenant_id', models.types.StringUUID(), nullable=False), - sa.Column('user_id', models.types.StringUUID(), nullable=False), - sa.Column('provider_id', sa.String(length=255), nullable=False, comment='Provider identifier (e.g., plugin_id/provider_name)'), - sa.Column('endpoint_id', sa.String(length=255), nullable=False, comment='Subscription endpoint'), - sa.Column('parameters', sa.JSON(), nullable=False, comment='Subscription parameters JSON'), - sa.Column('properties', sa.JSON(), nullable=False, comment='Subscription properties JSON'), - sa.Column('credentials', sa.JSON(), nullable=False, comment='Subscription credentials JSON'), - sa.Column('credential_type', sa.String(length=50), nullable=False, comment='oauth or api_key'), - sa.Column('credential_expires_at', sa.Integer(), nullable=False, comment='OAuth token expiration timestamp, -1 for never'), - sa.Column('expires_at', sa.Integer(), nullable=False, comment='Subscription instance expiration timestamp, -1 for never'), - sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.PrimaryKeyConstraint('id', name='trigger_provider_pkey'), - sa.UniqueConstraint('tenant_id', 'provider_id', 'name', name='unique_trigger_provider') - ) - with op.batch_alter_table('trigger_subscriptions', schema=None) as batch_op: - batch_op.create_index('idx_trigger_providers_endpoint', ['endpoint_id'], unique=True) - batch_op.create_index('idx_trigger_providers_tenant_endpoint', ['tenant_id', 'endpoint_id'], unique=False) - batch_op.create_index('idx_trigger_providers_tenant_provider', ['tenant_id', 'provider_id'], unique=False) - - # Create workflow_plugin_triggers table with final schema (merged from all 4 migrations) - op.create_table('workflow_plugin_triggers', - sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), - sa.Column('app_id', models.types.StringUUID(), nullable=False), - sa.Column('node_id', sa.String(length=64), nullable=False), - sa.Column('tenant_id', models.types.StringUUID(), nullable=False), - sa.Column('provider_id', sa.String(length=512), nullable=False), - sa.Column('subscription_id', sa.String(length=255), nullable=False), - sa.Column('event_name', sa.String(length=255), nullable=False), - sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), - sa.PrimaryKeyConstraint('id', name='workflow_plugin_trigger_pkey'), - sa.UniqueConstraint('app_id', 'node_id', name='uniq_app_node_subscription') - ) - with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op: - batch_op.create_index('workflow_plugin_trigger_tenant_subscription_idx', ['tenant_id', 'subscription_id', 'event_name'], unique=False) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op: - batch_op.drop_index('workflow_plugin_trigger_tenant_subscription_idx') - - op.drop_table('workflow_plugin_triggers') - with op.batch_alter_table('trigger_subscriptions', schema=None) as batch_op: - batch_op.drop_index('idx_trigger_providers_tenant_provider') - batch_op.drop_index('idx_trigger_providers_tenant_endpoint') - batch_op.drop_index('idx_trigger_providers_endpoint') - - op.drop_table('trigger_subscriptions') - op.drop_table('trigger_oauth_tenant_clients') - op.drop_table('trigger_oauth_system_clients') - - # ### end Alembic commands ### diff --git a/api/migrations/versions/2025_10_27_1752-5ed4b21dbb8d_trigger_log_metadata.py b/api/migrations/versions/2025_10_27_1752-5ed4b21dbb8d_trigger_log_metadata.py deleted file mode 100644 index 089246d2fa..0000000000 --- a/api/migrations/versions/2025_10_27_1752-5ed4b21dbb8d_trigger_log_metadata.py +++ /dev/null @@ -1,32 +0,0 @@ -"""trigger_log_metadata - -Revision ID: 5ed4b21dbb8d -Revises: 132392a2635f -Create Date: 2025-10-27 17:52:35.658975 - -""" -from alembic import op -import models as models -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '5ed4b21dbb8d' -down_revision = '132392a2635f' -branch_labels = None -depends_on = None - - -def upgrade(): - with op.batch_alter_table('workflow_trigger_logs', schema=None) as batch_op: - batch_op.add_column(sa.Column('trigger_metadata', sa.Text(), nullable=True)) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('workflow_trigger_logs', schema=None) as batch_op: - batch_op.drop_column('trigger_metadata') - - # ### end Alembic commands ### diff --git a/api/migrations/versions/2025_10_30_1518-669ffd70119c_introduce_trigger.py b/api/migrations/versions/2025_10_30_1518-669ffd70119c_introduce_trigger.py new file mode 100644 index 0000000000..c03d64b234 --- /dev/null +++ b/api/migrations/versions/2025_10_30_1518-669ffd70119c_introduce_trigger.py @@ -0,0 +1,235 @@ +"""introduce_trigger + +Revision ID: 669ffd70119c +Revises: 03f8dcbc611e +Create Date: 2025-10-30 15:18:49.549156 + +""" +from alembic import op +import models as models +import sqlalchemy as sa + +from models.enums import AppTriggerStatus, AppTriggerType + + +# revision identifiers, used by Alembic. +revision = '669ffd70119c' +down_revision = '03f8dcbc611e' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('app_triggers', + sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), + sa.Column('tenant_id', models.types.StringUUID(), nullable=False), + sa.Column('app_id', models.types.StringUUID(), nullable=False), + sa.Column('node_id', sa.String(length=64), nullable=False), + sa.Column('trigger_type', models.types.EnumText(AppTriggerType, length=50), nullable=False), + sa.Column('title', sa.String(length=255), nullable=False), + sa.Column('provider_name', sa.String(length=255), server_default='', nullable=True), + sa.Column('status', models.types.EnumText(AppTriggerStatus, length=50), nullable=False), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=False), + sa.PrimaryKeyConstraint('id', name='app_trigger_pkey') + ) + with op.batch_alter_table('app_triggers', schema=None) as batch_op: + batch_op.create_index('app_trigger_tenant_app_idx', ['tenant_id', 'app_id'], unique=False) + + op.create_table('trigger_oauth_system_clients', + sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False), + sa.Column('plugin_id', sa.String(length=512), nullable=False), + sa.Column('provider', sa.String(length=255), nullable=False), + sa.Column('encrypted_oauth_params', sa.Text(), nullable=False), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.PrimaryKeyConstraint('id', name='trigger_oauth_system_client_pkey'), + sa.UniqueConstraint('plugin_id', 'provider', name='trigger_oauth_system_client_plugin_id_provider_idx') + ) + op.create_table('trigger_oauth_tenant_clients', + sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False), + sa.Column('tenant_id', models.types.StringUUID(), nullable=False), + sa.Column('plugin_id', sa.String(length=512), nullable=False), + sa.Column('provider', sa.String(length=255), nullable=False), + sa.Column('enabled', sa.Boolean(), server_default=sa.text('true'), nullable=False), + sa.Column('encrypted_oauth_params', sa.Text(), nullable=False), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.PrimaryKeyConstraint('id', name='trigger_oauth_tenant_client_pkey'), + sa.UniqueConstraint('tenant_id', 'plugin_id', 'provider', name='unique_trigger_oauth_tenant_client') + ) + op.create_table('trigger_subscriptions', + sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False, comment='Subscription instance name'), + sa.Column('tenant_id', models.types.StringUUID(), nullable=False), + sa.Column('user_id', models.types.StringUUID(), nullable=False), + sa.Column('provider_id', sa.String(length=255), nullable=False, comment='Provider identifier (e.g., plugin_id/provider_name)'), + sa.Column('endpoint_id', sa.String(length=255), nullable=False, comment='Subscription endpoint'), + sa.Column('parameters', sa.JSON(), nullable=False, comment='Subscription parameters JSON'), + sa.Column('properties', sa.JSON(), nullable=False, comment='Subscription properties JSON'), + sa.Column('credentials', sa.JSON(), nullable=False, comment='Subscription credentials JSON'), + sa.Column('credential_type', sa.String(length=50), nullable=False, comment='oauth or api_key'), + sa.Column('credential_expires_at', sa.Integer(), nullable=False, comment='OAuth token expiration timestamp, -1 for never'), + sa.Column('expires_at', sa.Integer(), nullable=False, comment='Subscription instance expiration timestamp, -1 for never'), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.PrimaryKeyConstraint('id', name='trigger_provider_pkey'), + sa.UniqueConstraint('tenant_id', 'provider_id', 'name', name='unique_trigger_provider') + ) + with op.batch_alter_table('trigger_subscriptions', schema=None) as batch_op: + batch_op.create_index('idx_trigger_providers_endpoint', ['endpoint_id'], unique=True) + batch_op.create_index('idx_trigger_providers_tenant_endpoint', ['tenant_id', 'endpoint_id'], unique=False) + batch_op.create_index('idx_trigger_providers_tenant_provider', ['tenant_id', 'provider_id'], unique=False) + + op.create_table('workflow_plugin_triggers', + sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False), + sa.Column('app_id', models.types.StringUUID(), nullable=False), + sa.Column('node_id', sa.String(length=64), nullable=False), + sa.Column('tenant_id', models.types.StringUUID(), nullable=False), + sa.Column('provider_id', sa.String(length=512), nullable=False), + sa.Column('event_name', sa.String(length=255), nullable=False), + sa.Column('subscription_id', sa.String(length=255), nullable=False), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.PrimaryKeyConstraint('id', name='workflow_plugin_trigger_pkey'), + sa.UniqueConstraint('app_id', 'node_id', name='uniq_app_node_subscription') + ) + with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op: + batch_op.create_index('workflow_plugin_trigger_tenant_subscription_idx', ['tenant_id', 'subscription_id', 'event_name'], unique=False) + + op.create_table('workflow_schedule_plans', + sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), + sa.Column('app_id', models.types.StringUUID(), nullable=False), + sa.Column('node_id', sa.String(length=64), nullable=False), + sa.Column('tenant_id', models.types.StringUUID(), nullable=False), + sa.Column('cron_expression', sa.String(length=255), nullable=False), + sa.Column('timezone', sa.String(length=64), nullable=False), + sa.Column('next_run_at', sa.DateTime(), nullable=True), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.PrimaryKeyConstraint('id', name='workflow_schedule_plan_pkey'), + sa.UniqueConstraint('app_id', 'node_id', name='uniq_app_node') + ) + with op.batch_alter_table('workflow_schedule_plans', schema=None) as batch_op: + batch_op.create_index('workflow_schedule_plan_next_idx', ['next_run_at'], unique=False) + + op.create_table('workflow_trigger_logs', + sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), + sa.Column('tenant_id', models.types.StringUUID(), nullable=False), + sa.Column('app_id', models.types.StringUUID(), nullable=False), + sa.Column('workflow_id', models.types.StringUUID(), nullable=False), + sa.Column('workflow_run_id', models.types.StringUUID(), nullable=True), + sa.Column('root_node_id', sa.String(length=255), nullable=True), + sa.Column('trigger_metadata', sa.Text(), nullable=False), + sa.Column('trigger_type', models.types.EnumText(AppTriggerType, length=50), nullable=False), + sa.Column('trigger_data', sa.Text(), nullable=False), + sa.Column('inputs', sa.Text(), nullable=False), + sa.Column('outputs', sa.Text(), nullable=True), + sa.Column('status', models.types.EnumText(AppTriggerStatus, length=50), nullable=False), + sa.Column('error', sa.Text(), nullable=True), + sa.Column('queue_name', sa.String(length=100), nullable=False), + sa.Column('celery_task_id', sa.String(length=255), nullable=True), + sa.Column('retry_count', sa.Integer(), nullable=False), + sa.Column('elapsed_time', sa.Float(), nullable=True), + sa.Column('total_tokens', sa.Integer(), nullable=True), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.Column('created_by_role', sa.String(length=255), nullable=False), + sa.Column('created_by', sa.String(length=255), nullable=False), + sa.Column('triggered_at', sa.DateTime(), nullable=True), + sa.Column('finished_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id', name='workflow_trigger_log_pkey') + ) + with op.batch_alter_table('workflow_trigger_logs', schema=None) as batch_op: + batch_op.create_index('workflow_trigger_log_created_at_idx', ['created_at'], unique=False) + batch_op.create_index('workflow_trigger_log_status_idx', ['status'], unique=False) + batch_op.create_index('workflow_trigger_log_tenant_app_idx', ['tenant_id', 'app_id'], unique=False) + batch_op.create_index('workflow_trigger_log_workflow_id_idx', ['workflow_id'], unique=False) + batch_op.create_index('workflow_trigger_log_workflow_run_idx', ['workflow_run_id'], unique=False) + + op.create_table('workflow_webhook_triggers', + sa.Column('id', models.types.StringUUID(), server_default=sa.text('uuidv7()'), nullable=False), + sa.Column('app_id', models.types.StringUUID(), nullable=False), + sa.Column('node_id', sa.String(length=64), nullable=False), + sa.Column('tenant_id', models.types.StringUUID(), nullable=False), + sa.Column('webhook_id', sa.String(length=24), nullable=False), + sa.Column('created_by', models.types.StringUUID(), nullable=False), + sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), + sa.PrimaryKeyConstraint('id', name='workflow_webhook_trigger_pkey'), + sa.UniqueConstraint('app_id', 'node_id', name='uniq_node'), + sa.UniqueConstraint('webhook_id', name='uniq_webhook_id') + ) + with op.batch_alter_table('workflow_webhook_triggers', schema=None) as batch_op: + batch_op.create_index('workflow_webhook_trigger_tenant_idx', ['tenant_id'], unique=False) + + with op.batch_alter_table('celery_taskmeta', schema=None) as batch_op: + batch_op.alter_column('task_id', + existing_type=sa.VARCHAR(length=155), + nullable=False) + batch_op.alter_column('status', + existing_type=sa.VARCHAR(length=50), + nullable=False) + + with op.batch_alter_table('celery_tasksetmeta', schema=None) as batch_op: + batch_op.alter_column('taskset_id', + existing_type=sa.VARCHAR(length=155), + nullable=False) + + with op.batch_alter_table('providers', schema=None) as batch_op: + batch_op.drop_column('credential_status') + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('providers', schema=None) as batch_op: + batch_op.add_column(sa.Column('credential_status', sa.VARCHAR(length=20), server_default=sa.text("'active'::character varying"), autoincrement=False, nullable=True)) + + with op.batch_alter_table('celery_tasksetmeta', schema=None) as batch_op: + batch_op.alter_column('taskset_id', + existing_type=sa.VARCHAR(length=155), + nullable=True) + + with op.batch_alter_table('celery_taskmeta', schema=None) as batch_op: + batch_op.alter_column('status', + existing_type=sa.VARCHAR(length=50), + nullable=True) + batch_op.alter_column('task_id', + existing_type=sa.VARCHAR(length=155), + nullable=True) + + with op.batch_alter_table('workflow_webhook_triggers', schema=None) as batch_op: + batch_op.drop_index('workflow_webhook_trigger_tenant_idx') + + op.drop_table('workflow_webhook_triggers') + with op.batch_alter_table('workflow_trigger_logs', schema=None) as batch_op: + batch_op.drop_index('workflow_trigger_log_workflow_run_idx') + batch_op.drop_index('workflow_trigger_log_workflow_id_idx') + batch_op.drop_index('workflow_trigger_log_tenant_app_idx') + batch_op.drop_index('workflow_trigger_log_status_idx') + batch_op.drop_index('workflow_trigger_log_created_at_idx') + + op.drop_table('workflow_trigger_logs') + with op.batch_alter_table('workflow_schedule_plans', schema=None) as batch_op: + batch_op.drop_index('workflow_schedule_plan_next_idx') + + op.drop_table('workflow_schedule_plans') + with op.batch_alter_table('workflow_plugin_triggers', schema=None) as batch_op: + batch_op.drop_index('workflow_plugin_trigger_tenant_subscription_idx') + + op.drop_table('workflow_plugin_triggers') + with op.batch_alter_table('trigger_subscriptions', schema=None) as batch_op: + batch_op.drop_index('idx_trigger_providers_tenant_provider') + batch_op.drop_index('idx_trigger_providers_tenant_endpoint') + batch_op.drop_index('idx_trigger_providers_endpoint') + + op.drop_table('trigger_subscriptions') + op.drop_table('trigger_oauth_tenant_clients') + op.drop_table('trigger_oauth_system_clients') + with op.batch_alter_table('app_triggers', schema=None) as batch_op: + batch_op.drop_index('app_trigger_tenant_app_idx') + + op.drop_table('app_triggers') + # ### end Alembic commands ### diff --git a/api/models/trigger.py b/api/models/trigger.py index 5237a512e4..22bdcbca33 100644 --- a/api/models/trigger.py +++ b/api/models/trigger.py @@ -196,7 +196,7 @@ class WorkflowTriggerLog(Base): workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False) workflow_run_id: Mapped[Optional[str]] = mapped_column(StringUUID, nullable=True) root_node_id: Mapped[Optional[str]] = mapped_column(String(255), nullable=True) - trigger_metadata: Mapped[Optional[str]] = mapped_column(sa.Text, nullable=True) + trigger_metadata: Mapped[str] = mapped_column(sa.Text, nullable=False) trigger_type: Mapped[str] = mapped_column(EnumText(AppTriggerType, length=50), nullable=False) trigger_data: Mapped[str] = mapped_column(sa.Text, nullable=False) # Full TriggerData as JSON inputs: Mapped[str] = mapped_column(sa.Text, nullable=False) # Just inputs for easy viewing diff --git a/api/services/trigger/schedule_service.py b/api/services/trigger/schedule_service.py index b0f39bf929..97f2def0a1 100644 --- a/api/services/trigger/schedule_service.py +++ b/api/services/trigger/schedule_service.py @@ -14,6 +14,7 @@ from libs.schedule_utils import calculate_next_run_at, convert_12h_to_24h from models.account import Account, TenantAccountJoin from models.trigger import WorkflowSchedulePlan from models.workflow import Workflow +from services.errors.account import AccountNotFoundError logger = logging.getLogger(__name__) @@ -124,7 +125,7 @@ class ScheduleService: session.flush() @staticmethod - def get_tenant_owner(session: Session, tenant_id: str) -> Optional[Account]: + def get_tenant_owner(session: Session, tenant_id: str) -> Account: """ Returns an account to execute scheduled workflows on behalf of the tenant. Prioritizes owner over admin to ensure proper authorization hierarchy. @@ -144,7 +145,12 @@ class ScheduleService: ).scalar_one_or_none() if result: - return session.get(Account, result.account_id) + account = session.get(Account, result.account_id) + if not account: + raise AccountNotFoundError(f"Account not found: {result.account_id}") + return account + else: + raise AccountNotFoundError(f"Account not found for tenant: {tenant_id}") @staticmethod def update_next_run_at( diff --git a/api/services/trigger/trigger_service.py b/api/services/trigger/trigger_service.py index 8a04a3c642..0255e42546 100644 --- a/api/services/trigger/trigger_service.py +++ b/api/services/trigger/trigger_service.py @@ -6,7 +6,7 @@ from typing import Any from flask import Request, Response from pydantic import BaseModel -from sqlalchemy import and_, select +from sqlalchemy import select from sqlalchemy.orm import Session from core.plugin.entities.plugin_daemon import CredentialType @@ -22,7 +22,7 @@ from extensions.ext_database import db from extensions.ext_redis import redis_client from models.model import App from models.provider_ids import TriggerProviderID -from models.trigger import AppTrigger, AppTriggerStatus, TriggerSubscription, WorkflowPluginTrigger +from models.trigger import TriggerSubscription, WorkflowPluginTrigger from models.workflow import Workflow from services.trigger.trigger_provider_service import TriggerProviderService from services.trigger.trigger_request_service import TriggerHttpRequestCachingService @@ -148,68 +148,6 @@ class TriggerService: ) return dispatch_response.response - @classmethod - def get_subscriber_triggers( - cls, tenant_id: str, subscription_id: str, event_name: str - ) -> list[WorkflowPluginTrigger]: - """ - Get WorkflowPluginTriggers for a subscription and trigger. - - Args: - tenant_id: Tenant ID - subscription_id: Subscription ID - event_name: Event name - """ - with Session(db.engine, expire_on_commit=False) as session: - subscribers = session.scalars( - select(WorkflowPluginTrigger) - .join( - AppTrigger, - and_( - AppTrigger.tenant_id == WorkflowPluginTrigger.tenant_id, - AppTrigger.app_id == WorkflowPluginTrigger.app_id, - AppTrigger.node_id == WorkflowPluginTrigger.node_id, - ), - ) - .where( - WorkflowPluginTrigger.tenant_id == tenant_id, - WorkflowPluginTrigger.subscription_id == subscription_id, - WorkflowPluginTrigger.event_name == event_name, - AppTrigger.status == AppTriggerStatus.ENABLED, - ) - ).all() - return list(subscribers) - - @classmethod - def delete_plugin_trigger_by_subscription( - cls, - session: Session, - tenant_id: str, - subscription_id: str, - ) -> None: - """Delete a plugin trigger by tenant_id and subscription_id within an existing session - - Args: - session: Database session - tenant_id: The tenant ID - subscription_id: The subscription ID - - Raises: - NotFound: If plugin trigger not found - """ - # Find plugin trigger using indexed columns - plugin_trigger = session.scalar( - select(WorkflowPluginTrigger).where( - WorkflowPluginTrigger.tenant_id == tenant_id, - WorkflowPluginTrigger.subscription_id == subscription_id, - ) - ) - - if not plugin_trigger: - return - - session.delete(plugin_trigger) - @classmethod def sync_plugin_trigger_relationships(cls, app: App, workflow: Workflow): """ diff --git a/api/services/trigger/trigger_subscription_builder_service.py b/api/services/trigger/trigger_subscription_builder_service.py index 3b61d01648..571393c782 100644 --- a/api/services/trigger/trigger_subscription_builder_service.py +++ b/api/services/trigger/trigger_subscription_builder_service.py @@ -472,8 +472,9 @@ class TriggerSubscriptionBuilderService: response=response, ) return response - except Exception as e: - error_response = Response(status=500, response=str(e)) + except Exception: + logger.exception("Error during validation endpoint dispatch for endpoint_id=%s", endpoint_id) + error_response = Response(status=500, response="An internal error has occurred.") cls.append_log(endpoint_id=endpoint_id, request=request, response=error_response) return error_response diff --git a/api/services/trigger/trigger_subscription_operator_service.py b/api/services/trigger/trigger_subscription_operator_service.py new file mode 100644 index 0000000000..5d7785549e --- /dev/null +++ b/api/services/trigger/trigger_subscription_operator_service.py @@ -0,0 +1,70 @@ +from sqlalchemy import and_, select +from sqlalchemy.orm import Session + +from extensions.ext_database import db +from models.enums import AppTriggerStatus +from models.trigger import AppTrigger, WorkflowPluginTrigger + + +class TriggerSubscriptionOperatorService: + @classmethod + def get_subscriber_triggers( + cls, tenant_id: str, subscription_id: str, event_name: str + ) -> list[WorkflowPluginTrigger]: + """ + Get WorkflowPluginTriggers for a subscription and trigger. + + Args: + tenant_id: Tenant ID + subscription_id: Subscription ID + event_name: Event name + """ + with Session(db.engine, expire_on_commit=False) as session: + subscribers = session.scalars( + select(WorkflowPluginTrigger) + .join( + AppTrigger, + and_( + AppTrigger.tenant_id == WorkflowPluginTrigger.tenant_id, + AppTrigger.app_id == WorkflowPluginTrigger.app_id, + AppTrigger.node_id == WorkflowPluginTrigger.node_id, + ), + ) + .where( + WorkflowPluginTrigger.tenant_id == tenant_id, + WorkflowPluginTrigger.subscription_id == subscription_id, + WorkflowPluginTrigger.event_name == event_name, + AppTrigger.status == AppTriggerStatus.ENABLED, + ) + ).all() + return list(subscribers) + + @classmethod + def delete_plugin_trigger_by_subscription( + cls, + session: Session, + tenant_id: str, + subscription_id: str, + ) -> None: + """Delete a plugin trigger by tenant_id and subscription_id within an existing session + + Args: + session: Database session + tenant_id: The tenant ID + subscription_id: The subscription ID + + Raises: + NotFound: If plugin trigger not found + """ + # Find plugin trigger using indexed columns + plugin_trigger = session.scalar( + select(WorkflowPluginTrigger).where( + WorkflowPluginTrigger.tenant_id == tenant_id, + WorkflowPluginTrigger.subscription_id == subscription_id, + ) + ) + + if not plugin_trigger: + return + + session.delete(plugin_trigger) diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index fca7e47c95..75c36ef677 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -32,6 +32,7 @@ from services.async_workflow_service import AsyncWorkflowService from services.end_user_service import EndUserService from services.trigger.trigger_provider_service import TriggerProviderService from services.trigger.trigger_request_service import TriggerHttpRequestCachingService +from services.trigger.trigger_subscription_operator_service import TriggerSubscriptionOperatorService from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData, PluginTriggerMetadata logger = logging.getLogger(__name__) @@ -121,10 +122,7 @@ def dispatch_triggered_workflow( request = TriggerHttpRequestCachingService.get_request(request_id) payload = TriggerHttpRequestCachingService.get_payload(request_id) - from services.trigger.trigger_service import TriggerService - # FIXME: we should avoid import modules inside methods - - subscribers: list[WorkflowPluginTrigger] = TriggerService.get_subscriber_triggers( + subscribers: list[WorkflowPluginTrigger] = TriggerSubscriptionOperatorService.get_subscriber_triggers( tenant_id=subscription.tenant_id, subscription_id=subscription.id, event_name=event_name ) if not subscribers: diff --git a/docker/.env.example b/docker/.env.example index fb815ea160..74a436b082 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -24,6 +24,11 @@ CONSOLE_WEB_URL= # Example: https://api.dify.ai SERVICE_API_URL= +# Trigger external URL +# used to display trigger endpoint API Base URL to the front-end. +# Example: https://api.dify.ai +TRIGGER_URL= + # WebApp API backend Url, # used to declare the back-end URL for the front-end API. # If empty, it is the same domain. diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 81f2a5e625..6d1f2e8352 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -8,6 +8,7 @@ x-shared-env: &shared-api-worker-env CONSOLE_API_URL: ${CONSOLE_API_URL:-} CONSOLE_WEB_URL: ${CONSOLE_WEB_URL:-} SERVICE_API_URL: ${SERVICE_API_URL:-} + TRIGGER_URL: ${TRIGGER_URL:-} APP_API_URL: ${APP_API_URL:-} APP_WEB_URL: ${APP_WEB_URL:-} FILES_URL: ${FILES_URL:-} diff --git a/web/app/components/plugins/readme-panel/index.tsx b/web/app/components/plugins/readme-panel/index.tsx index b77d59fb0b..70d1e0db2c 100644 --- a/web/app/components/plugins/readme-panel/index.tsx +++ b/web/app/components/plugins/readme-panel/index.tsx @@ -2,7 +2,6 @@ import ActionButton from '@/app/components/base/action-button' import Loading from '@/app/components/base/loading' import { Markdown } from '@/app/components/base/markdown' -import Modal from '@/app/components/base/modal' import { useLanguage } from '@/app/components/header/account-setting/model-provider-page/hooks' import { usePluginReadme } from '@/service/use-plugins' import cn from '@/utils/classnames' @@ -85,29 +84,36 @@ const ReadmePanel: FC = () => { ) - return showType === ReadmeShowType.drawer ? createPortal( -
-
- {children} + const portalContent = showType === ReadmeShowType.drawer + ? ( +
+
+ {children} +
-
, + ) + : ( +
+
{ + event.stopPropagation() + }} + > + {children} +
+
+ ) + + return createPortal( + portalContent, document.body, - ) : ( - - {children} - ) } diff --git a/web/app/components/workflow/block-selector/data-sources.tsx b/web/app/components/workflow/block-selector/data-sources.tsx index 3961f63dbe..b98a52dcff 100644 --- a/web/app/components/workflow/block-selector/data-sources.tsx +++ b/web/app/components/workflow/block-selector/data-sources.tsx @@ -63,6 +63,7 @@ const DataSources = ({ datasource_name: toolDefaultValue?.tool_name, datasource_label: toolDefaultValue?.tool_label, title: toolDefaultValue?.title, + plugin_unique_identifier: toolDefaultValue?.plugin_unique_identifier, } // Update defaultValue with fileExtensions if this is the local file data source if (toolDefaultValue?.provider_id === 'langgenius/file' && toolDefaultValue?.provider_name === 'file') { diff --git a/web/app/components/workflow/block-selector/tool-picker.tsx b/web/app/components/workflow/block-selector/tool-picker.tsx index c438e45042..660cdad71e 100644 --- a/web/app/components/workflow/block-selector/tool-picker.tsx +++ b/web/app/components/workflow/block-selector/tool-picker.tsx @@ -23,7 +23,16 @@ import { } from '@/service/tools' import type { CustomCollectionBackend } from '@/app/components/tools/types' import Toast from '@/app/components/base/toast' -import { useAllBuiltInTools, useAllCustomTools, useAllMCPTools, useAllWorkflowTools, useInvalidateAllBuiltInTools, useInvalidateAllCustomTools } from '@/service/use-tools' +import { + useAllBuiltInTools, + useAllCustomTools, + useAllMCPTools, + useAllWorkflowTools, + useInvalidateAllBuiltInTools, + useInvalidateAllCustomTools, + useInvalidateAllMCPTools, + useInvalidateAllWorkflowTools, +} from '@/service/use-tools' import { useFeaturedToolsRecommendations } from '@/service/use-plugins' import { useGlobalPublicStore } from '@/context/global-public-context' import cn from '@/utils/classnames' @@ -70,6 +79,8 @@ const ToolPicker: FC = ({ const { data: workflowTools } = useAllWorkflowTools() const { data: mcpTools } = useAllMCPTools() const invalidateBuiltInTools = useInvalidateAllBuiltInTools() + const invalidateWorkflowTools = useInvalidateAllWorkflowTools() + const invalidateMcpTools = useInvalidateAllMCPTools() const { plugins: featuredPlugins = [], @@ -193,6 +204,9 @@ const ToolPicker: FC = ({ showFeatured={scope === 'all' && enable_marketplace} onFeaturedInstallSuccess={async () => { invalidateBuiltInTools() + invalidateCustomTools() + invalidateWorkflowTools() + invalidateMcpTools() }} />
diff --git a/web/app/components/workflow/block-selector/tool/action-item.tsx b/web/app/components/workflow/block-selector/tool/action-item.tsx index e2c28602f8..01c319327a 100644 --- a/web/app/components/workflow/block-selector/tool/action-item.tsx +++ b/web/app/components/workflow/block-selector/tool/action-item.tsx @@ -72,6 +72,7 @@ const ToolItem: FC = ({ provider_type: provider.type, provider_name: provider.name, plugin_id: provider.plugin_id, + plugin_unique_identifier: provider.plugin_unique_identifier, provider_icon: normalizeProviderIcon(provider.icon), tool_name: payload.name, tool_label: payload.label[language], diff --git a/web/app/components/workflow/block-selector/tool/tool.tsx b/web/app/components/workflow/block-selector/tool/tool.tsx index 5ac043e933..38be8d19d6 100644 --- a/web/app/components/workflow/block-selector/tool/tool.tsx +++ b/web/app/components/workflow/block-selector/tool/tool.tsx @@ -94,6 +94,7 @@ const Tool: FC = ({ provider_type: payload.type, provider_name: payload.name, plugin_id: payload.plugin_id, + plugin_unique_identifier: payload.plugin_unique_identifier, provider_icon: normalizeProviderIcon(payload.icon), tool_name: tool.name, tool_label: tool.label[language], @@ -175,6 +176,7 @@ const Tool: FC = ({ provider_type: payload.type, provider_name: payload.name, plugin_id: payload.plugin_id, + plugin_unique_identifier: payload.plugin_unique_identifier, provider_icon: normalizeProviderIcon(payload.icon), tool_name: tool.name, tool_label: tool.label[language], diff --git a/web/app/components/workflow/block-selector/types.ts b/web/app/components/workflow/block-selector/types.ts index 512621a552..e995974b87 100644 --- a/web/app/components/workflow/block-selector/types.ts +++ b/web/app/components/workflow/block-selector/types.ts @@ -59,6 +59,7 @@ export type ToolDefaultValue = PluginCommonDefaultValue & { meta?: PluginMeta plugin_id?: string provider_icon?: Collection['icon'] + plugin_unique_identifier?: string } export type DataSourceDefaultValue = Omit & { @@ -69,6 +70,7 @@ export type DataSourceDefaultValue = Omit void + shouldDim: boolean +} + +const useToolInstallation = (data: ToolNodeType): InstallationState => { + const builtInQuery = useAllBuiltInTools() + const customQuery = useAllCustomTools() + const workflowQuery = useAllWorkflowTools() + const mcpQuery = useAllMCPTools() + const invalidateTools = useInvalidToolsByType(data.provider_type) + + const collectionInfo = useMemo(() => { + switch (data.provider_type) { + case CollectionType.builtIn: + return { + list: builtInQuery.data, + isLoading: builtInQuery.isLoading, + } + case CollectionType.custom: + return { + list: customQuery.data, + isLoading: customQuery.isLoading, + } + case CollectionType.workflow: + return { + list: workflowQuery.data, + isLoading: workflowQuery.isLoading, + } + case CollectionType.mcp: + return { + list: mcpQuery.data, + isLoading: mcpQuery.isLoading, + } + default: + return undefined + } + }, [ + builtInQuery.data, + builtInQuery.isLoading, + customQuery.data, + customQuery.isLoading, + data.provider_type, + mcpQuery.data, + mcpQuery.isLoading, + workflowQuery.data, + workflowQuery.isLoading, + ]) + + const collection = collectionInfo?.list + const isLoading = collectionInfo?.isLoading ?? false + const isResolved = !!collectionInfo && !isLoading + + const matchedCollection = useMemo(() => { + if (!collection || !collection.length) + return undefined + + return collection.find((toolWithProvider) => { + if (data.plugin_id && toolWithProvider.plugin_id === data.plugin_id) + return true + if (canFindTool(toolWithProvider.id, data.provider_id)) + return true + if (toolWithProvider.name === data.provider_name) + return true + return false + }) + }, [collection, data.plugin_id, data.provider_id, data.provider_name]) + + const uniqueIdentifier = data.plugin_unique_identifier || data.plugin_id || data.provider_id + const canInstall = Boolean(data.plugin_unique_identifier) + + const onInstallSuccess = useCallback(() => { + if (invalidateTools) + invalidateTools() + }, [invalidateTools]) + + const shouldDim = (!!collectionInfo && !isResolved) || (isResolved && !matchedCollection) + + return { + isChecking: !!collectionInfo && !isResolved, + isMissing: isResolved && !matchedCollection, + uniqueIdentifier, + canInstall, + onInstallSuccess, + shouldDim, + } +} + +const useTriggerInstallation = (data: PluginTriggerNodeType): InstallationState => { + const triggerPluginsQuery = useAllTriggerPlugins() + const invalidateTriggers = useInvalidateAllTriggerPlugins() + + const triggerProviders = triggerPluginsQuery.data + const isLoading = triggerPluginsQuery.isLoading + + const matchedProvider = useMemo(() => { + if (!triggerProviders || !triggerProviders.length) + return undefined + + return triggerProviders.find(provider => + provider.name === data.provider_name + || provider.id === data.provider_id + || (data.plugin_id && provider.plugin_id === data.plugin_id), + ) + }, [ + data.plugin_id, + data.provider_id, + data.provider_name, + triggerProviders, + ]) + + const uniqueIdentifier = data.plugin_unique_identifier || data.plugin_id || data.provider_id + const canInstall = Boolean(data.plugin_unique_identifier) + + const onInstallSuccess = useCallback(() => { + invalidateTriggers() + }, [invalidateTriggers]) + + const shouldDim = isLoading || (!isLoading && !!triggerProviders && !matchedProvider) + + return { + isChecking: isLoading, + isMissing: !isLoading && !!triggerProviders && !matchedProvider, + uniqueIdentifier, + canInstall, + onInstallSuccess, + shouldDim, + } +} + +const useDataSourceInstallation = (data: DataSourceNodeType): InstallationState => { + const dataSourceList = useStore(s => s.dataSourceList) + const invalidateDataSourceList = useInvalidDataSourceList() + + const matchedPlugin = useMemo(() => { + if (!dataSourceList || !dataSourceList.length) + return undefined + + return dataSourceList.find((item) => { + if (data.plugin_unique_identifier && item.plugin_unique_identifier === data.plugin_unique_identifier) + return true + if (data.plugin_id && item.plugin_id === data.plugin_id) + return true + if (data.provider_name && item.provider === data.provider_name) + return true + return false + }) + }, [data.plugin_id, data.plugin_unique_identifier, data.provider_name, dataSourceList]) + + const uniqueIdentifier = data.plugin_unique_identifier || data.plugin_id + const canInstall = Boolean(data.plugin_unique_identifier) + + const onInstallSuccess = useCallback(() => { + invalidateDataSourceList() + }, [invalidateDataSourceList]) + + const hasLoadedList = dataSourceList !== undefined + + const shouldDim = !hasLoadedList || (hasLoadedList && !matchedPlugin) + + return { + isChecking: !hasLoadedList, + isMissing: hasLoadedList && !matchedPlugin, + uniqueIdentifier, + canInstall, + onInstallSuccess, + shouldDim, + } +} + +export const useNodePluginInstallation = (data: CommonNodeType): InstallationState => { + const toolInstallation = useToolInstallation(data as ToolNodeType) + const triggerInstallation = useTriggerInstallation(data as PluginTriggerNodeType) + const dataSourceInstallation = useDataSourceInstallation(data as DataSourceNodeType) + + switch (data.type as BlockEnum) { + case BlockEnum.Tool: + return toolInstallation + case BlockEnum.TriggerPlugin: + return triggerInstallation + case BlockEnum.DataSource: + return dataSourceInstallation + default: + return { + isChecking: false, + isMissing: false, + uniqueIdentifier: undefined, + canInstall: false, + onInstallSuccess: () => undefined, + shouldDim: false, + } + } +} diff --git a/web/app/components/workflow/nodes/_base/components/install-plugin-button.tsx b/web/app/components/workflow/nodes/_base/components/install-plugin-button.tsx index 23119f0213..b0d878d53d 100644 --- a/web/app/components/workflow/nodes/_base/components/install-plugin-button.tsx +++ b/web/app/components/workflow/nodes/_base/components/install-plugin-button.tsx @@ -1,37 +1,96 @@ import Button from '@/app/components/base/button' import { RiInstallLine, RiLoader2Line } from '@remixicon/react' import type { ComponentProps, MouseEventHandler } from 'react' +import { useState } from 'react' import classNames from '@/utils/classnames' import { useTranslation } from 'react-i18next' +import checkTaskStatus from '@/app/components/plugins/install-plugin/base/check-task-status' +import { TaskStatus } from '@/app/components/plugins/types' import { useCheckInstalled, useInstallPackageFromMarketPlace } from '@/service/use-plugins' type InstallPluginButtonProps = Omit, 'children' | 'loading'> & { uniqueIdentifier: string + extraIdentifiers?: string[] onSuccess?: () => void } export const InstallPluginButton = (props: InstallPluginButtonProps) => { - const { className, uniqueIdentifier, onSuccess, ...rest } = props + const { + className, + uniqueIdentifier, + extraIdentifiers = [], + onSuccess, + ...rest + } = props const { t } = useTranslation() + const identifiers = Array.from(new Set( + [uniqueIdentifier, ...extraIdentifiers].filter((item): item is string => Boolean(item)), + )) const manifest = useCheckInstalled({ - pluginIds: [uniqueIdentifier], - enabled: !!uniqueIdentifier, + pluginIds: identifiers, + enabled: identifiers.length > 0, }) const install = useInstallPackageFromMarketPlace() - const isLoading = manifest.isLoading || install.isPending - // await for refetch to get the new installed plugin, when manifest refetch, this component will unmount - || install.isSuccess + const [isTracking, setIsTracking] = useState(false) + const isLoading = manifest.isLoading || install.isPending || isTracking const handleInstall: MouseEventHandler = (e) => { e.stopPropagation() + if (isLoading) + return + setIsTracking(true) install.mutate(uniqueIdentifier, { - onSuccess: async () => { - await manifest.refetch() - onSuccess?.() + onSuccess: async (response) => { + const finish = async () => { + await manifest.refetch() + onSuccess?.() + setIsTracking(false) + install.reset() + } + + if (!response) { + await finish() + return + } + + if (response.all_installed) { + await finish() + return + } + + const { check } = checkTaskStatus() + try { + const { status } = await check({ + taskId: response.task_id, + pluginUniqueIdentifier: uniqueIdentifier, + }) + + if (status === TaskStatus.failed) { + setIsTracking(false) + install.reset() + return + } + + await finish() + } + catch { + setIsTracking(false) + install.reset() + } + }, + onError: () => { + setIsTracking(false) + install.reset() }, }) } if (!manifest.data) return null - if (manifest.data.plugins.some(plugin => plugin.id === uniqueIdentifier)) return null + const identifierSet = new Set(identifiers) + const isInstalled = manifest.data.plugins.some(plugin => ( + identifierSet.has(plugin.id) + || (plugin.plugin_unique_identifier && identifierSet.has(plugin.plugin_unique_identifier)) + || (plugin.plugin_id && identifierSet.has(plugin.plugin_id)) + )) + if (isInstalled) return null return