feat(api): bind UploadFile to workflow initiator in unauthenticated form submission

The basic assumption of Workflow execution for now is that only one user
(`Account` or `EndUser`) participate the workflow execution. For
unauthenticated form submission this assumption does not hold. Binding
the uploaded file to worfklow initiator aligns with current implementation.

For auditing the actual uploading recipient, a dedicated table
`HumanInputFormUploadFile` is introduced to record the uploading
behavior.
This commit is contained in:
QuantumGhost 2026-05-08 14:32:51 +08:00
parent 58af8aa7fe
commit 7133754a31
4 changed files with 76 additions and 63 deletions

View File

@ -111,7 +111,7 @@ class HumanInputFileUploadApi(Resource):
filename=file.filename or "",
content=file.read(),
mimetype=file.mimetype,
user=context.end_user,
user=context.owner,
source=None,
)
except services.errors.file.FileTooLargeError as file_too_large_error:
@ -157,7 +157,7 @@ class HumanInputRemoteFileUploadApi(Resource):
filename=file_info.filename,
content=content,
mimetype=file_info.mimetype,
user=context.end_user,
user=context.owner,
source_url=url,
)
except services.errors.file.FileTooLargeError as file_too_large_error:

View File

@ -9,8 +9,7 @@ Create Date: 2026-05-06 12:00:00.000000
import sqlalchemy as sa
from alembic import op
import models as models
import models
# revision identifiers, used by Alembic.
revision = "8d4c2a1b9f03"
@ -29,7 +28,6 @@ def upgrade():
sa.Column("app_id", models.types.StringUUID(), nullable=False),
sa.Column("form_id", models.types.StringUUID(), nullable=False),
sa.Column("recipient_id", models.types.StringUUID(), nullable=False),
sa.Column("end_user_id", models.types.StringUUID(), nullable=False),
sa.Column("token", sa.String(length=255), nullable=False),
sa.PrimaryKeyConstraint("id", name="human_input_form_upload_tokens_pkey"),
sa.UniqueConstraint("token", name="human_input_form_upload_tokens_token_key"),
@ -43,10 +41,10 @@ def upgrade():
sa.Column("created_at", sa.DateTime(), server_default=sa.text("CURRENT_TIMESTAMP"), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.text("CURRENT_TIMESTAMP"), nullable=False),
sa.Column("tenant_id", models.types.StringUUID(), nullable=False),
sa.Column("app_id", models.types.StringUUID(), nullable=False),
sa.Column("form_id", models.types.StringUUID(), nullable=False),
sa.Column("upload_file_id", models.types.StringUUID(), nullable=False),
sa.Column("upload_token_id", models.types.StringUUID(), nullable=False),
sa.Column("end_user_id", models.types.StringUUID(), nullable=False),
sa.PrimaryKeyConstraint("id", name="human_input_form_upload_files_pkey"),
sa.UniqueConstraint("upload_file_id", name="human_input_form_upload_files_upload_file_id_key"),
)

View File

