diff --git a/api/.env.example b/api/.env.example index 32d89d4287..4a3b1d65af 100644 --- a/api/.env.example +++ b/api/.env.example @@ -132,3 +132,7 @@ SSRF_PROXY_HTTP_URL= SSRF_PROXY_HTTPS_URL= BATCH_UPLOAD_LIMIT=10 + +# CODE EXECUTION CONFIGURATION +CODE_EXECUTION_ENDPOINT= +CODE_EXECUTINO_API_KEY= diff --git a/api/config.py b/api/config.py index a978a099b9..a6bc731b82 100644 --- a/api/config.py +++ b/api/config.py @@ -59,7 +59,9 @@ DEFAULTS = { 'CAN_REPLACE_LOGO': 'False', 'ETL_TYPE': 'dify', 'KEYWORD_STORE': 'jieba', - 'BATCH_UPLOAD_LIMIT': 20 + 'BATCH_UPLOAD_LIMIT': 20, + 'CODE_EXECUTION_ENDPOINT': '', + 'CODE_EXECUTION_API_KEY': '' } @@ -293,6 +295,9 @@ class Config: self.BATCH_UPLOAD_LIMIT = get_env('BATCH_UPLOAD_LIMIT') + self.CODE_EXECUTION_ENDPOINT = get_env('CODE_EXECUTION_ENDPOINT') + self.CODE_EXECUTION_API_KEY = get_env('CODE_EXECUTION_API_KEY') + self.API_COMPRESSION_ENABLED = get_bool_env('API_COMPRESSION_ENABLED') diff --git a/api/core/workflow/nodes/code/code_executor.py b/api/core/workflow/nodes/code/code_executor.py new file mode 100644 index 0000000000..3ecd7cfd89 --- /dev/null +++ b/api/core/workflow/nodes/code/code_executor.py @@ -0,0 +1,70 @@ +from os import environ + +from httpx import post +from yarl import URL +from pydantic import BaseModel + +from core.workflow.nodes.code.python_template import PythonTemplateTransformer + +# Code Executor +CODE_EXECUTION_ENDPOINT = environ.get('CODE_EXECUTION_ENDPOINT', '') +CODE_EXECUTION_API_KEY = environ.get('CODE_EXECUTION_API_KEY', '') + +class CodeExecutionException(Exception): + pass + +class CodeExecutionResponse(BaseModel): + class Data(BaseModel): + stdout: str + stderr: str + + code: int + message: str + data: Data + +class CodeExecutor: + @classmethod + def execute_code(cls, language: str, code: str, inputs: dict) -> dict: + """ + Execute code + :param language: code language + :param code: code + :param inputs: inputs + :return: + """ + runner = PythonTemplateTransformer.transform_caller(code, inputs) + + url = URL(CODE_EXECUTION_ENDPOINT) / 'v1' / 'sandbox' / 'run' + headers = { + 'X-Api-Key': CODE_EXECUTION_API_KEY + } + data = { + 'language': language, + 'code': runner, + } + + try: + response = post(str(url), json=data, headers=headers) + if response.status_code == 503: + raise CodeExecutionException('Code execution service is unavailable') + elif response.status_code != 200: + raise Exception('Failed to execute code') + except CodeExecutionException as e: + raise e + except Exception: + raise CodeExecutionException('Failed to execute code') + + try: + response = response.json() + except: + raise CodeExecutionException('Failed to parse response') + + response = CodeExecutionResponse(**response) + + if response.code != 0: + raise CodeExecutionException(response.message) + + if response.data.stderr: + raise CodeExecutionException(response.data.stderr) + + return PythonTemplateTransformer.transform_response(response.data.stdout) \ No newline at end of file diff --git a/api/core/workflow/nodes/code/code_node.py b/api/core/workflow/nodes/code/code_node.py index 7e69f91d11..dc69fdc84a 100644 --- a/api/core/workflow/nodes/code/code_node.py +++ b/api/core/workflow/nodes/code/code_node.py @@ -1,9 +1,23 @@ -from typing import Optional +from typing import Optional, cast, Union +from core.workflow.entities.node_entities import NodeRunResult, NodeType +from core.workflow.entities.variable_pool import VariablePool from core.workflow.nodes.base_node import BaseNode +from core.workflow.nodes.code.entities import CodeNodeData +from core.workflow.nodes.code.code_executor import CodeExecutor, CodeExecutionException +from models.workflow import WorkflowNodeExecutionStatus +MAX_NUMBER = 2 ** 63 - 1 +MIN_NUMBER = -2 ** 63 +MAX_PRECISION = 20 +MAX_DEPTH = 5 +MAX_STRING_LENGTH = 1000 +MAX_STRING_ARRAY_LENGTH = 30 class CodeNode(BaseNode): + _node_data_cls = CodeNodeData + node_type = NodeType.CODE + @classmethod def get_default_config(cls, filters: Optional[dict] = None) -> dict: """ @@ -62,3 +76,167 @@ class CodeNode(BaseNode): ] } } + + def _run(self, variable_pool: Optional[VariablePool] = None, + run_args: Optional[dict] = None) -> NodeRunResult: + """ + Run code + :param variable_pool: variable pool + :param run_args: run args + :return: + """ + node_data = self.node_data + node_data: CodeNodeData = cast(self._node_data_cls, node_data) + + # SINGLE DEBUG NOT IMPLEMENTED YET + if variable_pool is None and run_args: + raise ValueError("Not support single step debug.") + + # Get code language + code_language = node_data.code_language + code = node_data.code + + # Get variables + variables = {} + for variable_selector in node_data.variables: + variable = variable_selector.variable + value = variable_pool.get_variable_value( + variable_selector=variable_selector.value_selector + ) + + variables[variable] = value + + # Run code + try: + result = CodeExecutor.execute_code( + language=code_language, + code=code, + inputs=variables + ) + except CodeExecutionException as e: + return NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + error=str(e) + ) + + # Transform result + result = self._transform_result(result, node_data.outputs) + + return NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + inputs=variables, + outputs=result + ) + + def _check_string(self, value: str, variable: str) -> str: + """ + Check string + :param value: value + :param variable: variable + :param max_length: max length + :return: + """ + if not isinstance(value, str): + raise ValueError(f"{variable} in input form must be a string") + + if len(value) > MAX_STRING_LENGTH: + raise ValueError(f'{variable} in input form must be less than {MAX_STRING_LENGTH} characters') + + return value.replace('\x00', '') + + def _check_number(self, value: Union[int, float], variable: str) -> Union[int, float]: + """ + Check number + :param value: value + :param variable: variable + :return: + """ + if not isinstance(value, (int, float)): + raise ValueError(f"{variable} in input form must be a number") + + if value > MAX_NUMBER or value < MIN_NUMBER: + raise ValueError(f'{variable} in input form is out of range.') + + if isinstance(value, float): + value = round(value, MAX_PRECISION) + + return value + + def _transform_result(self, result: dict, output_schema: dict[str, CodeNodeData.Output], + prefix: str = '', + depth: int = 1) -> dict: + """ + Transform result + :param result: result + :param output_schema: output schema + :return: + """ + if depth > MAX_DEPTH: + raise ValueError("Depth limit reached, object too deep.") + + transformed_result = {} + for output_name, output_config in output_schema.items(): + if output_config.type == 'object': + # check if output is object + if not isinstance(result.get(output_name), dict): + raise ValueError( + f'Output {prefix}.{output_name} is not an object, got {type(result.get(output_name))} instead.' + ) + + transformed_result[output_name] = self._transform_result( + result=result[output_name], + output_schema=output_config.children, + prefix=f'{prefix}.{output_name}' if prefix else output_name, + depth=depth + 1 + ) + elif output_config.type == 'number': + # check if number available + transformed_result[output_name] = self._check_number( + value=result[output_name], + variable=f'{prefix}.{output_name}' if prefix else output_name + ) + + transformed_result[output_name] = result[output_name] + elif output_config.type == 'string': + # check if string available + transformed_result[output_name] = self._check_string( + value=result[output_name], + variable=f'{prefix}.{output_name}' if prefix else output_name, + ) + elif output_config.type == 'array[number]': + # check if array of number available + if not isinstance(result[output_name], list): + raise ValueError( + f'Output {prefix}.{output_name} is not an array, got {type(result.get(output_name))} instead.' + ) + + transformed_result[output_name] = [ + self._check_number( + value=value, + variable=f'{prefix}.{output_name}' if prefix else output_name + ) + for value in result[output_name] + ] + elif output_config.type == 'array[string]': + # check if array of string available + if not isinstance(result[output_name], list): + raise ValueError( + f'Output {prefix}.{output_name} is not an array, got {type(result.get(output_name))} instead.' + ) + + if len(result[output_name]) > MAX_STRING_ARRAY_LENGTH: + raise ValueError( + f'{prefix}.{output_name} in input form must be less than {MAX_STRING_ARRAY_LENGTH} characters' + ) + + transformed_result[output_name] = [ + self._check_string( + value=value, + variable=f'{prefix}.{output_name}' if prefix else output_name + ) + for value in result[output_name] + ] + else: + raise ValueError(f'Output type {output_config.type} is not supported.') + + return transformed_result \ No newline at end of file diff --git a/api/core/workflow/nodes/code/entities.py b/api/core/workflow/nodes/code/entities.py new file mode 100644 index 0000000000..731b00f8c8 --- /dev/null +++ b/api/core/workflow/nodes/code/entities.py @@ -0,0 +1,19 @@ +from core.workflow.entities.base_node_data_entities import BaseNodeData +from core.workflow.entities.variable_entities import VariableSelector + +from pydantic import BaseModel +from typing import Literal, Union + +class CodeNodeData(BaseNodeData): + """ + Code Node Data. + """ + class Output(BaseModel): + type: Literal['string', 'number', 'object', 'array[string]', 'array[number]'] + children: Union[None, dict[str, 'Output']] + + variables: list[VariableSelector] + answer: str + code_language: str + code: str + outputs: dict[str, Output] diff --git a/api/core/workflow/nodes/code/python_template.py b/api/core/workflow/nodes/code/python_template.py new file mode 100644 index 0000000000..03dfee36f3 --- /dev/null +++ b/api/core/workflow/nodes/code/python_template.py @@ -0,0 +1,55 @@ +import json +import re + +PYTHON_RUNNER = """# declare main function here +{{code}} + +# execute main function, and return the result +# inputs is a dict, and it +output = main(**{{inputs}}) + +# convert output to json and print +result = ''' +<> +{output} +<> +''' + +print(result) +""" + + +class PythonTemplateTransformer: + @classmethod + def transform_caller(cls, code: str, inputs: dict) -> str: + """ + Transform code to python runner + :param code: code + :param inputs: inputs + :return: + """ + + # transform inputs to json string + inputs_str = json.dumps(inputs, indent=4) + + # replace code and inputs + runner = PYTHON_RUNNER.replace('{{code}}', code) + runner = runner.replace('{{inputs}}', inputs_str) + + return runner + + @classmethod + def transform_response(cls, response: str) -> dict: + """ + Transform response to dict + :param response: response + :return: + """ + + # extract result + result = re.search(r'<>(.*)<>', response, re.DOTALL) + if not result: + raise ValueError('Failed to parse result') + + result = result.group(1) + return json.loads(result)