mirror of
https://github.com/langgenius/dify.git
synced 2026-04-24 09:06:42 +08:00
perf: split tasks to multi worker
This commit is contained in:
parent
6674d7fc18
commit
bcfbeee333
@ -25,9 +25,29 @@ def batch_fetch_plugin_manifests(plugin_ids: list[str]) -> Sequence[MarketplaceP
|
|||||||
url = str(marketplace_api_url / "api/v1/plugins/batch")
|
url = str(marketplace_api_url / "api/v1/plugins/batch")
|
||||||
response = requests.post(url, json={"plugin_ids": plugin_ids})
|
response = requests.post(url, json={"plugin_ids": plugin_ids})
|
||||||
response.raise_for_status()
|
response.raise_for_status()
|
||||||
|
|
||||||
return [MarketplacePluginDeclaration(**plugin) for plugin in response.json()["data"]["plugins"]]
|
return [MarketplacePluginDeclaration(**plugin) for plugin in response.json()["data"]["plugins"]]
|
||||||
|
|
||||||
|
|
||||||
|
def batch_fetch_plugin_manifests_ignore_deserialization_error(
|
||||||
|
plugin_ids: list[str],
|
||||||
|
) -> Sequence[MarketplacePluginDeclaration]:
|
||||||
|
if len(plugin_ids) == 0:
|
||||||
|
return []
|
||||||
|
|
||||||
|
url = str(marketplace_api_url / "api/v1/plugins/batch")
|
||||||
|
response = requests.post(url, json={"plugin_ids": plugin_ids})
|
||||||
|
response.raise_for_status()
|
||||||
|
result: list[MarketplacePluginDeclaration] = []
|
||||||
|
for plugin in response.json()["data"]["plugins"]:
|
||||||
|
try:
|
||||||
|
result.append(MarketplacePluginDeclaration(**plugin))
|
||||||
|
except Exception as e:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
def record_install_plugin_event(plugin_unique_identifier: str):
|
def record_install_plugin_event(plugin_unique_identifier: str):
|
||||||
url = str(marketplace_api_url / "api/v1/stats/plugins/install_count")
|
url = str(marketplace_api_url / "api/v1/stats/plugins/install_count")
|
||||||
response = requests.post(url, json={"unique_identifier": plugin_unique_identifier})
|
response = requests.post(url, json={"unique_identifier": plugin_unique_identifier})
|
||||||
|
|||||||
@ -1,19 +1,14 @@
|
|||||||
import time
|
import time
|
||||||
import traceback
|
|
||||||
|
|
||||||
import click
|
import click
|
||||||
|
|
||||||
import app
|
import app
|
||||||
from core.helper import marketplace
|
|
||||||
from core.plugin.entities.plugin import PluginInstallationSource
|
|
||||||
from core.plugin.impl.plugin import PluginInstaller
|
|
||||||
from extensions.ext_database import db
|
from extensions.ext_database import db
|
||||||
from models.account import TenantPluginAutoUpgradeStrategy
|
from models.account import TenantPluginAutoUpgradeStrategy
|
||||||
|
from tasks.process_tenant_plugin_autoupgrade_check_task import process_tenant_plugin_autoupgrade_check_task
|
||||||
|
|
||||||
AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL = 15 * 60 # 15 minutes
|
AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL = 15 * 60 # 15 minutes
|
||||||
|
|
||||||
RETRY_TIMES_OF_ONE_PLUGIN_IN_ONE_TENANT = 3
|
|
||||||
|
|
||||||
|
|
||||||
@app.celery.task(queue="plugin")
|
@app.celery.task(queue="plugin")
|
||||||
def check_upgradable_plugin_task():
|
def check_upgradable_plugin_task():
|
||||||
@ -23,7 +18,7 @@ def check_upgradable_plugin_task():
|
|||||||
now_seconds_of_day = time.time() % 86400 # we assume the tz is UTC
|
now_seconds_of_day = time.time() % 86400 # we assume the tz is UTC
|
||||||
click.echo(click.style("Now seconds of day: {}".format(now_seconds_of_day), fg="green"))
|
click.echo(click.style("Now seconds of day: {}".format(now_seconds_of_day), fg="green"))
|
||||||
|
|
||||||
# get strategies that set to be performed in the next AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL
|
# 获取需要在下一个AUTO_UPGRADE_MINIMAL_CHECKING_INTERVAL内执行的策略
|
||||||
strategies = (
|
strategies = (
|
||||||
db.session.query(TenantPluginAutoUpgradeStrategy)
|
db.session.query(TenantPluginAutoUpgradeStrategy)
|
||||||
.filter(
|
.filter(
|
||||||
@ -36,113 +31,15 @@ def check_upgradable_plugin_task():
|
|||||||
.all()
|
.all()
|
||||||
)
|
)
|
||||||
|
|
||||||
manager = PluginInstaller()
|
|
||||||
|
|
||||||
for strategy in strategies:
|
for strategy in strategies:
|
||||||
try:
|
process_tenant_plugin_autoupgrade_check_task.delay(
|
||||||
tenant_id = strategy.tenant_id
|
strategy.tenant_id,
|
||||||
strategy_setting = strategy.strategy_setting
|
strategy.strategy_setting,
|
||||||
upgrade_mode = strategy.upgrade_mode
|
strategy.upgrade_time_of_day,
|
||||||
exclude_plugins = strategy.exclude_plugins
|
strategy.upgrade_mode,
|
||||||
include_plugins = strategy.include_plugins
|
strategy.exclude_plugins,
|
||||||
|
strategy.include_plugins,
|
||||||
if strategy_setting == TenantPluginAutoUpgradeStrategy.StrategySetting.DISABLED:
|
)
|
||||||
continue
|
|
||||||
|
|
||||||
# get plugins that need to be checked
|
|
||||||
plugin_ids: list[tuple[str, str, str]] = [] # plugin_id, version, unique_identifier
|
|
||||||
|
|
||||||
if upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.PARTIAL and include_plugins:
|
|
||||||
all_plugins = manager.list_plugins(tenant_id)
|
|
||||||
|
|
||||||
for plugin in all_plugins:
|
|
||||||
if plugin.source == PluginInstallationSource.Marketplace and plugin.plugin_id in include_plugins:
|
|
||||||
plugin_ids.append((plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier))
|
|
||||||
|
|
||||||
elif upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.EXCLUDE:
|
|
||||||
# get all plugins and remove the exclude plugins
|
|
||||||
all_plugins = manager.list_plugins(tenant_id)
|
|
||||||
plugin_ids = [
|
|
||||||
(plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier)
|
|
||||||
for plugin in all_plugins
|
|
||||||
if plugin.source == PluginInstallationSource.Marketplace and plugin.plugin_id not in exclude_plugins
|
|
||||||
]
|
|
||||||
elif upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.ALL:
|
|
||||||
all_plugins = manager.list_plugins(tenant_id)
|
|
||||||
plugin_ids = [
|
|
||||||
(plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier)
|
|
||||||
for plugin in all_plugins
|
|
||||||
if plugin.source == PluginInstallationSource.Marketplace
|
|
||||||
]
|
|
||||||
|
|
||||||
if not plugin_ids:
|
|
||||||
continue
|
|
||||||
|
|
||||||
plugin_ids_plain_list = [plugin_id for plugin_id, _, _ in plugin_ids]
|
|
||||||
|
|
||||||
click.echo(click.style("Fetching manifests for plugins: {}".format(plugin_ids_plain_list), fg="green"))
|
|
||||||
|
|
||||||
# fetch latest versions from marketplace
|
|
||||||
manifests = marketplace.batch_fetch_plugin_manifests(plugin_ids_plain_list)
|
|
||||||
|
|
||||||
for manifest in manifests:
|
|
||||||
for plugin_id, version, original_unique_identifier in plugin_ids:
|
|
||||||
if manifest.plugin_id != plugin_id:
|
|
||||||
continue
|
|
||||||
|
|
||||||
try:
|
|
||||||
current_version = version
|
|
||||||
latest_version = manifest.latest_version
|
|
||||||
|
|
||||||
# @yeuoly review here
|
|
||||||
def fix_only_checker(latest_version, current_version):
|
|
||||||
latest_version_tuple = tuple(int(val) for val in latest_version.split("."))
|
|
||||||
current_version_tuple = tuple(int(val) for val in current_version.split("."))
|
|
||||||
|
|
||||||
if (
|
|
||||||
latest_version_tuple[0] == current_version_tuple[0]
|
|
||||||
and latest_version_tuple[1] == current_version_tuple[1]
|
|
||||||
):
|
|
||||||
return latest_version_tuple[2] != current_version_tuple[2]
|
|
||||||
return False
|
|
||||||
|
|
||||||
version_checker = {
|
|
||||||
TenantPluginAutoUpgradeStrategy.StrategySetting.LATEST: lambda latest_version,
|
|
||||||
current_version: latest_version != current_version,
|
|
||||||
TenantPluginAutoUpgradeStrategy.StrategySetting.FIX_ONLY: fix_only_checker,
|
|
||||||
}
|
|
||||||
|
|
||||||
if version_checker[strategy_setting](latest_version, current_version):
|
|
||||||
# execute upgrade
|
|
||||||
new_unique_identifier = manifest.latest_package_identifier
|
|
||||||
|
|
||||||
marketplace.record_install_plugin_event(new_unique_identifier)
|
|
||||||
click.echo(
|
|
||||||
click.style(
|
|
||||||
"Upgrade plugin: {} -> {}".format(
|
|
||||||
original_unique_identifier, new_unique_identifier
|
|
||||||
),
|
|
||||||
fg="green",
|
|
||||||
)
|
|
||||||
)
|
|
||||||
task_start_resp = manager.upgrade_plugin(
|
|
||||||
tenant_id,
|
|
||||||
original_unique_identifier,
|
|
||||||
new_unique_identifier,
|
|
||||||
PluginInstallationSource.Marketplace,
|
|
||||||
{
|
|
||||||
"plugin_unique_identifier": new_unique_identifier,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
except Exception as e:
|
|
||||||
click.echo(click.style("Error when upgrading plugin: {}".format(e), fg="red"))
|
|
||||||
traceback.print_exc()
|
|
||||||
break
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
click.echo(click.style("Error when checking upgradable plugin: {}".format(e), fg="red"))
|
|
||||||
traceback.print_exc()
|
|
||||||
continue
|
|
||||||
|
|
||||||
end_at = time.perf_counter()
|
end_at = time.perf_counter()
|
||||||
click.echo(
|
click.echo(
|
||||||
|
|||||||
163
api/tasks/process_tenant_plugin_autoupgrade_check_task.py
Normal file
163
api/tasks/process_tenant_plugin_autoupgrade_check_task.py
Normal file
@ -0,0 +1,163 @@
|
|||||||
|
import traceback
|
||||||
|
|
||||||
|
import click
|
||||||
|
from celery import shared_task
|
||||||
|
|
||||||
|
from core.helper import marketplace
|
||||||
|
from core.helper.marketplace import MarketplacePluginDeclaration
|
||||||
|
from core.plugin.entities.plugin import PluginInstallationSource
|
||||||
|
from core.plugin.impl.plugin import PluginInstaller
|
||||||
|
from models.account import TenantPluginAutoUpgradeStrategy
|
||||||
|
|
||||||
|
RETRY_TIMES_OF_ONE_PLUGIN_IN_ONE_TENANT = 3
|
||||||
|
|
||||||
|
|
||||||
|
cached_plugin_manifests: dict[str, MarketplacePluginDeclaration] = {}
|
||||||
|
|
||||||
|
|
||||||
|
def marketplace_batch_fetch_plugin_manifests(
|
||||||
|
plugin_ids_plain_list: list[str],
|
||||||
|
) -> list[MarketplacePluginDeclaration]:
|
||||||
|
global cached_plugin_manifests
|
||||||
|
# return marketplace.batch_fetch_plugin_manifests(plugin_ids_plain_list)
|
||||||
|
not_included_plugin_ids = [
|
||||||
|
plugin_id for plugin_id in plugin_ids_plain_list if plugin_id not in cached_plugin_manifests
|
||||||
|
]
|
||||||
|
if not_included_plugin_ids:
|
||||||
|
manifests = marketplace.batch_fetch_plugin_manifests_ignore_deserialization_error(not_included_plugin_ids)
|
||||||
|
for manifest in manifests:
|
||||||
|
cached_plugin_manifests[manifest.plugin_id] = manifest
|
||||||
|
|
||||||
|
if (
|
||||||
|
len(manifests) == 0
|
||||||
|
): # this indicates that the plugin not found in marketplace, should set None in cache to prevent future check
|
||||||
|
for plugin_id in not_included_plugin_ids:
|
||||||
|
cached_plugin_manifests[plugin_id] = None
|
||||||
|
|
||||||
|
return [
|
||||||
|
cached_plugin_manifests[plugin_id]
|
||||||
|
for plugin_id in plugin_ids_plain_list
|
||||||
|
if cached_plugin_manifests[plugin_id] is not None
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@shared_task(queue="plugin")
|
||||||
|
def process_tenant_plugin_autoupgrade_check_task(
|
||||||
|
tenant_id: str,
|
||||||
|
strategy_setting: TenantPluginAutoUpgradeStrategy.StrategySetting,
|
||||||
|
upgrade_time_of_day: int,
|
||||||
|
upgrade_mode: TenantPluginAutoUpgradeStrategy.UpgradeMode,
|
||||||
|
exclude_plugins: list[str],
|
||||||
|
include_plugins: list[str],
|
||||||
|
):
|
||||||
|
try:
|
||||||
|
manager = PluginInstaller()
|
||||||
|
|
||||||
|
click.echo(
|
||||||
|
click.style(
|
||||||
|
"Checking upgradable plugin for tenant: {}".format(tenant_id),
|
||||||
|
fg="green",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
if strategy_setting == TenantPluginAutoUpgradeStrategy.StrategySetting.DISABLED:
|
||||||
|
return
|
||||||
|
|
||||||
|
# 获取需要检查的插件
|
||||||
|
plugin_ids: list[tuple[str, str, str]] = [] # plugin_id, version, unique_identifier
|
||||||
|
click.echo(click.style("Upgrade mode: {}".format(upgrade_mode), fg="green"))
|
||||||
|
|
||||||
|
if upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.PARTIAL and include_plugins:
|
||||||
|
all_plugins = manager.list_plugins(tenant_id)
|
||||||
|
|
||||||
|
for plugin in all_plugins:
|
||||||
|
if plugin.source == PluginInstallationSource.Marketplace and plugin.plugin_id in include_plugins:
|
||||||
|
plugin_ids.append(
|
||||||
|
(
|
||||||
|
plugin.plugin_id,
|
||||||
|
plugin.version,
|
||||||
|
plugin.plugin_unique_identifier,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
elif upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.EXCLUDE:
|
||||||
|
# 获取所有插件并移除exclude的
|
||||||
|
all_plugins = manager.list_plugins(tenant_id)
|
||||||
|
plugin_ids = [
|
||||||
|
(plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier)
|
||||||
|
for plugin in all_plugins
|
||||||
|
if plugin.source == PluginInstallationSource.Marketplace and plugin.plugin_id not in exclude_plugins
|
||||||
|
]
|
||||||
|
elif upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.ALL:
|
||||||
|
all_plugins = manager.list_plugins(tenant_id)
|
||||||
|
plugin_ids = [
|
||||||
|
(plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier)
|
||||||
|
for plugin in all_plugins
|
||||||
|
if plugin.source == PluginInstallationSource.Marketplace
|
||||||
|
]
|
||||||
|
|
||||||
|
if not plugin_ids:
|
||||||
|
return
|
||||||
|
|
||||||
|
plugin_ids_plain_list = [plugin_id for plugin_id, _, _ in plugin_ids]
|
||||||
|
|
||||||
|
manifests = marketplace_batch_fetch_plugin_manifests(plugin_ids_plain_list)
|
||||||
|
|
||||||
|
if not manifests:
|
||||||
|
return
|
||||||
|
|
||||||
|
for manifest in manifests:
|
||||||
|
for plugin_id, version, original_unique_identifier in plugin_ids:
|
||||||
|
if manifest.plugin_id != plugin_id:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
current_version = version
|
||||||
|
latest_version = manifest.latest_version
|
||||||
|
|
||||||
|
def fix_only_checker(latest_version, current_version):
|
||||||
|
latest_version_tuple = tuple(int(val) for val in latest_version.split("."))
|
||||||
|
current_version_tuple = tuple(int(val) for val in current_version.split("."))
|
||||||
|
|
||||||
|
if (
|
||||||
|
latest_version_tuple[0] == current_version_tuple[0]
|
||||||
|
and latest_version_tuple[1] == current_version_tuple[1]
|
||||||
|
):
|
||||||
|
return latest_version_tuple[2] != current_version_tuple[2]
|
||||||
|
return False
|
||||||
|
|
||||||
|
version_checker = {
|
||||||
|
TenantPluginAutoUpgradeStrategy.StrategySetting.LATEST: lambda latest_version,
|
||||||
|
current_version: latest_version != current_version,
|
||||||
|
TenantPluginAutoUpgradeStrategy.StrategySetting.FIX_ONLY: fix_only_checker,
|
||||||
|
}
|
||||||
|
|
||||||
|
if version_checker[strategy_setting](latest_version, current_version):
|
||||||
|
# 执行升级
|
||||||
|
new_unique_identifier = manifest.latest_package_identifier
|
||||||
|
|
||||||
|
marketplace.record_install_plugin_event(new_unique_identifier)
|
||||||
|
click.echo(
|
||||||
|
click.style(
|
||||||
|
"Upgrade plugin: {} -> {}".format(original_unique_identifier, new_unique_identifier),
|
||||||
|
fg="green",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
task_start_resp = manager.upgrade_plugin(
|
||||||
|
tenant_id,
|
||||||
|
original_unique_identifier,
|
||||||
|
new_unique_identifier,
|
||||||
|
PluginInstallationSource.Marketplace,
|
||||||
|
{
|
||||||
|
"plugin_unique_identifier": new_unique_identifier,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
click.echo(click.style("Error when upgrading plugin: {}".format(e), fg="red"))
|
||||||
|
traceback.print_exc()
|
||||||
|
break
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
click.echo(click.style("Error when checking upgradable plugin: {}".format(e), fg="red"))
|
||||||
|
traceback.print_exc()
|
||||||
|
return
|
||||||
Loading…
Reference in New Issue
Block a user