From 748aa22ee2e1deec036378d664bc7d6652886c4e Mon Sep 17 00:00:00 2001 From: takatost Date: Sun, 25 Feb 2024 21:02:28 +0800 Subject: [PATCH] add manual convert logic --- api/commands.py | 81 +----------- api/controllers/console/app/workflow.py | 8 +- .../versions/b289e2408ee2_add_workflow.py | 2 + api/models/model.py | 1 + api/models/workflow.py | 78 +++++++++++ api/services/workflow/workflow_converter.py | 123 +++++++++++++----- api/services/workflow_service.py | 29 +++-- 7 files changed, 198 insertions(+), 124 deletions(-) diff --git a/api/commands.py b/api/commands.py index 73d2150de2..e376d222c6 100644 --- a/api/commands.py +++ b/api/commands.py @@ -1,6 +1,5 @@ import base64 import json -import logging import secrets import click @@ -13,12 +12,11 @@ from extensions.ext_database import db from libs.helper import email as email_validate from libs.password import hash_password, password_pattern, valid_password from libs.rsa import generate_key_pair -from models.account import Tenant, TenantAccountJoin +from models.account import Tenant from models.dataset import Dataset, DatasetCollectionBinding, DocumentSegment from models.dataset import Document as DatasetDocument from models.model import Account, App, AppMode, AppModelConfig, AppAnnotationSetting, Conversation, MessageAnnotation from models.provider import Provider, ProviderModel -from services.workflow.workflow_converter import WorkflowConverter @click.command('reset-password', help='Reset the account password.') @@ -384,10 +382,11 @@ def convert_to_agent_apps(): while True: # fetch first 1000 apps sql_query = """SELECT a.id AS id FROM apps a -INNER JOIN app_model_configs am ON a.app_model_config_id=am.id -WHERE a.mode = 'chat' AND am.agent_mode is not null -and (am.agent_mode like '%"strategy": "function_call"%' or am.agent_mode like '%"strategy": "react"%') -and am.agent_mode like '{"enabled": true%' ORDER BY a.created_at DESC LIMIT 1000""" + INNER JOIN app_model_configs am ON a.app_model_config_id=am.id + WHERE a.mode = 'chat' AND am.agent_mode is not null + and (am.agent_mode like '%"strategy": "function_call"%' or am.agent_mode like '%"strategy": "react"%') + and am.agent_mode like '{"enabled": true%' ORDER BY a.created_at DESC LIMIT 1000""" + with db.engine.begin() as conn: rs = conn.execute(db.text(sql_query)) @@ -424,77 +423,9 @@ and am.agent_mode like '{"enabled": true%' ORDER BY a.created_at DESC LIMIT 1000 click.echo(click.style('Congratulations! Converted {} agent apps.'.format(len(proceeded_app_ids)), fg='green')) -@click.command('convert-to-workflow-chatbot-apps', help='Convert Basic Export Assistant to Chatbot Workflow App.') -def convert_to_workflow_chatbot_apps(): - """ - Convert Basic Export Assistant to Chatbot Workflow App. - """ - click.echo(click.style('Start convert to workflow chatbot apps.', fg='green')) - - proceeded_app_ids = [] - workflow_converter = WorkflowConverter() - - while True: - # fetch first 1000 apps - sql_query = """SELECT a.id FROM apps a -LEFT JOIN app_model_configs am ON a.app_model_config_id=am.id -WHERE a.mode = 'chat' AND am.prompt_type='advanced' ORDER BY a.created_at DESC LIMIT 1000""" - - with db.engine.begin() as conn: - rs = conn.execute(db.text(sql_query)) - - apps = [] - for i in rs: - app_id = str(i.id) - print(app_id) - if app_id not in proceeded_app_ids: - proceeded_app_ids.append(app_id) - app = db.session.query(App).filter(App.id == app_id).first() - apps.append(app) - - if len(apps) == 0: - break - - for app in apps: - click.echo('Converting app: {}'.format(app.id)) - - try: - # get workspace of app - tenant = db.session.query(Tenant).filter(Tenant.id == app.tenant_id).first() - if not tenant: - click.echo(click.style('Tenant not found: {}'.format(app.tenant_id), fg='red')) - continue - - # get workspace owner - tenant_account_join = db.session.query(TenantAccountJoin).filter( - TenantAccountJoin.tenant_id == tenant.id, - TenantAccountJoin.role == 'owner' - ).first() - - if not tenant_account_join: - click.echo(click.style('Tenant owner not found: {}'.format(tenant.id), fg='red')) - continue - - # convert to workflow - workflow_converter.convert_to_workflow( - app_model=app, - account_id=tenant_account_join.account_id - ) - - click.echo(click.style('Converted app: {}'.format(app.id), fg='green')) - except Exception as e: - logging.exception('Convert app error: {}'.format(app.id)) - click.echo( - click.style('Convert app error: {} {}'.format(e.__class__.__name__, - str(e)), fg='red')) - - click.echo(click.style('Congratulations! Converted {} workflow chatbot apps.'.format(len(proceeded_app_ids)), fg='green')) - - def register_commands(app): app.cli.add_command(reset_password) app.cli.add_command(reset_email) app.cli.add_command(reset_encrypt_key_pair) app.cli.add_command(vdb_migrate) app.cli.add_command(convert_to_agent_apps) - app.cli.add_command(convert_to_workflow_chatbot_apps) diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 7663e22580..dc1b7edcaf 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -69,15 +69,15 @@ class ConvertToWorkflowApi(Resource): @setup_required @login_required @account_initialization_required - @get_app_model(mode=AppMode.CHAT) - @marshal_with(workflow_fields) + @get_app_model(mode=[AppMode.CHAT, AppMode.COMPLETION]) def post(self, app_model: App): """ - Convert basic mode of chatbot app to workflow + Convert basic mode of chatbot app(expert mode) to workflow mode + Convert Completion App to Workflow App """ # convert to workflow mode workflow_service = WorkflowService() - workflow = workflow_service.chatbot_convert_to_workflow( + workflow = workflow_service.convert_to_workflow( app_model=app_model, account=current_user ) diff --git a/api/migrations/versions/b289e2408ee2_add_workflow.py b/api/migrations/versions/b289e2408ee2_add_workflow.py index e9cd2caf3a..9e04fef288 100644 --- a/api/migrations/versions/b289e2408ee2_add_workflow.py +++ b/api/migrations/versions/b289e2408ee2_add_workflow.py @@ -53,6 +53,7 @@ def upgrade(): sa.Column('elapsed_time', sa.Float(), server_default=sa.text('0'), nullable=False), sa.Column('execution_metadata', sa.Text(), nullable=True), sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP(0)'), nullable=False), + sa.Column('created_by_role', sa.String(length=255), nullable=False), sa.Column('created_by', postgresql.UUID(), nullable=False), sa.Column('finished_at', sa.DateTime(), nullable=True), sa.PrimaryKeyConstraint('id', name='workflow_node_execution_pkey') @@ -80,6 +81,7 @@ def upgrade(): sa.Column('total_price', sa.Numeric(precision=10, scale=7), nullable=True), sa.Column('currency', sa.String(length=255), nullable=True), sa.Column('total_steps', sa.Integer(), server_default=sa.text('0'), nullable=True), + sa.Column('created_by_role', sa.String(length=255), nullable=False), sa.Column('created_by', postgresql.UUID(), nullable=False), sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP(0)'), nullable=False), sa.Column('finished_at', sa.DateTime(), nullable=True), diff --git a/api/models/model.py b/api/models/model.py index 58e29cd21c..1e66fd6c88 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -28,6 +28,7 @@ class DifySetup(db.Model): class AppMode(Enum): + COMPLETION = 'completion' WORKFLOW = 'workflow' CHAT = 'chat' AGENT = 'agent' diff --git a/api/models/workflow.py b/api/models/workflow.py index 95805e7871..251f33b0c0 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -7,6 +7,27 @@ from extensions.ext_database import db from models.account import Account +class CreatedByRole(Enum): + """ + Created By Role Enum + """ + ACCOUNT = 'account' + END_USER = 'end_user' + + @classmethod + def value_of(cls, value: str) -> 'CreatedByRole': + """ + Get value of given mode. + + :param value: mode value + :return: mode + """ + for mode in cls: + if mode.value == value: + return mode + raise ValueError(f'invalid created by role value {value}') + + class WorkflowType(Enum): """ Workflow Type Enum @@ -99,6 +120,49 @@ class Workflow(db.Model): return Account.query.get(self.updated_by) +class WorkflowRunTriggeredFrom(Enum): + """ + Workflow Run Triggered From Enum + """ + DEBUGGING = 'debugging' + APP_RUN = 'app-run' + + @classmethod + def value_of(cls, value: str) -> 'WorkflowRunTriggeredFrom': + """ + Get value of given mode. + + :param value: mode value + :return: mode + """ + for mode in cls: + if mode.value == value: + return mode + raise ValueError(f'invalid workflow run triggered from value {value}') + + +class WorkflowRunStatus(Enum): + """ + Workflow Run Status Enum + """ + RUNNING = 'running' + SUCCEEDED = 'succeeded' + FAILED = 'failed' + + @classmethod + def value_of(cls, value: str) -> 'WorkflowRunStatus': + """ + Get value of given mode. + + :param value: mode value + :return: mode + """ + for mode in cls: + if mode.value == value: + return mode + raise ValueError(f'invalid workflow run status value {value}') + + class WorkflowRun(db.Model): """ Workflow Run @@ -128,6 +192,12 @@ class WorkflowRun(db.Model): - total_price (decimal) `optional` Total cost - currency (string) `optional` Currency, such as USD / RMB - total_steps (int) Total steps (redundant), default 0 + - created_by_role (string) Creator role + + - `account` Console account + + - `end_user` End user + - created_by (uuid) Runner ID - created_at (timestamp) Run time - finished_at (timestamp) End time @@ -157,6 +227,7 @@ class WorkflowRun(db.Model): total_price = db.Column(db.Numeric(10, 7)) currency = db.Column(db.String(255)) total_steps = db.Column(db.Integer, server_default=db.text('0')) + created_by_role = db.Column(db.String(255), nullable=False) created_by = db.Column(UUID, nullable=False) created_at = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP(0)')) finished_at = db.Column(db.DateTime) @@ -208,6 +279,12 @@ class WorkflowNodeExecution(db.Model): - currency (string) `optional` Currency, such as USD / RMB - created_at (timestamp) Run time + - created_by_role (string) Creator role + + - `account` Console account + + - `end_user` End user + - created_by (uuid) Runner ID - finished_at (timestamp) End time """ @@ -240,6 +317,7 @@ class WorkflowNodeExecution(db.Model): elapsed_time = db.Column(db.Float, nullable=False, server_default=db.text('0')) execution_metadata = db.Column(db.Text) created_at = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP(0)')) + created_by_role = db.Column(db.String(255), nullable=False) created_by = db.Column(UUID, nullable=False) finished_at = db.Column(db.DateTime) diff --git a/api/services/workflow/workflow_converter.py b/api/services/workflow/workflow_converter.py index 1d3cbe2e0e..bb300d1a77 100644 --- a/api/services/workflow/workflow_converter.py +++ b/api/services/workflow/workflow_converter.py @@ -17,9 +17,11 @@ from core.model_runtime.utils.encoders import jsonable_encoder from core.prompt.simple_prompt_transform import SimplePromptTransform from core.workflow.entities.NodeEntities import NodeType from core.workflow.nodes.end.entities import EndNodeOutputType +from events.app_event import app_was_created from extensions.ext_database import db +from models.account import Account from models.api_based_extension import APIBasedExtension, APIBasedExtensionPoint -from models.model import App, AppMode, ChatbotAppEngine +from models.model import App, AppMode, ChatbotAppEngine, AppModelConfig, Site from models.workflow import Workflow, WorkflowType @@ -28,26 +30,99 @@ class WorkflowConverter: App Convert to Workflow Mode """ - def convert_to_workflow(self, app_model: App, account_id: str) -> Workflow: + def convert_to_workflow(self, app_model: App, account: Account) -> App: """ - Convert to workflow mode + Convert app to workflow - basic mode of chatbot app - - advanced mode of assistant app (for migration) + - advanced mode of assistant app - - completion app (for migration) + - completion app :param app_model: App instance + :param account: Account + :return: new App instance + """ + # get original app config + app_model_config = app_model.app_model_config + + # convert app model config + workflow = self.convert_app_model_config_to_workflow( + app_model=app_model, + app_model_config=app_model_config, + account_id=account.id + ) + + # create new app + new_app = App() + new_app.tenant_id = app_model.tenant_id + new_app.name = app_model.name + '(workflow)' + new_app.mode = AppMode.CHAT.value \ + if app_model.mode == AppMode.CHAT.value else AppMode.WORKFLOW.value + new_app.icon = app_model.icon + new_app.icon_background = app_model.icon_background + new_app.enable_site = app_model.enable_site + new_app.enable_api = app_model.enable_api + new_app.api_rpm = app_model.api_rpm + new_app.api_rph = app_model.api_rph + new_app.is_demo = False + new_app.is_public = app_model.is_public + db.session.add(new_app) + db.session.flush() + + # create new app model config record + new_app_model_config = app_model_config.copy() + new_app_model_config.id = None + new_app_model_config.app_id = new_app.id + new_app_model_config.external_data_tools = '' + new_app_model_config.model = '' + new_app_model_config.user_input_form = '' + new_app_model_config.dataset_query_variable = None + new_app_model_config.pre_prompt = None + new_app_model_config.agent_mode = '' + new_app_model_config.prompt_type = 'simple' + new_app_model_config.chat_prompt_config = '' + new_app_model_config.completion_prompt_config = '' + new_app_model_config.dataset_configs = '' + new_app_model_config.chatbot_app_engine = ChatbotAppEngine.WORKFLOW.value \ + if app_model.mode == AppMode.CHAT.value else ChatbotAppEngine.NORMAL.value + new_app_model_config.workflow_id = workflow.id + + db.session.add(new_app_model_config) + db.session.flush() + + new_app.app_model_config_id = new_app_model_config.id + db.session.commit() + + site = Site( + app_id=new_app.id, + title=new_app.name, + default_language=account.interface_language, + customize_token_strategy='not_allow', + code=Site.generate_code(16) + ) + + db.session.add(site) + db.session.commit() + + app_was_created.send(new_app) + + return new_app + + def convert_app_model_config_to_workflow(self, app_model: App, + app_model_config: AppModelConfig, + account_id: str) -> Workflow: + """ + Convert app model config to workflow mode + :param app_model: App instance + :param app_model_config: AppModelConfig instance :param account_id: Account ID - :return: workflow instance + :return: """ # get new app mode new_app_mode = self._get_new_app_mode(app_model) - # get original app config - app_model_config = app_model.app_model_config - # convert app model config application_manager = ApplicationManager() app_orchestration_config_entity = application_manager.convert_from_app_model_config_dict( @@ -122,33 +197,11 @@ class WorkflowConverter: type=WorkflowType.from_app_mode(new_app_mode).value, version='draft', graph=json.dumps(graph), - created_by=account_id + created_by=account_id, + created_at=app_model_config.created_at ) db.session.add(workflow) - db.session.flush() - - # create new app model config record - new_app_model_config = app_model_config.copy() - new_app_model_config.id = None - new_app_model_config.external_data_tools = '' - new_app_model_config.model = '' - new_app_model_config.user_input_form = '' - new_app_model_config.dataset_query_variable = None - new_app_model_config.pre_prompt = None - new_app_model_config.agent_mode = '' - new_app_model_config.prompt_type = 'simple' - new_app_model_config.chat_prompt_config = '' - new_app_model_config.completion_prompt_config = '' - new_app_model_config.dataset_configs = '' - new_app_model_config.chatbot_app_engine = ChatbotAppEngine.WORKFLOW.value \ - if new_app_mode == AppMode.CHAT else ChatbotAppEngine.NORMAL.value - new_app_model_config.workflow_id = workflow.id - - db.session.add(new_app_model_config) - db.session.commit() - - app_model.app_model_config_id = new_app_model_config.id db.session.commit() return workflow @@ -469,7 +522,7 @@ class WorkflowConverter: "type": NodeType.END.value, } } - elif app_model.mode == "completion": + elif app_model.mode == AppMode.COMPLETION.value: # for original completion app return { "id": "end", @@ -516,7 +569,7 @@ class WorkflowConverter: :param app_model: App instance :return: AppMode """ - if app_model.mode == "completion": + if app_model.mode == AppMode.COMPLETION.value: return AppMode.WORKFLOW else: return AppMode.value_of(app_model.mode) diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index bd88f3cbe2..2d9342ffc9 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -3,7 +3,7 @@ from datetime import datetime from extensions.ext_database import db from models.account import Account -from models.model import App, ChatbotAppEngine +from models.model import App, ChatbotAppEngine, AppMode from models.workflow import Workflow, WorkflowType from services.workflow.defaults import default_block_configs from services.workflow.workflow_converter import WorkflowConverter @@ -65,20 +65,29 @@ class WorkflowService: # return default block config return default_block_configs - def chatbot_convert_to_workflow(self, app_model: App, account: Account) -> Workflow: + def convert_to_workflow(self, app_model: App, account: Account) -> App: """ - basic mode of chatbot app to workflow + Basic mode of chatbot app(expert mode) to workflow + Completion App to Workflow App :param app_model: App instance :param account: Account instance :return: """ - # check if chatbot app is in basic mode - if app_model.app_model_config.chatbot_app_engine != ChatbotAppEngine.NORMAL: - raise ValueError('Chatbot app already in workflow mode') - - # convert to workflow mode + # chatbot convert to workflow mode workflow_converter = WorkflowConverter() - workflow = workflow_converter.convert_to_workflow(app_model=app_model, account_id=account.id) - return workflow + if app_model.mode == AppMode.CHAT.value: + # check if chatbot app is in basic mode + if app_model.app_model_config.chatbot_app_engine != ChatbotAppEngine.NORMAL: + raise ValueError('Chatbot app already in workflow mode') + elif app_model.mode != AppMode.COMPLETION.value: + raise ValueError(f'Current App mode: {app_model.mode} is not supported convert to workflow.') + + # convert to workflow + new_app = workflow_converter.convert_to_workflow( + app_model=app_model, + account=account + ) + + return new_app