feat: add archive storage client and env config (#30422)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
非法操作 2025-12-31 16:14:46 +08:00 committed by GitHub
parent 2bb1e24fb4
commit 3015e9be73
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 689 additions and 0 deletions

View File

@ -101,6 +101,15 @@ S3_ACCESS_KEY=your-access-key
S3_SECRET_KEY=your-secret-key
S3_REGION=your-region
# Workflow run and Conversation archive storage (S3-compatible)
ARCHIVE_STORAGE_ENABLED=false
ARCHIVE_STORAGE_ENDPOINT=
ARCHIVE_STORAGE_ARCHIVE_BUCKET=
ARCHIVE_STORAGE_EXPORT_BUCKET=
ARCHIVE_STORAGE_ACCESS_KEY=
ARCHIVE_STORAGE_SECRET_KEY=
ARCHIVE_STORAGE_REGION=auto
# Azure Blob Storage configuration
AZURE_BLOB_ACCOUNT_NAME=your-account-name
AZURE_BLOB_ACCOUNT_KEY=your-account-key

View File

@ -1,9 +1,11 @@
from configs.extra.archive_config import ArchiveStorageConfig
from configs.extra.notion_config import NotionConfig
from configs.extra.sentry_config import SentryConfig
class ExtraServiceConfig(
# place the configs in alphabet order
ArchiveStorageConfig,
NotionConfig,
SentryConfig,
):

View File

@ -0,0 +1,43 @@
from pydantic import Field
from pydantic_settings import BaseSettings
class ArchiveStorageConfig(BaseSettings):
"""
Configuration settings for workflow run logs archiving storage.
"""
ARCHIVE_STORAGE_ENABLED: bool = Field(
description="Enable workflow run logs archiving to S3-compatible storage",
default=False,
)
ARCHIVE_STORAGE_ENDPOINT: str | None = Field(
description="URL of the S3-compatible storage endpoint (e.g., 'https://storage.example.com')",
default=None,
)
ARCHIVE_STORAGE_ARCHIVE_BUCKET: str | None = Field(
description="Name of the bucket to store archived workflow logs",
default=None,
)
ARCHIVE_STORAGE_EXPORT_BUCKET: str | None = Field(
description="Name of the bucket to store exported workflow runs",
default=None,
)
ARCHIVE_STORAGE_ACCESS_KEY: str | None = Field(
description="Access key ID for authenticating with storage",
default=None,
)
ARCHIVE_STORAGE_SECRET_KEY: str | None = Field(
description="Secret access key for authenticating with storage",
default=None,
)
ARCHIVE_STORAGE_REGION: str = Field(
description="Region for storage (use 'auto' if the provider supports it)",
default="auto",
)

347
api/libs/archive_storage.py Normal file
View File

@ -0,0 +1,347 @@
"""
Archive Storage Client for S3-compatible storage.
This module provides a dedicated storage client for archiving or exporting logs
to S3-compatible object storage.
"""
import base64
import datetime
import gzip
import hashlib
import logging
from collections.abc import Generator
from typing import Any, cast
import boto3
import orjson
from botocore.client import Config
from botocore.exceptions import ClientError
from configs import dify_config
logger = logging.getLogger(__name__)
class ArchiveStorageError(Exception):
"""Base exception for archive storage operations."""
pass
class ArchiveStorageNotConfiguredError(ArchiveStorageError):
"""Raised when archive storage is not properly configured."""
pass
class ArchiveStorage:
"""
S3-compatible storage client for archiving or exporting.
This client provides methods for storing and retrieving archived data in JSONL+gzip format.
"""
def __init__(self, bucket: str):
if not dify_config.ARCHIVE_STORAGE_ENABLED:
raise ArchiveStorageNotConfiguredError("Archive storage is not enabled")
if not bucket:
raise ArchiveStorageNotConfiguredError("Archive storage bucket is not configured")
if not all(
[
dify_config.ARCHIVE_STORAGE_ENDPOINT,
bucket,
dify_config.ARCHIVE_STORAGE_ACCESS_KEY,
dify_config.ARCHIVE_STORAGE_SECRET_KEY,
]
):
raise ArchiveStorageNotConfiguredError(
"Archive storage configuration is incomplete. "
"Required: ARCHIVE_STORAGE_ENDPOINT, ARCHIVE_STORAGE_ACCESS_KEY, "
"ARCHIVE_STORAGE_SECRET_KEY, and a bucket name"
)
self.bucket = bucket
self.client = boto3.client(
"s3",
endpoint_url=dify_config.ARCHIVE_STORAGE_ENDPOINT,
aws_access_key_id=dify_config.ARCHIVE_STORAGE_ACCESS_KEY,
aws_secret_access_key=dify_config.ARCHIVE_STORAGE_SECRET_KEY,
region_name=dify_config.ARCHIVE_STORAGE_REGION,
config=Config(s3={"addressing_style": "path"}),
)
# Verify bucket accessibility
try:
self.client.head_bucket(Bucket=self.bucket)
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code")
if error_code == "404":
raise ArchiveStorageNotConfiguredError(f"Archive bucket '{self.bucket}' does not exist")
elif error_code == "403":
raise ArchiveStorageNotConfiguredError(f"Access denied to archive bucket '{self.bucket}'")
else:
raise ArchiveStorageError(f"Failed to access archive bucket: {e}")
def put_object(self, key: str, data: bytes) -> str:
"""
Upload an object to the archive storage.
Args:
key: Object key (path) within the bucket
data: Binary data to upload
Returns:
MD5 checksum of the uploaded data
Raises:
ArchiveStorageError: If upload fails
"""
checksum = hashlib.md5(data).hexdigest()
try:
self.client.put_object(
Bucket=self.bucket,
Key=key,
Body=data,
ContentMD5=self._content_md5(data),
)
logger.debug("Uploaded object: %s (size=%d, checksum=%s)", key, len(data), checksum)
return checksum
except ClientError as e:
raise ArchiveStorageError(f"Failed to upload object '{key}': {e}")
def get_object(self, key: str) -> bytes:
"""
Download an object from the archive storage.
Args:
key: Object key (path) within the bucket
Returns:
Binary data of the object
Raises:
ArchiveStorageError: If download fails
FileNotFoundError: If object does not exist
"""
try:
response = self.client.get_object(Bucket=self.bucket, Key=key)
return response["Body"].read()
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code")
if error_code == "NoSuchKey":
raise FileNotFoundError(f"Archive object not found: {key}")
raise ArchiveStorageError(f"Failed to download object '{key}': {e}")
def get_object_stream(self, key: str) -> Generator[bytes, None, None]:
"""
Stream an object from the archive storage.
Args:
key: Object key (path) within the bucket
Yields:
Chunks of binary data
Raises:
ArchiveStorageError: If download fails
FileNotFoundError: If object does not exist
"""
try:
response = self.client.get_object(Bucket=self.bucket, Key=key)
yield from response["Body"].iter_chunks()
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code")
if error_code == "NoSuchKey":
raise FileNotFoundError(f"Archive object not found: {key}")
raise ArchiveStorageError(f"Failed to stream object '{key}': {e}")
def object_exists(self, key: str) -> bool:
"""
Check if an object exists in the archive storage.
Args:
key: Object key (path) within the bucket
Returns:
True if object exists, False otherwise
"""
try:
self.client.head_object(Bucket=self.bucket, Key=key)
return True
except ClientError:
return False
def delete_object(self, key: str) -> None:
"""
Delete an object from the archive storage.
Args:
key: Object key (path) within the bucket
Raises:
ArchiveStorageError: If deletion fails
"""
try:
self.client.delete_object(Bucket=self.bucket, Key=key)
logger.debug("Deleted object: %s", key)
except ClientError as e:
raise ArchiveStorageError(f"Failed to delete object '{key}': {e}")
def generate_presigned_url(self, key: str, expires_in: int = 3600) -> str:
"""
Generate a pre-signed URL for downloading an object.
Args:
key: Object key (path) within the bucket
expires_in: URL validity duration in seconds (default: 1 hour)
Returns:
Pre-signed URL string.
Raises:
ArchiveStorageError: If generation fails
"""
try:
return self.client.generate_presigned_url(
ClientMethod="get_object",
Params={"Bucket": self.bucket, "Key": key},
ExpiresIn=expires_in,
)
except ClientError as e:
raise ArchiveStorageError(f"Failed to generate pre-signed URL for '{key}': {e}")
def list_objects(self, prefix: str) -> list[str]:
"""
List objects under a given prefix.
Args:
prefix: Object key prefix to filter by
Returns:
List of object keys matching the prefix
"""
keys = []
paginator = self.client.get_paginator("list_objects_v2")
try:
for page in paginator.paginate(Bucket=self.bucket, Prefix=prefix):
for obj in page.get("Contents", []):
keys.append(obj["Key"])
except ClientError as e:
raise ArchiveStorageError(f"Failed to list objects with prefix '{prefix}': {e}")
return keys
@staticmethod
def _content_md5(data: bytes) -> str:
"""Calculate base64-encoded MD5 for Content-MD5 header."""
return base64.b64encode(hashlib.md5(data).digest()).decode()
@staticmethod
def serialize_to_jsonl_gz(records: list[dict[str, Any]]) -> bytes:
"""
Serialize records to gzipped JSONL format.
Args:
records: List of dictionaries to serialize
Returns:
Gzipped JSONL bytes
"""
lines = []
for record in records:
# Convert datetime objects to ISO format strings
serialized = ArchiveStorage._serialize_record(record)
lines.append(orjson.dumps(serialized))
jsonl_content = b"\n".join(lines)
if jsonl_content:
jsonl_content += b"\n"
return gzip.compress(jsonl_content)
@staticmethod
def deserialize_from_jsonl_gz(data: bytes) -> list[dict[str, Any]]:
"""
Deserialize gzipped JSONL data to records.
Args:
data: Gzipped JSONL bytes
Returns:
List of dictionaries
"""
jsonl_content = gzip.decompress(data)
records = []
for line in jsonl_content.splitlines():
if line:
records.append(orjson.loads(line))
return records
@staticmethod
def _serialize_record(record: dict[str, Any]) -> dict[str, Any]:
"""Serialize a single record, converting special types."""
def _serialize(item: Any) -> Any:
if isinstance(item, datetime.datetime):
return item.isoformat()
if isinstance(item, dict):
return {key: _serialize(value) for key, value in item.items()}
if isinstance(item, list):
return [_serialize(value) for value in item]
return item
return cast(dict[str, Any], _serialize(record))
@staticmethod
def compute_checksum(data: bytes) -> str:
"""Compute MD5 checksum of data."""
return hashlib.md5(data).hexdigest()
# Singleton instance (lazy initialization)
_archive_storage: ArchiveStorage | None = None
_export_storage: ArchiveStorage | None = None
def get_archive_storage() -> ArchiveStorage:
"""
Get the archive storage singleton instance.
Returns:
ArchiveStorage instance
Raises:
ArchiveStorageNotConfiguredError: If archive storage is not configured
"""
global _archive_storage
if _archive_storage is None:
archive_bucket = dify_config.ARCHIVE_STORAGE_ARCHIVE_BUCKET
if not archive_bucket:
raise ArchiveStorageNotConfiguredError(
"Archive storage bucket is not configured. Required: ARCHIVE_STORAGE_ARCHIVE_BUCKET"
)
_archive_storage = ArchiveStorage(bucket=archive_bucket)
return _archive_storage
def get_export_storage() -> ArchiveStorage:
"""
Get the export storage singleton instance.
Returns:
ArchiveStorage instance
"""
global _export_storage
if _export_storage is None:
export_bucket = dify_config.ARCHIVE_STORAGE_EXPORT_BUCKET
if not export_bucket:
raise ArchiveStorageNotConfiguredError(
"Archive export bucket is not configured. Required: ARCHIVE_STORAGE_EXPORT_BUCKET"
)
_export_storage = ArchiveStorage(bucket=export_bucket)
return _export_storage

View File

@ -0,0 +1,272 @@
import base64
import hashlib
from datetime import datetime
from unittest.mock import ANY, MagicMock
import pytest
from botocore.exceptions import ClientError
from libs import archive_storage as storage_module
from libs.archive_storage import (
ArchiveStorage,
ArchiveStorageError,
ArchiveStorageNotConfiguredError,
)
BUCKET_NAME = "archive-bucket"
def _configure_storage(monkeypatch, **overrides):
defaults = {
"ARCHIVE_STORAGE_ENABLED": True,
"ARCHIVE_STORAGE_ENDPOINT": "https://storage.example.com",
"ARCHIVE_STORAGE_ARCHIVE_BUCKET": BUCKET_NAME,
"ARCHIVE_STORAGE_ACCESS_KEY": "access",
"ARCHIVE_STORAGE_SECRET_KEY": "secret",
"ARCHIVE_STORAGE_REGION": "auto",
}
defaults.update(overrides)
for key, value in defaults.items():
monkeypatch.setattr(storage_module.dify_config, key, value, raising=False)
def _client_error(code: str) -> ClientError:
return ClientError({"Error": {"Code": code}}, "Operation")
def _mock_client(monkeypatch):
client = MagicMock()
client.head_bucket.return_value = None
boto_client = MagicMock(return_value=client)
monkeypatch.setattr(storage_module.boto3, "client", boto_client)
return client, boto_client
def test_init_disabled(monkeypatch):
_configure_storage(monkeypatch, ARCHIVE_STORAGE_ENABLED=False)
with pytest.raises(ArchiveStorageNotConfiguredError, match="not enabled"):
ArchiveStorage(bucket=BUCKET_NAME)
def test_init_missing_config(monkeypatch):
_configure_storage(monkeypatch, ARCHIVE_STORAGE_ENDPOINT=None)
with pytest.raises(ArchiveStorageNotConfiguredError, match="incomplete"):
ArchiveStorage(bucket=BUCKET_NAME)
def test_init_bucket_not_found(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
client.head_bucket.side_effect = _client_error("404")
with pytest.raises(ArchiveStorageNotConfiguredError, match="does not exist"):
ArchiveStorage(bucket=BUCKET_NAME)
def test_init_bucket_access_denied(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
client.head_bucket.side_effect = _client_error("403")
with pytest.raises(ArchiveStorageNotConfiguredError, match="Access denied"):
ArchiveStorage(bucket=BUCKET_NAME)
def test_init_bucket_other_error(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
client.head_bucket.side_effect = _client_error("500")
with pytest.raises(ArchiveStorageError, match="Failed to access archive bucket"):
ArchiveStorage(bucket=BUCKET_NAME)
def test_init_sets_client(monkeypatch):
_configure_storage(monkeypatch)
client, boto_client = _mock_client(monkeypatch)
storage = ArchiveStorage(bucket=BUCKET_NAME)
boto_client.assert_called_once_with(
"s3",
endpoint_url="https://storage.example.com",
aws_access_key_id="access",
aws_secret_access_key="secret",
region_name="auto",
config=ANY,
)
assert storage.client is client
assert storage.bucket == BUCKET_NAME
def test_put_object_returns_checksum(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
storage = ArchiveStorage(bucket=BUCKET_NAME)
data = b"hello"
checksum = storage.put_object("key", data)
expected_md5 = hashlib.md5(data).hexdigest()
expected_content_md5 = base64.b64encode(hashlib.md5(data).digest()).decode()
client.put_object.assert_called_once_with(
Bucket="archive-bucket",
Key="key",
Body=data,
ContentMD5=expected_content_md5,
)
assert checksum == expected_md5
def test_put_object_raises_on_error(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
storage = ArchiveStorage(bucket=BUCKET_NAME)
client.put_object.side_effect = _client_error("500")
with pytest.raises(ArchiveStorageError, match="Failed to upload object"):
storage.put_object("key", b"data")
def test_get_object_returns_bytes(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
body = MagicMock()
body.read.return_value = b"payload"
client.get_object.return_value = {"Body": body}
storage = ArchiveStorage(bucket=BUCKET_NAME)
assert storage.get_object("key") == b"payload"
def test_get_object_missing(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
client.get_object.side_effect = _client_error("NoSuchKey")
storage = ArchiveStorage(bucket=BUCKET_NAME)
with pytest.raises(FileNotFoundError, match="Archive object not found"):
storage.get_object("missing")
def test_get_object_stream(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
body = MagicMock()
body.iter_chunks.return_value = [b"a", b"b"]
client.get_object.return_value = {"Body": body}
storage = ArchiveStorage(bucket=BUCKET_NAME)
assert list(storage.get_object_stream("key")) == [b"a", b"b"]
def test_get_object_stream_missing(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
client.get_object.side_effect = _client_error("NoSuchKey")
storage = ArchiveStorage(bucket=BUCKET_NAME)
with pytest.raises(FileNotFoundError, match="Archive object not found"):
list(storage.get_object_stream("missing"))
def test_object_exists(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
storage = ArchiveStorage(bucket=BUCKET_NAME)
assert storage.object_exists("key") is True
client.head_object.side_effect = _client_error("404")
assert storage.object_exists("missing") is False
def test_delete_object_error(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
client.delete_object.side_effect = _client_error("500")
storage = ArchiveStorage(bucket=BUCKET_NAME)
with pytest.raises(ArchiveStorageError, match="Failed to delete object"):
storage.delete_object("key")
def test_list_objects(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
paginator = MagicMock()
paginator.paginate.return_value = [
{"Contents": [{"Key": "a"}, {"Key": "b"}]},
{"Contents": [{"Key": "c"}]},
]
client.get_paginator.return_value = paginator
storage = ArchiveStorage(bucket=BUCKET_NAME)
assert storage.list_objects("prefix") == ["a", "b", "c"]
paginator.paginate.assert_called_once_with(Bucket="archive-bucket", Prefix="prefix")
def test_list_objects_error(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
paginator = MagicMock()
paginator.paginate.side_effect = _client_error("500")
client.get_paginator.return_value = paginator
storage = ArchiveStorage(bucket=BUCKET_NAME)
with pytest.raises(ArchiveStorageError, match="Failed to list objects"):
storage.list_objects("prefix")
def test_generate_presigned_url(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
client.generate_presigned_url.return_value = "http://signed-url"
storage = ArchiveStorage(bucket=BUCKET_NAME)
url = storage.generate_presigned_url("key", expires_in=123)
client.generate_presigned_url.assert_called_once_with(
ClientMethod="get_object",
Params={"Bucket": "archive-bucket", "Key": "key"},
ExpiresIn=123,
)
assert url == "http://signed-url"
def test_generate_presigned_url_error(monkeypatch):
_configure_storage(monkeypatch)
client, _ = _mock_client(monkeypatch)
client.generate_presigned_url.side_effect = _client_error("500")
storage = ArchiveStorage(bucket=BUCKET_NAME)
with pytest.raises(ArchiveStorageError, match="Failed to generate pre-signed URL"):
storage.generate_presigned_url("key")
def test_serialization_roundtrip():
records = [
{
"id": "1",
"created_at": datetime(2024, 1, 1, 12, 0, 0),
"payload": {"nested": "value"},
"items": [{"name": "a"}],
},
{"id": "2", "value": 123},
]
data = ArchiveStorage.serialize_to_jsonl_gz(records)
decoded = ArchiveStorage.deserialize_from_jsonl_gz(data)
assert decoded[0]["id"] == "1"
assert decoded[0]["payload"]["nested"] == "value"
assert decoded[0]["items"][0]["name"] == "a"
assert "2024-01-01T12:00:00" in decoded[0]["created_at"]
assert decoded[1]["value"] == 123
def test_content_md5_matches_checksum():
data = b"checksum"
expected = base64.b64encode(hashlib.md5(data).digest()).decode()
assert ArchiveStorage._content_md5(data) == expected
assert ArchiveStorage.compute_checksum(data) == hashlib.md5(data).hexdigest()

View File

@ -447,6 +447,15 @@ S3_SECRET_KEY=
# If set to false, the access key and secret key must be provided.
S3_USE_AWS_MANAGED_IAM=false
# Workflow run and Conversation archive storage (S3-compatible)
ARCHIVE_STORAGE_ENABLED=false
ARCHIVE_STORAGE_ENDPOINT=
ARCHIVE_STORAGE_ARCHIVE_BUCKET=
ARCHIVE_STORAGE_EXPORT_BUCKET=
ARCHIVE_STORAGE_ACCESS_KEY=
ARCHIVE_STORAGE_SECRET_KEY=
ARCHIVE_STORAGE_REGION=auto
# Azure Blob Configuration
#
AZURE_BLOB_ACCOUNT_NAME=difyai

View File

@ -122,6 +122,13 @@ x-shared-env: &shared-api-worker-env
S3_ACCESS_KEY: ${S3_ACCESS_KEY:-}
S3_SECRET_KEY: ${S3_SECRET_KEY:-}
S3_USE_AWS_MANAGED_IAM: ${S3_USE_AWS_MANAGED_IAM:-false}
ARCHIVE_STORAGE_ENABLED: ${ARCHIVE_STORAGE_ENABLED:-false}
ARCHIVE_STORAGE_ENDPOINT: ${ARCHIVE_STORAGE_ENDPOINT:-}
ARCHIVE_STORAGE_ARCHIVE_BUCKET: ${ARCHIVE_STORAGE_ARCHIVE_BUCKET:-}
ARCHIVE_STORAGE_EXPORT_BUCKET: ${ARCHIVE_STORAGE_EXPORT_BUCKET:-}
ARCHIVE_STORAGE_ACCESS_KEY: ${ARCHIVE_STORAGE_ACCESS_KEY:-}
ARCHIVE_STORAGE_SECRET_KEY: ${ARCHIVE_STORAGE_SECRET_KEY:-}
ARCHIVE_STORAGE_REGION: ${ARCHIVE_STORAGE_REGION:-auto}
AZURE_BLOB_ACCOUNT_NAME: ${AZURE_BLOB_ACCOUNT_NAME:-difyai}
AZURE_BLOB_ACCOUNT_KEY: ${AZURE_BLOB_ACCOUNT_KEY:-difyai}
AZURE_BLOB_CONTAINER_NAME: ${AZURE_BLOB_CONTAINER_NAME:-difyai-container}