@ -259,6 +259,8 @@ class HumanInputFormUploadToken(DefaultFieldsMixin, Base):
HITL upload tokens are intentionally separate from app/service bearer tokens.
The token is stored as an opaque random value so upload endpoints can perform
a direct lookup without entering the normal Web App authentication chain.
Upload ownership is resolved from the form's workflow run initiator instead
of being persisted on the token row itself.
"""
__tablename__ = "human_input_form_upload_tokens"
@ -271,7 +273,6 @@ class HumanInputFormUploadToken(DefaultFieldsMixin, Base):
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
form_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
recipient_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
end_user_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
token: Mapped[str] = mapped_column(sa.String(255), nullable=False)
form: Mapped[HumanInputForm] = relationship(
@ -284,7 +285,11 @@ class HumanInputFormUploadToken(DefaultFieldsMixin, Base):
class HumanInputFormUploadFile(DefaultFieldsMixin, Base):
"""Association between a human input form and a file uploaded through its token."""
"""Association between a human input form and a file uploaded through its token.
Ownership remains on ``UploadFile`` itself; this table only records the
durable form/token/file linkage needed by Human Input flows.
"""
__tablename__ = "human_input_form_upload_files"
__table_args__ = (
@ -294,7 +299,7 @@ class HumanInputFormUploadFile(DefaultFieldsMixin, Base):
)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
form_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
upload_file_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
upload_token_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
end_user_id: Mapped[str] = mapped_column(StringUUID, nullable=False)

View File

@ -10,6 +10,8 @@ from sqlalchemy.orm import Session, selectinload, sessionmaker
from configs import dify_config
from graphon.nodes.human_input.enums import HumanInputFormStatus
from libs.datetime_utils import ensure_naive_utc, naive_utc_now
from models.account import Account, Tenant
from models.enums import CreatorUserRole
from models.human_input import (
HumanInputForm,
HumanInputFormRecipient,
@ -17,11 +19,11 @@ from models.human_input import (
HumanInputFormUploadToken,
)
from models.model import EndUser
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
from repositories.factory import DifyAPIRepositoryFactory
from services.human_input_service import FormExpiredError, FormNotFoundError, FormSubmittedError
HITL_UPLOAD_TOKEN_PREFIX = "hitl_upload_"
HUMAN_INPUT_END_USER_TYPE = "human-input"
HUMAN_INPUT_END_USER_SESSION_PREFIX = "hitl:recipient:"
_TOKEN_RANDOM_BYTES = 32
_TOKEN_GENERATION_ATTEMPTS = 10
@ -39,7 +41,7 @@ class HumanInputUploadContext:
form_id: str
recipient_id: str
upload_token_id: str
end_user: EndUser
owner: Account | EndUser
class InvalidUploadTokenError(Exception):
@ -47,14 +49,27 @@ class InvalidUploadTokenError(Exception):
class HumanInputFileUploadService:
"""Coordinates HITL upload tokens, technical EndUsers, and form-file links."""
"""Coordinates HITL upload tokens, workflow-run owners, and form-file links.
Standalone HITL uploads must be owned by the original workflow/chatflow
initiator so that resume-time file restoration continues to flow through the
normal file access checks.
"""
_session_maker: sessionmaker[Session]
_workflow_run_repository: APIWorkflowRunRepository
def __init__(self, session_factory: sessionmaker[Session] | Engine):
def __init__(
self,
session_factory: sessionmaker[Session] | Engine,
workflow_run_repository: APIWorkflowRunRepository | None = None,
):
if isinstance(session_factory, Engine):
session_factory = sessionmaker(bind=session_factory)
self._session_maker = session_factory
self._workflow_run_repository = (
workflow_run_repository or DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_factory)
)
def issue_upload_token(self, form_token: str) -> HumanInputUploadToken:
"""Create an upload token for an active human input recipient token."""
@ -71,19 +86,12 @@ class HumanInputFileUploadService:
form = recipient_model.form
self._ensure_form_model_active(form)
end_user = self._get_or_create_human_input_end_user(
session=session,
tenant_id=form.tenant_id,
app_id=form.app_id,
recipient_id=recipient_model.id,
)
upload_token = self._generate_unique_upload_token(session)
token_model = HumanInputFormUploadToken(
tenant_id=form.tenant_id,
app_id=form.app_id,
form_id=form.id,
recipient_id=recipient_model.id,
end_user_id=end_user.id,
token=upload_token,
)
session.add(token_model)
@ -109,18 +117,7 @@ class HumanInputFileUploadService:
raise InvalidUploadTokenError()
self._ensure_form_model_active(form_model)
end_user = session.scalar(
select(EndUser)
.where(
EndUser.id == token_model.end_user_id,
EndUser.tenant_id == token_model.tenant_id,
EndUser.app_id == token_model.app_id,
EndUser.type == HUMAN_INPUT_END_USER_TYPE,
)
.limit(1)
)
if end_user is None:
raise InvalidUploadTokenError()
owner = self._resolve_upload_owner(session=session, form_model=form_model)
return HumanInputUploadContext(
tenant_id=token_model.tenant_id,
@ -128,7 +125,7 @@ class HumanInputFileUploadService:
form_id=token_model.form_id,
recipient_id=token_model.recipient_id,
upload_token_id=token_model.id,
end_user=end_user,
owner=owner,
)
def record_upload_file(self, *, context: HumanInputUploadContext, file_id: str) -> None:
@ -138,49 +135,62 @@ class HumanInputFileUploadService:
session.add(
HumanInputFormUploadFile(
tenant_id=context.tenant_id,
app_id=context.app_id,
form_id=context.form_id,
upload_file_id=file_id,
upload_token_id=context.upload_token_id,
end_user_id=context.end_user.id,
)
)
def _generate_unique_upload_token(self, session: Session) -> str:
return f"{HITL_UPLOAD_TOKEN_PREFIX}{secrets.token_urlsafe(_TOKEN_RANDOM_BYTES)}"
@staticmethod
def _get_or_create_human_input_end_user(
def _resolve_upload_owner(
self,
*,
session: Session,
tenant_id: str,
app_id: str,
recipient_id: str,
) -> EndUser:
session_id = f"{HUMAN_INPUT_END_USER_SESSION_PREFIX}{recipient_id}"
end_user = session.scalar(
select(EndUser)
.where(
EndUser.tenant_id == tenant_id,
EndUser.app_id == app_id,
EndUser.session_id == session_id,
EndUser.type == HUMAN_INPUT_END_USER_TYPE,
)
.limit(1)
form_model: HumanInputForm,
) -> Account | EndUser:
if form_model.workflow_run_id is None:
raise InvalidUploadTokenError()
workflow_run = self._workflow_run_repository.get_workflow_run_by_id(
tenant_id=form_model.tenant_id,
app_id=form_model.app_id,
run_id=form_model.workflow_run_id,
)
if end_user is not None:
if workflow_run is None:
raise InvalidUploadTokenError()
if workflow_run.created_by_role == CreatorUserRole.END_USER:
end_user = session.scalar(
select(EndUser)
.where(
EndUser.id == workflow_run.created_by,
EndUser.tenant_id == workflow_run.tenant_id,
EndUser.app_id == workflow_run.app_id,
)
.limit(1)
)
if end_user is None:
raise InvalidUploadTokenError()
return end_user
end_user = EndUser(
tenant_id=tenant_id,
app_id=app_id,
type=HUMAN_INPUT_END_USER_TYPE,
is_anonymous=True,
session_id=session_id,
external_user_id=session_id,
)
session.add(end_user)
session.flush()
return end_user
if workflow_run.created_by_role != CreatorUserRole.ACCOUNT:
raise InvalidUploadTokenError()
account = session.scalar(select(Account).where(Account.id == workflow_run.created_by).limit(1))
if account is None:
raise InvalidUploadTokenError()
tenant = session.scalar(select(Tenant).where(Tenant.id == workflow_run.tenant_id).limit(1))
if tenant is None:
raise InvalidUploadTokenError()
# HITL upload runs outside the normal account auth flow, so hydrate the
# account tenant context explicitly before delegating to FileService.
account.current_tenant = tenant
return account
@staticmethod
def _ensure_form_model_active(form: HumanInputForm) -> None: