diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 8e51ae8cbd..4fcf8daf6e 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -51,6 +51,62 @@ class DraftWorkflowApi(Resource): } +class DraftWorkflowRunApi(Resource): + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) + def post(self, app_model: App): + """ + Run draft workflow + """ + # TODO + workflow_service = WorkflowService() + workflow_service.run_draft_workflow(app_model=app_model, account=current_user) + + # TODO + return { + "result": "success" + } + + +class WorkflowTaskStopApi(Resource): + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) + def post(self, app_model: App, task_id: str): + """ + Stop workflow task + """ + # TODO + workflow_service = WorkflowService() + workflow_service.stop_workflow_task(app_model=app_model, task_id=task_id, account=current_user) + + return { + "result": "success" + } + + +class DraftWorkflowNodeRunApi(Resource): + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) + def post(self, app_model: App, node_id: str): + """ + Run draft workflow node + """ + # TODO + workflow_service = WorkflowService() + workflow_service.run_draft_workflow_node(app_model=app_model, node_id=node_id, account=current_user) + + # TODO + return { + "result": "success" + } + + class PublishedWorkflowApi(Resource): @setup_required @@ -85,7 +141,6 @@ class PublishedWorkflowApi(Resource): } - class DefaultBlockConfigApi(Resource): @setup_required @login_required @@ -123,6 +178,9 @@ class ConvertToWorkflowApi(Resource): api.add_resource(DraftWorkflowApi, '/apps//workflows/draft') +api.add_resource(DraftWorkflowRunApi, '/apps//workflows/draft/run') +api.add_resource(WorkflowTaskStopApi, '/apps//workflows/tasks//stop') +api.add_resource(DraftWorkflowNodeRunApi, '/apps//workflows/draft/nodes//run') api.add_resource(PublishedWorkflowApi, '/apps//workflows/published') api.add_resource(DefaultBlockConfigApi, '/apps//workflows/default-workflow-block-configs') api.add_resource(ConvertToWorkflowApi, '/apps//convert-to-workflow') diff --git a/api/controllers/console/app/workflow_run.py b/api/controllers/console/app/workflow_run.py new file mode 100644 index 0000000000..38e3d4d837 --- /dev/null +++ b/api/controllers/console/app/workflow_run.py @@ -0,0 +1,80 @@ +from flask_restful import Resource, marshal_with, reqparse +from flask_restful.inputs import int_range + +from controllers.console import api +from controllers.console.app.wraps import get_app_model +from controllers.console.setup import setup_required +from controllers.console.wraps import account_initialization_required +from fields.workflow_run_fields import workflow_run_detail_fields, workflow_run_pagination_fields, \ + workflow_run_node_execution_list_fields +from libs.helper import uuid_value +from libs.login import login_required +from models.model import App, AppMode +from services.workflow_run_service import WorkflowRunService + + +class WorkflowRunListApi(Resource): + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) + @marshal_with(workflow_run_pagination_fields) + def get(self, app_model: App): + """ + Get workflow run list + """ + parser = reqparse.RequestParser() + parser.add_argument('last_id', type=uuid_value, location='args') + parser.add_argument('limit', type=int_range(1, 100), required=False, default=20, location='args') + args = parser.parse_args() + + workflow_run_service = WorkflowRunService() + result = workflow_run_service.get_paginate_workflow_runs( + app_model=app_model, + args=args + ) + + return result + + +class WorkflowRunDetailApi(Resource): + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) + @marshal_with(workflow_run_detail_fields) + def get(self, app_model: App, run_id): + """ + Get workflow run detail + """ + run_id = str(run_id) + + workflow_run_service = WorkflowRunService() + workflow_run = workflow_run_service.get_workflow_run(app_model=app_model, run_id=run_id) + + return workflow_run + + +class WorkflowRunNodeExecutionListApi(Resource): + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) + @marshal_with(workflow_run_node_execution_list_fields) + def get(self, app_model: App, run_id): + """ + Get workflow run node execution list + """ + run_id = str(run_id) + + workflow_run_service = WorkflowRunService() + node_executions = workflow_run_service.get_workflow_run_node_executions(app_model=app_model, run_id=run_id) + + return { + 'data': node_executions + } + + +api.add_resource(WorkflowRunListApi, '/apps//workflow-runs') +api.add_resource(WorkflowRunDetailApi, '/apps//workflow-runs/') +api.add_resource(WorkflowRunNodeExecutionListApi, '/apps//workflow-runs//node-executions') diff --git a/api/fields/conversation_fields.py b/api/fields/conversation_fields.py index afa486f1cd..747b0b86ab 100644 --- a/api/fields/conversation_fields.py +++ b/api/fields/conversation_fields.py @@ -66,6 +66,7 @@ message_detail_fields = { 'from_end_user_id': fields.String, 'from_account_id': fields.String, 'feedbacks': fields.List(fields.Nested(feedback_fields)), + 'workflow_run_id': fields.String, 'annotation': fields.Nested(annotation_fields, allow_null=True), 'annotation_hit_history': fields.Nested(annotation_hit_history_fields, allow_null=True), 'created_at': TimestampField, diff --git a/api/fields/workflow_app_log_fields.py b/api/fields/workflow_app_log_fields.py index 8f3998d90a..e230c159fb 100644 --- a/api/fields/workflow_app_log_fields.py +++ b/api/fields/workflow_app_log_fields.py @@ -2,12 +2,12 @@ from flask_restful import fields from fields.end_user_fields import simple_end_user_fields from fields.member_fields import simple_account_fields -from fields.workflow_fields import workflow_run_fields +from fields.workflow_run_fields import workflow_run_for_log_fields from libs.helper import TimestampField workflow_app_log_partial_fields = { "id": fields.String, - "workflow_run": fields.Nested(workflow_run_fields, attribute='workflow_run', allow_null=True), + "workflow_run": fields.Nested(workflow_run_for_log_fields, attribute='workflow_run', allow_null=True), "created_from": fields.String, "created_by_role": fields.String, "created_by_account": fields.Nested(simple_account_fields, attribute='created_by_account', allow_null=True), diff --git a/api/fields/workflow_fields.py b/api/fields/workflow_fields.py index 091f293150..decdc0567f 100644 --- a/api/fields/workflow_fields.py +++ b/api/fields/workflow_fields.py @@ -13,16 +13,3 @@ workflow_fields = { 'updated_by': fields.Nested(simple_account_fields, attribute='updated_by_account', allow_null=True), 'updated_at': TimestampField } - -workflow_run_fields = { - "id": fields.String, - "version": fields.String, - "status": fields.String, - "error": fields.String, - "elapsed_time": fields.Float, - "total_tokens": fields.Integer, - "total_price": fields.Float, - "currency": fields.String, - "total_steps": fields.Integer, - "finished_at": TimestampField -} \ No newline at end of file diff --git a/api/fields/workflow_run_fields.py b/api/fields/workflow_run_fields.py new file mode 100644 index 0000000000..37751bc70f --- /dev/null +++ b/api/fields/workflow_run_fields.py @@ -0,0 +1,92 @@ +from flask_restful import fields + +from fields.end_user_fields import simple_end_user_fields +from fields.member_fields import simple_account_fields +from libs.helper import TimestampField + +workflow_run_for_log_fields = { + "id": fields.String, + "version": fields.String, + "status": fields.String, + "error": fields.String, + "elapsed_time": fields.Float, + "total_tokens": fields.Integer, + "total_price": fields.Float, + "currency": fields.String, + "total_steps": fields.Integer, + "created_at": TimestampField, + "finished_at": TimestampField +} + +workflow_run_for_list_fields = { + "id": fields.String, + "sequence_number": fields.Integer, + "version": fields.String, + "graph": fields.String, + "inputs": fields.String, + "status": fields.String, + "outputs": fields.String, + "error": fields.String, + "elapsed_time": fields.Float, + "total_tokens": fields.Integer, + "total_price": fields.Float, + "currency": fields.String, + "total_steps": fields.Integer, + "created_by_account": fields.Nested(simple_account_fields, attribute='created_by_account', allow_null=True), + "created_at": TimestampField, + "finished_at": TimestampField +} + +workflow_run_pagination_fields = { + 'page': fields.Integer, + 'limit': fields.Integer(attribute='per_page'), + 'total': fields.Integer, + 'has_more': fields.Boolean(attribute='has_next'), + 'data': fields.List(fields.Nested(workflow_run_for_list_fields), attribute='items') +} + +workflow_run_detail_fields = { + "id": fields.String, + "sequence_number": fields.Integer, + "version": fields.String, + "graph": fields.String, + "inputs": fields.String, + "status": fields.String, + "outputs": fields.String, + "error": fields.String, + "elapsed_time": fields.Float, + "total_tokens": fields.Integer, + "total_price": fields.Float, + "currency": fields.String, + "total_steps": fields.Integer, + "created_by_role": fields.String, + "created_by_account": fields.Nested(simple_account_fields, attribute='created_by_account', allow_null=True), + "created_by_end_user": fields.Nested(simple_end_user_fields, attribute='created_by_end_user', allow_null=True), + "created_at": TimestampField, + "finished_at": TimestampField +} + +workflow_run_node_execution_fields = { + "id": fields.String, + "index": fields.Integer, + "predecessor_node_id": fields.String, + "node_id": fields.String, + "node_type": fields.String, + "title": fields.String, + "inputs": fields.String, + "process_data": fields.String, + "outputs": fields.String, + "status": fields.String, + "error": fields.String, + "elapsed_time": fields.Float, + "execution_metadata": fields.String, + "created_at": TimestampField, + "created_by_role": fields.String, + "created_by_account": fields.Nested(simple_account_fields, attribute='created_by_account', allow_null=True), + "created_by_end_user": fields.Nested(simple_end_user_fields, attribute='created_by_end_user', allow_null=True), + "finished_at": TimestampField +} + +workflow_run_node_execution_list_fields = { + 'data': fields.List(fields.Nested(workflow_run_node_execution_fields)), +} diff --git a/api/migrations/versions/b289e2408ee2_add_workflow.py b/api/migrations/versions/b289e2408ee2_add_workflow.py index 7255b4b5fa..5f7ddc7d68 100644 --- a/api/migrations/versions/b289e2408ee2_add_workflow.py +++ b/api/migrations/versions/b289e2408ee2_add_workflow.py @@ -88,7 +88,7 @@ def upgrade(): sa.PrimaryKeyConstraint('id', name='workflow_run_pkey') ) with op.batch_alter_table('workflow_runs', schema=None) as batch_op: - batch_op.create_index('workflow_run_triggerd_from_idx', ['tenant_id', 'app_id', 'workflow_id', 'triggered_from'], unique=False) + batch_op.create_index('workflow_run_triggerd_from_idx', ['tenant_id', 'app_id', 'triggered_from'], unique=False) op.create_table('workflows', sa.Column('id', postgresql.UUID(), server_default=sa.text('uuid_generate_v4()'), nullable=False), diff --git a/api/models/workflow.py b/api/models/workflow.py index 41266fe9f5..7ea342cda7 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -208,7 +208,7 @@ class WorkflowRun(db.Model): __tablename__ = 'workflow_runs' __table_args__ = ( db.PrimaryKeyConstraint('id', name='workflow_run_pkey'), - db.Index('workflow_run_triggerd_from_idx', 'tenant_id', 'app_id', 'workflow_id', 'triggered_from'), + db.Index('workflow_run_triggerd_from_idx', 'tenant_id', 'app_id', 'triggered_from'), ) id = db.Column(UUID, server_default=db.text('uuid_generate_v4()')) @@ -236,11 +236,36 @@ class WorkflowRun(db.Model): @property def created_by_account(self): - return Account.query.get(self.created_by) + created_by_role = CreatedByRole.value_of(self.created_by_role) + return Account.query.get(self.created_by) \ + if created_by_role == CreatedByRole.ACCOUNT else None @property - def updated_by_account(self): - return Account.query.get(self.updated_by) + def created_by_end_user(self): + created_by_role = CreatedByRole.value_of(self.created_by_role) + return EndUser.query.get(self.created_by) \ + if created_by_role == CreatedByRole.END_USER else None + + +class WorkflowNodeExecutionTriggeredFrom(Enum): + """ + Workflow Node Execution Triggered From Enum + """ + SINGLE_STEP = 'single-step' + WORKFLOW_RUN = 'workflow-run' + + @classmethod + def value_of(cls, value: str) -> 'WorkflowNodeExecutionTriggeredFrom': + """ + Get value of given mode. + + :param value: mode value + :return: mode + """ + for mode in cls: + if mode.value == value: + return mode + raise ValueError(f'invalid workflow node execution triggered from value {value}') class WorkflowNodeExecution(db.Model): @@ -323,6 +348,18 @@ class WorkflowNodeExecution(db.Model): created_by = db.Column(UUID, nullable=False) finished_at = db.Column(db.DateTime) + @property + def created_by_account(self): + created_by_role = CreatedByRole.value_of(self.created_by_role) + return Account.query.get(self.created_by) \ + if created_by_role == CreatedByRole.ACCOUNT else None + + @property + def created_by_end_user(self): + created_by_role = CreatedByRole.value_of(self.created_by_role) + return EndUser.query.get(self.created_by) \ + if created_by_role == CreatedByRole.END_USER else None + class WorkflowAppLog(db.Model): """ diff --git a/api/services/workflow_run_service.py b/api/services/workflow_run_service.py new file mode 100644 index 0000000000..9c898f10fb --- /dev/null +++ b/api/services/workflow_run_service.py @@ -0,0 +1,89 @@ +from extensions.ext_database import db +from libs.infinite_scroll_pagination import InfiniteScrollPagination +from models.model import App +from models.workflow import WorkflowRun, WorkflowRunTriggeredFrom, WorkflowNodeExecution, \ + WorkflowNodeExecutionTriggeredFrom + + +class WorkflowRunService: + def get_paginate_workflow_runs(self, app_model: App, args: dict) -> InfiniteScrollPagination: + """ + Get debug workflow run list + Only return triggered_from == debugging + + :param app_model: app model + :param args: request args + """ + limit = int(args.get('limit', 20)) + + base_query = db.session.query(WorkflowRun).filter( + WorkflowRun.tenant_id == app_model.tenant_id, + WorkflowRun.app_id == app_model.id, + WorkflowRun.triggered_from == WorkflowRunTriggeredFrom.DEBUGGING.value + ) + + if args.get('last_id'): + last_workflow_run = base_query.filter( + WorkflowRun.id == args.get('last_id'), + ).first() + + if not last_workflow_run: + raise ValueError('Last workflow run not exists') + + conversations = base_query.filter( + WorkflowRun.created_at < last_workflow_run.created_at, + WorkflowRun.id != last_workflow_run.id + ).order_by(WorkflowRun.created_at.desc()).limit(limit).all() + else: + conversations = base_query.order_by(WorkflowRun.created_at.desc()).limit(limit).all() + + has_more = False + if len(conversations) == limit: + current_page_first_conversation = conversations[-1] + rest_count = base_query.filter( + WorkflowRun.created_at < current_page_first_conversation.created_at, + WorkflowRun.id != current_page_first_conversation.id + ).count() + + if rest_count > 0: + has_more = True + + return InfiniteScrollPagination( + data=conversations, + limit=limit, + has_more=has_more + ) + + def get_workflow_run(self, app_model: App, run_id: str) -> WorkflowRun: + """ + Get workflow run detail + + :param app_model: app model + :param run_id: workflow run id + """ + workflow_run = db.session.query(WorkflowRun).filter( + WorkflowRun.tenant_id == app_model.tenant_id, + WorkflowRun.app_id == app_model.id, + WorkflowRun.id == run_id, + ).first() + + return workflow_run + + def get_workflow_run_node_executions(self, app_model: App, run_id: str) -> list[WorkflowNodeExecution]: + """ + Get workflow run node execution list + """ + workflow_run = self.get_workflow_run(app_model, run_id) + + if not workflow_run: + return [] + + node_executions = db.session.query(WorkflowNodeExecution).filter( + WorkflowNodeExecution.tenant_id == app_model.tenant_id, + WorkflowNodeExecution.app_id == app_model.id, + WorkflowNodeExecution.workflow_id == workflow_run.workflow_id, + WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value, + WorkflowNodeExecution.workflow_run_id == run_id, + ).order_by(WorkflowNodeExecution.index.desc()).all() + + return node_executions