dify/api/services/attachment_service.py
Asuka Minato af99414fc1
chore: port isinstance to match case (#37271)
Co-authored-by: WH-2099 <wh2099@pm.me>
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-06-18 00:18:03 +00:00

34 lines
1.1 KiB
Python

import base64
from sqlalchemy import Engine, select
from sqlalchemy.orm import sessionmaker
from werkzeug.exceptions import NotFound
from extensions.ext_storage import storage
from models.model import UploadFile
PREVIEW_WORDS_LIMIT = 3000
class AttachmentService:
_session_maker: sessionmaker
def __init__(self, session_factory: sessionmaker | Engine | None = None):
match session_factory:
case Engine():
self._session_maker = sessionmaker(bind=session_factory)
case sessionmaker():
self._session_maker = session_factory
case _:
raise AssertionError("must be a sessionmaker or an Engine.")
def get_file_base64(self, file_id: str) -> str:
with self._session_maker(expire_on_commit=False) as session:
upload_file = session.scalar(select(UploadFile).where(UploadFile.id == file_id).limit(1))
if not upload_file:
raise NotFound("File not found")
upload_file_key = upload_file.key
blob = storage.load_once(upload_file_key)
return base64.b64encode(blob).decode()