diff --git a/.agents/skills/how-to-write-component/SKILL.md b/.agents/skills/how-to-write-component/SKILL.md index f7e6e595092..8a480c8fd09 100644 --- a/.agents/skills/how-to-write-component/SKILL.md +++ b/.agents/skills/how-to-write-component/SKILL.md @@ -37,12 +37,16 @@ Use this as the decision guide for React/TypeScript component structure. Existin - Do not replace prop drilling with one top-level hook that returns a large view model and then thread that object through section props. Move each hook, query, derived value, and handler to the concrete section that consumes it, or use feature-scoped Jotai atoms for simple shared form/UI state when siblings need the same source of truth. - When using feature-scoped Jotai state for a form, drawer, or other secondary surface, scope the store to that surface instance when stale cross-instance state is possible. Initialize stable config at the owning boundary, then let descendants read only the atoms or purpose-named hooks they actually need. - For Jotai-backed surfaces, put shared query atoms, mutation atoms, derived state, and write actions in the feature state file when they coordinate multiple descendants. The lowest-owner rule still applies to independent visual surfaces that do not participate in shared state. +- For repeated row/menu action surfaces that need reset, hydrate the stable identity at the surface entry and scope only the primitives that truly need per-instance reset, such as open flags, drafts, or selected local options. - Keep callbacks in a parent only for workflow coordination such as form submission, shared selection, batch behavior, or navigation. Otherwise let the child or row own its action. - Prefer uncontrolled DOM state and CSS variables before adding controlled props. ## Feature-Scoped Jotai State - A module's feature-local state lives in one state file for Jotai-backed features: primitive atoms, query atoms, derived atoms, write-only action atoms, mutation atoms, submission orchestration, provider exports, and optional scope configuration. +- Keep state local when one component owns it, even inside Jotai-backed features. Dialog open flags, menu/popover visibility, confirmation visibility, form/input drafts, row-local pending flags, and in-flight refs usually belong in component state. +- Promote UI state to an atom only when siblings need the same source of truth, the value drives a query or mutation atom, a parent workflow coordinates the state, or the state intentionally persists across hidden or unmounted descendants within a scoped surface. +- Reflect atom-backed surface-wide locks or invariants in every affected trigger. If only one row, menu, or dialog should be disabled, keep the pending or lock state local to that row, menu, or dialog. - Atom order in the state file follows the dependency graph: types/constants, editable primitives, query atoms, query-data derived atoms, readiness/business derived atoms, write actions, mutation atoms, submission orchestration, provider exports. - Derived atom names read as business facts. Write atom names read as user or workflow commands. - UI components read and write the exact atom they use with `useAtomValue` or `useSetAtom`. Repeated workflow semantics live in named derived atoms or write atoms. @@ -51,7 +55,11 @@ Use this as the decision guide for React/TypeScript component structure. Existin - Avoid feature hooks that aggregate form values, query results, derived state, and commands for sibling components. Prefer named derived atoms and write atoms so UI components read the exact shared fact or command they need. - When a form library owns validation, keep submit orchestration in feature state when post-submit result or error state is shared by the surface. Avoid duplicating validation gates or request shaping in UI hooks. - `jotai-tanstack-query` atoms use the same QueryClient as the React Query provider. Query atoms belong in feature state when atoms are the feature's local state surface. -- Jotai scope is an optional instance-isolation tool for secondary surfaces with independent local state. Query atoms keep shared cache behavior through the shared QueryClient. +- Jotai scope is an optional instance-isolation tool for secondary surfaces with independent local state. Query and mutation atoms keep shared cache behavior through the shared QueryClient. +- Do not put `atomWithQuery`, `atomWithInfiniteQuery`, `atomWithMutation`, or broad derived orchestration atoms in a `ScopeProvider` just to reset a surface. Scoped derived atoms implicitly scope their dependencies, which can duplicate query client access and break shared invalidation. Leave query/mutation atoms unscoped; let them read scoped primitive inputs. +- Scope providers should list resettable primitive atoms and explicit hydration tuples. If a derived atom must be scoped, confirm that every dependency it implicitly scopes is meant to be private to that surface. +- Keep independent dialog lifecycles separate. Avoid a single discriminated "current action dialog" atom when edit, delete, and other dialogs have their own open state, loading guard, or reset behavior. +- Route-derived stable identities that do not need instance reset or scoped isolation can be hydrated at the route or layout boundary into a feature route atom. Use scoped atoms only when stale cross-instance state or per-surface reset semantics are needed. ## Components, Props, And Types @@ -74,6 +82,7 @@ Use this as the decision guide for React/TypeScript component structure. Existin - Use generated enum objects and union types directly in props, comparisons, status logic, and i18n keys. Do not add local enum constants or parallel frontend enum/status layers unless they model real product state not represented by the API. Presentation-only tone maps should be keyed by the generated enum. - Normalize or coerce only at a real boundary, such as user-entered forms, search, URL/query params, file names, DOM IDs, or legacy adapters. Preserve user-entered values when whitespace or formatting can be meaningful. - Do not coerce nullable or optional API strings to `''` in query, derived model, or payload-building code. Keep `undefined` or `null` until the final boundary that requires a string. +- Do not use `value || undefined` for mutation payload fields where an empty string means "clear this value". Trim or normalize at the form boundary, then preserve `''` when the API contract treats it as an intentional update. - Local UI models are fine for presentation, form state, select options, or guarded required-field refinements. Name them as UI concepts, not generated DTO mirrors. - Required-value refinements are allowed only after same-branch filtering or early return. Prefer nullable-tolerant props for render-only data. - When a component needs a stricter shape than a generated DTO, refine once at the API/query-to-UI boundary into a purpose-named UI type instead of hiding missing fields with generic fallback or coercion helpers. @@ -93,12 +102,17 @@ Use this as the decision guide for React/TypeScript component structure. Existin - Keep `web/contract/*` as the single source of truth for API shape; follow existing domain/router patterns and the `{ params, query?, body? }` input shape. - Consume queries directly with `useQuery(consoleQuery.xxx.queryOptions(...))` or `useQuery(marketplaceQuery.xxx.queryOptions(...))`. +- In `atomWithQuery` and `atomWithInfiniteQuery`, return generated `queryOptions()` or `infiniteOptions()` directly. Pass `enabled`, `retry`, `placeholderData`, `select`, and pagination options into that call instead of spreading generated options into a hand-built object. +- In `atomWithMutation`, return generated `mutationOptions()` directly when using generated clients. Put request shaping and submit orchestration in write atoms; do not rebuild mutation option objects just to pass through the generated mutation function. +- For custom query functions that do not come from generated clients, wrap the options object with TanStack `queryOptions(...)` so query atoms still return a query options contract. - Avoid pass-through hooks and thin `web/service/use-*` wrappers that only rename `queryOptions()` or `mutationOptions()`. Extract a small `queryOptions` helper only when repeated call-site options justify it. - Keep feature hooks for real orchestration, workflow state, or shared domain behavior. - For TanStack cache data, use generated or query-derived types; do not create local wrappers for `getQueryData` or `getQueriesData`. -- For generated oRPC `queryOptions()` / `infiniteOptions()`, do not pass `skipToken` as `input`; keep a valid placeholder input shape and use `enabled` to gate missing required params because the OpenAPI codec encodes input eagerly. +- For generated oRPC `queryOptions()` / `infiniteOptions()`, keep returning the generated options directly. When required input is missing, use a whole-input branch such as `input: condition ? validInput : skipToken` together with `enabled: Boolean(condition)` so no request runs and no fake payload is built. +- Do not put `skipToken` inside a nested placeholder payload, such as `{ params: { appInstanceId: skipToken } }`. Do not create hand-written "missing queryOptions" objects or coerce required IDs to `''`. - Consume mutations directly with `useMutation(consoleQuery.xxx.mutationOptions(...))` or `useMutation(marketplaceQuery.xxx.mutationOptions(...))`; use oRPC clients as `mutationFn` only for custom flows. - Put shared cache behavior in `createTanstackQueryUtils(...experimental_defaults...)`; components may add UI feedback callbacks, but should not own shared invalidation rules. +- Component or atom mutation callbacks can handle local UI feedback such as toasts, closing dialogs, or navigation. They should not replace shared invalidation or add local cache patches for shared server state. - Do not use deprecated `useInvalid` or `useReset`. - Prefer `mutate(...)`; use `mutateAsync(...)` only when Promise semantics are required, and wrap awaited calls in `try/catch`. @@ -110,6 +124,7 @@ Use this as the decision guide for React/TypeScript component structure. Existin - Keep cohesive forms, menu bodies, and one-off helpers local unless they need their own state, reuse, or semantic boundary. - Separate hidden secondary surfaces from the trigger's main flow. For dialogs, dropdowns, popovers, and similar branches, extract a small local component that owns the trigger, open state, and hidden content when it would obscure the parent flow. - Preserve composability by separating behavior ownership from layout ownership. A dropdown action may own its trigger, open state, and menu content; the caller owns placement such as slots, offsets, and alignment. +- When a dialog, dropdown, or popover component already accepts controlled `open` state, mount the surface unconditionally unless unmounting is required for performance or reset semantics. Use keyed scope or local state reset for reset behavior instead of `{open && }` wrappers. - Avoid unnecessary DOM hierarchy. Do not add wrapper elements unless they provide layout, semantics, accessibility, state ownership, or integration with a library API; prefer fragments or styling an existing element when possible. - Avoid shallow wrappers, hook-to-props adapter components, layout-only render-prop wrappers, children-as-pass-through composition, and prop renaming unless the wrapper adds validation, orchestration, error handling, state ownership, or a real semantic boundary. If a component only calls a hook, forwards props, or passes trigger/content through to one child, move the logic into that child or make the wrapper own a real surface. @@ -120,6 +135,7 @@ Use this as the decision guide for React/TypeScript component structure. Existin - Do not use Effects to handle user actions. Put action-specific logic in the event handler where the cause is known. - Do not use Effects to copy one state value into another state value representing the same concept. Pick one source of truth and derive the rest during render. - Do not reset or adjust state from props with an Effect. Prefer a `key` reset, storing a stable ID and deriving the selected object, or guarded same-component render-time adjustment when truly necessary. +- For forms initialized from query data, prefer keyed remounts or surface-entry hydration of form/field atoms over an Effect that copies query data into form state. - Prefer framework data APIs or TanStack Query for data fetching instead of writing request Effects in components. - If an Effect still seems necessary, first name the external system it synchronizes with. If there is no external system, remove the Effect and restructure the state or event flow. diff --git a/.github/workflows/build-push.yml b/.github/workflows/build-push.yml index 0134ea850d0..63ce63cad42 100644 --- a/.github/workflows/build-push.yml +++ b/.github/workflows/build-push.yml @@ -21,6 +21,7 @@ env: DIFY_WEB_IMAGE_NAME: ${{ vars.DIFY_WEB_IMAGE_NAME || 'langgenius/dify-web' }} DIFY_API_IMAGE_NAME: ${{ vars.DIFY_API_IMAGE_NAME || 'langgenius/dify-api' }} DIFY_AGENT_IMAGE_NAME: ${{ vars.DIFY_AGENT_IMAGE_NAME || 'langgenius/dify-agent-backend' }} + DIFY_AGENT_LOCAL_SANDBOX_IMAGE_NAME: ${{ vars.DIFY_AGENT_LOCAL_SANDBOX_IMAGE_NAME || 'langgenius/dify-agent-local-sandbox' }} jobs: build: @@ -74,6 +75,20 @@ jobs: file: "dify-agent/Dockerfile" platform: linux/arm64 runs_on: depot-ubuntu-24.04-4 + - service_name: "build-agent-local-sandbox-amd64" + image_name_env: "DIFY_AGENT_LOCAL_SANDBOX_IMAGE_NAME" + artifact_context: "local-sandbox" + build_context: "{{defaultContext}}:dify-agent" + file: "docker/local-sandbox/Dockerfile" + platform: linux/amd64 + runs_on: depot-ubuntu-24.04-4 + - service_name: "build-agent-local-sandbox-arm64" + image_name_env: "DIFY_AGENT_LOCAL_SANDBOX_IMAGE_NAME" + artifact_context: "local-sandbox" + build_context: "{{defaultContext}}:dify-agent" + file: "docker/local-sandbox/Dockerfile" + platform: linux/arm64 + runs_on: depot-ubuntu-24.04-4 steps: - name: Prepare @@ -139,6 +154,9 @@ jobs: - service_name: "validate-agent-amd64" build_context: "{{defaultContext}}" file: "dify-agent/Dockerfile" + - service_name: "validate-agent-local-sandbox-amd64" + build_context: "{{defaultContext}}:dify-agent" + file: "docker/local-sandbox/Dockerfile" steps: - name: Set up Docker Buildx uses: docker/setup-buildx-action@d7f5e7f509e45cec5c76c4d5afdd7de93d0b3df5 # v4.1.0 @@ -167,6 +185,9 @@ jobs: - service_name: "merge-agent-images" image_name_env: "DIFY_AGENT_IMAGE_NAME" context: "agent" + - service_name: "merge-agent-local-sandbox-images" + image_name_env: "DIFY_AGENT_LOCAL_SANDBOX_IMAGE_NAME" + context: "local-sandbox" steps: - name: Download digests uses: actions/download-artifact@3e5f45b2cfb9172054b4087a40e8e0b5a5461e7c # v8.0.1 diff --git a/api/clients/agent_backend/request_builder.py b/api/clients/agent_backend/request_builder.py index c245a09e970..6eadd4ce3d8 100644 --- a/api/clients/agent_backend/request_builder.py +++ b/api/clients/agent_backend/request_builder.py @@ -78,6 +78,13 @@ def _filter_snapshot_to_specs( return CompositorSessionSnapshot(schema_version=snapshot.schema_version, layers=filtered_layers) +def _shell_layer_deps(*, include_drive: bool) -> dict[str, str]: + deps = {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID} + if include_drive: + deps["drive"] = DIFY_DRIVE_LAYER_ID + return deps + + class AgentBackendModelConfig(BaseModel): """API-side model/plugin selection before it is converted to Dify Agent layers.""" @@ -263,6 +270,7 @@ class AgentBackendRunRequestBuilder: RunLayerSpec( name=DIFY_DRIVE_LAYER_ID, type=DIFY_DRIVE_LAYER_TYPE_ID, + deps={"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}, metadata=run_input.metadata, config=run_input.drive_config, ) @@ -329,14 +337,15 @@ class AgentBackendRunRequestBuilder: ) if run_input.include_shell: - # Sandboxed bash workspace (dify.shell). Depends on execution_context so - # the agent server can mint per-command Agent Stub env (back proxy); + # Sandboxed bash workspace (dify.shell). Depends on execution_context + # so the agent server can mint per-command Agent Stub env, and on + # drive when present so that env points at /mnt/drive/. # shellctl connection itself is server-injected. layers.append( RunLayerSpec( name=DIFY_SHELL_LAYER_ID, type=DIFY_SHELL_LAYER_TYPE_ID, - deps={"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}, + deps=_shell_layer_deps(include_drive=run_input.drive_config is not None), metadata=run_input.metadata, config=run_input.shell_config or DifyShellLayerConfig(), ) @@ -460,6 +469,7 @@ class AgentBackendRunRequestBuilder: RunLayerSpec( name=DIFY_DRIVE_LAYER_ID, type=DIFY_DRIVE_LAYER_TYPE_ID, + deps={"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}, metadata=run_input.metadata, config=run_input.drive_config, ) @@ -528,14 +538,15 @@ class AgentBackendRunRequestBuilder: ) if run_input.include_shell: - # Sandboxed bash workspace (dify.shell). Depends on execution_context so - # the agent server can mint per-command Agent Stub env (back proxy); + # Sandboxed bash workspace (dify.shell). Depends on execution_context + # so the agent server can mint per-command Agent Stub env, and on + # drive when present so that env points at /mnt/drive/. # shellctl connection itself is server-injected. layers.append( RunLayerSpec( name=DIFY_SHELL_LAYER_ID, type=DIFY_SHELL_LAYER_TYPE_ID, - deps={"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}, + deps=_shell_layer_deps(include_drive=run_input.drive_config is not None), metadata=run_input.metadata, config=run_input.shell_config or DifyShellLayerConfig(), ) diff --git a/api/commands/__init__.py b/api/commands/__init__.py index 94321ed1e49..e4207bea74e 100644 --- a/api/commands/__init__.py +++ b/api/commands/__init__.py @@ -25,6 +25,7 @@ from .plugin import ( from .rbac import migrate_member_roles_to_rbac from .retention import ( archive_workflow_runs, + archive_workflow_runs_plan, clean_expired_messages, clean_workflow_runs, cleanup_orphaned_draft_variables, @@ -51,6 +52,7 @@ from .vector import ( __all__ = [ "add_qdrant_index", "archive_workflow_runs", + "archive_workflow_runs_plan", "backfill_plugin_auto_upgrade", "clean_expired_messages", "clean_workflow_runs", diff --git a/api/commands/retention.py b/api/commands/retention.py index 657a2a2e839..1386e367aff 100644 --- a/api/commands/retention.py +++ b/api/commands/retention.py @@ -12,10 +12,160 @@ from services.clear_free_plan_tenant_expired_logs import ClearFreePlanTenantExpi from services.retention.conversation.messages_clean_policy import create_message_clean_policy from services.retention.conversation.messages_clean_service import MessagesCleanService from services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup +from services.retention.workflow_run.tenant_prefix import tenant_prefix_condition from tasks.remove_app_and_related_data_task import delete_draft_variables_batch logger = logging.getLogger(__name__) +_HEX_PREFIXES = tuple("0123456789abcdef") + + +class WorkflowRunArchivePlanRow(TypedDict): + tenant_prefix: str + total_tenants: int + workflow_runs: int + workflow_node_executions: int + paid_tenants: int + unpaid_tenants: int + + +class WorkflowRunArchiveTenantPlan(TypedDict): + archive_tenant_ids: list[str] | None + paid_tenant_ids: list[str] + unpaid_tenant_ids: list[str] + + +def _parse_tenant_prefixes(prefixes: str | None) -> list[str]: + if not prefixes: + return [] + + parsed = [] + for raw_prefix in prefixes.split(","): + prefix = raw_prefix.strip().lower() + if not prefix: + continue + if len(prefix) != 1 or prefix not in _HEX_PREFIXES: + raise click.UsageError("--tenant-prefixes must be a comma-separated list of hex digits, e.g. 0,1,a,f.") + parsed.append(prefix) + return sorted(set(parsed)) + + +def _get_archive_candidate_tenant_ids_by_prefix( + prefix: str, + *, + start_from: datetime.datetime | None, + end_before: datetime.datetime, +) -> list[str]: + from graphon.enums import WorkflowExecutionStatus + from models.workflow import WorkflowRun + from services.retention.workflow_run.archive_paid_plan_workflow_run import WorkflowRunArchiver + + conditions = [ + WorkflowRun.created_at < end_before, + WorkflowRun.status.in_(WorkflowExecutionStatus.ended_values()), + WorkflowRun.type.in_(WorkflowRunArchiver.ARCHIVED_TYPE), + tenant_prefix_condition(WorkflowRun.tenant_id, prefix), + ] + if start_from is not None: + conditions.append(WorkflowRun.created_at >= start_from) + + tenant_ids = db.session.scalars( + sa.select(WorkflowRun.tenant_id).where(*conditions).distinct().order_by(WorkflowRun.tenant_id) + ).all() + return list(tenant_ids) + + +def _filter_paid_workflow_archive_tenant_ids(tenant_ids: list[str]) -> tuple[list[str], list[str]]: + from configs import dify_config + from enums.cloud_plan import CloudPlan + from services.billing_service import BillingService + + tenant_ids = sorted(set(tenant_ids)) + if not tenant_ids: + return [], [] + if not dify_config.BILLING_ENABLED: + return tenant_ids, [] + + plans = BillingService.get_plan_bulk_with_cache(tenant_ids) + paid_tenant_ids = [ + tenant_id + for tenant_id in tenant_ids + if plans.get(tenant_id) and plans[tenant_id].get("plan") in (CloudPlan.PROFESSIONAL, CloudPlan.TEAM) + ] + unpaid_tenant_ids = sorted(set(tenant_ids) - set(paid_tenant_ids)) + return paid_tenant_ids, unpaid_tenant_ids + + +def _resolve_archive_tenant_ids_from_plan( + *, + tenant_ids: str | None, + tenant_prefixes: list[str], + start_from: datetime.datetime | None, + end_before: datetime.datetime, +) -> WorkflowRunArchiveTenantPlan: + """ + Resolve the archive tenant scope once before scanning workflow_runs. + + Prefix rollout should use the tenant list collected by the same planning path, then archive by + tenant_id IN (...). Scanning workflow_runs with a tenant prefix range in every archive run is too expensive on + the large production table this command is meant to shrink. + """ + if tenant_ids: + requested_tenant_ids = [tid.strip() for tid in tenant_ids.split(",") if tid.strip()] + elif tenant_prefixes: + requested_tenant_ids = [] + for prefix in tenant_prefixes: + requested_tenant_ids.extend( + _get_archive_candidate_tenant_ids_by_prefix( + prefix, + start_from=start_from, + end_before=end_before, + ) + ) + else: + return WorkflowRunArchiveTenantPlan( + archive_tenant_ids=None, + paid_tenant_ids=[], + unpaid_tenant_ids=[], + ) + + paid_tenant_ids, unpaid_tenant_ids = _filter_paid_workflow_archive_tenant_ids(requested_tenant_ids) + return WorkflowRunArchiveTenantPlan( + archive_tenant_ids=paid_tenant_ids, + paid_tenant_ids=paid_tenant_ids, + unpaid_tenant_ids=unpaid_tenant_ids, + ) + + +def _resolve_archive_time_range( + *, + before_days: int, + from_days_ago: int | None, + to_days_ago: int | None, + start_from: datetime.datetime | None, + end_before: datetime.datetime | None, +) -> tuple[int, datetime.datetime | None, datetime.datetime | None]: + if (start_from is None) ^ (end_before is None): + raise click.UsageError("--start-from and --end-before must be provided together.") + + if (from_days_ago is None) ^ (to_days_ago is None): + raise click.UsageError("--from-days-ago and --to-days-ago must be provided together.") + + if from_days_ago is not None and to_days_ago is not None: + if start_from or end_before: + raise click.UsageError("Choose either day offsets or explicit dates, not both.") + if from_days_ago <= to_days_ago: + raise click.UsageError("--from-days-ago must be greater than --to-days-ago.") + now = datetime.datetime.now() + start_from = now - datetime.timedelta(days=from_days_ago) + end_before = now - datetime.timedelta(days=to_days_ago) + before_days = 0 + + if start_from and end_before and start_from >= end_before: + raise click.UsageError("--start-from must be earlier than --end-before.") + + return before_days, start_from, end_before + @click.command("clear-free-plan-tenant-expired-logs", help="Clear free plan tenant expired logs.") @click.option("--days", prompt=True, help="The days to clear free plan tenant expired logs.", default=30) @@ -139,11 +289,143 @@ def clean_workflow_runs( ) +@click.command( + "archive-workflow-runs-plan", + help="Plan workflow run archive rollout by tenant ID first hex digit.", +) +@click.option("--before-days", default=90, show_default=True, help="Plan runs older than N days.") +@click.option( + "--from-days-ago", + default=None, + type=click.IntRange(min=0), + help="Lower bound in days ago (older). Must be paired with --to-days-ago.", +) +@click.option( + "--to-days-ago", + default=None, + type=click.IntRange(min=0), + help="Upper bound in days ago (newer). Must be paired with --from-days-ago.", +) +@click.option( + "--start-from", + type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]), + default=None, + help="Plan runs created at or after this timestamp (UTC if no timezone).", +) +@click.option( + "--end-before", + type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]), + default=None, + help="Plan runs created before this timestamp (UTC if no timezone).", +) +@click.option( + "--include-archived", + is_flag=True, + help="Compatibility no-op for V2 bundle archive; plan counts source rows in the requested window.", +) +def archive_workflow_runs_plan( + before_days: int, + from_days_ago: int | None, + to_days_ago: int | None, + start_from: datetime.datetime | None, + end_before: datetime.datetime | None, + include_archived: bool, +): + """ + Print the 16 tenant-prefix rollout rows used to choose archive execution order. + + Counts use the same workflow run eligibility as archive-workflow-runs: ended runs, + supported workflow types, and the requested created_at window. V2 bundle archive + does not maintain per-run archive logs, so this plan reports source-table volume. + """ + from graphon.enums import WorkflowExecutionStatus + from models.workflow import WorkflowNodeExecutionModel, WorkflowRun + from services.retention.workflow_run.archive_paid_plan_workflow_run import WorkflowRunArchiver + + before_days, start_from, end_before = _resolve_archive_time_range( + before_days=before_days, + from_days_ago=from_days_ago, + to_days_ago=to_days_ago, + start_from=start_from, + end_before=end_before, + ) + plan_end_before = end_before or datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=before_days) + if include_archived: + click.echo(click.style("--include-archived is a no-op for V2 bundle archive plans.", fg="yellow")) + + rows: list[WorkflowRunArchivePlanRow] = [] + for prefix in _HEX_PREFIXES: + tenant_ids = _get_archive_candidate_tenant_ids_by_prefix( + prefix, + start_from=start_from, + end_before=plan_end_before, + ) + total_tenants = len(tenant_ids) + paid_tenant_ids, unpaid_tenant_ids = _filter_paid_workflow_archive_tenant_ids(tenant_ids) + + run_conditions = [ + WorkflowRun.created_at < plan_end_before, + WorkflowRun.status.in_(WorkflowExecutionStatus.ended_values()), + WorkflowRun.type.in_(WorkflowRunArchiver.ARCHIVED_TYPE), + tenant_prefix_condition(WorkflowRun.tenant_id, prefix), + ] + if start_from is not None: + run_conditions.append(WorkflowRun.created_at >= start_from) + workflow_runs = ( + db.session.scalar(sa.select(sa.func.count()).select_from(WorkflowRun).where(*run_conditions)) or 0 + ) + candidate_runs = sa.select(WorkflowRun.id).where(*run_conditions).subquery() + workflow_node_executions = ( + db.session.scalar( + sa.select(sa.func.count()) + .select_from(WorkflowNodeExecutionModel) + .join(candidate_runs, WorkflowNodeExecutionModel.workflow_run_id == candidate_runs.c.id) + ) + or 0 + ) + + rows.append( + WorkflowRunArchivePlanRow( + tenant_prefix=prefix, + total_tenants=total_tenants, + workflow_runs=workflow_runs, + workflow_node_executions=workflow_node_executions, + paid_tenants=len(paid_tenant_ids), + unpaid_tenants=len(unpaid_tenant_ids), + ) + ) + + click.echo( + click.style( + f"Workflow archive plan for runs before {plan_end_before.isoformat()}" + f"{f' and at/after {start_from.isoformat()}' if start_from else ''}.", + fg="white", + ) + ) + click.echo("tenant_prefix,total_tenants,workflow_runs,workflow_node_executions,paid_tenants,unpaid_tenants") + for row in rows: + click.echo( + f"{row['tenant_prefix']},{row['total_tenants']},{row['workflow_runs']}," + f"{row['workflow_node_executions']},{row['paid_tenants']},{row['unpaid_tenants']}" + ) + + ordered_rows = sorted( + rows, + key=lambda row: (row["workflow_runs"] + row["workflow_node_executions"], row["tenant_prefix"]), + ) + click.echo("suggested_execution_order=" + ",".join(row["tenant_prefix"] for row in ordered_rows)) + + @click.command( "archive-workflow-runs", help="Archive workflow runs for paid plan tenants to S3-compatible storage.", ) @click.option("--tenant-ids", default=None, help="Optional comma-separated tenant IDs for grayscale rollout.") +@click.option( + "--tenant-prefixes", + default=None, + help="Optional comma-separated tenant ID first hex digits for rollout waves, e.g. 0,1,a,f.", +) @click.option("--before-days", default=90, show_default=True, help="Archive runs older than N days.") @click.option( "--from-days-ago", @@ -169,13 +451,36 @@ def clean_workflow_runs( default=None, help="Archive runs created before this timestamp (UTC if no timezone).", ) -@click.option("--batch-size", default=100, show_default=True, help="Batch size for processing.") -@click.option("--workers", default=1, show_default=True, type=int, help="Concurrent workflow runs to archive.") +@click.option("--batch-size", default=100, show_default=True, help="Maximum workflow runs per archive bundle.") +@click.option( + "--workers", + default=1, + show_default=True, + type=int, + help="Reserved; bundle archive currently runs serially.", +) +@click.option( + "--run-shard-index", + default=None, + type=click.IntRange(min=0), + help="Zero-based workflow run shard index for parallel cron jobs. Must be paired with --run-shard-total.", +) +@click.option( + "--run-shard-total", + default=None, + type=click.IntRange(min=1, max=16), + help="Total workflow run shard count for parallel cron jobs. Must be paired with --run-shard-index.", +) @click.option("--limit", default=None, type=int, help="Maximum number of runs to archive.") @click.option("--dry-run", is_flag=True, help="Preview without archiving.") -@click.option("--delete-after-archive", is_flag=True, help="Delete runs and related data after archiving.") +@click.option( + "--delete-after-archive", + is_flag=True, + help="Not supported by bundle archive; use a separate bundle delete workflow after validation.", +) def archive_workflow_runs( tenant_ids: str | None, + tenant_prefixes: str | None, before_days: int, from_days_ago: int | None, to_days_ago: int | None, @@ -183,6 +488,8 @@ def archive_workflow_runs( end_before: datetime.datetime | None, batch_size: int, workers: int, + run_shard_index: int | None, + run_shard_total: int | None, limit: int | None, dry_run: bool, delete_after_archive: bool, @@ -190,14 +497,19 @@ def archive_workflow_runs( """ Archive workflow runs for paid plan tenants older than the specified days. - This command archives the following tables to storage: + This command writes V2 tenant/month/shard archive bundles. Each bundle contains Parquet snapshots from: + - workflow_runs + - workflow_app_logs - workflow_node_executions - workflow_node_execution_offload - workflow_pauses - workflow_pause_reasons - workflow_trigger_logs - The workflow_runs and workflow_app_logs tables are preserved for UI listing. + Source database rows are always preserved by archive. Deletion must be handled by + a separate bundle-level delete workflow after manifest, checksum, row-count, and + restore-sampling validation. In --dry-run mode, no storage or database writes + happen; the command estimates per-table Parquet bytes and object size instead. """ from services.retention.workflow_run.archive_paid_plan_workflow_run import WorkflowRunArchiver @@ -209,32 +521,58 @@ def archive_workflow_runs( ) ) - if (start_from is None) ^ (end_before is None): - click.echo(click.style("start-from and end-before must be provided together.", fg="red")) - return - - if (from_days_ago is None) ^ (to_days_ago is None): - click.echo(click.style("from-days-ago and to-days-ago must be provided together.", fg="red")) - return - - if from_days_ago is not None and to_days_ago is not None: - if start_from or end_before: - click.echo(click.style("Choose either day offsets or explicit dates, not both.", fg="red")) - return - if from_days_ago <= to_days_ago: - click.echo(click.style("from-days-ago must be greater than to-days-ago.", fg="red")) - return - now = datetime.datetime.now() - start_from = now - datetime.timedelta(days=from_days_ago) - end_before = now - datetime.timedelta(days=to_days_ago) - before_days = 0 - - if start_from and end_before and start_from >= end_before: - click.echo(click.style("start-from must be earlier than end-before.", fg="red")) + try: + before_days, start_from, end_before = _resolve_archive_time_range( + before_days=before_days, + from_days_ago=from_days_ago, + to_days_ago=to_days_ago, + start_from=start_from, + end_before=end_before, + ) + parsed_tenant_prefixes = _parse_tenant_prefixes(tenant_prefixes) + except click.UsageError as e: + click.echo(click.style(e.message, fg="red")) return + plan_end_before = end_before or datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=before_days) if workers < 1: click.echo(click.style("workers must be at least 1.", fg="red")) return + if (run_shard_index is None) ^ (run_shard_total is None): + click.echo(click.style("run-shard-index and run-shard-total must be provided together.", fg="red")) + return + if run_shard_index is not None and run_shard_total is not None and run_shard_index >= run_shard_total: + click.echo(click.style("run-shard-index must be less than run-shard-total.", fg="red")) + return + if delete_after_archive: + click.echo(click.style("delete-after-archive is not supported by bundle archive.", fg="red")) + return + + try: + tenant_plan = _resolve_archive_tenant_ids_from_plan( + tenant_ids=tenant_ids, + tenant_prefixes=parsed_tenant_prefixes, + start_from=start_from, + end_before=plan_end_before, + ) + except Exception: + logger.exception("Failed to resolve workflow archive tenant plan") + click.echo(click.style("Failed to resolve workflow archive tenant plan.", fg="red")) + return + + planned_tenant_ids = tenant_plan["archive_tenant_ids"] + planned_paid_tenant_ids = tenant_plan["paid_tenant_ids"] if planned_tenant_ids is not None else None + paid_tenants = len(tenant_plan["paid_tenant_ids"]) + unpaid_tenants = len(tenant_plan["unpaid_tenant_ids"]) + if planned_tenant_ids is not None: + click.echo( + click.style( + f"Resolved archive tenant plan: paid_tenants={paid_tenants}, unpaid_tenants={unpaid_tenants}.", + fg="white", + ) + ) + if not planned_tenant_ids: + click.echo(click.style("No paid tenants matched the archive plan; nothing to archive.", fg="yellow")) + return archiver = WorkflowRunArchiver( days=before_days, @@ -242,7 +580,11 @@ def archive_workflow_runs( start_from=start_from, end_before=end_before, workers=workers, - tenant_ids=[tid.strip() for tid in tenant_ids.split(",")] if tenant_ids else None, + tenant_ids=planned_tenant_ids, + tenant_prefixes=parsed_tenant_prefixes, + paid_tenant_ids=planned_paid_tenant_ids, + run_shard_index=run_shard_index, + run_shard_total=run_shard_total, limit=limit, dry_run=dry_run, delete_after_archive=delete_after_archive, @@ -252,7 +594,9 @@ def archive_workflow_runs( click.style( f"Summary: processed={summary.total_runs_processed}, archived={summary.runs_archived}, " f"skipped={summary.runs_skipped}, failed={summary.runs_failed}, " - f"time={summary.total_elapsed_time:.2f}s", + f"bundles_archived={summary.bundles_archived}, bundles_skipped={summary.bundles_skipped}, " + f"bundles_failed={summary.bundles_failed}, " + f"object_size_bytes={summary.total_object_size_bytes}, time={summary.total_elapsed_time:.2f}s", fg="cyan", ) ) @@ -268,6 +612,52 @@ def archive_workflow_runs( ) +def _echo_bundle_archive_operation_summary(summary) -> None: + status = "completed successfully" if summary.bundles_failed == 0 else "completed with failures" + fg = "green" if summary.bundles_failed == 0 else "red" + click.echo( + click.style( + f"{summary.operation} {status}. " + f"bundles_success={summary.bundles_succeeded} bundles_failed={summary.bundles_failed} " + f"runs={summary.runs_processed} rows={summary.rows_processed} " + f"archive_bytes={summary.archive_bytes} duration={summary.elapsed_time:.2f}s " + f"validation_time={summary.validation_time:.2f}s " + f"runs_per_second={summary.runs_per_second:.2f} rows_per_second={summary.rows_per_second:.2f} " + f"bytes_per_second={summary.bytes_per_second:.2f}", + fg=fg, + ) + ) + click.echo(click.style("table,row_count", fg="white")) + for table_name in [ + "workflow_runs", + "workflow_app_logs", + "workflow_node_executions", + "workflow_node_execution_offload", + "workflow_pauses", + "workflow_pause_reasons", + "workflow_trigger_logs", + ]: + click.echo(f"{table_name},{summary.table_counts.get(table_name, 0)}") + for result in summary.results: + if result.success: + click.echo( + click.style( + f" bundle={result.bundle_id} tenant={result.tenant_id} runs={result.run_count} " + f"rows={result.row_count} archive_bytes={result.archive_bytes} " + f"time={result.elapsed_time:.2f}s validation={result.validation_time:.2f}s", + fg="white", + ) + ) + else: + click.echo( + click.style( + f" failed bundle={result.bundle_id} tenant={result.tenant_id} " + f"object_prefix={result.object_prefix} error={result.error}", + fg="red", + ) + ) + + @click.command( "restore-workflow-runs", help="Restore archived workflow runs from S3-compatible storage.", @@ -290,8 +680,8 @@ def archive_workflow_runs( default=None, help="Optional upper bound (exclusive) for created_at; must be paired with --start-from.", ) -@click.option("--workers", default=1, show_default=True, type=int, help="Concurrent workflow runs to restore.") -@click.option("--limit", type=int, default=100, show_default=True, help="Maximum number of runs to restore.") +@click.option("--workers", default=1, show_default=True, type=int, help="V1 --run-id compatibility only.") +@click.option("--limit", type=int, default=100, show_default=True, help="Maximum number of V2 bundles to restore.") @click.option("--dry-run", is_flag=True, help="Preview without restoring.") def restore_workflow_runs( tenant_ids: str | None, @@ -303,15 +693,18 @@ def restore_workflow_runs( dry_run: bool, ): """ - Restore an archived workflow run from storage to the database. + Restore archived workflow runs from storage to the database. - This restores the following tables: + Batch restore uses V2 bundle metadata and validates archive objects before writing source rows. This restores: + - workflow_runs + - workflow_app_logs - workflow_node_executions - workflow_node_execution_offload - workflow_pauses - workflow_pause_reasons - workflow_trigger_logs """ + from services.retention.workflow_run.bundle_archive_maintenance import WorkflowRunBundleArchiveMaintenance from services.retention.workflow_run.restore_archived_workflow_run import WorkflowRunRestore parsed_tenant_ids = None @@ -335,39 +728,46 @@ def restore_workflow_runs( ) ) - restorer = WorkflowRunRestore(dry_run=dry_run, workers=workers) if run_id: + restorer = WorkflowRunRestore(dry_run=dry_run, workers=workers) results = [restorer.restore_by_run_id(run_id)] - else: - assert start_from is not None - assert end_before is not None - results = restorer.restore_batch( - parsed_tenant_ids, - start_date=start_from, - end_date=end_before, - limit=limit, - ) + end_time = datetime.datetime.now(datetime.UTC) + elapsed = end_time - start_time - end_time = datetime.datetime.now(datetime.UTC) - elapsed = end_time - start_time + successes = sum(1 for result in results if result.success) + failures = len(results) - successes - successes = sum(1 for result in results if result.success) - failures = len(results) - successes - - if failures == 0: - click.echo( - click.style( - f"Restore completed successfully. success={successes} duration={elapsed}", - fg="green", + if failures == 0: + click.echo( + click.style( + f"Restore completed successfully. success={successes} duration={elapsed}", + fg="green", + ) ) - ) - else: - click.echo( - click.style( - f"Restore completed with failures. success={successes} failed={failures} duration={elapsed}", - fg="red", + else: + click.echo( + click.style( + f"Restore completed with failures. success={successes} failed={failures} duration={elapsed}", + fg="red", + ) ) + return + + if workers != 1: + click.echo( + click.style("--workers is ignored for V2 bundle restore; bundles are processed serially.", fg="yellow") ) + assert start_from is not None + assert end_before is not None + bundle_restorer = WorkflowRunBundleArchiveMaintenance(dry_run=dry_run, strict_content_validation=True) + summary = bundle_restorer.restore_batch( + tenant_ids=parsed_tenant_ids, + start_date=start_from, + end_date=end_before, + limit=limit, + ) + _echo_bundle_archive_operation_summary(summary) + return @click.command( @@ -392,8 +792,20 @@ def restore_workflow_runs( default=None, help="Optional upper bound (exclusive) for created_at; must be paired with --start-from.", ) -@click.option("--limit", type=int, default=100, show_default=True, help="Maximum number of runs to delete.") +@click.option("--limit", type=int, default=100, show_default=True, help="Maximum number of V2 bundles to delete.") @click.option("--dry-run", is_flag=True, help="Preview without deleting.") +@click.option( + "--skip-bad-archives", + is_flag=True, + help="Continue batch deletion when one archive object fails validation.", +) +@click.option( + "--restore-sample-interval", + type=int, + default=0, + show_default=True, + help="Run restore dry-run after every N successful deletes; 0 disables restore sampling.", +) def delete_archived_workflow_runs( tenant_ids: str | None, run_id: str | None, @@ -401,10 +813,16 @@ def delete_archived_workflow_runs( end_before: datetime.datetime | None, limit: int, dry_run: bool, + skip_bad_archives: bool, + restore_sample_interval: int, ): """ Delete archived workflow runs from the database. + + Batch delete uses V2 bundle metadata and validates object existence, manifest schema, object size, checksum, row + counts, and source/archive content checksums before deleting source rows. `--run-id` keeps the V1 per-run path. """ + from services.retention.workflow_run.bundle_archive_maintenance import WorkflowRunBundleArchiveMaintenance from services.retention.workflow_run.delete_archived_workflow_run import ArchivedWorkflowRunDeletion parsed_tenant_ids = None @@ -417,6 +835,8 @@ def delete_archived_workflow_runs( raise click.UsageError("--start-from and --end-before must be provided together.") if run_id is None and (start_from is None or end_before is None): raise click.UsageError("--start-from and --end-before are required for batch delete.") + if restore_sample_interval < 0: + raise click.BadParameter("restore-sample-interval must be >= 0") start_time = datetime.datetime.now(datetime.UTC) target_desc = f"workflow run {run_id}" if run_id else "workflow runs" @@ -427,56 +847,85 @@ def delete_archived_workflow_runs( ) ) - deleter = ArchivedWorkflowRunDeletion(dry_run=dry_run) if run_id: - results = [deleter.delete_by_run_id(run_id)] - else: - assert start_from is not None - assert end_before is not None - results = deleter.delete_batch( - parsed_tenant_ids, - start_date=start_from, - end_date=end_before, - limit=limit, + deleter = ArchivedWorkflowRunDeletion( + dry_run=dry_run, + skip_bad_archives=skip_bad_archives, + restore_sample_interval=restore_sample_interval, ) + results = [deleter.delete_by_run_id(run_id)] + for result in results: + if result.success: + click.echo( + click.style( + f"{'[DRY RUN] Would delete' if dry_run else 'Deleted'} " + f"workflow run {result.run_id} (tenant={result.tenant_id}, " + f"archive_key={result.archive_key}, counts={result.validated_counts})", + fg="green", + ) + ) + if result.restore_sampled: + sample_status = "passed" if result.restore_sample_success else "failed" + click.echo( + click.style( + f" restore dry-run sample {sample_status} for workflow run {result.run_id}", + fg="green" if result.restore_sample_success else "red", + ) + ) + else: + click.echo( + click.style( + f"Failed to delete workflow run {result.run_id}: {result.error}", + fg="red", + ) + ) + click.echo( + click.style( + " runbook: pause this delete window, verify archive storage object and manifest/checksum, " + "retry the same run after fixing storage or DB drift, or rerun with --skip-bad-archives " + "to quarantine this run and continue the batch.", + fg="yellow", + ) + ) - for result in results: - if result.success: + end_time = datetime.datetime.now(datetime.UTC) + elapsed = end_time - start_time + + successes = sum(1 for result in results if result.success) + failures = len(results) - successes + + if failures == 0: click.echo( click.style( - f"{'[DRY RUN] Would delete' if dry_run else 'Deleted'} " - f"workflow run {result.run_id} (tenant={result.tenant_id})", + f"Delete completed successfully. success={successes} duration={elapsed}", fg="green", ) ) else: click.echo( click.style( - f"Failed to delete workflow run {result.run_id}: {result.error}", + f"Delete completed with failures. success={successes} failed={failures} duration={elapsed}", fg="red", ) ) + return - end_time = datetime.datetime.now(datetime.UTC) - elapsed = end_time - start_time - - successes = sum(1 for result in results if result.success) - failures = len(results) - successes - - if failures == 0: - click.echo( - click.style( - f"Delete completed successfully. success={successes} duration={elapsed}", - fg="green", - ) - ) - else: - click.echo( - click.style( - f"Delete completed with failures. success={successes} failed={failures} duration={elapsed}", - fg="red", - ) - ) + if restore_sample_interval: + click.echo(click.style("--restore-sample-interval is ignored for V2 bundle delete.", fg="yellow")) + assert start_from is not None + assert end_before is not None + bundle_deleter = WorkflowRunBundleArchiveMaintenance( + dry_run=dry_run, + strict_content_validation=True, + stop_on_error=not skip_bad_archives, + ) + summary = bundle_deleter.delete_batch( + tenant_ids=parsed_tenant_ids, + start_date=start_from, + end_date=end_before, + limit=limit, + ) + _echo_bundle_archive_operation_summary(summary) def _find_orphaned_draft_variables(batch_size: int = 1000) -> list[str]: diff --git a/api/controllers/console/agent/composer.py b/api/controllers/console/agent/composer.py index 2cd01e427f7..975586c635c 100644 --- a/api/controllers/console/agent/composer.py +++ b/api/controllers/console/agent/composer.py @@ -104,7 +104,7 @@ class WorkflowAgentComposerValidateApi(Resource): @with_current_tenant_id def post(self, tenant_id: str, app_model: App, node_id: str): payload = ComposerSavePayload.model_validate(console_ns.payload or {}) - ComposerConfigValidator.validate_save_payload(payload) + ComposerConfigValidator.validate_publish_payload(payload) findings = AgentComposerService.collect_validation_findings( tenant_id=tenant_id, payload=payload, @@ -238,7 +238,7 @@ class AgentComposerValidateApi(Resource): def post(self, tenant_id: str, agent_id: UUID): _resolve_agent_app_id(tenant_id=tenant_id, agent_id=agent_id) payload = ComposerSavePayload.model_validate(console_ns.payload or {}) - ComposerConfigValidator.validate_save_payload(payload) + ComposerConfigValidator.validate_publish_payload(payload) findings = AgentComposerService.collect_validation_findings( tenant_id=tenant_id, payload=payload, diff --git a/api/controllers/console/agent/roster.py b/api/controllers/console/agent/roster.py index 96bce6763f5..ac3f7ef4824 100644 --- a/api/controllers/console/agent/roster.py +++ b/api/controllers/console/agent/roster.py @@ -14,7 +14,6 @@ from controllers.console.app.app import ( ) from controllers.console.app.app import ( AppListQuery, - CopyAppPayload, _normalize_app_list_query_args, ) from controllers.console.app.app import ( @@ -110,6 +109,25 @@ class AgentAppUpdatePayload(GenericUpdateAppPayload): return role +class AgentAppCopyPayload(BaseModel): + name: str | None = Field(default=None, description="Name for the copied agent") + description: str | None = Field(default=None, description="Description for the copied agent", max_length=400) + role: str | None = Field(default=None, description="Role for the copied agent", max_length=255) + icon_type: IconType | None = Field(default=None, description="Icon type") + icon: str | None = Field(default=None, description="Icon") + icon_background: str | None = Field(default=None, description="Icon background color") + + @field_validator("role") + @classmethod + def validate_role(cls, value: str | None) -> str | None: + if value is None: + return None + role = value.strip() + if not role: + raise ValueError("Agent role is required when provided.") + return role + + class AgentApiStatusPayload(BaseModel): enable_api: bool = Field(..., description="Enable or disable Agent service API") @@ -228,6 +246,10 @@ class AgentAppDetailWithSite(GenericAppDetailWithSite): active_config_is_published: bool = False +class AgentDebugConversationRefreshResponse(BaseModel): + debug_conversation_id: str + + class AgentAppPagination(GenericAppPagination): data: list[AgentAppPartial] = Field( # type: ignore[assignment] # pyrefly: ignore[bad-override-mutable-attribute] validation_alias=AliasChoices("items", "data") @@ -238,8 +260,8 @@ register_schema_models( console_ns, AgentAppCreatePayload, AgentAppUpdatePayload, + AgentAppCopyPayload, AgentApiStatusPayload, - CopyAppPayload, AgentInviteOptionsQuery, AgentLogsQuery, AgentStatisticsQuery, @@ -254,6 +276,7 @@ register_response_schema_models( AgentAppPublishedReferenceResponse, AgentAppDetailWithSite, AgentAppPartial, + AgentDebugConversationRefreshResponse, AgentConfigSnapshotDetailResponse, AgentConfigSnapshotListResponse, AgentConfigSnapshotRestoreResponse, @@ -535,9 +558,34 @@ class AgentAppApi(Resource): return "", 204 +@console_ns.route("/agent//debug-conversation/refresh") +class AgentDebugConversationRefreshApi(Resource): + @console_ns.response( + 200, + "Agent debug conversation refreshed", + console_ns.models[AgentDebugConversationRefreshResponse.__name__], + ) + @console_ns.response(403, "Insufficient permissions") + @setup_required + @login_required + @account_initialization_required + @edit_permission_required + @with_current_user + @with_current_tenant_id + def post(self, tenant_id: str, current_user: Account, agent_id: UUID): + debug_conversation_id = _agent_roster_service().refresh_agent_app_debug_conversation_id( + tenant_id=tenant_id, + agent_id=str(agent_id), + account_id=current_user.id, + ) + return AgentDebugConversationRefreshResponse(debug_conversation_id=debug_conversation_id).model_dump( + mode="json" + ) + + @console_ns.route("/agent//copy") class AgentAppCopyApi(Resource): - @console_ns.expect(console_ns.models[CopyAppPayload.__name__]) + @console_ns.expect(console_ns.models[AgentAppCopyPayload.__name__]) @console_ns.response(201, "Agent app copied successfully", console_ns.models[AgentAppDetailWithSite.__name__]) @console_ns.response(403, "Insufficient permissions") @console_ns.response(400, "Invalid request parameters") @@ -548,13 +596,14 @@ class AgentAppCopyApi(Resource): @with_current_user @with_current_tenant_id def post(self, tenant_id: str, current_user: Account, agent_id: UUID): - args = CopyAppPayload.model_validate(console_ns.payload or {}) + args = AgentAppCopyPayload.model_validate(console_ns.payload or {}) copied_app = _agent_roster_service().duplicate_agent_app( tenant_id=tenant_id, agent_id=str(agent_id), account=current_user, name=args.name, description=args.description, + role=args.role, icon_type=args.icon_type, icon=args.icon, icon_background=args.icon_background, diff --git a/api/controllers/console/app/agent.py b/api/controllers/console/app/agent.py index a53a174da42..86a3c473547 100644 --- a/api/controllers/console/app/agent.py +++ b/api/controllers/console/app/agent.py @@ -1,4 +1,3 @@ -import logging from typing import Any from uuid import UUID @@ -30,7 +29,6 @@ from fields.base import ResponseModel from libs.helper import uuid_value from libs.login import login_required from models import Account -from models.agent_config_entities import AgentFileRefConfig, AgentSkillRefConfig from models.model import App, AppMode, UploadFile from services.agent.composer_service import AgentComposerService from services.agent.skill_package_service import SkillManifest, SkillPackageError @@ -49,8 +47,6 @@ from services.agent_drive_service import ( ) from services.agent_service import AgentService -logger = logging.getLogger(__name__) - _WORKFLOW_AGENT_DRIVE_APP_MODES = [AppMode.WORKFLOW, AppMode.ADVANCED_CHAT] _AGENT_SKILL_UPLOAD_PARAMS = { "file": { @@ -130,8 +126,16 @@ class AgentLogResponse(ResponseModel): files: list[Any] = Field(default_factory=list) +class AgentUploadedSkillResponse(ResponseModel): + name: str + description: str + path: str + skill_md_key: str + archive_key: str | None = None + + class AgentSkillUploadResponse(ResponseModel): - skill: AgentSkillRefConfig + skill: AgentUploadedSkillResponse manifest: SkillManifest @@ -145,13 +149,11 @@ class AgentDriveFileResponse(ResponseModel): class AgentDriveFileCommitResponse(ResponseModel): file: AgentDriveFileResponse - config_version_id: str | None = None class AgentDriveDeleteResponse(ResponseModel): result: str removed_keys: list[str] = Field(default_factory=list) - config_version_id: str | None = None register_schema_models(console_ns, AgentLogQuery, AgentDriveFilePayload, AgentDriveDeleteFileByAgentQuery) @@ -161,6 +163,7 @@ register_response_schema_models( AgentDriveFileCommitResponse, AgentDriveFileResponse, AgentLogResponse, + AgentUploadedSkillResponse, AgentSkillUploadResponse, SkillToolInferenceResult, ) @@ -242,24 +245,6 @@ def _commit_drive_file_for_app(*, current_user: Account, app_model: App, allow_n return {"code": exc.code, "message": exc.message}, exc.status_code row = committed[0] - file_ref = AgentFileRefConfig.model_validate( - { - "id": row["key"], - "name": upload_file.name, - "file_id": upload_file.id, - "drive_key": row["key"], - "type": row.get("mime_type"), - "size": row.get("size"), - } - ) - config_version_id = AgentComposerService.add_drive_file_ref( - tenant_id=app_model.tenant_id, - agent_id=agent_id, - account_id=current_user.id, - file_ref=file_ref, - app_id=app_model.id, - node_id=node_id, - ) return { "file": { "name": upload_file.name, @@ -268,7 +253,6 @@ def _commit_drive_file_for_app(*, current_user: Account, app_model: App, allow_n "size": row.get("size"), "mime_type": row.get("mime_type"), }, - "config_version_id": config_version_id, }, 201 @@ -283,24 +267,17 @@ def _delete_drive_file_for_app(*, current_user: Account, app_model: App, allow_n except AgentDriveError as exc: return {"code": exc.code, "message": exc.message}, exc.status_code - config_version_id = AgentComposerService.remove_drive_refs( - tenant_id=app_model.tenant_id, - agent_id=agent_id, - account_id=current_user.id, - file_key=key, - app_id=app_model.id, - node_id=node_id, - ) - removed_keys: list[str] = [] try: - removed_keys = AgentDriveService().delete(tenant_id=app_model.tenant_id, agent_id=agent_id, key=key) + result = AgentDriveService().commit( + tenant_id=app_model.tenant_id, + user_id=current_user.id, + agent_id=agent_id, + items=[DriveCommitItem(key=key, file_ref=None)], + ) except AgentDriveError as exc: return {"code": exc.code, "message": exc.message}, exc.status_code - except Exception: - # Soul-first ordering: the ref is already gone; orphan KV rows are - # harmless and an idempotent DELETE retry cleans them. - logger.exception("agent drive delete failed for key %s (soul already updated)", key) - return {"result": "success", "removed_keys": removed_keys, "config_version_id": config_version_id} + removed_keys = [item["key"] for item in result if item.get("removed")] + return {"result": "success", "removed_keys": removed_keys} def _delete_skill_for_app(*, current_user: Account, app_model: App, slug: str, allow_node_id: bool = True): @@ -312,22 +289,20 @@ def _delete_skill_for_app(*, current_user: Account, app_model: App, slug: str, a if "/" in slug or not slug.strip(): return {"code": "drive_key_invalid", "message": "skill slug must be a single path segment"}, 400 - config_version_id = AgentComposerService.remove_drive_refs( - tenant_id=app_model.tenant_id, - agent_id=agent_id, - account_id=current_user.id, - skill_slug=slug, - app_id=app_model.id, - node_id=node_id, - ) - removed_keys: list[str] = [] try: - removed_keys = AgentDriveService().delete(tenant_id=app_model.tenant_id, agent_id=agent_id, prefix=f"{slug}/") + result = AgentDriveService().commit( + tenant_id=app_model.tenant_id, + user_id=current_user.id, + agent_id=agent_id, + items=[ + DriveCommitItem(key=f"{slug}/SKILL.md", file_ref=None), + DriveCommitItem(key=f"{slug}/.DIFY-SKILL-FULL.zip", file_ref=None), + ], + ) except AgentDriveError as exc: return {"code": exc.code, "message": exc.message}, exc.status_code - except Exception: - logger.exception("agent drive delete failed for skill %s (soul already updated)", slug) - return {"result": "success", "removed_keys": removed_keys, "config_version_id": config_version_id} + removed_keys = [item["key"] for item in result if item.get("removed")] + return {"result": "success", "removed_keys": removed_keys} def _infer_skill_tools_for_app(*, app_model: App, slug: str): @@ -455,7 +430,7 @@ class AgentDriveFilesApi(Resource): return _commit_drive_file_for_app(current_user=current_user, app_model=app_model) @console_ns.doc("delete_agent_drive_file") - @console_ns.doc(description="Delete one drive file by key; soul ref first, then the KV row (ENG-625 D5)") + @console_ns.doc(description="Delete one drive file by key via drive commit-null semantics") @console_ns.doc(params={"app_id": "Application ID", **query_params_from_model(AgentDriveDeleteFileQuery)}) @console_ns.response(200, "File removed", console_ns.models[AgentDriveDeleteResponse.__name__]) @setup_required @@ -486,9 +461,7 @@ class AgentSkillByAgentApi(Resource): @console_ns.route("/apps//agent/skills/") class AgentSkillApi(Resource): @console_ns.doc("delete_agent_skill") - @console_ns.doc( - description="Delete a standardized skill: soul ref first, then the / drive prefix (ENG-625 D5)" - ) + @console_ns.doc(description="Delete a standardized skill by removing its known drive keys via commit-null") @console_ns.doc( params={ "app_id": "Application ID", diff --git a/api/controllers/console/datasets/external.py b/api/controllers/console/datasets/external.py index 033c9a69af6..eb7b9aa84f8 100644 --- a/api/controllers/console/datasets/external.py +++ b/api/controllers/console/datasets/external.py @@ -26,6 +26,7 @@ from controllers.console.wraps import ( with_current_tenant_id, with_current_user, ) +from extensions.ext_database import db from fields.base import ResponseModel from fields.dataset_fields import ( dataset_detail_fields, @@ -390,6 +391,7 @@ class ExternalKnowledgeHitTestingApi(Resource): try: response = HitTestingService.external_retrieve( + session=db.session, dataset=dataset, query=payload.query, account=current_user, diff --git a/api/controllers/console/datasets/hit_testing_base.py b/api/controllers/console/datasets/hit_testing_base.py index 4e90e66eb25..c343effa9a1 100644 --- a/api/controllers/console/datasets/hit_testing_base.py +++ b/api/controllers/console/datasets/hit_testing_base.py @@ -18,6 +18,7 @@ from core.errors.error import ( ProviderTokenNotInitError, QuotaExceededError, ) +from extensions.ext_database import db from graphon.model_runtime.errors.invoke import InvokeError from libs.login import resolve_account_fallback from models.account import Account @@ -115,6 +116,7 @@ class DatasetsHitTestingBase: try: current_user, _ = resolve_account_fallback(current_user, current_tenant_id) response = HitTestingService.retrieve( + session=db.session, dataset=dataset, query=cast(str, args.get("query")), account=current_user, diff --git a/api/controllers/console/snippets/snippet_workflow.py b/api/controllers/console/snippets/snippet_workflow.py index 444c8295eb0..ee1d928a74f 100644 --- a/api/controllers/console/snippets/snippet_workflow.py +++ b/api/controllers/console/snippets/snippet_workflow.py @@ -80,6 +80,13 @@ class SnippetDraftConfigResponse(BaseModel): parallel_depth_limit: int +class SnippetWorkflowPaginationResponse(BaseModel): + items: list[SnippetWorkflowResponse] + page: int + limit: int + has_more: bool + + register_schema_models( console_ns, SnippetDraftSyncPayload, @@ -98,6 +105,7 @@ register_response_schema_models( SimpleResultResponse, SnippetDraftConfigResponse, SnippetWorkflowResponse, + SnippetWorkflowPaginationResponse, WorkflowPublishResponse, WorkflowPaginationResponse, WorkflowRestoreResponse, @@ -325,7 +333,7 @@ class SnippetPublishedAllWorkflowApi(Resource): @console_ns.response( 200, "Published workflows retrieved successfully", - console_ns.models[WorkflowPaginationResponse.__name__], + console_ns.models[SnippetWorkflowPaginationResponse.__name__], ) @setup_required @login_required @@ -348,7 +356,7 @@ class SnippetPublishedAllWorkflowApi(Resource): limit=args.limit, ) - return WorkflowPaginationResponse.model_validate( + response = SnippetWorkflowPaginationResponse.model_validate( { "items": workflows, "page": args.page, @@ -357,6 +365,9 @@ class SnippetPublishedAllWorkflowApi(Resource): }, from_attributes=True, ).model_dump(mode="json") + for item in response["items"]: + item["input_fields"] = snippet.input_fields_list + return response @console_ns.route("/snippets//workflows//restore") diff --git a/api/controllers/inner_api/knowledge/retrieval.py b/api/controllers/inner_api/knowledge/retrieval.py index ef33fbda518..1c1320fde42 100644 --- a/api/controllers/inner_api/knowledge/retrieval.py +++ b/api/controllers/inner_api/knowledge/retrieval.py @@ -1,9 +1,10 @@ -"""Inner API endpoint for tenant-scoped knowledge retrieval. +"""Plugin inner API endpoint for tenant-scoped knowledge retrieval. This controller is a thin HTTP wrapper around ``services.knowledge_retrieval_inner_service.InnerKnowledgeRetrievalService``. -It intentionally keeps authorization simple: shared inner API key plus -tenant-scoped app/dataset validation in the service layer. +It uses the plugin inner API key because dify-agent calls this endpoint through +the same trusted Dify API bridge as other agent/plugin inner calls; tenant-scoped +app/dataset validation remains in the service layer. """ from flask_restx import Resource @@ -11,7 +12,7 @@ from pydantic import ValidationError from controllers.common.schema import register_response_schema_models, register_schema_models from controllers.inner_api import inner_api_ns -from controllers.inner_api.wraps import inner_api_only +from controllers.inner_api.wraps import plugin_inner_api_only from core.workflow.nodes.knowledge_retrieval import exc as retrieval_exc from libs.exception import BaseHTTPException from services.entities.knowledge_retrieval_inner import InnerKnowledgeRetrieveRequest, InnerKnowledgeRetrieveResponse @@ -48,7 +49,7 @@ register_response_schema_models(inner_api_ns, InnerKnowledgeRetrieveResponse) class InnerKnowledgeRetrieveApi(Resource): """Retrieve knowledge from one or more datasets within the caller tenant.""" - @inner_api_only + @plugin_inner_api_only @inner_api_ns.doc("inner_knowledge_retrieve") @inner_api_ns.doc(description="Retrieve knowledge for trusted internal callers") @inner_api_ns.expect(inner_api_ns.models[InnerKnowledgeRetrieveRequest.__name__]) @@ -60,9 +61,8 @@ class InnerKnowledgeRetrieveApi(Resource): @inner_api_ns.doc( responses={ 400: "Invalid request body", - 401: "Unauthorized - invalid inner API key", 403: "Caller tenant does not own the requested resource", - 404: "App or dataset not found", + 404: "Invalid plugin inner API key, app not found, or dataset not found", 422: "Invalid retrieval configuration", 429: "Knowledge retrieval rate limited", 502: "External knowledge retrieval failed", diff --git a/api/controllers/inner_api/plugin/agent_drive.py b/api/controllers/inner_api/plugin/agent_drive.py index a80caea3c55..0cdb9dab35f 100644 --- a/api/controllers/inner_api/plugin/agent_drive.py +++ b/api/controllers/inner_api/plugin/agent_drive.py @@ -1,10 +1,12 @@ -"""Inner API for the agent drive (agent 网盘) control plane — ENG-591. +"""Inner API for the agent drive (agent 网盘) control plane. -Two endpoints, called by the dify-agent server (not the sandbox) with the inner -API key. The drive ref is the URL segment ``agent-``; the path-like -file key travels in the query/body, never as a URL path segment (so its ``/`` -characters do not collide with routing). Drive-owned semantics: tenant scoped, -no user-level FileAccessScope. +These endpoints are called by the dify-agent server (not the sandbox) with the +inner API key. The drive ref is the URL segment ``agent-``; the +path-like file key travels in the query/body, never as a URL path segment (so +its ``/`` characters do not collide with routing). Drive-owned semantics: +tenant scoped, no user-level FileAccessScope. Commit still canonicalizes the +trusted execution-context user through the same EndUser lookup as plugin file +upload before validating ToolFile ownership. """ from flask import request @@ -13,6 +15,7 @@ from pydantic import BaseModel, ValidationError from controllers.console.wraps import setup_required from controllers.inner_api import inner_api_ns +from controllers.inner_api.plugin.wraps import get_user from controllers.inner_api.wraps import plugin_inner_api_only from services.agent_drive_service import ( AgentDriveError, @@ -56,6 +59,24 @@ class AgentDriveManifestApi(Resource): return {"items": items} +@inner_api_ns.route("/drive//skills") +class AgentDriveSkillsApi(Resource): + @setup_required + @plugin_inner_api_only + @inner_api_ns.doc("agent_drive_skills") + @inner_api_ns.doc(description="List the skill catalog of an agent drive") + def get(self, drive_ref: str): + try: + agent_id = parse_agent_drive_ref(drive_ref) + tenant_id = (request.args.get("tenant_id") or "").strip() + if not tenant_id: + raise AgentDriveError("missing_tenant_id", "tenant_id is required", status_code=400) + items = AgentDriveService().list_skills(tenant_id=tenant_id, agent_id=agent_id) + except AgentDriveError as exc: + return _error_response(exc) + return {"items": items} + + @inner_api_ns.route("/drive//commit") class AgentDriveCommitApi(Resource): @setup_required @@ -69,9 +90,10 @@ class AgentDriveCommitApi(Resource): body = _CommitRequest.model_validate(request.get_json(silent=True) or {}) except ValidationError as exc: raise AgentDriveError("invalid_request", str(exc), status_code=400) from exc + user = get_user(body.tenant_id, body.user_id) items = AgentDriveService().commit( tenant_id=body.tenant_id, - user_id=body.user_id, + user_id=user.id, agent_id=agent_id, items=body.items, ) diff --git a/api/controllers/openapi/_models.py b/api/controllers/openapi/_models.py index e846db3ea75..6e8a9c9d439 100644 --- a/api/controllers/openapi/_models.py +++ b/api/controllers/openapi/_models.py @@ -2,7 +2,8 @@ from __future__ import annotations -from typing import Any, Literal +from enum import StrEnum +from typing import Any, Final, Literal from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator @@ -13,6 +14,30 @@ from models.model import AppMode MAX_PAGE_LIMIT = 200 +class SupportedAppType(StrEnum): + """App types the ``app`` usage face (``get app``) lists and filters. + + A curated subset of :class:`AppMode`: the real, user-facing app categories. + Excludes runtime-only mode tags that are not standalone apps + (``rag-pipeline`` is a knowledge ``Pipeline``; ``channel`` is unused) and the + roster-owned ``agent`` type (surfaced through the roster, not this list). + + Members reference ``AppMode.*.value`` so the subset relationship is + type-checked: dropping a member from ``AppMode`` breaks this at import. + This is the single source for the listable set — params, filters, and the + generated CLI whitelist all derive from it. + """ + + COMPLETION = AppMode.COMPLETION.value + CHAT = AppMode.CHAT.value + ADVANCED_CHAT = AppMode.ADVANCED_CHAT.value + WORKFLOW = AppMode.WORKFLOW.value + AGENT_CHAT = AppMode.AGENT_CHAT.value + + +SUPPORTED_APP_TYPES: Final[tuple[AppMode, ...]] = tuple(AppMode(t.value) for t in SupportedAppType) + + class UsageInfo(BaseModel): prompt_tokens: int = 0 completion_tokens: int = 0 @@ -279,12 +304,12 @@ class AppDescribeQuery(BaseModel): class AppListQuery(BaseModel): - """mode is a closed enum.""" + """mode is a closed enum of listable app types.""" workspace_id: UUIDStr page: int = Field(1, ge=1) limit: int = Field(20, ge=1, le=MAX_PAGE_LIMIT) - mode: AppMode | None = None + mode: SupportedAppType | None = None name: str | None = Field(None, max_length=200) @@ -335,7 +360,7 @@ class PermittedExternalAppsListQuery(BaseModel): page: int = Field(1, ge=1) limit: int = Field(20, ge=1, le=MAX_PAGE_LIMIT) - mode: AppMode | None = None + mode: SupportedAppType | None = None name: str | None = Field(None, max_length=200) diff --git a/api/controllers/openapi/apps.py b/api/controllers/openapi/apps.py index c2626cd5d8c..181af5c0742 100644 --- a/api/controllers/openapi/apps.py +++ b/api/controllers/openapi/apps.py @@ -16,6 +16,7 @@ from controllers.openapi import openapi_ns from controllers.openapi._contract import accepts, returns from controllers.openapi._input_schema import EMPTY_INPUT_SCHEMA, build_input_schema, resolve_app_config from controllers.openapi._models import ( + SUPPORTED_APP_TYPES, AppDescribeInfo, AppDescribeQuery, AppDescribeResponse, @@ -37,6 +38,11 @@ from services.app_service import AppListParams, AppService _ALLOWED_DESCRIBE_FIELDS: frozenset[str] = frozenset({"info", "parameters", "input_schema"}) +def _is_listable(app: App) -> bool: + """Whether the openapi app face exposes this app (curated, listable types only).""" + return app.mode in SUPPORTED_APP_TYPES + + _EMPTY_PARAMETERS: dict[str, Any] = { "opening_statement": None, "suggested_questions": [], @@ -171,6 +177,8 @@ class AppListApi(Resource): app: App | None = AppService.get_visible_app_by_id(db.session, str(parsed_uuid)) if app is None or str(app.tenant_id) != workspace_id: return empty + if not _is_listable(app): + return empty # Apply RBAC visibility to the UUID fast-path the same way the service # layer does for paginated queries (id in accessible set OR own app). if apply_rbac_filter and not access_filter.is_app_accessible( @@ -223,6 +231,7 @@ class AppListApi(Resource): workspace_name=tenant_name, ) for r in pagination.items + if _is_listable(r) ] env = AppListResponse( diff --git a/api/core/app/apps/agent_app/runtime_request_builder.py b/api/core/app/apps/agent_app/runtime_request_builder.py index 01206b12db6..fc1fcb0b168 100644 --- a/api/core/app/apps/agent_app/runtime_request_builder.py +++ b/api/core/app/apps/agent_app/runtime_request_builder.py @@ -37,6 +37,7 @@ from core.workflow.nodes.agent_v2.plugin_tools_builder import ( from core.workflow.nodes.agent_v2.runtime_request_builder import ( append_runtime_warnings, build_ask_human_layer_config, + build_drive_aware_soul_mention_resolver, build_drive_layer_config, build_knowledge_layer_config, build_shell_layer_config, @@ -123,9 +124,19 @@ class AgentAppRuntimeRequestBuilder: } drive_config = None + soul_prompt_resolver = build_soul_mention_resolver(agent_soul) if dify_config.AGENT_DRIVE_MANIFEST_ENABLED: - drive_config, drive_warnings = build_drive_layer_config(agent_soul, agent_id=context.agent_id) + drive_config, drive_warnings = build_drive_layer_config( + agent_soul, + tenant_id=context.dify_context.tenant_id, + agent_id=context.agent_id, + ) append_runtime_warnings(metadata, drive_warnings) + soul_prompt_resolver = build_drive_aware_soul_mention_resolver( + agent_soul, + tenant_id=context.dify_context.tenant_id, + agent_id=context.agent_id, + ) knowledge_config = build_knowledge_layer_config(agent_soul) request = self._request_builder.build_for_agent_app( @@ -154,9 +165,7 @@ class AgentAppRuntimeRequestBuilder: ), # ENG-616: expand slash-menu mention tokens to canonical names so # no frontend-internal {{#…#}} marker ever reaches the model. - agent_soul_prompt=expand_prompt_mentions( - agent_soul.prompt.system_prompt, build_soul_mention_resolver(agent_soul) - ).strip() + agent_soul_prompt=expand_prompt_mentions(agent_soul.prompt.system_prompt, soul_prompt_resolver).strip() or None, user_prompt=context.user_query, tools=tools_layer, diff --git a/api/core/workflow/nodes/agent_v2/runtime_feature_manifest.py b/api/core/workflow/nodes/agent_v2/runtime_feature_manifest.py index 65c5d42e916..fa7b28cbb0a 100644 --- a/api/core/workflow/nodes/agent_v2/runtime_feature_manifest.py +++ b/api/core/workflow/nodes/agent_v2/runtime_feature_manifest.py @@ -16,9 +16,6 @@ SUPPORTED_AGENT_BACKEND_FEATURES = frozenset( "knowledge", "env", "sandbox", - # ENG-623: exposed at runtime as the dify.drive declaration layer - # (an index the agent pulls through the back proxy). - "skills_files", # ENG-635: human involvement is exposed at runtime as the dify.ask_human # deferred tool; a call pauses via the existing HITL form mechanism. "human", @@ -32,11 +29,7 @@ RESERVED_AGENT_BACKEND_FEATURES = frozenset( ) -def build_runtime_feature_manifest( - agent_soul: AgentSoulConfig, - *, - drive_manifest_enabled: bool = False, -) -> dict[str, Any]: +def build_runtime_feature_manifest(agent_soul: AgentSoulConfig) -> dict[str, Any]: """Describe PRD capabilities supported by or still reserved from Agent backend runtime.""" warnings: list[dict[str, str]] = [] soul_dump = agent_soul.model_dump(mode="json", exclude_none=True, exclude_defaults=True) @@ -54,38 +47,10 @@ def build_runtime_feature_manifest( } ) - has_skills_files = bool(agent_soul.skills_files.skills or agent_soul.skills_files.files) - if has_skills_files and not drive_manifest_enabled: - warnings.append( - { - "section": "agent_soul.skills_files", - "code": "drive_manifest_disabled", - "message": ( - "skills_files is configured but AGENT_DRIVE_MANIFEST_ENABLED is off; " - "the drive declaration layer is not injected into this run." - ), - } - ) - for skill in agent_soul.skills_files.skills: - if not skill.skill_md_key: - warnings.append( - { - "section": "agent_soul.skills_files", - "code": "skill_ref_dangling", - "message": ( - f"skill_ref_dangling: skill '{skill.name or skill.id or 'unknown'}' has no drive key; " - "re-standardize it to expose it at runtime." - ), - } - ) - reserved_status = dict.fromkeys(sorted(RESERVED_AGENT_BACKEND_FEATURES), "reserved_not_executed") reserved_status["knowledge"] = ( "supported_by_knowledge_layer" if list_configured_knowledge_dataset_ids(agent_soul) else "not_configured" ) - reserved_status["skills_files"] = ( - "supported_by_drive_manifest" if drive_manifest_enabled else "drive_manifest_disabled" - ) reserved_status["tools.dify_tools"] = "supported_when_config_valid" reserved_status["tools.cli_tools"] = "supported_by_shell_bootstrap" reserved_status["env"] = "supported_by_shell_bootstrap" diff --git a/api/core/workflow/nodes/agent_v2/runtime_request_builder.py b/api/core/workflow/nodes/agent_v2/runtime_request_builder.py index 8aaa4fcc1d3..e3c2dcee839 100644 --- a/api/core/workflow/nodes/agent_v2/runtime_request_builder.py +++ b/api/core/workflow/nodes/agent_v2/runtime_request_builder.py @@ -7,7 +7,6 @@ from typing import Any, Literal, Protocol, assert_never, cast from agenton.compositor import CompositorSessionSnapshot from dify_agent.layers.ask_human import DifyAskHumanLayerConfig from dify_agent.layers.drive import ( - DifyDriveFileConfig, DifyDriveLayerConfig, DifyDriveSkillConfig, ) @@ -55,10 +54,13 @@ from models.agent_config_entities import ( ) from models.provider_ids import ModelProviderID from services.agent.prompt_mentions import ( + MentionKind, build_node_job_mention_resolver, build_soul_mention_resolver, expand_prompt_mentions, + parse_prompt_mentions, ) +from services.agent_drive_service import AgentDriveService, decode_drive_mention_ref from .output_failure_orchestrator import retry_idempotency_key from .plugin_tools_builder import WorkflowAgentPluginToolsBuilder, WorkflowAgentPluginToolsBuildError @@ -153,9 +155,6 @@ class WorkflowAgentRuntimeRequestBuilder: expand_prompt_mentions(node_job.workflow_prompt, build_node_job_mention_resolver(node_job)).strip() or "Run this workflow Agent Node for the current run." ) - soul_prompt = expand_prompt_mentions( - agent_soul.prompt.system_prompt, build_soul_mention_resolver(agent_soul) - ).strip() user_prompt = workflow_context_prompt.strip() or "Use the current workflow context." credentials = self._credentials_provider.fetch(agent_soul.model.model_provider, agent_soul.model.model) try: @@ -182,9 +181,20 @@ class WorkflowAgentRuntimeRequestBuilder: } drive_config: DifyDriveLayerConfig | None = None + soul_prompt_resolver = build_soul_mention_resolver(agent_soul) if dify_config.AGENT_DRIVE_MANIFEST_ENABLED: - drive_config, drive_warnings = build_drive_layer_config(agent_soul, agent_id=context.agent.id) + drive_config, drive_warnings = build_drive_layer_config( + agent_soul, + tenant_id=context.dify_context.tenant_id, + agent_id=context.agent.id, + ) append_runtime_warnings(metadata, drive_warnings) + soul_prompt_resolver = build_drive_aware_soul_mention_resolver( + agent_soul, + tenant_id=context.dify_context.tenant_id, + agent_id=context.agent.id, + ) + soul_prompt = expand_prompt_mentions(agent_soul.prompt.system_prompt, soul_prompt_resolver).strip() knowledge_config = build_knowledge_layer_config(agent_soul) request = self._request_builder.build_for_workflow_node( @@ -292,10 +302,7 @@ class WorkflowAgentRuntimeRequestBuilder: "agent_config_snapshot_id": context.snapshot.id, "binding_id": context.binding.id, "workflow_node_job_mode": node_job.mode.value, - "runtime_support": build_runtime_feature_manifest( - agent_soul, - drive_manifest_enabled=dify_config.AGENT_DRIVE_MANIFEST_ENABLED, - ), + "runtime_support": build_runtime_feature_manifest(agent_soul), } def _build_workflow_context_prompt( @@ -603,76 +610,107 @@ def append_runtime_warnings(metadata: dict[str, Any], warnings: list[dict[str, s existing.extend(warnings) +def build_drive_aware_soul_mention_resolver( + agent_soul: AgentSoulConfig, + *, + tenant_id: str, + agent_id: str, +): + """Resolve skill/file mentions against the agent drive and everything else via Agent Soul.""" + + base_resolver = build_soul_mention_resolver(agent_soul) + drive_service = AgentDriveService() + skill_catalog = drive_service.list_skills(tenant_id=tenant_id, agent_id=agent_id) + skill_names_by_key = {skill["skill_md_key"]: skill["name"] for skill in skill_catalog} + drive_keys = {item["key"] for item in drive_service.manifest(tenant_id=tenant_id, agent_id=agent_id)} + + def _resolve(mention: object) -> str | None: + if not hasattr(mention, "kind") or not hasattr(mention, "ref_id"): + return None + kind = cast(MentionKind, mention.kind) + ref_id = cast(str, mention.ref_id) + label = cast(str | None, getattr(mention, "label", None)) + if kind == MentionKind.SKILL: + decoded_key = decode_drive_mention_ref(ref_id) + return skill_names_by_key.get(decoded_key) or label or decoded_key + if kind == MentionKind.FILE: + decoded_key = decode_drive_mention_ref(ref_id) + if decoded_key in drive_keys: + return decoded_key.rsplit("/", 1)[-1] + return label or decoded_key + return base_resolver(cast(Any, mention)) + + return _resolve + + def build_drive_layer_config( agent_soul: AgentSoulConfig, *, + tenant_id: str, agent_id: str | None, ) -> tuple[DifyDriveLayerConfig | None, list[dict[str, str]]]: - """Catalog the soul's drive-backed Skills & Files into the dify.drive declaration. + """Derive drive runtime catalog + prompt-mentioned eager-pull keys from the drive.""" - Returns ``(config, warnings)`` — ``config is None`` means nothing to inject - (no skills/files configured, or no agent identity to address the drive by). - Refs that predate standardization (no drive key) are skipped with a warning - instead of failing the run, so historic souls keep running. - """ - skill_refs = agent_soul.skills_files.skills - file_refs = agent_soul.skills_files.files - if not skill_refs and not file_refs: - return None, [] - - warnings: list[dict[str, str]] = [] + mentioned_drive_refs = [ + decode_drive_mention_ref(mention.ref_id) + for mention in parse_prompt_mentions(agent_soul.prompt.system_prompt) + if mention.kind in {MentionKind.SKILL, MentionKind.FILE} + ] + ordered_mentions = list(dict.fromkeys(ref for ref in mentioned_drive_refs if ref)) if not agent_id: + if not ordered_mentions: + return None, [] + return None, [ + { + "section": "agent_soul.prompt.system_prompt", + "code": "drive_ref_dangling", + "message": "drive mentions are configured but the run has no bound agent to address a drive by.", + } + ] + + drive_service = AgentDriveService() + skills_catalog = drive_service.list_skills(tenant_id=tenant_id, agent_id=agent_id) + manifest_items = drive_service.manifest(tenant_id=tenant_id, agent_id=agent_id) + manifest_by_key = {item["key"]: item for item in manifest_items} + skill_keys = {skill["skill_md_key"] for skill in skills_catalog} + warnings: list[dict[str, str]] = [] + mentioned_skill_keys: list[str] = [] + mentioned_file_keys: list[str] = [] + for drive_key in ordered_mentions: + if drive_key in skill_keys: + mentioned_skill_keys.append(drive_key) + continue + if drive_key in manifest_by_key: + mentioned_file_keys.append(drive_key) + continue warnings.append( { - "section": "agent_soul.skills_files", - "code": "skill_ref_dangling", - "message": "skills_files is configured but the run has no bound agent to address a drive by.", + "section": "agent_soul.prompt.system_prompt", + "code": "mention_target_missing", + "message": f"drive mention '{drive_key}' has no matching drive entry.", } ) - return None, warnings - skills: list[DifyDriveSkillConfig] = [] - for skill in skill_refs: - if not skill.skill_md_key: - warnings.append( - { - "section": "agent_soul.skills_files", - "code": "skill_ref_dangling", - "message": ( - f"skill_ref_dangling: skill '{skill.name or skill.id or 'unknown'}' has no drive key; " - "re-standardize it to expose it at runtime." - ), - } - ) - continue - skills.append( - DifyDriveSkillConfig( - name=skill.name or skill.skill_md_key.split("/", 1)[0], - description=skill.description or "", - skill_md_key=skill.skill_md_key, - archive_key=skill.full_archive_key, - ) + skills = [ + DifyDriveSkillConfig( + path=skill["path"], + name=skill["name"], + description=skill["description"], + skill_md_key=skill["skill_md_key"], + archive_key=skill["archive_key"], ) + for skill in skills_catalog + ] - files: list[DifyDriveFileConfig] = [] - for file in file_refs: - if not file.drive_key: - # Plain upload references (pre-ENG-625) are not drive-backed; they are - # simply invisible to the manifest rather than a defect worth warning on. - continue - size = file.get("size") - files.append( - DifyDriveFileConfig( - name=file.name or file.drive_key.rsplit("/", 1)[-1], - key=file.drive_key, - size=size if isinstance(size, int) else None, - mime_type=file.type, - ) - ) - - if not skills and not files: - return None, warnings - return DifyDriveLayerConfig(drive_ref=f"agent-{agent_id}", skills=skills, files=files), warnings + return ( + DifyDriveLayerConfig( + drive_ref=f"agent-{agent_id}", + skills=skills, + mentioned_skill_keys=mentioned_skill_keys, + mentioned_file_keys=mentioned_file_keys, + ), + warnings, + ) def _cli_tool_enabled(item: object) -> bool: diff --git a/api/core/workflow/nodes/agent_v2/validators.py b/api/core/workflow/nodes/agent_v2/validators.py index ca3adb5b0d1..2eabac10dd6 100644 --- a/api/core/workflow/nodes/agent_v2/validators.py +++ b/api/core/workflow/nodes/agent_v2/validators.py @@ -35,7 +35,6 @@ class WorkflowAgentNodeValidator: "soul", "prompt", "system_prompt", - "skills_files", "skills", "files", "tools", diff --git a/api/extensions/ext_commands.py b/api/extensions/ext_commands.py index 6cd4b08b900..a85f6569978 100644 --- a/api/extensions/ext_commands.py +++ b/api/extensions/ext_commands.py @@ -5,6 +5,7 @@ def init_app(app: DifyApp): from commands import ( add_qdrant_index, archive_workflow_runs, + archive_workflow_runs_plan, backfill_plugin_auto_upgrade, clean_expired_messages, clean_workflow_runs, @@ -72,6 +73,7 @@ def init_app(app: DifyApp): setup_datasource_oauth_client, transform_datasource_credentials, install_rag_pipeline_plugins, + archive_workflow_runs_plan, archive_workflow_runs, delete_archived_workflow_runs, restore_workflow_runs, diff --git a/api/fields/agent_fields.py b/api/fields/agent_fields.py index 07bcbad26e3..e60a6b01426 100644 --- a/api/fields/agent_fields.py +++ b/api/fields/agent_fields.py @@ -1,5 +1,5 @@ from datetime import datetime -from typing import Annotated, Literal +from typing import Literal from pydantic import Field, field_validator @@ -16,10 +16,8 @@ from models.agent import ( ) from models.agent_config_entities import ( AgentCliToolConfig, - AgentFileRefConfig, AgentHumanContactConfig, AgentKnowledgeDatasetConfig, - AgentSkillRefConfig, AgentSoulConfig, DeclaredOutputConfig, DeclaredOutputType, @@ -396,20 +394,6 @@ class AgentComposerDifyToolCandidateResponse(ResponseModel): tools_count: int | None = None -class AgentComposerSkillCandidateResponse(AgentSkillRefConfig): - kind: Literal["skill"] = "skill" - - -class AgentComposerFileCandidateResponse(AgentFileRefConfig): - kind: Literal["file"] = "file" - - -AgentComposerSkillFileCandidateResponse = Annotated[ - AgentComposerSkillCandidateResponse | AgentComposerFileCandidateResponse, - Field(discriminator="kind"), -] - - class AgentComposerNodeJobCandidatesResponse(ResponseModel): previous_node_outputs: list[WorkflowPreviousNodeOutputRef] = Field(default_factory=list) declare_output_types: list[DeclaredOutputType] = Field(default_factory=list) @@ -417,7 +401,6 @@ class AgentComposerNodeJobCandidatesResponse(ResponseModel): class AgentComposerSoulCandidatesResponse(ResponseModel): - skills_files: list[AgentComposerSkillFileCandidateResponse] = Field(default_factory=list) dify_tools: list[AgentComposerDifyToolCandidateResponse] = Field(default_factory=list) cli_tools: list[AgentCliToolConfig] = Field(default_factory=list) knowledge_datasets: list[AgentKnowledgeDatasetConfig] = Field(default_factory=list) diff --git a/api/libs/broadcast_channel/redis/_subscription.py b/api/libs/broadcast_channel/redis/_subscription.py index 912a48d26ae..01a9e668bcc 100644 --- a/api/libs/broadcast_channel/redis/_subscription.py +++ b/api/libs/broadcast_channel/redis/_subscription.py @@ -165,14 +165,20 @@ class RedisSubscriptionBase(Subscription): except queue.Empty: continue + if self._closed.is_set(): + return + yield item @override def __iter__(self) -> Iterator[bytes]: """Return an iterator over messages from the subscription.""" if self._closed.is_set(): - raise SubscriptionClosedError(f"The Redis {self._get_subscription_type()} subscription is closed") - self._start_if_needed() + return iter(()) + try: + self._start_if_needed() + except SubscriptionClosedError: + return iter(()) return iter(self._message_iterator()) @override @@ -209,10 +215,18 @@ class RedisSubscriptionBase(Subscription): @override def close(self) -> None: """Close the subscription and clean up resources.""" - if self._closed.is_set(): - return + with self._start_lock: + if self._closed.is_set(): + return + + self._closed.set() + listener = self._listener_thread + self._listener_thread = None + started = self._started + + if started: + self._unblock_message_iterator() - self._closed.set() # Send a control event on the same Redis channel to unblock the self._publish_close_event() @@ -220,10 +234,21 @@ class RedisSubscriptionBase(Subscription): # message retrieval method should NOT be called concurrently. # # Due to the restriction above, the PubSub cleanup logic happens inside the consumer thread. - listener = self._listener_thread - if listener is not None: + if listener is not None and listener.is_alive(): listener.join(timeout=2) - self._listener_thread = None + + def _unblock_message_iterator(self) -> None: + try: + self._queue.put_nowait(SIG_CLOSE) + except queue.Full: + try: + self._queue.get_nowait() + except queue.Empty: + pass + try: + self._queue.put_nowait(SIG_CLOSE) + except queue.Full: + pass # Abstract methods to be implemented by subclasses def _get_subscription_type(self) -> str: diff --git a/api/migrations/versions/2026_06_15_1100-b7c2d9e8a1f4_add_tenant_last_opened_at.py b/api/migrations/versions/2026_06_15_1100-b7c2d9e8a1f4_add_tenant_last_opened_at.py index ce2fd2b79ca..066a2ba8ca8 100644 --- a/api/migrations/versions/2026_06_15_1100-b7c2d9e8a1f4_add_tenant_last_opened_at.py +++ b/api/migrations/versions/2026_06_15_1100-b7c2d9e8a1f4_add_tenant_last_opened_at.py @@ -7,7 +7,7 @@ Create Date: 2026-06-05 11:00:00.000000 """ import sqlalchemy as sa -from alembic import op +from alembic import context, op # revision identifiers, used by Alembic. revision = "b7c2d9e8a1f4" @@ -17,10 +17,23 @@ depends_on = None def upgrade(): + if _has_last_opened_at_column(): + return with op.batch_alter_table("tenant_account_joins", schema=None) as batch_op: batch_op.add_column(sa.Column("last_opened_at", sa.DateTime(), nullable=True)) def downgrade(): + if not _has_last_opened_at_column(): + return with op.batch_alter_table("tenant_account_joins", schema=None) as batch_op: batch_op.drop_column("last_opened_at") + + +def _has_last_opened_at_column() -> bool: + if context.is_offline_mode(): + # Offline SQL generation cannot inspect the target schema. Assume the + # linear migration path so generated SQL stays explicit. + return False + inspector = sa.inspect(op.get_bind()) + return "last_opened_at" in {column["name"] for column in inspector.get_columns("tenant_account_joins")} diff --git a/api/migrations/versions/2026_06_18_2300-b2515f9d4c2a_agent_drive_skill_metadata_refactor.py b/api/migrations/versions/2026_06_18_2300-b2515f9d4c2a_agent_drive_skill_metadata_refactor.py index 9dc85d2a89b..3398c2eb018 100644 --- a/api/migrations/versions/2026_06_18_2300-b2515f9d4c2a_agent_drive_skill_metadata_refactor.py +++ b/api/migrations/versions/2026_06_18_2300-b2515f9d4c2a_agent_drive_skill_metadata_refactor.py @@ -6,9 +6,15 @@ Create Date: 2026-06-18 23:00:00.000000 """ -import sqlalchemy as sa +from __future__ import annotations + +import json +from typing import Any + from alembic import op +import sqlalchemy as sa from sqlalchemy.dialects import mysql +from sqlalchemy.engine.mock import MockConnection # revision identifiers, used by Alembic. revision = "b2515f9d4c2a" @@ -31,9 +37,46 @@ def upgrade() -> None: "agent_drive_files", ["tenant_id", "agent_id", "is_skill", "key"], ) + _remove_skills_files_from_snapshots() def downgrade() -> None: op.drop_index("agent_drive_files_tenant_agent_is_skill_key_idx", table_name="agent_drive_files") op.drop_column("agent_drive_files", "skill_metadata") op.drop_column("agent_drive_files", "is_skill") + + +def _remove_skills_files_from_snapshots() -> None: + connection = op.get_bind() + if connection is None or isinstance(connection, MockConnection): + return + snapshots = sa.table( + "agent_config_snapshots", + sa.column("id", sa.String()), + sa.column("config_snapshot", sa.Text()), + ) + rows = connection.execute(sa.select(snapshots.c.id, snapshots.c.config_snapshot)).fetchall() + for row in rows: + cleaned = _strip_skills_files(row.config_snapshot) + if cleaned is None: + continue + connection.execute( + snapshots.update() + .where(snapshots.c.id == row.id) + .values(config_snapshot=json.dumps(cleaned, separators=(",", ":"), sort_keys=True)) + ) + + +def _strip_skills_files(raw_snapshot: Any) -> dict[str, Any] | None: + if raw_snapshot is None: + return None + if isinstance(raw_snapshot, str): + snapshot = json.loads(raw_snapshot) + elif isinstance(raw_snapshot, dict): + snapshot = dict(raw_snapshot) + else: + snapshot = dict(raw_snapshot) + if not isinstance(snapshot, dict) or "skills_files" not in snapshot: + return None + snapshot.pop("skills_files", None) + return snapshot diff --git a/api/models/agent_config_entities.py b/api/models/agent_config_entities.py index 76108f271d4..2503ba66f06 100644 --- a/api/models/agent_config_entities.py +++ b/api/models/agent_config_entities.py @@ -361,11 +361,6 @@ class AgentSoulPromptConfig(BaseModel): system_prompt: str = "" -class AgentSoulSkillsFilesConfig(BaseModel): - files: list[AgentFileRefConfig] = Field(default_factory=list) - skills: list[AgentSkillRefConfig] = Field(default_factory=list) - - class AgentSoulDifyToolCredentialRef(BaseModel): """Reference to a stored Dify Plugin Tool credential. @@ -514,7 +509,6 @@ class AgentSoulConfig(BaseModel): schema_version: int = 1 prompt: AgentSoulPromptConfig = Field(default_factory=AgentSoulPromptConfig) - skills_files: AgentSoulSkillsFilesConfig = Field(default_factory=AgentSoulSkillsFilesConfig) tools: AgentSoulToolsConfig = Field(default_factory=AgentSoulToolsConfig) knowledge: AgentSoulKnowledgeConfig = Field(default_factory=AgentSoulKnowledgeConfig) human: AgentSoulHumanConfig = Field(default_factory=AgentSoulHumanConfig) diff --git a/api/openapi/markdown/console-openapi.md b/api/openapi/markdown/console-openapi.md index ef11a817662..bb40c8b48b0 100644 --- a/api/openapi/markdown/console-openapi.md +++ b/api/openapi/markdown/console-openapi.md @@ -592,7 +592,7 @@ Stop a running Agent App chat message generation | Required | Schema | | -------- | ------ | -| Yes | **application/json**: [CopyAppPayload](#copyapppayload)
| +| Yes | **application/json**: [AgentAppCopyPayload](#agentappcopypayload)
| #### Responses @@ -602,6 +602,20 @@ Stop a running Agent App chat message generation | 400 | Invalid request parameters | | | 403 | Insufficient permissions | | +### [POST] /agent/{agent_id}/debug-conversation/refresh +#### Parameters + +| Name | Located in | Description | Required | Schema | +| ---- | ---------- | ----------- | -------- | ------ | +| agent_id | path | | Yes | string (uuid) | + +#### Responses + +| Code | Description | Schema | +| ---- | ----------- | ------ | +| 200 | Agent debug conversation refreshed | **application/json**: [AgentDebugConversationRefreshResponse](#agentdebugconversationrefreshresponse)
| +| 403 | Insufficient permissions | | + ### [GET] /agent/{agent_id}/drive/files List agent drive entries for an Agent App @@ -1608,7 +1622,7 @@ Inspect one drive-backed skill for slash-menu hover/detail UI | 200 | Drive skill inspect view | **application/json**: [AgentDriveSkillInspectResponse](#agentdriveskillinspectresponse)
| ### [DELETE] /apps/{app_id}/agent/files -Delete one drive file by key; soul ref first, then the KV row (ENG-625 D5) +Delete one drive file by key via drive commit-null semantics #### Parameters @@ -1694,7 +1708,7 @@ Upload + standardize a Skill into the agent drive | 400 | Invalid skill package or no bound agent | | ### [DELETE] /apps/{app_id}/agent/skills/{slug} -Delete a standardized skill: soul ref first, then the / drive prefix (ENG-625 D5) +Delete a standardized skill by removing its known drive keys via commit-null #### Parameters @@ -8145,7 +8159,7 @@ Get all published workflows for a snippet | Code | Description | Schema | | ---- | ----------- | ------ | -| 200 | Published workflows retrieved successfully | **application/json**: [WorkflowPaginationResponse](#workflowpaginationresponse)
| +| 200 | Published workflows retrieved successfully | **application/json**: [SnippetWorkflowPaginationResponse](#snippetworkflowpaginationresponse)
| ### [GET] /snippets/{snippet_id}/workflows/default-workflow-block-configs **Get default block configurations for snippet workflow** @@ -12143,6 +12157,17 @@ Default namespace | validation | [ComposerValidationFindingsResponse](#composervalidationfindingsresponse) | | No | | variant | string | | Yes | +#### AgentAppCopyPayload + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| description | string | Description for the copied agent | No | +| icon | string | Icon | No | +| icon_background | string | Icon background color | No | +| icon_type | [IconType](#icontype) | Icon type | No | +| name | string | Name for the copied agent | No | +| role | string | Role for the copied agent | No | + #### AgentAppCreatePayload | Name | Type | Description | Required | @@ -12392,23 +12417,6 @@ Risk marker for CLI tool bootstrap commands. | provider_id | string | | No | | tools_count | integer | | No | -#### AgentComposerFileCandidateResponse - -| Name | Type | Description | Required | -| ---- | ---- | ----------- | -------- | -| drive_key | string | | No | -| file_id | string | | No | -| id | string | | No | -| kind | string,
**Default:** file | | No | -| name | string | | No | -| reference | string | | No | -| remote_url | string | | No | -| tenant_id | string | | No | -| transfer_method | string | | No | -| type | string | | No | -| upload_file_id | string | | No | -| url | string | | No | - #### AgentComposerImpactBindingResponse | Name | Type | Description | Required | @@ -12433,22 +12441,6 @@ Risk marker for CLI tool bootstrap commands. | human_contacts | [ [AgentHumanContactConfig](#agenthumancontactconfig) ] | | No | | previous_node_outputs | [ [WorkflowPreviousNodeOutputRef](#workflowpreviousnodeoutputref) ] | | No | -#### AgentComposerSkillCandidateResponse - -| Name | Type | Description | Required | -| ---- | ---- | ----------- | -------- | -| description | string | | No | -| file_id | string | | No | -| full_archive_file_id | string | | No | -| full_archive_key | string | | No | -| id | string | | No | -| kind | string,
**Default:** skill | | No | -| manifest_files | [ string ] | | No | -| name | string | | No | -| path | string | | No | -| skill_md_file_id | string | | No | -| skill_md_key | string | | No | - #### AgentComposerSoulCandidatesResponse | Name | Type | Description | Required | @@ -12457,7 +12449,6 @@ Risk marker for CLI tool bootstrap commands. | dify_tools | [ [AgentComposerDifyToolCandidateResponse](#agentcomposerdifytoolcandidateresponse) ] | | No | | human_contacts | [ [AgentHumanContactConfig](#agenthumancontactconfig) ] | | No | | knowledge_datasets | [ [AgentKnowledgeDatasetConfig](#agentknowledgedatasetconfig) ] | | No | -| skills_files | [ ] | | No | #### AgentComposerSoulLockResponse @@ -12562,6 +12553,12 @@ Audit operation recorded for Agent Soul version/revision changes. | date | string | | Yes | | message_count | integer | | Yes | +#### AgentDebugConversationRefreshResponse + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| debug_conversation_id | string | | Yes | + #### AgentDriveDeleteFileByAgentQuery | Name | Type | Description | Required | @@ -12572,7 +12569,6 @@ Audit operation recorded for Agent Soul version/revision changes. | Name | Type | Description | Required | | ---- | ---- | ----------- | -------- | -| config_version_id | string | | No | | removed_keys | [ string ] | | No | | result | string | | Yes | @@ -12586,7 +12582,6 @@ Audit operation recorded for Agent Soul version/revision changes. | Name | Type | Description | Required | | ---- | ---- | ----------- | -------- | -| config_version_id | string | | No | | file | [AgentDriveFileResponse](#agentdrivefileresponse) | | Yes | #### AgentDriveFilePayload @@ -13172,27 +13167,12 @@ Visibility and lifecycle scope of an Agent record. | enabled | boolean | | No | | type | string | | No | -#### AgentSkillRefConfig - -| Name | Type | Description | Required | -| ---- | ---- | ----------- | -------- | -| description | string | | No | -| file_id | string | | No | -| full_archive_file_id | string | | No | -| full_archive_key | string | | No | -| id | string | | No | -| manifest_files | [ string ] | | No | -| name | string | | No | -| path | string | | No | -| skill_md_file_id | string | | No | -| skill_md_key | string | | No | - #### AgentSkillUploadResponse | Name | Type | Description | Required | | ---- | ---- | ----------- | -------- | | manifest | [SkillManifest](#skillmanifest) | | Yes | -| skill | [AgentSkillRefConfig](#agentskillrefconfig) | | Yes | +| skill | [AgentUploadedSkillResponse](#agentuploadedskillresponse) | | Yes | #### AgentSoulAppFeaturesConfig @@ -13221,7 +13201,6 @@ Visibility and lifecycle scope of an Agent record. | prompt | [AgentSoulPromptConfig](#agentsoulpromptconfig) | | No | | sandbox | [AgentSoulSandboxConfig](#agentsoulsandboxconfig) | | No | | schema_version | integer,
**Default:** 1 | | No | -| skills_files | [AgentSoulSkillsFilesConfig](#agentsoulskillsfilesconfig) | | No | | tools | [AgentSoulToolsConfig](#agentsoultoolsconfig) | | No | #### AgentSoulDifyToolConfig @@ -13338,13 +13317,6 @@ Reference to model credentials resolved only at runtime. | config | [AgentSandboxProviderConfig](#agentsandboxproviderconfig) | | No | | provider | string | | No | -#### AgentSoulSkillsFilesConfig - -| Name | Type | Description | Required | -| ---- | ---- | ----------- | -------- | -| files | [ [AgentFileRefConfig](#agentfilerefconfig) ] | | No | -| skills | [ [AgentSkillRefConfig](#agentskillrefconfig) ] | | No | - #### AgentSoulToolsConfig | Name | Type | Description | Required | @@ -13476,6 +13448,16 @@ Soft lifecycle state for Agent records. | tool_output | object | | Yes | | tool_parameters | object | | Yes | +#### AgentUploadedSkillResponse + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| archive_key | string | | No | +| description | string | | Yes | +| name | string | | Yes | +| path | string | | Yes | +| skill_md_key | string | | Yes | + #### AgentUserSatisfactionRateStatisticResponse | Name | Type | Description | Required | @@ -19435,6 +19417,15 @@ Query parameters for listing snippet published workflows. | limit | integer,
**Default:** 10 | | No | | page | integer,
**Default:** 1 | | No | +#### SnippetWorkflowPaginationResponse + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| has_more | boolean | | Yes | +| items | [ [SnippetWorkflowResponse](#snippetworkflowresponse) ] | | Yes | +| limit | integer | | Yes | +| page | integer | | Yes | + #### SnippetWorkflowResponse | Name | Type | Description | Required | diff --git a/api/openapi/markdown/openapi-openapi.md b/api/openapi/markdown/openapi-openapi.md index bd93557edcf..4bb6761c22e 100644 --- a/api/openapi/markdown/openapi-openapi.md +++ b/api/openapi/markdown/openapi-openapi.md @@ -80,7 +80,7 @@ User-scoped operations | Name | Located in | Description | Required | Schema | | ---- | ---------- | ----------- | -------- | ------ | | limit | query | | No | integer,
**Default:** 20 | -| mode | query | | No | string,
**Available values:** "advanced-chat", "agent", "agent-chat", "channel", "chat", "completion", "rag-pipeline", "workflow" | +| mode | query | App types the ``app`` usage face (``get app``) lists and filters. A curated subset of :class:`AppMode`: the real, user-facing app categories. Excludes runtime-only mode tags that are not standalone apps (``rag-pipeline`` is a knowledge ``Pipeline``; ``channel`` is unused) and the roster-owned ``agent`` type (surfaced through the roster, not this list). Members reference ``AppMode.*.value`` so the subset relationship is type-checked: dropping a member from ``AppMode`` breaks this at import. This is the single source for the listable set — params, filters, and the generated CLI whitelist all derive from it. | No | string,
**Available values:** "advanced-chat", "agent-chat", "chat", "completion", "workflow" | | name | query | | No | string | | page | query | | No | integer,
**Default:** 1 | | workspace_id | query | | Yes | string | @@ -318,7 +318,7 @@ Upload a file to use as an input variable when running the app | Name | Located in | Description | Required | Schema | | ---- | ---------- | ----------- | -------- | ------ | | limit | query | | No | integer,
**Default:** 20 | -| mode | query | | No | string,
**Available values:** "advanced-chat", "agent", "agent-chat", "channel", "chat", "completion", "rag-pipeline", "workflow" | +| mode | query | App types the ``app`` usage face (``get app``) lists and filters. A curated subset of :class:`AppMode`: the real, user-facing app categories. Excludes runtime-only mode tags that are not standalone apps (``rag-pipeline`` is a knowledge ``Pipeline``; ``channel`` is unused) and the roster-owned ``agent`` type (surfaced through the roster, not this list). Members reference ``AppMode.*.value`` so the subset relationship is type-checked: dropping a member from ``AppMode`` breaks this at import. This is the single source for the listable set — params, filters, and the generated CLI whitelist all derive from it. | No | string,
**Available values:** "advanced-chat", "agent-chat", "chat", "completion", "workflow" | | name | query | | No | string | | page | query | | No | integer,
**Default:** 1 | @@ -592,12 +592,12 @@ Request body for POST /workspaces//apps/imports. #### AppListQuery -mode is a closed enum. +mode is a closed enum of listable app types. | Name | Type | Description | Required | | ---- | ---- | ----------- | -------- | | limit | integer,
**Default:** 20 | | No | -| mode | [AppMode](#appmode) | | No | +| mode | [SupportedAppType](#supportedapptype) | | No | | name | string | | No | | page | integer,
**Default:** 1 | | No | | workspace_id | string | | Yes | @@ -922,7 +922,7 @@ Strict (extra='forbid'). | Name | Type | Description | Required | | ---- | ---- | ----------- | -------- | | limit | integer,
**Default:** 20 | | No | -| mode | [AppMode](#appmode) | | No | +| mode | [SupportedAppType](#supportedapptype) | | No | | name | string | | No | | page | integer,
**Default:** 1 | | No | @@ -990,6 +990,24 @@ Pagination for GET /account/sessions. Strict (extra='forbid'). | last_used_at | string | | No | | prefix | string | | Yes | +#### SupportedAppType + +App types the ``app`` usage face (``get app``) lists and filters. + +A curated subset of :class:`AppMode`: the real, user-facing app categories. +Excludes runtime-only mode tags that are not standalone apps +(``rag-pipeline`` is a knowledge ``Pipeline``; ``channel`` is unused) and the +roster-owned ``agent`` type (surfaced through the roster, not this list). + +Members reference ``AppMode.*.value`` so the subset relationship is +type-checked: dropping a member from ``AppMode`` breaks this at import. +This is the single source for the listable set — params, filters, and the +generated CLI whitelist all derive from it. + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| SupportedAppType | string | App types the ``app`` usage face (``get app``) lists and filters. A curated subset of :class:`AppMode`: the real, user-facing app categories. Excludes runtime-only mode tags that are not standalone apps (``rag-pipeline`` is a knowledge ``Pipeline``; ``channel`` is unused) and the roster-owned ``agent`` type (surfaced through the roster, not this list). Members reference ``AppMode.*.value`` so the subset relationship is type-checked: dropping a member from ``AppMode`` breaks this at import. This is the single source for the listable set — params, filters, and the generated CLI whitelist all derive from it. | | + #### TaskStopResponse 200 body for POST /apps//tasks//stop. The handler always returns diff --git a/api/repositories/api_workflow_run_repository.py b/api/repositories/api_workflow_run_repository.py index 2659e550552..bc30e980619 100644 --- a/api/repositories/api_workflow_run_repository.py +++ b/api/repositories/api_workflow_run_repository.py @@ -290,7 +290,10 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): batch_size: int, run_types: Sequence[WorkflowType] | None = None, tenant_ids: Sequence[str] | None = None, + tenant_prefixes: Sequence[str] | None = None, workflow_ids: Sequence[str] | None = None, + run_shard_index: int | None = None, + run_shard_total: int | None = None, ) -> Sequence[WorkflowRun]: """ Fetch ended workflow runs in a time window for archival and clean batching. @@ -298,7 +301,9 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): Optional filters: - run_types - tenant_ids + - tenant_prefixes, using the first hexadecimal digit of tenant_id for rollout waves - workflow_ids + - run_shard_index/run_shard_total, using a deterministic workflow_run_id shard """ ... diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index b40eb4bdd8a..2394377c9d4 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -56,6 +56,7 @@ from repositories.types import ( DailyTerminalsStats, DailyTokenCostStats, ) +from services.retention.workflow_run.tenant_prefix import tenant_prefix_condition logger = logging.getLogger(__name__) @@ -64,6 +65,40 @@ class _WorkflowRunError(Exception): pass +_HEX_SHARD_VALUES = { + "0": 0, + "1": 1, + "2": 2, + "3": 3, + "4": 4, + "5": 5, + "6": 6, + "7": 7, + "8": 8, + "9": 9, + "a": 10, + "b": 11, + "c": 12, + "d": 13, + "e": 14, + "f": 15, +} + + +def _tenant_prefix_condition(prefixes: Sequence[str]) -> sa.ColumnElement[bool]: + conditions = [tenant_prefix_condition(WorkflowRun.tenant_id, prefix) for prefix in prefixes] + return sa.or_(*conditions) + + +def _workflow_run_id_shard_expr() -> sa.ColumnElement[int]: + normalized_id = func.lower(func.replace(sa.cast(WorkflowRun.id, sa.String()), "-", "")) + last_hex = func.substr(normalized_id, func.length(normalized_id), 1) + return sa.case( + *[(last_hex == hex_digit, shard_value) for hex_digit, shard_value in _HEX_SHARD_VALUES.items()], + else_=0, + ) + + def _build_human_input_required_reason( reason_model: WorkflowPauseReason, form_model: HumanInputForm | None, @@ -378,7 +413,10 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): batch_size: int, run_types: Sequence[WorkflowType] | None = None, tenant_ids: Sequence[str] | None = None, + tenant_prefixes: Sequence[str] | None = None, workflow_ids: Sequence[str] | None = None, + run_shard_index: int | None = None, + run_shard_total: int | None = None, ) -> Sequence[WorkflowRun]: """ Fetch ended workflow runs in a time window for archival and clean batching. @@ -387,7 +425,8 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): - created_at in [start_from, end_before) - type in run_types (when provided) - status is an ended state - - optional tenant_id, workflow_id filters and cursor (last_seen) for pagination + - optional tenant_id, tenant_prefix, workflow_id filters and cursor (last_seen) for pagination + - optional deterministic shard by the last hexadecimal digit of workflow_run_id """ with self._session_maker() as session: stmt = ( @@ -410,9 +449,15 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): if tenant_ids: stmt = stmt.where(WorkflowRun.tenant_id.in_(tenant_ids)) + if tenant_prefixes: + stmt = stmt.where(_tenant_prefix_condition(tenant_prefixes)) + if workflow_ids: stmt = stmt.where(WorkflowRun.workflow_id.in_(workflow_ids)) + if run_shard_index is not None and run_shard_total is not None: + stmt = stmt.where((_workflow_run_id_shard_expr() % run_shard_total) == run_shard_index) + if last_seen: stmt = stmt.where( tuple_(WorkflowRun.created_at, WorkflowRun.id) diff --git a/api/services/agent/composer_candidates.py b/api/services/agent/composer_candidates.py index 0a1419be399..7868f2a2f63 100644 --- a/api/services/agent/composer_candidates.py +++ b/api/services/agent/composer_candidates.py @@ -137,9 +137,6 @@ def soul_candidates( soul = agent_soul or AgentSoulConfig() truncated = False - skills_files = [{"kind": "skill", **skill.model_dump(exclude_none=True)} for skill in soul.skills_files.skills] - skills_files += [{"kind": "file", **file.model_dump(exclude_none=True)} for file in soul.skills_files.files] - cli_tools = [tool.model_dump(exclude_none=True) for tool in soul.tools.cli_tools if tool.enabled] dataset_ids = [dataset.id for dataset in soul.knowledge.datasets if dataset.id] @@ -162,7 +159,6 @@ def soul_candidates( dify_tools = workspace_tools_loader() lists = { - "skills_files": skills_files, "dify_tools": dify_tools, "cli_tools": cli_tools, "knowledge_datasets": knowledge_datasets, diff --git a/api/services/agent/composer_service.py b/api/services/agent/composer_service.py index af6be0abfa6..0a17c06300f 100644 --- a/api/services/agent/composer_service.py +++ b/api/services/agent/composer_service.py @@ -21,7 +21,6 @@ from models.agent import ( WorkflowAgentNodeBinding, ) from models.agent_config_entities import ( - AgentFileRefConfig, DeclaredOutputConfig, ) from models.agent_config_entities import ( @@ -34,7 +33,6 @@ from services.agent.errors import ( AgentNameConflictError, AgentNotFoundError, AgentVersionNotFoundError, - InvalidComposerConfigError, ) from services.entities.agent_entities import ( AgentSoulConfig, @@ -48,6 +46,13 @@ from services.entities.agent_entities import ( # WorkflowAgentNodeBinding.workflow_version tag for the draft workflow row. # Mirrors Workflow.version when it is "draft" (see models/workflow.py). _DRAFT_WORKFLOW_VERSION = "draft" +_PUBLISH_SAVE_STRATEGIES = frozenset( + { + ComposerSaveStrategy.SAVE_AS_NEW_VERSION, + ComposerSaveStrategy.SAVE_AS_NEW_AGENT, + ComposerSaveStrategy.SAVE_TO_ROSTER, + } +) logger = logging.getLogger(__name__) @@ -73,6 +78,13 @@ def _backfill_cli_tool_ids(agent_soul: AgentSoulConfig | None) -> None: seen_ids.add(minted) +def _validate_composer_payload_for_strategy(payload: ComposerSavePayload) -> None: + if payload.save_strategy in _PUBLISH_SAVE_STRATEGIES: + ComposerConfigValidator.validate_publish_payload(payload) + return + ComposerConfigValidator.validate_draft_save_payload(payload) + + class AgentComposerService: @classmethod def load_workflow_composer(cls, *, tenant_id: str, app_id: str, node_id: str) -> dict[str, Any]: @@ -102,33 +114,10 @@ class AgentComposerService: raise ValueError("Workflow composer endpoint only accepts workflow variant") _backfill_cli_tool_ids(payload.agent_soul) - ComposerConfigValidator.validate_save_payload(payload) + _validate_composer_payload_for_strategy(payload) workflow = cls._get_draft_workflow(tenant_id=tenant_id, app_id=app_id) binding = cls._get_workflow_binding(tenant_id=tenant_id, workflow_id=workflow.id, node_id=node_id) - # ENG-623 §4.4: drive-backed refs must point at real drive rows before the - # soul is persisted. Only strategies that write the soul onto an *existing* - # agent are checked — new-agent strategies create a fresh (empty) drive, so - # any carried drive key would be flagged on the next save instead. - if ( - payload.agent_soul is not None - and binding is not None - and binding.agent_id - and payload.save_strategy - in ( - ComposerSaveStrategy.NODE_JOB_ONLY, - ComposerSaveStrategy.SAVE_TO_CURRENT_VERSION, - ComposerSaveStrategy.SAVE_AS_NEW_VERSION, - ) - and ( - payload.save_strategy != ComposerSaveStrategy.NODE_JOB_ONLY - or binding.binding_type == WorkflowAgentBindingType.INLINE_AGENT - ) - ): - cls._require_drive_refs_resolved( - tenant_id=tenant_id, agent_id=binding.agent_id, agent_soul=payload.agent_soul - ) - match payload.save_strategy: case ComposerSaveStrategy.NODE_JOB_ONLY: binding = cls._save_node_job_only( @@ -176,7 +165,11 @@ class AgentComposerService: version_id=version_id, ) state = cls._serialize_workflow_state(binding=binding, agent=agent, version=version) - state["validation"] = cls.collect_validation_findings(tenant_id=tenant_id, payload=payload) + state["validation"] = cls.collect_validation_findings( + tenant_id=tenant_id, + payload=payload, + agent_id=binding.agent_id, + ) return state @classmethod @@ -215,7 +208,7 @@ class AgentComposerService: if payload.variant != ComposerVariant.AGENT_APP: raise ValueError("Agent App composer endpoint only accepts agent_app variant") _backfill_cli_tool_ids(payload.agent_soul) - ComposerConfigValidator.validate_save_payload(payload) + _validate_composer_payload_for_strategy(payload) if payload.agent_soul is None: raise ValueError("agent_soul is required") @@ -250,9 +243,6 @@ class AgentComposerService: db.session.rollback() raise AgentNameConflictError() from exc - # ENG-623 §4.4: dangling drive-backed refs are rejected before persisting. - cls._require_drive_refs_resolved(tenant_id=tenant_id, agent_id=agent.id, agent_soul=payload.agent_soul) - if payload.save_strategy == ComposerSaveStrategy.SAVE_AS_NEW_VERSION or not agent.active_config_snapshot_id: version = cls._create_config_version( tenant_id=tenant_id, @@ -281,7 +271,11 @@ class AgentComposerService: db.session.commit() state = cls.load_agent_app_composer(tenant_id=tenant_id, app_id=app_id) - state["validation"] = cls.collect_validation_findings(tenant_id=tenant_id, payload=payload) + state["validation"] = cls.collect_validation_findings( + tenant_id=tenant_id, + payload=payload, + agent_id=agent.id, + ) return state @classmethod @@ -292,11 +286,7 @@ class AgentComposerService: payload: ComposerSavePayload, agent_id: str | None = None, ) -> dict[str, Any]: - """ENG-617 soft findings, with DB-backed dataset existence for placeholders. - - With ``agent_id`` the drive-backed skill/file refs are also checked against - the agent drive (ENG-623 §4.4) and dangling ones surface as warnings. - """ + """ENG-617 soft findings, with DB-backed dataset and drive mention checks.""" from services.agent.prompt_mentions import MentionKind, parse_prompt_mentions mentioned_ids: set[str] = set() @@ -312,136 +302,14 @@ class AgentComposerService: findings = ComposerConfigValidator.collect_soft_findings(payload, existing_dataset_ids=existing_dataset_ids) if agent_id and payload.agent_soul is not None: findings["warnings"].extend( - cls._drive_ref_findings(tenant_id=tenant_id, agent_id=agent_id, agent_soul=payload.agent_soul) + cls._drive_mention_findings( + tenant_id=tenant_id, + agent_id=agent_id, + prompt=payload.agent_soul.prompt.system_prompt, + ) ) return findings - @classmethod - def remove_drive_refs( - cls, - *, - tenant_id: str, - agent_id: str, - account_id: str, - skill_slug: str | None = None, - file_key: str | None = None, - app_id: str | None = None, - node_id: str | None = None, - ) -> str | None: - """Drop the soul refs backed by a drive skill/file before the drive rows go. - - Soul-first ordering (ENG-625 D5): a mid-failure leaves harmless orphan KV - rows that an idempotent DELETE retry cleans, instead of a soul ref that - keeps failing dangling-ref validation. Returns the new config version id, - or ``None`` when the soul held no matching ref (idempotent re-delete). - """ - if (skill_slug is None) == (file_key is None): - raise ValueError("remove_drive_refs requires exactly one of skill_slug or file_key") - agent = db.session.scalar(select(Agent).where(Agent.tenant_id == tenant_id, Agent.id == agent_id).limit(1)) - if agent is None or not agent.active_config_snapshot_id: - return None - current_snapshot = cls._require_version( - tenant_id=tenant_id, agent_id=agent.id, version_id=agent.active_config_snapshot_id - ) - agent_soul = AgentSoulConfig.model_validate(current_snapshot.config_snapshot_dict) - - removed_display: str | None = None - if skill_slug is not None: - kept_skills = [] - for skill in agent_soul.skills_files.skills: - slug = (skill.skill_md_key or "").split("/", 1)[0] or (skill.path or "").strip("/") - if slug == skill_slug: - removed_display = skill.name or skill.id or skill_slug - continue - kept_skills.append(skill) - if removed_display is None: - return None - agent_soul.skills_files.skills = kept_skills - note = f"Removed skill '{removed_display}' from the drive." - else: - kept_files = [] - for file in agent_soul.skills_files.files: - if file.drive_key == file_key: - removed_display = file.name or file.drive_key - continue - kept_files.append(file) - if removed_display is None: - return None - agent_soul.skills_files.files = kept_files - note = f"Removed file '{removed_display}' from the drive." - - version = cls._update_current_version( - current_snapshot=current_snapshot, - account_id=account_id, - agent_soul=agent_soul, - operation=AgentConfigRevisionOperation.SAVE_CURRENT_VERSION, - version_note=note, - ) - agent.active_config_snapshot_id = version.id - agent.updated_by = account_id - cls._sync_draft_binding_snapshot( - tenant_id=tenant_id, - app_id=app_id, - node_id=node_id, - agent_id=agent_id, - snapshot_id=version.id, - account_id=account_id, - ) - db.session.commit() - return version.id - - @classmethod - def add_drive_file_ref( - cls, - *, - tenant_id: str, - agent_id: str, - account_id: str, - file_ref: AgentFileRefConfig, - app_id: str | None = None, - node_id: str | None = None, - ) -> str | None: - """Add or replace one drive-backed file ref in the active Agent Soul. - - ``POST /agent/files`` is an ADD FILE user action, not just a low-level - drive commit. The committed file must be present in ``skills_files.files`` - because runtime ``dify.drive`` is built from the active Agent Soul. - """ - if not file_ref.drive_key: - raise ValueError("file_ref.drive_key is required") - agent = db.session.scalar(select(Agent).where(Agent.tenant_id == tenant_id, Agent.id == agent_id).limit(1)) - if agent is None or not agent.active_config_snapshot_id: - return None - current_snapshot = cls._require_version( - tenant_id=tenant_id, agent_id=agent.id, version_id=agent.active_config_snapshot_id - ) - agent_soul = AgentSoulConfig.model_validate(current_snapshot.config_snapshot_dict) - kept_files = [item for item in agent_soul.skills_files.files if item.drive_key != file_ref.drive_key] - kept_files.append(file_ref) - agent_soul.skills_files.files = kept_files - - display = file_ref.name or file_ref.drive_key - version = cls._update_current_version( - current_snapshot=current_snapshot, - account_id=account_id, - agent_soul=agent_soul, - operation=AgentConfigRevisionOperation.SAVE_CURRENT_VERSION, - version_note=f"Added file '{display}' to the drive.", - ) - agent.active_config_snapshot_id = version.id - agent.active_config_has_model = agent_soul_has_model(agent_soul) - agent.updated_by = account_id - cls._sync_draft_binding_snapshot( - tenant_id=tenant_id, - app_id=app_id, - node_id=node_id, - agent_id=agent_id, - snapshot_id=version.id, - account_id=account_id, - ) - db.session.commit() - return version.id - @classmethod def resolve_bound_agent_id(cls, *, tenant_id: str, app_id: str) -> str | None: """The Agent App's bound roster agent id, if any (validate-endpoint context).""" @@ -468,49 +336,25 @@ class AgentComposerService: return binding.agent_id if binding else None @classmethod - def _sync_draft_binding_snapshot( - cls, - *, - tenant_id: str, - app_id: str | None, - node_id: str | None, - agent_id: str, - snapshot_id: str, - account_id: str, - ) -> None: - """Keep workflow node bindings on the new active snapshot after direct drive edits.""" - if not app_id or not node_id: - return - try: - workflow = cls._get_draft_workflow(tenant_id=tenant_id, app_id=app_id) - except ValueError: - return - binding = cls._get_workflow_binding(tenant_id=tenant_id, workflow_id=workflow.id, node_id=node_id) - if binding is None or binding.agent_id != agent_id: - return - binding.current_snapshot_id = snapshot_id - binding.updated_by = account_id - - @classmethod - def _drive_ref_findings( + def _drive_mention_findings( cls, *, tenant_id: str, agent_id: str, - agent_soul: AgentSoulConfig, + prompt: str, ) -> list[dict[str, str | None]]: - """Drive-backed refs whose keys have no row in the agent drive (ENG-623 §4.4). + """Soft warnings for missing drive-backed prompt mentions.""" + from services.agent.prompt_mentions import MentionKind, parse_prompt_mentions + from services.agent_drive_service import decode_drive_mention_ref - Each finding message starts with its stable code token - (``skill_ref_dangling`` / ``file_ref_dangling``) in the ENG-616/617 style. - """ wanted_keys: dict[str, tuple[str, str]] = {} - for skill in agent_soul.skills_files.skills: - if skill.skill_md_key: - wanted_keys[skill.skill_md_key] = ("skill_ref_dangling", skill.name or skill.id or "unknown") - for file in agent_soul.skills_files.files: - if file.drive_key: - wanted_keys[file.drive_key] = ("file_ref_dangling", file.name or file.id or "unknown") + for mention in parse_prompt_mentions(prompt): + if mention.kind not in {MentionKind.SKILL, MentionKind.FILE}: + continue + decoded_key = decode_drive_mention_ref(mention.ref_id) + if not decoded_key: + continue + wanted_keys[decoded_key] = (mention.kind.value, mention.label or decoded_key) if not wanted_keys: return [] @@ -524,28 +368,20 @@ class AgentComposerService: ) ) findings: list[dict[str, str | None]] = [] - for key, (code, display) in wanted_keys.items(): + for key, (kind, display) in wanted_keys.items(): if key in existing_keys: continue - kind = "skill" if code == "skill_ref_dangling" else "file" findings.append( { - "code": code, + "code": "mention_target_missing", "surface": "agent_soul", "kind": kind, "id": key, - "message": f"{code}: {kind} '{display}' has no drive entry for key '{key}'.", + "message": f"{kind} '{display}' has no drive entry for key '{key}'.", } ) return findings - @classmethod - def _require_drive_refs_resolved(cls, *, tenant_id: str, agent_id: str, agent_soul: AgentSoulConfig) -> None: - """Hard save-time guard: dangling drive-backed refs are rejected (400).""" - findings = cls._drive_ref_findings(tenant_id=tenant_id, agent_id=agent_id, agent_soul=agent_soul) - if findings: - raise InvalidComposerConfigError("; ".join(str(finding["message"]) for finding in findings)) - @classmethod def get_workflow_candidates(cls, *, tenant_id: str, app_id: str, node_id: str, user_id: str) -> dict[str, Any]: """Slash-menu data source for the workflow Agent node composer (ENG-615).""" diff --git a/api/services/agent/composer_validator.py b/api/services/agent/composer_validator.py index b9519272c4a..34b80b8a9d0 100644 --- a/api/services/agent/composer_validator.py +++ b/api/services/agent/composer_validator.py @@ -50,7 +50,7 @@ _DANGEROUS_ACK_KEYS = ( class ComposerConfigValidator: @classmethod - def validate_save_payload(cls, payload: ComposerSavePayload) -> None: + def validate_draft_save_payload(cls, payload: ComposerSavePayload) -> None: if ( payload.variant == ComposerVariant.WORKFLOW and payload.soul_lock.locked @@ -59,6 +59,13 @@ class ComposerConfigValidator: ): raise AgentSoulLockedError() + @classmethod + def validate_save_payload(cls, payload: ComposerSavePayload) -> None: + cls.validate_publish_payload(payload) + + @classmethod + def validate_publish_payload(cls, payload: ComposerSavePayload) -> None: + cls.validate_draft_save_payload(payload) if payload.agent_soul is not None: cls.validate_agent_soul(payload.agent_soul) if payload.node_job is not None: @@ -191,6 +198,8 @@ class ComposerConfigValidator: } ) continue + if mention.kind in {MentionKind.SKILL, MentionKind.FILE}: + continue if resolved is None: warnings.append( { diff --git a/api/services/agent/prompt_mentions.py b/api/services/agent/prompt_mentions.py index 921d6838b26..27bed49c53b 100644 --- a/api/services/agent/prompt_mentions.py +++ b/api/services/agent/prompt_mentions.py @@ -4,13 +4,14 @@ Slash-menu insertions are stored inline in the plain-string prompt as tokens: [§:[: