Merge remote-tracking branch 'origin/feat/workflow-backend' into feat/workflow-backend

This commit is contained in:
jyong 2024-03-18 22:38:27 +08:00
commit 5a5beb5b59
6 changed files with 109 additions and 31 deletions

View File

@ -51,7 +51,7 @@ class AppParameterApi(InstalledAppResource):
raise AppUnavailableError()
features_dict = workflow.features_dict
user_input_form = workflow.user_input_form()
user_input_form = workflow.user_input_form(to_old_structure=True)
else:
app_model_config = app_model.app_model_config
features_dict = app_model_config.to_dict()

View File

@ -54,7 +54,7 @@ class AppParameterApi(Resource):
raise AppUnavailableError()
features_dict = workflow.features_dict
user_input_form = workflow.user_input_form()
user_input_form = workflow.user_input_form(to_old_structure=True)
else:
app_model_config = app_model.app_model_config
features_dict = app_model_config.to_dict()

View File

@ -52,7 +52,7 @@ class AppParameterApi(WebApiResource):
raise AppUnavailableError()
features_dict = workflow.features_dict
user_input_form = workflow.user_input_form()
user_input_form = workflow.user_input_form(to_old_structure=True)
else:
app_model_config = app_model.app_model_config
features_dict = app_model_config.to_dict()

View File

@ -13,6 +13,8 @@ from core.helper.code_executor.python_transformer import PythonTemplateTransform
CODE_EXECUTION_ENDPOINT = environ.get('CODE_EXECUTION_ENDPOINT', '')
CODE_EXECUTION_API_KEY = environ.get('CODE_EXECUTION_API_KEY', '')
CODE_EXECUTION_TIMEOUT= (10, 60)
class CodeExecutionException(Exception):
pass
@ -58,7 +60,7 @@ class CodeExecutor:
}
try:
response = post(str(url), json=data, headers=headers)
response = post(str(url), json=data, headers=headers, timeout=CODE_EXECUTION_TIMEOUT)
if response.status_code == 503:
raise CodeExecutionException('Code execution service is unavailable')
elif response.status_code != 200:

View File

@ -11,19 +11,42 @@ import core.helper.ssrf_proxy as ssrf_proxy
from core.workflow.nodes.http_request.entities import HttpRequestNodeData
HTTP_REQUEST_DEFAULT_TIMEOUT = (10, 60)
MAX_BINARY_SIZE = 1024 * 1024 * 10 # 10MB
READABLE_MAX_BINARY_SIZE = '10MB'
MAX_TEXT_SIZE = 1024 * 1024 // 10 # 0.1MB
READABLE_MAX_TEXT_SIZE = '0.1MB'
class HttpExecutorResponse:
status_code: int
headers: dict[str, str]
body: bytes
response: Union[httpx.Response, requests.Response]
def __init__(self, status_code: int, headers: dict[str, str], body: bytes):
def __init__(self, response: Union[httpx.Response, requests.Response] = None):
"""
init
"""
self.status_code = status_code
headers = {}
if isinstance(response, httpx.Response):
for k, v in response.headers.items():
headers[k] = v
elif isinstance(response, requests.Response):
for k, v in response.headers.items():
headers[k] = v
self.headers = headers
self.body = body
self.response = response
@property
def is_file(self) -> bool:
"""
check if response is file
"""
content_type = self.get_content_type()
file_content_types = ['image', 'audio', 'video']
for v in file_content_types:
if v in content_type:
return True
return False
def get_content_type(self) -> str:
"""
@ -32,17 +55,15 @@ class HttpExecutorResponse:
for key, val in self.headers.items():
if key.lower() == 'content-type':
return val
return ''
def extract_file(self) -> tuple[str, bytes]:
"""
extract file from response if content type is file related
"""
content_type = self.get_content_type()
file_content_types = ['image', 'audio', 'video']
for v in file_content_types:
if v in content_type:
return content_type, self.body
if self.is_file:
return self.get_content_type(), self.body
return '', b''
@ -51,7 +72,55 @@ class HttpExecutorResponse:
"""
get content
"""
return self.body.decode('utf-8')
if isinstance(self.response, httpx.Response):
return self.response.text
elif isinstance(self.response, requests.Response):
return self.response.text
else:
raise ValueError(f'Invalid response type {type(self.response)}')
@property
def body(self) -> bytes:
"""
get body
"""
if isinstance(self.response, httpx.Response):
return self.response.content
elif isinstance(self.response, requests.Response):
return self.response.content
else:
raise ValueError(f'Invalid response type {type(self.response)}')
@property
def status_code(self) -> int:
"""
get status code
"""
if isinstance(self.response, httpx.Response):
return self.response.status_code
elif isinstance(self.response, requests.Response):
return self.response.status_code
else:
raise ValueError(f'Invalid response type {type(self.response)}')
@property
def size(self) -> int:
"""
get size
"""
return len(self.body)
@property
def readable_size(self) -> str:
"""
get readable size
"""
if self.size < 1024:
return f'{self.size} bytes'
elif self.size < 1024 * 1024:
return f'{(self.size / 1024):.2f} KB'
else:
return f'{(self.size / 1024 / 1024):.2f} MB'
class HttpExecutor:
server_url: str
@ -214,23 +283,20 @@ class HttpExecutor:
"""
validate the response
"""
if isinstance(response, httpx.Response):
# get key-value pairs headers
headers = {}
for k, v in response.headers.items():
headers[k] = v
return HttpExecutorResponse(response.status_code, headers, response.content)
elif isinstance(response, requests.Response):
# get key-value pairs headers
headers = {}
for k, v in response.headers.items():
headers[k] = v
return HttpExecutorResponse(response.status_code, headers, response.content)
if isinstance(response, httpx.Response | requests.Response):
executor_response = HttpExecutorResponse(response)
else:
raise ValueError(f'Invalid response type {type(response)}')
if executor_response.is_file:
if executor_response.size > MAX_BINARY_SIZE:
raise ValueError(f'File size is too large, max size is {READABLE_MAX_BINARY_SIZE}, but current size is {executor_response.readable_size}.')
else:
if executor_response.size > MAX_TEXT_SIZE:
raise ValueError(f'Text size is too large, max size is {READABLE_MAX_TEXT_SIZE}, but current size is {executor_response.readable_size}.')
return executor_response
def _do_http_request(self, headers: dict[str, Any]) -> httpx.Response:
"""
do http request depending on api bundle

View File

@ -129,7 +129,7 @@ class Workflow(db.Model):
def features_dict(self):
return json.loads(self.features) if self.features else {}
def user_input_form(self) -> list:
def user_input_form(self, to_old_structure: bool = False) -> list:
# get start node from graph
if not self.graph:
return []
@ -143,8 +143,18 @@ class Workflow(db.Model):
return []
# get user_input_form from start node
return start_node.get('data', {}).get('variables', [])
variables = start_node.get('data', {}).get('variables', [])
if to_old_structure:
old_structure_variables = []
for variable in variables:
old_structure_variables.append({
variable['type']: variable
})
return old_structure_variables
return variables
class WorkflowRunTriggeredFrom(Enum):
"""