add pipeline async run

This commit is contained in:
jyong 2025-08-26 15:51:49 +08:00
parent 1d2d0ff49f
commit 85f0d31fab
1 changed files with 43 additions and 31 deletions

View File

@ -798,45 +798,57 @@ class DatasourceProviderService:
# get all plugin providers
manager = PluginDatasourceManager()
datasources = manager.fetch_installed_datasource_providers(tenant_id)
copy_credentials_list = []
datasource_credentials = []
for datasource in datasources:
if datasource.plugin_id in [
"langgenius/firecrawl_datasource",
"langgenius/notion_datasource",
"langgenius/jina_datasource",
]:
default_provider = (
db.session.query(DatasourceProvider)
.filter_by(tenant_id=tenant_id, provider=datasource.provider, plugin_id=datasource.plugin_id)
.order_by(DatasourceProvider.is_default.desc(), DatasourceProvider.created_at.asc())
.first()
datasource_provider_id = DatasourceProviderID(f"{datasource.plugin_id}/{datasource.provider}")
credentials = self.list_datasource_credentials(
tenant_id=tenant_id, provider=datasource.provider, plugin_id=datasource.plugin_id
)
if default_provider:
datasource_provider_id = DatasourceProviderID(f"{datasource.plugin_id}/{datasource.provider}")
encrypted_credentials = default_provider.encrypted_credentials
# Get provider credential secret variables
credential_secret_variables = self.extract_secret_variables(
tenant_id=tenant_id,
provider_id=f"{datasource.plugin_id}/{datasource.provider}",
credential_type=CredentialType.of(default_provider.auth_type),
)
# Obfuscate provider credentials
copy_credentials = encrypted_credentials.copy()
for key, value in copy_credentials.items():
if key in credential_secret_variables:
copy_credentials[key] = encrypter.obfuscated_token(value)
copy_credentials_list.append(
{
"credential": copy_credentials,
"type": default_provider.auth_type,
"name": default_provider.name,
"avatar_url": default_provider.avatar_url,
"id": default_provider.id,
"is_default": True,
redirect_uri = (
f"{dify_config.CONSOLE_API_URL}/console/api/oauth/plugin/{datasource_provider_id}/datasource/callback"
)
datasource_credentials.append(
{
"provider": datasource.provider,
"plugin_id": datasource.plugin_id,
"plugin_unique_identifier": datasource.plugin_unique_identifier,
"icon": datasource.declaration.identity.icon,
"name": datasource.declaration.identity.name.split("/")[-1],
"label": datasource.declaration.identity.label.model_dump(),
"description": datasource.declaration.identity.description.model_dump(),
"author": datasource.declaration.identity.author,
"credentials_list": credentials,
"credential_schema": [
credential.model_dump() for credential in datasource.declaration.credentials_schema
],
"oauth_schema": {
"client_schema": [
client_schema.model_dump()
for client_schema in datasource.declaration.oauth_schema.client_schema
],
"credentials_schema": [
credential_schema.model_dump()
for credential_schema in datasource.declaration.oauth_schema.credentials_schema
],
"oauth_custom_client_params": self.get_tenant_oauth_client(
tenant_id, datasource_provider_id, mask=True
),
"is_oauth_custom_client_enabled": self.is_tenant_oauth_params_enabled(
tenant_id, datasource_provider_id
),
"is_system_oauth_params_exists": self.is_system_oauth_params_exist(datasource_provider_id),
"redirect_uri": redirect_uri,
}
)
return copy_credentials_list
if datasource.declaration.oauth_schema
else None,
}
)
return datasource_credentials
def get_real_datasource_credentials(self, tenant_id: str, provider: str, plugin_id: str) -> list[dict]:
"""