From b5e31c0f25623c12fcb281ff48a044857f91b6f0 Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Wed, 21 Jan 2026 16:23:44 +0800 Subject: [PATCH] feat: parallelize asset packing --- .../app_assets/packager/zip_packager.py.md | 14 +++++++++++++ api/core/app_assets/packager/zip_packager.py | 21 ++++++++++++++++--- 2 files changed, 32 insertions(+), 3 deletions(-) create mode 100644 api/agent-notes/core/app_assets/packager/zip_packager.py.md diff --git a/api/agent-notes/core/app_assets/packager/zip_packager.py.md b/api/agent-notes/core/app_assets/packager/zip_packager.py.md new file mode 100644 index 0000000000..cf0d557138 --- /dev/null +++ b/api/agent-notes/core/app_assets/packager/zip_packager.py.md @@ -0,0 +1,14 @@ +# Zip Packager Notes + +## Purpose +- Builds a ZIP archive of asset contents stored via the configured storage backend. + +## Key Decisions +- Packaging writes assets into an in-memory zip buffer returned as bytes. +- Asset fetch + zip writing are executed via a thread pool with a lock guarding `ZipFile` writes. + +## Edge Cases +- ZIP writes are serialized by the lock; storage reads still run in parallel. + +## Tests/Verification +- None yet. diff --git a/api/core/app_assets/packager/zip_packager.py b/api/core/app_assets/packager/zip_packager.py index 5aeb95d53a..09a7d860d7 100644 --- a/api/core/app_assets/packager/zip_packager.py +++ b/api/core/app_assets/packager/zip_packager.py @@ -1,5 +1,7 @@ import io import zipfile +from concurrent.futures import Future, ThreadPoolExecutor +from threading import Lock from typing import TYPE_CHECKING from core.app_assets.entities import AssetItem @@ -20,8 +22,21 @@ class ZipPackager(AssetPackager): zip_buffer = io.BytesIO() with zipfile.ZipFile(zip_buffer, "w", zipfile.ZIP_DEFLATED) as zf: - for asset in assets: - content = self._storage.load_once(asset.get_storage_key()) - zf.writestr(asset.path, content) + lock = Lock() + # FOR DELVELPMENT AND TESTING ONLY, TODO: optimize + with ThreadPoolExecutor(max_workers=8) as executor: + futures: list[Future[None]] = [] + for asset in assets: + + def _write_asset(a: AssetItem) -> None: + content = self._storage.load_once(a.get_storage_key()) + with lock: + zf.writestr(a.path, content) + + futures.append(executor.submit(_write_asset, asset)) + + # Wait for all futures to complete + for future in futures: + future.result() return zip_buffer.getvalue()