merge main

This commit is contained in:
Joel 2025-07-30 10:26:31 +08:00
commit bd04ddd544
488 changed files with 5751 additions and 2074 deletions

View File

@ -49,8 +49,8 @@ jobs:
if: steps.changed-files.outputs.any_changed == 'true'
run: |
uv run --directory api ruff --version
uv run --directory api ruff check --diff ./
uv run --directory api ruff format --check --diff ./
uv run --directory api ruff check ./
uv run --directory api ruff format --check ./
- name: Dotenv check
if: steps.changed-files.outputs.any_changed == 'true'

View File

@ -241,7 +241,7 @@ One-Click deploy Dify to Alibaba Cloud with [Alibaba Cloud Data Management](http
For those who'd like to contribute code, see our [Contribution Guide](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md).
At the same time, please consider supporting Dify by sharing it on social media and at events and conferences.
> We are looking for contributors to help translate Dify into languages other than Mandarin or English. If you are interested in helping, please see the [i18n README](https://github.com/langgenius/dify/blob/main/web/i18n/README.md) for more information, and leave us a comment in the `global-users` channel of our [Discord Community Server](https://discord.gg/8Tpq4AcN9c).
> We are looking for contributors to help translate Dify into languages other than Mandarin or English. If you are interested in helping, please see the [i18n README](https://github.com/langgenius/dify/blob/main/web/i18n-config/README.md) for more information, and leave us a comment in the `global-users` channel of our [Discord Community Server](https://discord.gg/8Tpq4AcN9c).
## Community & contact

View File

@ -223,7 +223,7 @@ docker compose up -d
لأولئك الذين يرغبون في المساهمة، انظر إلى [دليل المساهمة](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md) لدينا.
في الوقت نفسه، يرجى النظر في دعم Dify عن طريق مشاركته على وسائل التواصل الاجتماعي وفي الفعاليات والمؤتمرات.
> نحن نبحث عن مساهمين لمساعدة في ترجمة Dify إلى لغات أخرى غير اللغة الصينية المندرين أو الإنجليزية. إذا كنت مهتمًا بالمساعدة، يرجى الاطلاع على [README للترجمة](https://github.com/langgenius/dify/blob/main/web/i18n/README.md) لمزيد من المعلومات، واترك لنا تعليقًا في قناة `global-users` على [خادم المجتمع على Discord](https://discord.gg/8Tpq4AcN9c).
> نحن نبحث عن مساهمين لمساعدة في ترجمة Dify إلى لغات أخرى غير اللغة الصينية المندرين أو الإنجليزية. إذا كنت مهتمًا بالمساعدة، يرجى الاطلاع على [README للترجمة](https://github.com/langgenius/dify/blob/main/web/i18n-config/README.md) لمزيد من المعلومات، واترك لنا تعليقًا في قناة `global-users` على [خادم المجتمع على Discord](https://discord.gg/8Tpq4AcN9c).
**المساهمون**

View File

@ -241,7 +241,7 @@ GitHub-এ ডিফাইকে স্টার দিয়ে রাখুন
যারা কোড অবদান রাখতে চান, তাদের জন্য আমাদের [অবদান নির্দেশিকা] দেখুন (https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md)।
একই সাথে, সোশ্যাল মিডিয়া এবং ইভেন্ট এবং কনফারেন্সে এটি শেয়ার করে Dify কে সমর্থন করুন।
> আমরা ম্যান্ডারিন বা ইংরেজি ছাড়া অন্য ভাষায় Dify অনুবাদ করতে সাহায্য করার জন্য অবদানকারীদের খুঁজছি। আপনি যদি সাহায্য করতে আগ্রহী হন, তাহলে আরও তথ্যের জন্য [i18n README](https://github.com/langgenius/dify/blob/main/web/i18n/README.md) দেখুন এবং আমাদের [ডিসকর্ড কমিউনিটি সার্ভার](https://discord.gg/8Tpq4AcN9c) এর `গ্লোবাল-ইউজারস` চ্যানেলে আমাদের একটি মন্তব্য করুন।
> আমরা ম্যান্ডারিন বা ইংরেজি ছাড়া অন্য ভাষায় Dify অনুবাদ করতে সাহায্য করার জন্য অবদানকারীদের খুঁজছি। আপনি যদি সাহায্য করতে আগ্রহী হন, তাহলে আরও তথ্যের জন্য [i18n README](https://github.com/langgenius/dify/blob/main/web/i18n-config/README.md) দেখুন এবং আমাদের [ডিসকর্ড কমিউনিটি সার্ভার](https://discord.gg/8Tpq4AcN9c) এর `গ্লোবাল-ইউজারস` চ্যানেলে আমাদের একটি মন্তব্য করুন।
## কমিউনিটি এবং যোগাযোগ

View File

@ -244,7 +244,7 @@ docker compose up -d
对于那些想要贡献代码的人,请参阅我们的[贡献指南](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md)。
同时,请考虑通过社交媒体、活动和会议来支持 Dify 的分享。
> 我们正在寻找贡献者来帮助将 Dify 翻译成除了中文和英文之外的其他语言。如果您有兴趣帮助,请参阅我们的[i18n README](https://github.com/langgenius/dify/blob/main/web/i18n/README.md)获取更多信息,并在我们的[Discord 社区服务器](https://discord.gg/8Tpq4AcN9c)的`global-users`频道中留言。
> 我们正在寻找贡献者来帮助将 Dify 翻译成除了中文和英文之外的其他语言。如果您有兴趣帮助,请参阅我们的[i18n README](https://github.com/langgenius/dify/blob/main/web/i18n-config/README.md)获取更多信息,并在我们的[Discord 社区服务器](https://discord.gg/8Tpq4AcN9c)的`global-users`频道中留言。
**Contributors**

View File

@ -236,7 +236,7 @@ Ein-Klick-Bereitstellung von Dify in der Alibaba Cloud mit [Alibaba Cloud Data M
Falls Sie Code beitragen möchten, lesen Sie bitte unseren [Contribution Guide](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md). Gleichzeitig bitten wir Sie, Dify zu unterstützen, indem Sie es in den sozialen Medien teilen und auf Veranstaltungen und Konferenzen präsentieren.
> Wir suchen Mitwirkende, die dabei helfen, Dify in weitere Sprachen zu übersetzen außer Mandarin oder Englisch. Wenn Sie Interesse an einer Mitarbeit haben, lesen Sie bitte die [i18n README](https://github.com/langgenius/dify/blob/main/web/i18n/README.md) für weitere Informationen und hinterlassen Sie einen Kommentar im `global-users`-Kanal unseres [Discord Community Servers](https://discord.gg/8Tpq4AcN9c).
> Wir suchen Mitwirkende, die dabei helfen, Dify in weitere Sprachen zu übersetzen außer Mandarin oder Englisch. Wenn Sie Interesse an einer Mitarbeit haben, lesen Sie bitte die [i18n README](https://github.com/langgenius/dify/blob/main/web/i18n-config/README.md) für weitere Informationen und hinterlassen Sie einen Kommentar im `global-users`-Kanal unseres [Discord Community Servers](https://discord.gg/8Tpq4AcN9c).
## Gemeinschaft & Kontakt

View File

@ -237,7 +237,7 @@ Para aquellos que deseen contribuir con código, consulten nuestra [Guía de con
Al mismo tiempo, considera apoyar a Dify compartiéndolo en redes sociales y en eventos y conferencias.
> Estamos buscando colaboradores para ayudar con la traducción de Dify a idiomas que no sean el mandarín o el inglés. Si estás interesado en ayudar, consulta el [README de i18n](https://github.com/langgenius/dify/blob/main/web/i18n/README.md) para obtener más información y déjanos un comentario en el canal `global-users` de nuestro [Servidor de Comunidad en Discord](https://discord.gg/8Tpq4AcN9c).
> Estamos buscando colaboradores para ayudar con la traducción de Dify a idiomas que no sean el mandarín o el inglés. Si estás interesado en ayudar, consulta el [README de i18n](https://github.com/langgenius/dify/blob/main/web/i18n-config/README.md) para obtener más información y déjanos un comentario en el canal `global-users` de nuestro [Servidor de Comunidad en Discord](https://discord.gg/8Tpq4AcN9c).
**Contribuidores**

View File

@ -235,7 +235,7 @@ Pour ceux qui souhaitent contribuer du code, consultez notre [Guide de contribut
Dans le même temps, veuillez envisager de soutenir Dify en le partageant sur les réseaux sociaux et lors d'événements et de conférences.
> Nous recherchons des contributeurs pour aider à traduire Dify dans des langues autres que le mandarin ou l'anglais. Si vous êtes intéressé à aider, veuillez consulter le [README i18n](https://github.com/langgenius/dify/blob/main/web/i18n/README.md) pour plus d'informations, et laissez-nous un commentaire dans le canal `global-users` de notre [Serveur communautaire Discord](https://discord.gg/8Tpq4AcN9c).
> Nous recherchons des contributeurs pour aider à traduire Dify dans des langues autres que le mandarin ou l'anglais. Si vous êtes intéressé à aider, veuillez consulter le [README i18n](https://github.com/langgenius/dify/blob/main/web/i18n-config/README.md) pour plus d'informations, et laissez-nous un commentaire dans le canal `global-users` de notre [Serveur communautaire Discord](https://discord.gg/8Tpq4AcN9c).
**Contributeurs**

View File

@ -234,7 +234,7 @@ docker compose up -d
同時に、DifyをSNSやイベント、カンファレンスで共有してサポートしていただけると幸いです。
> Difyを英語または中国語以外の言語に翻訳してくれる貢献者を募集しています。興味がある場合は、詳細については[i18n README](https://github.com/langgenius/dify/blob/main/web/i18n/README.md)を参照してください。また、[Discordコミュニティサーバー](https://discord.gg/8Tpq4AcN9c)の`global-users`チャンネルにコメントを残してください。
> Difyを英語または中国語以外の言語に翻訳してくれる貢献者を募集しています。興味がある場合は、詳細については[i18n README](https://github.com/langgenius/dify/blob/main/web/i18n-config/README.md)を参照してください。また、[Discordコミュニティサーバー](https://discord.gg/8Tpq4AcN9c)の`global-users`チャンネルにコメントを残してください。
**貢献者**

View File

@ -235,7 +235,7 @@ For those who'd like to contribute code, see our [Contribution Guide](https://gi
At the same time, please consider supporting Dify by sharing it on social media and at events and conferences.
> We are looking for contributors to help with translating Dify to languages other than Mandarin or English. If you are interested in helping, please see the [i18n README](https://github.com/langgenius/dify/blob/main/web/i18n/README.md) for more information, and leave us a comment in the `global-users` channel of our [Discord Community Server](https://discord.gg/8Tpq4AcN9c).
> We are looking for contributors to help with translating Dify to languages other than Mandarin or English. If you are interested in helping, please see the [i18n README](https://github.com/langgenius/dify/blob/main/web/i18n-config/README.md) for more information, and leave us a comment in the `global-users` channel of our [Discord Community Server](https://discord.gg/8Tpq4AcN9c).
**Contributors**

View File

@ -229,7 +229,7 @@ Dify를 Kubernetes에 배포하고 프리미엄 스케일링 설정을 구성했
동시에 Dify를 소셜 미디어와 행사 및 컨퍼런스에 공유하여 지원하는 것을 고려해 주시기 바랍니다.
> 우리는 Dify를 중국어나 영어 이외의 언어로 번역하는 데 도움을 줄 수 있는 기여자를 찾고 있습니다. 도움을 주고 싶으시다면 [i18n README](https://github.com/langgenius/dify/blob/main/web/i18n/README.md)에서 더 많은 정보를 확인하시고 [Discord 커뮤니티 서버](https://discord.gg/8Tpq4AcN9c)의 `global-users` 채널에 댓글을 남겨주세요.
> 우리는 Dify를 중국어나 영어 이외의 언어로 번역하는 데 도움을 줄 수 있는 기여자를 찾고 있습니다. 도움을 주고 싶으시다면 [i18n README](https://github.com/langgenius/dify/blob/main/web/i18n-config/README.md)에서 더 많은 정보를 확인하시고 [Discord 커뮤니티 서버](https://discord.gg/8Tpq4AcN9c)의 `global-users` 채널에 댓글을 남겨주세요.
**기여자**

View File

@ -233,7 +233,7 @@ Implante o Dify na Alibaba Cloud com um clique usando o [Alibaba Cloud Data Mana
Para aqueles que desejam contribuir com código, veja nosso [Guia de Contribuição](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md).
Ao mesmo tempo, considere apoiar o Dify compartilhando-o nas redes sociais e em eventos e conferências.
> Estamos buscando contribuidores para ajudar na tradução do Dify para idiomas além de Mandarim e Inglês. Se você tiver interesse em ajudar, consulte o [README i18n](https://github.com/langgenius/dify/blob/main/web/i18n/README.md) para mais informações e deixe-nos um comentário no canal `global-users` em nosso [Servidor da Comunidade no Discord](https://discord.gg/8Tpq4AcN9c).
> Estamos buscando contribuidores para ajudar na tradução do Dify para idiomas além de Mandarim e Inglês. Se você tiver interesse em ajudar, consulte o [README i18n](https://github.com/langgenius/dify/blob/main/web/i18n-config/README.md) para mais informações e deixe-nos um comentário no canal `global-users` em nosso [Servidor da Comunidade no Discord](https://discord.gg/8Tpq4AcN9c).
**Contribuidores**

View File

@ -227,7 +227,7 @@ Dify'ı bulut platformuna tek tıklamayla dağıtın [terraform](https://www.ter
Kod katkısında bulunmak isteyenler için [Katkı Kılavuzumuza](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md) bakabilirsiniz.
Aynı zamanda, lütfen Dify'ı sosyal medyada, etkinliklerde ve konferanslarda paylaşarak desteklemeyi düşünün.
> Dify'ı Mandarin veya İngilizce dışındaki dillere çevirmemize yardımcı olacak katkıda bulunanlara ihtiyacımız var. Yardımcı olmakla ilgileniyorsanız, lütfen daha fazla bilgi için [i18n README](https://github.com/langgenius/dify/blob/main/web/i18n/README.md) dosyasına bakın ve [Discord Topluluk Sunucumuzdaki](https://discord.gg/8Tpq4AcN9c) `global-users` kanalında bize bir yorum bırakın.
> Dify'ı Mandarin veya İngilizce dışındaki dillere çevirmemize yardımcı olacak katkıda bulunanlara ihtiyacımız var. Yardımcı olmakla ilgileniyorsanız, lütfen daha fazla bilgi için [i18n README](https://github.com/langgenius/dify/blob/main/web/i18n-config/README.md) dosyasına bakın ve [Discord Topluluk Sunucumuzdaki](https://discord.gg/8Tpq4AcN9c) `global-users` kanalında bize bir yorum bırakın.
**Katkıda Bulunanlar**

View File

@ -239,7 +239,7 @@ Dify 的所有功能都提供相應的 API因此您可以輕鬆地將 Dify
對於想要貢獻程式碼的開發者,請參閱我們的[貢獻指南](https://github.com/langgenius/dify/blob/main/CONTRIBUTING.md)。
同時,也請考慮透過在社群媒體和各種活動與會議上分享 Dify 來支持我們。
> 我們正在尋找貢獻者協助將 Dify 翻譯成中文和英文以外的語言。如果您有興趣幫忙,請查看 [i18n README](https://github.com/langgenius/dify/blob/main/web/i18n/README.md) 獲取更多資訊,並在我們的 [Discord 社群伺服器](https://discord.gg/8Tpq4AcN9c) 的 `global-users` 頻道留言給我們。
> 我們正在尋找貢獻者協助將 Dify 翻譯成中文和英文以外的語言。如果您有興趣幫忙,請查看 [i18n README](https://github.com/langgenius/dify/blob/main/web/i18n-config/README.md) 獲取更多資訊,並在我們的 [Discord 社群伺服器](https://discord.gg/8Tpq4AcN9c) 的 `global-users` 頻道留言給我們。
## 社群與聯絡方式

View File

@ -231,7 +231,7 @@ Triển khai Dify lên Alibaba Cloud chỉ với một cú nhấp chuột bằng
Đồng thời, vui lòng xem xét hỗ trợ Dify bằng cách chia sẻ nó trên mạng xã hội và tại các sự kiện và hội nghị.
> Chúng tôi đang tìm kiếm người đóng góp để giúp dịch Dify sang các ngôn ngữ khác ngoài tiếng Trung hoặc tiếng Anh. Nếu bạn quan tâm đến việc giúp đỡ, vui lòng xem [README i18n](https://github.com/langgenius/dify/blob/main/web/i18n/README.md) để biết thêm thông tin và để lại bình luận cho chúng tôi trong kênh `global-users` của [Máy chủ Cộng đồng Discord](https://discord.gg/8Tpq4AcN9c) của chúng tôi.
> Chúng tôi đang tìm kiếm người đóng góp để giúp dịch Dify sang các ngôn ngữ khác ngoài tiếng Trung hoặc tiếng Anh. Nếu bạn quan tâm đến việc giúp đỡ, vui lòng xem [README i18n](https://github.com/langgenius/dify/blob/main/web/i18n-config/README.md) để biết thêm thông tin và để lại bình luận cho chúng tôi trong kênh `global-users` của [Máy chủ Cộng đồng Discord](https://discord.gg/8Tpq4AcN9c) của chúng tôi.
**Người đóng góp**

View File

@ -4,6 +4,11 @@
# Alternatively you can set it with `SECRET_KEY` environment variable.
SECRET_KEY=
# Ensure UTF-8 encoding
LANG=en_US.UTF-8
LC_ALL=en_US.UTF-8
PYTHONIOENCODING=utf-8
# Console API base URL
CONSOLE_API_URL=http://localhost:5001
CONSOLE_WEB_URL=http://localhost:3000

View File

@ -42,6 +42,8 @@ select = [
"S301", # suspicious-pickle-usage, disallow use of `pickle` and its wrappers.
"S302", # suspicious-marshal-usage, disallow use of `marshal` module
"S311", # suspicious-non-cryptographic-random-usage
"G001", # don't use str format to logging messages
"G004", # don't use f-strings to format logging messages
]
ignore = [

View File

@ -37,6 +37,11 @@ EXPOSE 5001
# set timezone
ENV TZ=UTC
# Set UTF-8 locale
ENV LANG=en_US.UTF-8
ENV LC_ALL=en_US.UTF-8
ENV PYTHONIOENCODING=utf-8
WORKDIR /app/api
RUN \

View File

@ -32,7 +32,7 @@ def create_app() -> DifyApp:
initialize_extensions(app)
end_time = time.perf_counter()
if dify_config.DEBUG:
logging.info(f"Finished create_app ({round((end_time - start_time) * 1000, 2)} ms)")
logging.info("Finished create_app (%s ms)", round((end_time - start_time) * 1000, 2))
return app
@ -91,14 +91,14 @@ def initialize_extensions(app: DifyApp):
is_enabled = ext.is_enabled() if hasattr(ext, "is_enabled") else True
if not is_enabled:
if dify_config.DEBUG:
logging.info(f"Skipped {short_name}")
logging.info("Skipped %s", short_name)
continue
start_time = time.perf_counter()
ext.init_app(app)
end_time = time.perf_counter()
if dify_config.DEBUG:
logging.info(f"Loaded {short_name} ({round((end_time - start_time) * 1000, 2)} ms)")
logging.info("Loaded %s (%s ms)", short_name, round((end_time - start_time) * 1000, 2))
def create_migrations_app():

View File

@ -53,13 +53,13 @@ def reset_password(email, new_password, password_confirm):
account = db.session.query(Account).where(Account.email == email).one_or_none()
if not account:
click.echo(click.style("Account not found for email: {}".format(email), fg="red"))
click.echo(click.style(f"Account not found for email: {email}", fg="red"))
return
try:
valid_password(new_password)
except:
click.echo(click.style("Invalid password. Must match {}".format(password_pattern), fg="red"))
click.echo(click.style(f"Invalid password. Must match {password_pattern}", fg="red"))
return
# generate password salt
@ -92,13 +92,13 @@ def reset_email(email, new_email, email_confirm):
account = db.session.query(Account).where(Account.email == email).one_or_none()
if not account:
click.echo(click.style("Account not found for email: {}".format(email), fg="red"))
click.echo(click.style(f"Account not found for email: {email}", fg="red"))
return
try:
email_validate(new_email)
except:
click.echo(click.style("Invalid email: {}".format(new_email), fg="red"))
click.echo(click.style(f"Invalid email: {new_email}", fg="red"))
return
account.email = new_email
@ -142,7 +142,7 @@ def reset_encrypt_key_pair():
click.echo(
click.style(
"Congratulations! The asymmetric key pair of workspace {} has been reset.".format(tenant.id),
f"Congratulations! The asymmetric key pair of workspace {tenant.id} has been reset.",
fg="green",
)
)
@ -190,14 +190,14 @@ def migrate_annotation_vector_database():
f"Processing the {total_count} app {app.id}. " + f"{create_count} created, {skipped_count} skipped."
)
try:
click.echo("Creating app annotation index: {}".format(app.id))
click.echo(f"Creating app annotation index: {app.id}")
app_annotation_setting = (
db.session.query(AppAnnotationSetting).where(AppAnnotationSetting.app_id == app.id).first()
)
if not app_annotation_setting:
skipped_count = skipped_count + 1
click.echo("App annotation setting disabled: {}".format(app.id))
click.echo(f"App annotation setting disabled: {app.id}")
continue
# get dataset_collection_binding info
dataset_collection_binding = (
@ -206,7 +206,7 @@ def migrate_annotation_vector_database():
.first()
)
if not dataset_collection_binding:
click.echo("App annotation collection binding not found: {}".format(app.id))
click.echo(f"App annotation collection binding not found: {app.id}")
continue
annotations = db.session.query(MessageAnnotation).where(MessageAnnotation.app_id == app.id).all()
dataset = Dataset(
@ -252,9 +252,7 @@ def migrate_annotation_vector_database():
create_count += 1
except Exception as e:
click.echo(
click.style(
"Error creating app annotation index: {} {}".format(e.__class__.__name__, str(e)), fg="red"
)
click.style(f"Error creating app annotation index: {e.__class__.__name__} {str(e)}", fg="red")
)
continue
@ -319,7 +317,7 @@ def migrate_knowledge_vector_database():
f"Processing the {total_count} dataset {dataset.id}. {create_count} created, {skipped_count} skipped."
)
try:
click.echo("Creating dataset vector database index: {}".format(dataset.id))
click.echo(f"Creating dataset vector database index: {dataset.id}")
if dataset.index_struct_dict:
if dataset.index_struct_dict["type"] == vector_type:
skipped_count = skipped_count + 1
@ -423,9 +421,7 @@ def migrate_knowledge_vector_database():
create_count += 1
except Exception as e:
db.session.rollback()
click.echo(
click.style("Error creating dataset index: {} {}".format(e.__class__.__name__, str(e)), fg="red")
)
click.echo(click.style(f"Error creating dataset index: {e.__class__.__name__} {str(e)}", fg="red"))
continue
click.echo(
@ -476,7 +472,7 @@ def convert_to_agent_apps():
break
for app in apps:
click.echo("Converting app: {}".format(app.id))
click.echo(f"Converting app: {app.id}")
try:
app.mode = AppMode.AGENT_CHAT.value
@ -488,11 +484,11 @@ def convert_to_agent_apps():
)
db.session.commit()
click.echo(click.style("Converted app: {}".format(app.id), fg="green"))
click.echo(click.style(f"Converted app: {app.id}", fg="green"))
except Exception as e:
click.echo(click.style("Convert app error: {} {}".format(e.__class__.__name__, str(e)), fg="red"))
click.echo(click.style(f"Convert app error: {e.__class__.__name__} {str(e)}", fg="red"))
click.echo(click.style("Conversion complete. Converted {} agent apps.".format(len(proceeded_app_ids)), fg="green"))
click.echo(click.style(f"Conversion complete. Converted {len(proceeded_app_ids)} agent apps.", fg="green"))
@click.command("add-qdrant-index", help="Add Qdrant index.")
@ -665,7 +661,7 @@ def create_tenant(email: str, language: Optional[str] = None, name: Optional[str
click.echo(
click.style(
"Account and tenant created.\nAccount: {}\nPassword: {}".format(email, new_password),
f"Account and tenant created.\nAccount: {email}\nPassword: {new_password}",
fg="green",
)
)
@ -726,16 +722,16 @@ where sites.id is null limit 1000"""
if tenant:
accounts = tenant.get_accounts()
if not accounts:
print("Fix failed for app {}".format(app.id))
print(f"Fix failed for app {app.id}")
continue
account = accounts[0]
print("Fixing missing site for app {}".format(app.id))
print(f"Fixing missing site for app {app.id}")
app_was_created.send(app, account=account)
except Exception:
failed_app_ids.append(app_id)
click.echo(click.style("Failed to fix missing site for app {}".format(app_id), fg="red"))
logging.exception(f"Failed to fix app related site missing issue, app_id: {app_id}")
click.echo(click.style(f"Failed to fix missing site for app {app_id}", fg="red"))
logging.exception("Failed to fix app related site missing issue, app_id: %s", app_id)
continue
if not processed_count:

View File

@ -41,7 +41,7 @@ class RemoteSettingsSourceFactory(PydanticBaseSettingsSource):
case RemoteSettingsSourceName.NACOS:
remote_source = NacosSettingsSource(current_state)
case _:
logger.warning(f"Unsupported remote source: {remote_source_name}")
logger.warning("Unsupported remote source: %s", remote_source_name)
return {}
d: dict[str, Any] = {}

View File

@ -245,11 +245,7 @@ class CeleryConfig(DatabaseConfig):
@computed_field
def CELERY_RESULT_BACKEND(self) -> str | None:
return (
"db+{}".format(self.SQLALCHEMY_DATABASE_URI)
if self.CELERY_BACKEND == "database"
else self.CELERY_BROKER_URL
)
return f"db+{self.SQLALCHEMY_DATABASE_URI}" if self.CELERY_BACKEND == "database" else self.CELERY_BROKER_URL
@property
def BROKER_USE_SSL(self) -> bool:

View File

@ -76,7 +76,7 @@ class ApolloClient:
code, body = http_request(url, timeout=3, headers=self._sign_headers(url))
if code == 200:
if not body:
logger.error(f"get_json_from_net load configs failed, body is {body}")
logger.error("get_json_from_net load configs failed, body is %s", body)
return None
data = json.loads(body)
data = data["configurations"]
@ -207,7 +207,7 @@ class ApolloClient:
# if the length is 0 it is returned directly
if len(notifications) == 0:
return
url = "{}/notifications/v2".format(self.config_url)
url = f"{self.config_url}/notifications/v2"
params = {
"appId": self.app_id,
"cluster": self.cluster,
@ -222,7 +222,7 @@ class ApolloClient:
return
if http_code == 200:
if not body:
logger.error(f"_long_poll load configs failed,body is {body}")
logger.error("_long_poll load configs failed,body is %s", body)
return
data = json.loads(body)
for entry in data:
@ -273,12 +273,12 @@ class ApolloClient:
time.sleep(60 * 10) # 10 minutes
def _do_heart_beat(self, namespace):
url = "{}/configs/{}/{}/{}?ip={}".format(self.config_url, self.app_id, self.cluster, namespace, self.ip)
url = f"{self.config_url}/configs/{self.app_id}/{self.cluster}/{namespace}?ip={self.ip}"
try:
code, body = http_request(url, timeout=3, headers=self._sign_headers(url))
if code == 200:
if not body:
logger.error(f"_do_heart_beat load configs failed,body is {body}")
logger.error("_do_heart_beat load configs failed,body is %s", body)
return None
data = json.loads(body)
if self.last_release_key == data["releaseKey"]:

View File

@ -24,7 +24,7 @@ def url_encode_wrapper(params):
def no_key_cache_key(namespace, key):
return "{}{}{}".format(namespace, len(namespace), key)
return f"{namespace}{len(namespace)}{key}"
# Returns whether the obtained value is obtained, and None if it does not

View File

@ -28,5 +28,5 @@ def supported_language(lang):
if lang in languages:
return lang
error = "{lang} is not a valid language.".format(lang=lang)
error = f"{lang} is not a valid language."
raise ValueError(error)

View File

@ -86,7 +86,7 @@ class AnnotationReplyActionStatusApi(Resource):
raise Forbidden()
job_id = str(job_id)
app_annotation_job_key = "{}_app_annotation_job_{}".format(action, str(job_id))
app_annotation_job_key = f"{action}_app_annotation_job_{str(job_id)}"
cache_result = redis_client.get(app_annotation_job_key)
if cache_result is None:
raise ValueError("The job does not exist.")
@ -94,7 +94,7 @@ class AnnotationReplyActionStatusApi(Resource):
job_status = cache_result.decode()
error_msg = ""
if job_status == "error":
app_annotation_error_key = "{}_app_annotation_error_{}".format(action, str(job_id))
app_annotation_error_key = f"{action}_app_annotation_error_{str(job_id)}"
error_msg = redis_client.get(app_annotation_error_key).decode()
return {"job_id": job_id, "job_status": job_status, "error_msg": error_msg}, 200
@ -123,6 +123,17 @@ class AnnotationListApi(Resource):
}
return response, 200
@setup_required
@login_required
@account_initialization_required
def delete(self, app_id):
if not current_user.is_editor:
raise Forbidden()
app_id = str(app_id)
AppAnnotationService.clear_all_annotations(app_id)
return {"result": "success"}, 204
class AnnotationExportApi(Resource):
@setup_required
@ -223,14 +234,14 @@ class AnnotationBatchImportStatusApi(Resource):
raise Forbidden()
job_id = str(job_id)
indexing_cache_key = "app_annotation_batch_import_{}".format(str(job_id))
indexing_cache_key = f"app_annotation_batch_import_{str(job_id)}"
cache_result = redis_client.get(indexing_cache_key)
if cache_result is None:
raise ValueError("The job does not exist.")
job_status = cache_result.decode()
error_msg = ""
if job_status == "error":
indexing_error_msg_key = "app_annotation_batch_import_error_msg_{}".format(str(job_id))
indexing_error_msg_key = f"app_annotation_batch_import_error_msg_{str(job_id)}"
error_msg = redis_client.get(indexing_error_msg_key).decode()
return {"job_id": job_id, "job_status": job_status, "error_msg": error_msg}, 200

View File

@ -51,8 +51,8 @@ class CompletionConversationApi(Resource):
if args["keyword"]:
query = query.join(Message, Message.conversation_id == Conversation.id).where(
or_(
Message.query.ilike("%{}%".format(args["keyword"])),
Message.answer.ilike("%{}%".format(args["keyword"])),
Message.query.ilike(f"%{args['keyword']}%"),
Message.answer.ilike(f"%{args['keyword']}%"),
)
)
@ -174,7 +174,7 @@ class ChatConversationApi(Resource):
query = db.select(Conversation).where(Conversation.app_id == app_model.id)
if args["keyword"]:
keyword_filter = "%{}%".format(args["keyword"])
keyword_filter = f"%{args['keyword']}%"
query = (
query.join(
Message,

View File

@ -1,5 +1,3 @@
import os
from flask_login import current_user
from flask_restful import Resource, reqparse
@ -29,15 +27,12 @@ class RuleGenerateApi(Resource):
args = parser.parse_args()
account = current_user
PROMPT_GENERATION_MAX_TOKENS = int(os.getenv("PROMPT_GENERATION_MAX_TOKENS", "512"))
try:
rules = LLMGenerator.generate_rule_config(
tenant_id=account.current_tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
no_variable=args["no_variable"],
rule_config_max_tokens=PROMPT_GENERATION_MAX_TOKENS,
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
@ -64,14 +59,12 @@ class RuleCodeGenerateApi(Resource):
args = parser.parse_args()
account = current_user
CODE_GENERATION_MAX_TOKENS = int(os.getenv("CODE_GENERATION_MAX_TOKENS", "1024"))
try:
code_result = LLMGenerator.generate_code(
tenant_id=account.current_tenant_id,
instruction=args["instruction"],
model_config=args["model_config"],
code_language=args["code_language"],
max_tokens=CODE_GENERATION_MAX_TOKENS,
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)

View File

@ -5,7 +5,6 @@ from flask_restful import Resource, fields, marshal_with, reqparse
from flask_restful.inputs import int_range
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
import services
from controllers.console import api
from controllers.console.app.error import (
CompletionRequestError,
@ -133,7 +132,7 @@ class MessageFeedbackApi(Resource):
rating=args.get("rating"),
content=None,
)
except services.errors.message.MessageNotExistsError:
except MessageNotExistsError:
raise NotFound("Message Not Exists.")
return {"result": "success"}

View File

@ -81,7 +81,7 @@ class OAuthDataSourceBinding(Resource):
oauth_provider.get_access_token(code)
except requests.exceptions.HTTPError as e:
logging.exception(
f"An error occurred during the OAuthCallback process with {provider}: {e.response.text}"
"An error occurred during the OAuthCallback process with %s: %s", provider, e.response.text
)
return {"error": "OAuth data source process failed"}, 400
@ -103,7 +103,9 @@ class OAuthDataSourceSync(Resource):
try:
oauth_provider.sync_data_source(binding_id)
except requests.exceptions.HTTPError as e:
logging.exception(f"An error occurred during the OAuthCallback process with {provider}: {e.response.text}")
logging.exception(
"An error occurred during the OAuthCallback process with %s: %s", provider, e.response.text
)
return {"error": "OAuth data source process failed"}, 400
return {"result": "success"}, 200

View File

@ -80,7 +80,7 @@ class OAuthCallback(Resource):
user_info = oauth_provider.get_user_info(token)
except requests.exceptions.RequestException as e:
error_text = e.response.text if e.response else str(e)
logging.exception(f"An error occurred during the OAuth process with {provider}: {error_text}")
logging.exception("An error occurred during the OAuth process with %s: %s", provider, error_text)
return {"error": "OAuth process failed"}, 400
if invite_token and RegisterService.is_valid_invite_token(invite_token):

View File

@ -970,7 +970,7 @@ class DocumentRetryApi(DocumentResource):
raise DocumentAlreadyFinishedError()
retry_documents.append(document)
except Exception:
logging.exception(f"Failed to retry document, document id: {document_id}")
logging.exception("Failed to retry document, document id: %s", document_id)
continue
# retry document
DocumentService.retry_document(dataset_id, retry_documents)

View File

@ -1,6 +1,5 @@
import uuid
import pandas as pd
from flask import request
from flask_login import current_user
from flask_restful import Resource, marshal, reqparse
@ -14,8 +13,6 @@ from controllers.console.datasets.error import (
ChildChunkDeleteIndexError,
ChildChunkIndexingError,
InvalidActionError,
NoFileUploadedError,
TooManyFilesError,
)
from controllers.console.wraps import (
account_initialization_required,
@ -32,6 +29,7 @@ from extensions.ext_redis import redis_client
from fields.segment_fields import child_chunk_fields, segment_fields
from libs.login import login_required
from models.dataset import ChildChunk, DocumentSegment
from models.model import UploadFile
from services.dataset_service import DatasetService, DocumentService, SegmentService
from services.entities.knowledge_entities.knowledge_entities import ChildChunkUpdateArgs, SegmentUpdateArgs
from services.errors.chunk import ChildChunkDeleteIndexError as ChildChunkDeleteIndexServiceError
@ -184,7 +182,7 @@ class DatasetDocumentSegmentApi(Resource):
raise ProviderNotInitializeError(ex.description)
segment_ids = request.args.getlist("segment_id")
document_indexing_cache_key = "document_{}_indexing".format(document.id)
document_indexing_cache_key = f"document_{document.id}_indexing"
cache_result = redis_client.get(document_indexing_cache_key)
if cache_result is not None:
raise InvalidActionError("Document is being indexed, please try again later")
@ -365,37 +363,28 @@ class DatasetDocumentSegmentBatchImportApi(Resource):
document = DocumentService.get_document(dataset_id, document_id)
if not document:
raise NotFound("Document not found.")
# get file from request
file = request.files["file"]
# check file
if "file" not in request.files:
raise NoFileUploadedError()
if len(request.files) > 1:
raise TooManyFilesError()
parser = reqparse.RequestParser()
parser.add_argument("upload_file_id", type=str, required=True, nullable=False, location="json")
args = parser.parse_args()
upload_file_id = args["upload_file_id"]
upload_file = db.session.query(UploadFile).where(UploadFile.id == upload_file_id).first()
if not upload_file:
raise NotFound("UploadFile not found.")
# check file type
if not file.filename or not file.filename.lower().endswith(".csv"):
if not upload_file.name or not upload_file.name.lower().endswith(".csv"):
raise ValueError("Invalid file type. Only CSV files are allowed")
try:
# Skip the first row
df = pd.read_csv(file)
result = []
for index, row in df.iterrows():
if document.doc_form == "qa_model":
data = {"content": row.iloc[0], "answer": row.iloc[1]}
else:
data = {"content": row.iloc[0]}
result.append(data)
if len(result) == 0:
raise ValueError("The CSV file is empty.")
# async job
job_id = str(uuid.uuid4())
indexing_cache_key = "segment_batch_import_{}".format(str(job_id))
indexing_cache_key = f"segment_batch_import_{str(job_id)}"
# send batch add segments task
redis_client.setnx(indexing_cache_key, "waiting")
batch_create_segment_to_index_task.delay(
str(job_id), result, dataset_id, document_id, current_user.current_tenant_id, current_user.id
str(job_id), upload_file_id, dataset_id, document_id, current_user.current_tenant_id, current_user.id
)
except Exception as e:
return {"error": str(e)}, 500
@ -406,7 +395,7 @@ class DatasetDocumentSegmentBatchImportApi(Resource):
@account_initialization_required
def get(self, job_id):
job_id = str(job_id)
indexing_cache_key = "segment_batch_import_{}".format(job_id)
indexing_cache_key = f"segment_batch_import_{job_id}"
cache_result = redis_client.get(indexing_cache_key)
if cache_result is None:
raise ValueError("The job does not exist.")

View File

@ -74,7 +74,7 @@ class InstalledAppsListApi(Resource):
):
res.append(installed_app)
installed_app_list = res
logger.debug(f"installed_app_list: {installed_app_list}, user_id: {user_id}")
logger.debug("installed_app_list: %s, user_id: %s", installed_app_list, user_id)
installed_app_list.sort(
key=lambda app: (

View File

@ -5,7 +5,6 @@ from flask_restful import marshal_with, reqparse
from flask_restful.inputs import int_range
from werkzeug.exceptions import InternalServerError, NotFound
import services
from controllers.console.app.error import (
AppMoreLikeThisDisabledError,
CompletionRequestError,
@ -29,7 +28,11 @@ from models.model import AppMode
from services.app_generate_service import AppGenerateService
from services.errors.app import MoreLikeThisDisabledError
from services.errors.conversation import ConversationNotExistsError
from services.errors.message import MessageNotExistsError, SuggestedQuestionsAfterAnswerDisabledError
from services.errors.message import (
FirstMessageNotExistsError,
MessageNotExistsError,
SuggestedQuestionsAfterAnswerDisabledError,
)
from services.message_service import MessageService
@ -52,9 +55,9 @@ class MessageListApi(InstalledAppResource):
return MessageService.pagination_by_first_id(
app_model, current_user, args["conversation_id"], args["first_id"], args["limit"]
)
except services.errors.conversation.ConversationNotExistsError:
except ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.message.FirstMessageNotExistsError:
except FirstMessageNotExistsError:
raise NotFound("First Message Not Exists.")
@ -77,7 +80,7 @@ class MessageFeedbackApi(InstalledAppResource):
rating=args.get("rating"),
content=args.get("content"),
)
except services.errors.message.MessageNotExistsError:
except MessageNotExistsError:
raise NotFound("Message Not Exists.")
return {"result": "success"}

View File

@ -34,7 +34,7 @@ class VersionApi(Resource):
try:
response = requests.get(check_update_url, {"current_version": args.get("current_version")})
except Exception as error:
logging.warning("Check update version error: {}.".format(str(error)))
logging.warning("Check update version error: %s.", str(error))
result["version"] = args.get("current_version")
return result
@ -55,7 +55,7 @@ def _has_new_version(*, latest_version: str, current_version: str) -> bool:
# Compare versions
return latest > current
except version.InvalidVersion:
logging.warning(f"Invalid version format: latest={latest_version}, current={current_version}")
logging.warning("Invalid version format: latest=%s, current=%s", latest_version, current_version)
return False

View File

@ -15,7 +15,7 @@ from controllers.console.auth.error import (
InvalidEmailError,
InvalidTokenError,
)
from controllers.console.error import AccountNotFound, EmailSendIpLimitError
from controllers.console.error import AccountInFreezeError, AccountNotFound, EmailSendIpLimitError
from controllers.console.workspace.error import (
AccountAlreadyInitedError,
CurrentPasswordIncorrectError,
@ -479,20 +479,27 @@ class ChangeEmailResetApi(Resource):
parser.add_argument("token", type=str, required=True, nullable=False, location="json")
args = parser.parse_args()
if AccountService.is_account_in_freeze(args["new_email"]):
raise AccountInFreezeError()
if not AccountService.check_email_unique(args["new_email"]):
raise EmailAlreadyInUseError()
reset_data = AccountService.get_change_email_data(args["token"])
if not reset_data:
raise InvalidTokenError()
AccountService.revoke_change_email_token(args["token"])
if not AccountService.check_email_unique(args["new_email"]):
raise EmailAlreadyInUseError()
old_email = reset_data.get("old_email", "")
if current_user.email != old_email:
raise AccountNotFound()
updated_account = AccountService.update_account(current_user, email=args["new_email"])
updated_account = AccountService.update_account_email(current_user, email=args["new_email"])
AccountService.send_change_email_completed_notify_email(
email=args["new_email"],
)
return updated_account
@ -503,6 +510,8 @@ class CheckEmailUnique(Resource):
parser = reqparse.RequestParser()
parser.add_argument("email", type=email, required=True, location="json")
args = parser.parse_args()
if AccountService.is_account_in_freeze(args["email"]):
raise AccountInFreezeError()
if not AccountService.check_email_unique(args["email"]):
raise EmailAlreadyInUseError()
return {"result": "success"}

View File

@ -73,8 +73,9 @@ class DefaultModelApi(Resource):
)
except Exception as ex:
logging.exception(
f"Failed to update default model, model type: {model_setting['model_type']},"
f" model:{model_setting.get('model')}"
"Failed to update default model, model type: %s, model: %s",
model_setting["model_type"],
model_setting.get("model"),
)
raise ex
@ -160,8 +161,10 @@ class ModelProviderModelApi(Resource):
)
except CredentialsValidateFailedError as ex:
logging.exception(
f"Failed to save model credentials, tenant_id: {tenant_id},"
f" model: {args.get('model')}, model_type: {args.get('model_type')}"
"Failed to save model credentials, tenant_id: %s, model: %s, model_type: %s",
tenant_id,
args.get("model"),
args.get("model_type"),
)
raise ValueError(str(ex))

View File

@ -34,7 +34,7 @@ class AnnotationReplyActionStatusApi(Resource):
@validate_app_token
def get(self, app_model: App, job_id, action):
job_id = str(job_id)
app_annotation_job_key = "{}_app_annotation_job_{}".format(action, str(job_id))
app_annotation_job_key = f"{action}_app_annotation_job_{str(job_id)}"
cache_result = redis_client.get(app_annotation_job_key)
if cache_result is None:
raise ValueError("The job does not exist.")
@ -42,7 +42,7 @@ class AnnotationReplyActionStatusApi(Resource):
job_status = cache_result.decode()
error_msg = ""
if job_status == "error":
app_annotation_error_key = "{}_app_annotation_error_{}".format(action, str(job_id))
app_annotation_error_key = f"{action}_app_annotation_error_{str(job_id)}"
error_msg = redis_client.get(app_annotation_error_key).decode()
return {"job_id": job_id, "job_status": job_status, "error_msg": error_msg}, 200

View File

@ -47,6 +47,9 @@ class CompletionApi(Resource):
parser.add_argument("retriever_from", type=str, required=False, default="dev", location="json")
args = parser.parse_args()
external_trace_id = get_external_trace_id(request)
if external_trace_id:
args["external_trace_id"] = external_trace_id
streaming = args["response_mode"] == "streaming"

View File

@ -15,7 +15,11 @@ from fields.message_fields import agent_thought_fields, feedback_fields
from fields.raws import FilesContainedField
from libs.helper import TimestampField, uuid_value
from models.model import App, AppMode, EndUser
from services.errors.message import SuggestedQuestionsAfterAnswerDisabledError
from services.errors.message import (
FirstMessageNotExistsError,
MessageNotExistsError,
SuggestedQuestionsAfterAnswerDisabledError,
)
from services.message_service import MessageService
@ -65,7 +69,7 @@ class MessageListApi(Resource):
)
except services.errors.conversation.ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.message.FirstMessageNotExistsError:
except FirstMessageNotExistsError:
raise NotFound("First Message Not Exists.")
@ -87,7 +91,7 @@ class MessageFeedbackApi(Resource):
rating=args.get("rating"),
content=args.get("content"),
)
except services.errors.message.MessageNotExistsError:
except MessageNotExistsError:
raise NotFound("Message Not Exists.")
return {"result": "success"}
@ -117,7 +121,7 @@ class MessageSuggestedApi(Resource):
questions = MessageService.get_suggested_questions_after_answer(
app_model=app_model, user=end_user, message_id=message_id, invoke_from=InvokeFrom.SERVICE_API
)
except services.errors.message.MessageNotExistsError:
except MessageNotExistsError:
raise NotFound("Message Not Exists.")
except SuggestedQuestionsAfterAnswerDisabledError:
raise BadRequest("Suggested Questions Is Disabled.")

View File

@ -4,7 +4,6 @@ from flask_restful import fields, marshal_with, reqparse
from flask_restful.inputs import int_range
from werkzeug.exceptions import InternalServerError, NotFound
import services
from controllers.web import api
from controllers.web.error import (
AppMoreLikeThisDisabledError,
@ -29,7 +28,11 @@ from models.model import AppMode
from services.app_generate_service import AppGenerateService
from services.errors.app import MoreLikeThisDisabledError
from services.errors.conversation import ConversationNotExistsError
from services.errors.message import MessageNotExistsError, SuggestedQuestionsAfterAnswerDisabledError
from services.errors.message import (
FirstMessageNotExistsError,
MessageNotExistsError,
SuggestedQuestionsAfterAnswerDisabledError,
)
from services.message_service import MessageService
@ -73,9 +76,9 @@ class MessageListApi(WebApiResource):
return MessageService.pagination_by_first_id(
app_model, end_user, args["conversation_id"], args["first_id"], args["limit"]
)
except services.errors.conversation.ConversationNotExistsError:
except ConversationNotExistsError:
raise NotFound("Conversation Not Exists.")
except services.errors.message.FirstMessageNotExistsError:
except FirstMessageNotExistsError:
raise NotFound("First Message Not Exists.")
@ -96,7 +99,7 @@ class MessageFeedbackApi(WebApiResource):
rating=args.get("rating"),
content=args.get("content"),
)
except services.errors.message.MessageNotExistsError:
except MessageNotExistsError:
raise NotFound("Message Not Exists.")
return {"result": "success"}

View File

@ -280,7 +280,7 @@ class BaseAgentRunner(AppRunner):
def create_agent_thought(
self, message_id: str, message: str, tool_name: str, tool_input: str, messages_ids: list[str]
) -> MessageAgentThought:
) -> str:
"""
Create agent thought
"""
@ -313,16 +313,15 @@ class BaseAgentRunner(AppRunner):
db.session.add(thought)
db.session.commit()
db.session.refresh(thought)
agent_thought_id = str(thought.id)
self.agent_thought_count += 1
db.session.close()
self.agent_thought_count += 1
return thought
return agent_thought_id
def save_agent_thought(
self,
agent_thought: MessageAgentThought,
agent_thought_id: str,
tool_name: str | None,
tool_input: Union[str, dict, None],
thought: str | None,
@ -335,12 +334,9 @@ class BaseAgentRunner(AppRunner):
"""
Save agent thought
"""
updated_agent_thought = (
db.session.query(MessageAgentThought).where(MessageAgentThought.id == agent_thought.id).first()
)
if not updated_agent_thought:
agent_thought = db.session.query(MessageAgentThought).where(MessageAgentThought.id == agent_thought_id).first()
if not agent_thought:
raise ValueError("agent thought not found")
agent_thought = updated_agent_thought
if thought:
agent_thought.thought += thought
@ -355,7 +351,7 @@ class BaseAgentRunner(AppRunner):
except Exception:
tool_input = json.dumps(tool_input)
updated_agent_thought.tool_input = tool_input
agent_thought.tool_input = tool_input
if observation:
if isinstance(observation, dict):
@ -364,27 +360,27 @@ class BaseAgentRunner(AppRunner):
except Exception:
observation = json.dumps(observation)
updated_agent_thought.observation = observation
agent_thought.observation = observation
if answer:
agent_thought.answer = answer
if messages_ids is not None and len(messages_ids) > 0:
updated_agent_thought.message_files = json.dumps(messages_ids)
agent_thought.message_files = json.dumps(messages_ids)
if llm_usage:
updated_agent_thought.message_token = llm_usage.prompt_tokens
updated_agent_thought.message_price_unit = llm_usage.prompt_price_unit
updated_agent_thought.message_unit_price = llm_usage.prompt_unit_price
updated_agent_thought.answer_token = llm_usage.completion_tokens
updated_agent_thought.answer_price_unit = llm_usage.completion_price_unit
updated_agent_thought.answer_unit_price = llm_usage.completion_unit_price
updated_agent_thought.tokens = llm_usage.total_tokens
updated_agent_thought.total_price = llm_usage.total_price
agent_thought.message_token = llm_usage.prompt_tokens
agent_thought.message_price_unit = llm_usage.prompt_price_unit
agent_thought.message_unit_price = llm_usage.prompt_unit_price
agent_thought.answer_token = llm_usage.completion_tokens
agent_thought.answer_price_unit = llm_usage.completion_price_unit
agent_thought.answer_unit_price = llm_usage.completion_unit_price
agent_thought.tokens = llm_usage.total_tokens
agent_thought.total_price = llm_usage.total_price
# check if tool labels is not empty
labels = updated_agent_thought.tool_labels or {}
tools = updated_agent_thought.tool.split(";") if updated_agent_thought.tool else []
labels = agent_thought.tool_labels or {}
tools = agent_thought.tool.split(";") if agent_thought.tool else []
for tool in tools:
if not tool:
continue
@ -395,7 +391,7 @@ class BaseAgentRunner(AppRunner):
else:
labels[tool] = {"en_US": tool, "zh_Hans": tool}
updated_agent_thought.tool_labels_str = json.dumps(labels)
agent_thought.tool_labels_str = json.dumps(labels)
if tool_invoke_meta is not None:
if isinstance(tool_invoke_meta, dict):
@ -404,7 +400,7 @@ class BaseAgentRunner(AppRunner):
except Exception:
tool_invoke_meta = json.dumps(tool_invoke_meta)
updated_agent_thought.tool_meta_str = tool_invoke_meta
agent_thought.tool_meta_str = tool_invoke_meta
db.session.commit()
db.session.close()

View File

@ -97,13 +97,13 @@ class CotAgentRunner(BaseAgentRunner, ABC):
message_file_ids: list[str] = []
agent_thought = self.create_agent_thought(
agent_thought_id = self.create_agent_thought(
message_id=message.id, message="", tool_name="", tool_input="", messages_ids=message_file_ids
)
if iteration_step > 1:
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought.id), PublishFrom.APPLICATION_MANAGER
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
# recalc llm max tokens
@ -133,7 +133,7 @@ class CotAgentRunner(BaseAgentRunner, ABC):
# publish agent thought if it's first iteration
if iteration_step == 1:
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought.id), PublishFrom.APPLICATION_MANAGER
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
for chunk in react_chunks:
@ -168,7 +168,7 @@ class CotAgentRunner(BaseAgentRunner, ABC):
usage_dict["usage"] = LLMUsage.empty_usage()
self.save_agent_thought(
agent_thought=agent_thought,
agent_thought_id=agent_thought_id,
tool_name=(scratchpad.action.action_name if scratchpad.action and not scratchpad.is_final() else ""),
tool_input={scratchpad.action.action_name: scratchpad.action.action_input} if scratchpad.action else {},
tool_invoke_meta={},
@ -181,7 +181,7 @@ class CotAgentRunner(BaseAgentRunner, ABC):
if not scratchpad.is_final():
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought.id), PublishFrom.APPLICATION_MANAGER
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
if not scratchpad.action:
@ -212,7 +212,7 @@ class CotAgentRunner(BaseAgentRunner, ABC):
scratchpad.agent_response = tool_invoke_response
self.save_agent_thought(
agent_thought=agent_thought,
agent_thought_id=agent_thought_id,
tool_name=scratchpad.action.action_name,
tool_input={scratchpad.action.action_name: scratchpad.action.action_input},
thought=scratchpad.thought or "",
@ -224,7 +224,7 @@ class CotAgentRunner(BaseAgentRunner, ABC):
)
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought.id), PublishFrom.APPLICATION_MANAGER
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
# update prompt tool message
@ -244,7 +244,7 @@ class CotAgentRunner(BaseAgentRunner, ABC):
# save agent thought
self.save_agent_thought(
agent_thought=agent_thought,
agent_thought_id=agent_thought_id,
tool_name="",
tool_input={},
tool_invoke_meta={},

View File

@ -80,7 +80,7 @@ class FunctionCallAgentRunner(BaseAgentRunner):
prompt_messages_tools = []
message_file_ids: list[str] = []
agent_thought = self.create_agent_thought(
agent_thought_id = self.create_agent_thought(
message_id=message.id, message="", tool_name="", tool_input="", messages_ids=message_file_ids
)
@ -114,7 +114,7 @@ class FunctionCallAgentRunner(BaseAgentRunner):
for chunk in chunks:
if is_first_chunk:
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought.id), PublishFrom.APPLICATION_MANAGER
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
is_first_chunk = False
# check if there is any tool call
@ -172,7 +172,7 @@ class FunctionCallAgentRunner(BaseAgentRunner):
result.message.content = ""
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought.id), PublishFrom.APPLICATION_MANAGER
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
yield LLMResultChunk(
@ -205,7 +205,7 @@ class FunctionCallAgentRunner(BaseAgentRunner):
# save thought
self.save_agent_thought(
agent_thought=agent_thought,
agent_thought_id=agent_thought_id,
tool_name=tool_call_names,
tool_input=tool_call_inputs,
thought=response,
@ -216,7 +216,7 @@ class FunctionCallAgentRunner(BaseAgentRunner):
llm_usage=current_llm_usage,
)
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought.id), PublishFrom.APPLICATION_MANAGER
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
final_answer += response + "\n"
@ -276,7 +276,7 @@ class FunctionCallAgentRunner(BaseAgentRunner):
if len(tool_responses) > 0:
# save agent thought
self.save_agent_thought(
agent_thought=agent_thought,
agent_thought_id=agent_thought_id,
tool_name="",
tool_input="",
thought="",
@ -291,7 +291,7 @@ class FunctionCallAgentRunner(BaseAgentRunner):
messages_ids=message_file_ids,
)
self.queue_manager.publish(
QueueAgentThoughtEvent(agent_thought_id=agent_thought.id), PublishFrom.APPLICATION_MANAGER
QueueAgentThoughtEvent(agent_thought_id=agent_thought_id), PublishFrom.APPLICATION_MANAGER
)
# update prompt tool

View File

@ -600,5 +600,5 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator):
if len(e.args) > 0 and e.args[0] == "I/O operation on closed file.": # ignore this error
raise GenerateTaskStoppedError()
else:
logger.exception(f"Failed to process generate task pipeline, conversation_id: {conversation.id}")
logger.exception("Failed to process generate task pipeline, conversation_id: %s", conversation.id)
raise e

View File

@ -271,7 +271,7 @@ class AdvancedChatAppGenerateTaskPipeline:
start_listener_time = time.time()
yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id)
except Exception:
logger.exception(f"Failed to listen audio message, task_id: {task_id}")
logger.exception("Failed to listen audio message, task_id: %s", task_id)
break
if tts_publisher:
yield MessageAudioEndStreamResponse(audio="", task_id=task_id)

View File

@ -78,7 +78,7 @@ class MessageBasedAppGenerator(BaseAppGenerator):
if len(e.args) > 0 and e.args[0] == "I/O operation on closed file.": # ignore this error
raise GenerateTaskStoppedError()
else:
logger.exception(f"Failed to handle response, conversation_id: {conversation.id}")
logger.exception("Failed to handle response, conversation_id: %s", conversation.id)
raise e
def _get_app_model_config(self, app_model: App, conversation: Optional[Conversation] = None) -> AppModelConfig:

View File

@ -483,7 +483,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
try:
runner.run()
except GenerateTaskStoppedError as e:
logger.warning(f"Task stopped: {str(e)}")
logger.warning("Task stopped: %s", str(e))
pass
except InvokeAuthorizationError:
queue_manager.publish_error(
@ -540,6 +540,6 @@ class WorkflowAppGenerator(BaseAppGenerator):
raise GenerateTaskStoppedError()
else:
logger.exception(
f"Fails to process generate task pipeline, task_id: {application_generate_entity.task_id}"
"Fails to process generate task pipeline, task_id: %s", application_generate_entity.task_id
)
raise e

View File

@ -246,7 +246,7 @@ class WorkflowAppGenerateTaskPipeline:
else:
yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id)
except Exception:
logger.exception(f"Fails to get audio trunk, task_id: {task_id}")
logger.exception("Fails to get audio trunk, task_id: %s", task_id)
break
if tts_publisher:
yield MessageAudioEndStreamResponse(audio="", task_id=task_id)

View File

@ -83,7 +83,7 @@ class AnnotationReplyFeature:
return annotation
except Exception as e:
logger.warning(f"Query annotation failed, exception: {str(e)}.")
logger.warning("Query annotation failed, exception: %s.", str(e))
return None
return None

View File

@ -97,7 +97,7 @@ class MessageCycleManager:
conversation.name = name
except Exception as e:
if dify_config.DEBUG:
logging.exception(f"generate conversation name failed, conversation_id: {conversation_id}")
logging.exception("generate conversation name failed, conversation_id: %s", conversation_id)
pass
db.session.merge(conversation)

View File

@ -900,7 +900,7 @@ class ProviderConfiguration(BaseModel):
credentials=copy_credentials,
)
except Exception as ex:
logger.warning(f"get custom model schema failed, {ex}")
logger.warning("get custom model schema failed, %s", ex)
continue
if not custom_model_schema:
@ -1009,7 +1009,7 @@ class ProviderConfiguration(BaseModel):
credentials=model_configuration.credentials,
)
except Exception as ex:
logger.warning(f"get custom model schema failed, {ex}")
logger.warning("get custom model schema failed, %s", ex)
continue
if not custom_model_schema:

View File

@ -22,7 +22,7 @@ class APIBasedExtensionRequestor:
:param params: the request params
:return: the response json
"""
headers = {"Content-Type": "application/json", "Authorization": "Bearer {}".format(self.api_key)}
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}"}
url = self.api_endpoint
@ -49,8 +49,6 @@ class APIBasedExtensionRequestor:
raise ValueError("request connection error")
if response.status_code != 200:
raise ValueError(
"request error, status_code: {}, content: {}".format(response.status_code, response.text[:100])
)
raise ValueError(f"request error, status_code: {response.status_code}, content: {response.text[:100]}")
return cast(dict, response.json())

View File

@ -66,7 +66,7 @@ class Extensible:
# Check for extension module file
if (extension_name + ".py") not in file_names:
logging.warning(f"Missing {extension_name}.py file in {subdir_path}, Skip.")
logging.warning("Missing %s.py file in %s, Skip.", extension_name, subdir_path)
continue
# Check for builtin flag and position
@ -95,7 +95,7 @@ class Extensible:
break
if not extension_class:
logging.warning(f"Missing subclass of {cls.__name__} in {module_name}, Skip.")
logging.warning("Missing subclass of %s in %s, Skip.", cls.__name__, module_name)
continue
# Load schema if not builtin
@ -103,7 +103,7 @@ class Extensible:
if not builtin:
json_path = os.path.join(subdir_path, "schema.json")
if not os.path.exists(json_path):
logging.warning(f"Missing schema.json file in {subdir_path}, Skip.")
logging.warning("Missing schema.json file in %s, Skip.", subdir_path)
continue
with open(json_path, encoding="utf-8") as f:

View File

@ -49,7 +49,7 @@ class ApiExternalDataTool(ExternalDataTool):
"""
# get params from config
if not self.config:
raise ValueError("config is required, config: {}".format(self.config))
raise ValueError(f"config is required, config: {self.config}")
api_based_extension_id = self.config.get("api_based_extension_id")
assert api_based_extension_id is not None, "api_based_extension_id is required"
@ -74,7 +74,7 @@ class ApiExternalDataTool(ExternalDataTool):
# request api
requestor = APIBasedExtensionRequestor(api_endpoint=api_based_extension.api_endpoint, api_key=api_key)
except Exception as e:
raise ValueError("[External data tool] API query failed, variable: {}, error: {}".format(self.variable, e))
raise ValueError(f"[External data tool] API query failed, variable: {self.variable}, error: {e}")
response_json = requestor.request(
point=APIBasedExtensionPoint.APP_EXTERNAL_DATA_TOOL_QUERY,
@ -90,7 +90,7 @@ class ApiExternalDataTool(ExternalDataTool):
if not isinstance(response_json["result"], str):
raise ValueError(
"[External data tool] API query failed, variable: {}, error: result is not string".format(self.variable)
f"[External data tool] API query failed, variable: {self.variable}, error: result is not string"
)
return response_json["result"]

View File

@ -55,7 +55,7 @@ def check_moderation(tenant_id: str, model_config: ModelConfigWithCredentialsEnt
if moderation_result is True:
return True
except Exception:
logger.exception(f"Fails to check moderation, provider_name: {provider_name}")
logger.exception("Fails to check moderation, provider_name: %s", provider_name)
raise InvokeBadRequestError("Rate limit exceeded, please try again later.")
return False

View File

@ -30,7 +30,7 @@ def import_module_from_source(*, module_name: str, py_file_path: AnyStr, use_laz
spec.loader.exec_module(module)
return module
except Exception as e:
logging.exception(f"Failed to load module {module_name} from script file '{py_file_path!r}'")
logging.exception("Failed to load module %s from script file '%s'", module_name, repr(py_file_path))
raise e

View File

@ -73,10 +73,12 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
if response.status_code not in STATUS_FORCELIST:
return response
else:
logging.warning(f"Received status code {response.status_code} for URL {url} which is in the force list")
logging.warning(
"Received status code %s for URL %s which is in the force list", response.status_code, url
)
except httpx.RequestError as e:
logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}")
logging.warning("Request to URL %s failed on attempt %s: %s", url, retries + 1, e)
if max_retries == 0:
raise

View File

@ -84,14 +84,14 @@ class IndexingRunner:
documents=documents,
)
except DocumentIsPausedError:
raise DocumentIsPausedError("Document paused, document id: {}".format(dataset_document.id))
raise DocumentIsPausedError(f"Document paused, document id: {dataset_document.id}")
except ProviderTokenNotInitError as e:
dataset_document.indexing_status = "error"
dataset_document.error = str(e.description)
dataset_document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)
db.session.commit()
except ObjectDeletedError:
logging.warning("Document deleted, document id: {}".format(dataset_document.id))
logging.warning("Document deleted, document id: %s", dataset_document.id)
except Exception as e:
logging.exception("consume document failed")
dataset_document.indexing_status = "error"
@ -147,7 +147,7 @@ class IndexingRunner:
index_processor=index_processor, dataset=dataset, dataset_document=dataset_document, documents=documents
)
except DocumentIsPausedError:
raise DocumentIsPausedError("Document paused, document id: {}".format(dataset_document.id))
raise DocumentIsPausedError(f"Document paused, document id: {dataset_document.id}")
except ProviderTokenNotInitError as e:
dataset_document.indexing_status = "error"
dataset_document.error = str(e.description)
@ -222,7 +222,7 @@ class IndexingRunner:
index_processor=index_processor, dataset=dataset, dataset_document=dataset_document, documents=documents
)
except DocumentIsPausedError:
raise DocumentIsPausedError("Document paused, document id: {}".format(dataset_document.id))
raise DocumentIsPausedError(f"Document paused, document id: {dataset_document.id}")
except ProviderTokenNotInitError as e:
dataset_document.indexing_status = "error"
dataset_document.error = str(e.description)
@ -324,7 +324,8 @@ class IndexingRunner:
except Exception:
logging.exception(
"Delete image_files failed while indexing_estimate, \
image_upload_file_is: {}".format(upload_file_id)
image_upload_file_is: %s",
upload_file_id,
)
db.session.delete(image_file)
@ -649,7 +650,7 @@ class IndexingRunner:
@staticmethod
def _check_document_paused_status(document_id: str):
indexing_cache_key = "document_{}_is_paused".format(document_id)
indexing_cache_key = f"document_{document_id}_is_paused"
result = redis_client.get(indexing_cache_key)
if result:
raise DocumentIsPausedError()

View File

@ -125,16 +125,13 @@ class LLMGenerator:
return questions
@classmethod
def generate_rule_config(
cls, tenant_id: str, instruction: str, model_config: dict, no_variable: bool, rule_config_max_tokens: int = 512
) -> dict:
def generate_rule_config(cls, tenant_id: str, instruction: str, model_config: dict, no_variable: bool) -> dict:
output_parser = RuleConfigGeneratorOutputParser()
error = ""
error_step = ""
rule_config = {"prompt": "", "variables": [], "opening_statement": "", "error": ""}
model_parameters = {"max_tokens": rule_config_max_tokens, "temperature": 0.01}
model_parameters = model_config.get("completion_params", {})
if no_variable:
prompt_template = PromptTemplateParser(WORKFLOW_RULE_CONFIG_PROMPT_GENERATE_TEMPLATE)
@ -170,7 +167,7 @@ class LLMGenerator:
error = str(e)
error_step = "generate rule config"
except Exception as e:
logging.exception(f"Failed to generate rule config, model: {model_config.get('name')}")
logging.exception("Failed to generate rule config, model: %s", model_config.get("name"))
rule_config["error"] = str(e)
rule_config["error"] = f"Failed to {error_step}. Error: {error}" if error else ""
@ -267,7 +264,7 @@ class LLMGenerator:
error_step = "generate conversation opener"
except Exception as e:
logging.exception(f"Failed to generate rule config, model: {model_config.get('name')}")
logging.exception("Failed to generate rule config, model: %s", model_config.get("name"))
rule_config["error"] = str(e)
rule_config["error"] = f"Failed to {error_step}. Error: {error}" if error else ""
@ -276,12 +273,7 @@ class LLMGenerator:
@classmethod
def generate_code(
cls,
tenant_id: str,
instruction: str,
model_config: dict,
code_language: str = "javascript",
max_tokens: int = 1000,
cls, tenant_id: str, instruction: str, model_config: dict, code_language: str = "javascript"
) -> dict:
if code_language == "python":
prompt_template = PromptTemplateParser(PYTHON_CODE_GENERATOR_PROMPT_TEMPLATE)
@ -305,8 +297,7 @@ class LLMGenerator:
)
prompt_messages = [UserPromptMessage(content=prompt)]
model_parameters = {"max_tokens": max_tokens, "temperature": 0.01}
model_parameters = model_config.get("completion_params", {})
try:
response = cast(
LLMResult,
@ -323,7 +314,7 @@ class LLMGenerator:
return {"code": "", "language": code_language, "error": f"Failed to generate code. Error: {error}"}
except Exception as e:
logging.exception(
f"Failed to invoke LLM model, model: {model_config.get('name')}, language: {code_language}"
"Failed to invoke LLM model, model: %s, language: %s", model_config.get("name"), code_language
)
return {"code": "", "language": code_language, "error": f"An unexpected error occurred: {str(e)}"}
@ -395,5 +386,5 @@ class LLMGenerator:
error = str(e)
return {"output": "", "error": f"Failed to generate JSON Schema. Error: {error}"}
except Exception as e:
logging.exception(f"Failed to invoke LLM model, model: {model_config.get('name')}")
logging.exception("Failed to invoke LLM model, model: %s", model_config.get("name"))
return {"output": "", "error": f"An unexpected error occurred: {str(e)}"}

View File

@ -88,7 +88,7 @@ class SSETransport:
status_queue: Queue to put status updates.
"""
endpoint_url = urljoin(self.url, sse_data)
logger.info(f"Received endpoint URL: {endpoint_url}")
logger.info("Received endpoint URL: %s", endpoint_url)
if not self._validate_endpoint_url(endpoint_url):
error_msg = f"Endpoint origin does not match connection origin: {endpoint_url}"
@ -107,7 +107,7 @@ class SSETransport:
"""
try:
message = types.JSONRPCMessage.model_validate_json(sse_data)
logger.debug(f"Received server message: {message}")
logger.debug("Received server message: %s", message)
session_message = SessionMessage(message)
read_queue.put(session_message)
except Exception as exc:
@ -128,7 +128,7 @@ class SSETransport:
case "message":
self._handle_message_event(sse.data, read_queue)
case _:
logger.warning(f"Unknown SSE event: {sse.event}")
logger.warning("Unknown SSE event: %s", sse.event)
def sse_reader(self, event_source, read_queue: ReadQueue, status_queue: StatusQueue) -> None:
"""Read and process SSE events.
@ -142,7 +142,7 @@ class SSETransport:
for sse in event_source.iter_sse():
self._handle_sse_event(sse, read_queue, status_queue)
except httpx.ReadError as exc:
logger.debug(f"SSE reader shutting down normally: {exc}")
logger.debug("SSE reader shutting down normally: %s", exc)
except Exception as exc:
read_queue.put(exc)
finally:
@ -165,7 +165,7 @@ class SSETransport:
),
)
response.raise_for_status()
logger.debug(f"Client message sent successfully: {response.status_code}")
logger.debug("Client message sent successfully: %s", response.status_code)
def post_writer(self, client: httpx.Client, endpoint_url: str, write_queue: WriteQueue) -> None:
"""Handle writing messages to the server.
@ -190,7 +190,7 @@ class SSETransport:
except queue.Empty:
continue
except httpx.ReadError as exc:
logger.debug(f"Post writer shutting down normally: {exc}")
logger.debug("Post writer shutting down normally: %s", exc)
except Exception as exc:
logger.exception("Error writing messages")
write_queue.put(exc)
@ -326,7 +326,7 @@ def send_message(http_client: httpx.Client, endpoint_url: str, session_message:
),
)
response.raise_for_status()
logger.debug(f"Client message sent successfully: {response.status_code}")
logger.debug("Client message sent successfully: %s", response.status_code)
except Exception as exc:
logger.exception("Error sending message")
raise
@ -349,13 +349,13 @@ def read_messages(
if sse.event == "message":
try:
message = types.JSONRPCMessage.model_validate_json(sse.data)
logger.debug(f"Received server message: {message}")
logger.debug("Received server message: %s", message)
yield SessionMessage(message)
except Exception as exc:
logger.exception("Error parsing server message")
yield exc
else:
logger.warning(f"Unknown SSE event: {sse.event}")
logger.warning("Unknown SSE event: %s", sse.event)
except Exception as exc:
logger.exception("Error reading SSE messages")
yield exc

View File

@ -129,7 +129,7 @@ class StreamableHTTPTransport:
new_session_id = response.headers.get(MCP_SESSION_ID)
if new_session_id:
self.session_id = new_session_id
logger.info(f"Received session ID: {self.session_id}")
logger.info("Received session ID: %s", self.session_id)
def _handle_sse_event(
self,
@ -142,7 +142,7 @@ class StreamableHTTPTransport:
if sse.event == "message":
try:
message = JSONRPCMessage.model_validate_json(sse.data)
logger.debug(f"SSE message: {message}")
logger.debug("SSE message: %s", message)
# If this is a response and we have original_request_id, replace it
if original_request_id is not None and isinstance(message.root, JSONRPCResponse | JSONRPCError):
@ -168,7 +168,7 @@ class StreamableHTTPTransport:
logger.debug("Received ping event")
return False
else:
logger.warning(f"Unknown SSE event: {sse.event}")
logger.warning("Unknown SSE event: %s", sse.event)
return False
def handle_get_stream(
@ -197,7 +197,7 @@ class StreamableHTTPTransport:
self._handle_sse_event(sse, server_to_client_queue)
except Exception as exc:
logger.debug(f"GET stream error (non-fatal): {exc}")
logger.debug("GET stream error (non-fatal): %s", exc)
def _handle_resumption_request(self, ctx: RequestContext) -> None:
"""Handle a resumption request using GET with SSE."""
@ -352,7 +352,7 @@ class StreamableHTTPTransport:
# Check if this is a resumption request
is_resumption = bool(metadata and metadata.resumption_token)
logger.debug(f"Sending client message: {message}")
logger.debug("Sending client message: %s", message)
# Handle initialized notification
if self._is_initialized_notification(message):
@ -389,9 +389,9 @@ class StreamableHTTPTransport:
if response.status_code == 405:
logger.debug("Server does not allow session termination")
elif response.status_code != 200:
logger.warning(f"Session termination failed: {response.status_code}")
logger.warning("Session termination failed: %s", response.status_code)
except Exception as exc:
logger.warning(f"Session termination failed: {exc}")
logger.warning("Session termination failed: %s", exc)
def get_session_id(self) -> str | None:
"""Get the current session ID."""

View File

@ -75,7 +75,7 @@ class MCPClient:
self.connect_server(client_factory, method_name)
else:
try:
logger.debug(f"Not supported method {method_name} found in URL path, trying default 'mcp' method.")
logger.debug("Not supported method %s found in URL path, trying default 'mcp' method.", method_name)
self.connect_server(sse_client, "sse")
except MCPConnectionError:
logger.debug("MCP connection failed with 'sse', falling back to 'mcp' method.")

View File

@ -368,7 +368,7 @@ class BaseSession(
self._handle_incoming(notification)
except Exception as e:
# For other validation errors, log and continue
logging.warning(f"Failed to validate notification: {e}. Message was: {message.message.root}")
logging.warning("Failed to validate notification: %s. Message was: %s", e, message.message.root)
else: # Response or error
response_queue = self._response_streams.get(message.message.root.id)
if response_queue is not None:

View File

@ -535,9 +535,19 @@ class LBModelManager:
if dify_config.DEBUG:
logger.info(
f"Model LB\nid: {config.id}\nname:{config.name}\n"
f"tenant_id: {self._tenant_id}\nprovider: {self._provider}\n"
f"model_type: {self._model_type.value}\nmodel: {self._model}"
"""Model LB
id: %s
name:%s
tenant_id: %s
provider: %s
model_type: %s
model: %s""",
config.id,
config.name,
self._tenant_id,
self._provider,
self._model_type.value,
self._model,
)
return config

View File

@ -440,7 +440,9 @@ class LargeLanguageModel(AIModel):
if callback.raise_error:
raise e
else:
logger.warning(f"Callback {callback.__class__.__name__} on_before_invoke failed with error {e}")
logger.warning(
"Callback %s on_before_invoke failed with error %s", callback.__class__.__name__, e
)
def _trigger_new_chunk_callbacks(
self,
@ -487,7 +489,7 @@ class LargeLanguageModel(AIModel):
if callback.raise_error:
raise e
else:
logger.warning(f"Callback {callback.__class__.__name__} on_new_chunk failed with error {e}")
logger.warning("Callback %s on_new_chunk failed with error %s", callback.__class__.__name__, e)
def _trigger_after_invoke_callbacks(
self,
@ -535,7 +537,9 @@ class LargeLanguageModel(AIModel):
if callback.raise_error:
raise e
else:
logger.warning(f"Callback {callback.__class__.__name__} on_after_invoke failed with error {e}")
logger.warning(
"Callback %s on_after_invoke failed with error %s", callback.__class__.__name__, e
)
def _trigger_invoke_error_callbacks(
self,
@ -583,4 +587,6 @@ class LargeLanguageModel(AIModel):
if callback.raise_error:
raise e
else:
logger.warning(f"Callback {callback.__class__.__name__} on_invoke_error failed with error {e}")
logger.warning(
"Callback %s on_invoke_error failed with error %s", callback.__class__.__name__, e
)

View File

@ -136,6 +136,6 @@ class OutputModeration(BaseModel):
result: ModerationOutputsResult = moderation_factory.moderation_for_outputs(moderation_buffer)
return result
except Exception as e:
logger.exception(f"Moderation Output error, app_id: {app_id}")
logger.exception("Moderation Output error, app_id: %s", app_id)
return None

View File

@ -10,6 +10,7 @@ from sqlalchemy.orm import Session, sessionmaker
from core.ops.aliyun_trace.data_exporter.traceclient import (
TraceClient,
convert_datetime_to_nanoseconds,
convert_string_to_id,
convert_to_span_id,
convert_to_trace_id,
generate_span_id,
@ -97,12 +98,13 @@ class AliyunDataTrace(BaseTraceInstance):
try:
return self.trace_client.get_project_url()
except Exception as e:
logger.info(f"Aliyun get run url failed: {str(e)}", exc_info=True)
logger.info("Aliyun get run url failed: %s", str(e), exc_info=True)
raise ValueError(f"Aliyun get run url failed: {str(e)}")
def workflow_trace(self, trace_info: WorkflowTraceInfo):
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or convert_to_trace_id(trace_info.workflow_run_id)
trace_id = convert_to_trace_id(trace_info.workflow_run_id)
if trace_info.trace_id:
trace_id = convert_string_to_id(trace_info.trace_id)
workflow_span_id = convert_to_span_id(trace_info.workflow_run_id, "workflow")
self.add_workflow_span(trace_id, workflow_span_id, trace_info)
@ -130,6 +132,9 @@ class AliyunDataTrace(BaseTraceInstance):
status = Status(StatusCode.ERROR, trace_info.error)
trace_id = convert_to_trace_id(message_id)
if trace_info.trace_id:
trace_id = convert_string_to_id(trace_info.trace_id)
message_span_id = convert_to_span_id(message_id, "message")
message_span = SpanData(
trace_id=trace_id,
@ -139,7 +144,7 @@ class AliyunDataTrace(BaseTraceInstance):
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
attributes={
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""),
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "",
GEN_AI_USER_ID: str(user_id),
GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value,
GEN_AI_FRAMEWORK: "dify",
@ -161,12 +166,12 @@ class AliyunDataTrace(BaseTraceInstance):
start_time=convert_datetime_to_nanoseconds(trace_info.start_time),
end_time=convert_datetime_to_nanoseconds(trace_info.end_time),
attributes={
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""),
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "",
GEN_AI_USER_ID: str(user_id),
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
GEN_AI_FRAMEWORK: "dify",
GEN_AI_MODEL_NAME: trace_info.metadata.get("ls_model_name", ""),
GEN_AI_SYSTEM: trace_info.metadata.get("ls_provider", ""),
GEN_AI_MODEL_NAME: trace_info.metadata.get("ls_model_name") or "",
GEN_AI_SYSTEM: trace_info.metadata.get("ls_provider") or "",
GEN_AI_USAGE_INPUT_TOKENS: str(trace_info.message_tokens),
GEN_AI_USAGE_OUTPUT_TOKENS: str(trace_info.answer_tokens),
GEN_AI_USAGE_TOTAL_TOKENS: str(trace_info.total_tokens),
@ -186,9 +191,13 @@ class AliyunDataTrace(BaseTraceInstance):
return
message_id = trace_info.message_id
trace_id = convert_to_trace_id(message_id)
if trace_info.trace_id:
trace_id = convert_string_to_id(trace_info.trace_id)
documents_data = extract_retrieval_documents(trace_info.documents)
dataset_retrieval_span = SpanData(
trace_id=convert_to_trace_id(message_id),
trace_id=trace_id,
parent_span_id=convert_to_span_id(message_id, "message"),
span_id=generate_span_id(),
name="dataset_retrieval",
@ -214,8 +223,12 @@ class AliyunDataTrace(BaseTraceInstance):
if trace_info.error:
status = Status(StatusCode.ERROR, trace_info.error)
trace_id = convert_to_trace_id(message_id)
if trace_info.trace_id:
trace_id = convert_string_to_id(trace_info.trace_id)
tool_span = SpanData(
trace_id=convert_to_trace_id(message_id),
trace_id=trace_id,
parent_span_id=convert_to_span_id(message_id, "message"),
span_id=generate_span_id(),
name=trace_info.tool_name,
@ -286,7 +299,7 @@ class AliyunDataTrace(BaseTraceInstance):
node_span = self.build_workflow_task_span(trace_id, workflow_span_id, trace_info, node_execution)
return node_span
except Exception as e:
logging.debug(f"Error occurred in build_workflow_node_span: {e}", exc_info=True)
logging.debug("Error occurred in build_workflow_node_span: %s", e, exc_info=True)
return None
def get_workflow_node_status(self, node_execution: WorkflowNodeExecution) -> Status:
@ -386,14 +399,14 @@ class AliyunDataTrace(BaseTraceInstance):
GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "",
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
GEN_AI_FRAMEWORK: "dify",
GEN_AI_MODEL_NAME: process_data.get("model_name", ""),
GEN_AI_SYSTEM: process_data.get("model_provider", ""),
GEN_AI_MODEL_NAME: process_data.get("model_name") or "",
GEN_AI_SYSTEM: process_data.get("model_provider") or "",
GEN_AI_USAGE_INPUT_TOKENS: str(usage_data.get("prompt_tokens", 0)),
GEN_AI_USAGE_OUTPUT_TOKENS: str(usage_data.get("completion_tokens", 0)),
GEN_AI_USAGE_TOTAL_TOKENS: str(usage_data.get("total_tokens", 0)),
GEN_AI_PROMPT: json.dumps(process_data.get("prompts", []), ensure_ascii=False),
GEN_AI_COMPLETION: str(outputs.get("text", "")),
GEN_AI_RESPONSE_FINISH_REASON: outputs.get("finish_reason", ""),
GEN_AI_RESPONSE_FINISH_REASON: outputs.get("finish_reason") or "",
INPUT_VALUE: json.dumps(process_data.get("prompts", []), ensure_ascii=False),
OUTPUT_VALUE: str(outputs.get("text", "")),
},
@ -421,7 +434,7 @@ class AliyunDataTrace(BaseTraceInstance):
GEN_AI_USER_ID: str(user_id),
GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value,
GEN_AI_FRAMEWORK: "dify",
INPUT_VALUE: trace_info.workflow_run_inputs.get("sys.query", ""),
INPUT_VALUE: trace_info.workflow_run_inputs.get("sys.query") or "",
OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False),
},
status=status,
@ -451,8 +464,13 @@ class AliyunDataTrace(BaseTraceInstance):
status: Status = Status(StatusCode.OK)
if trace_info.error:
status = Status(StatusCode.ERROR, trace_info.error)
trace_id = convert_to_trace_id(message_id)
if trace_info.trace_id:
trace_id = convert_string_to_id(trace_info.trace_id)
suggested_question_span = SpanData(
trace_id=convert_to_trace_id(message_id),
trace_id=trace_id,
parent_span_id=convert_to_span_id(message_id, "message"),
span_id=convert_to_span_id(message_id, "suggested_question"),
name="suggested_question",
@ -461,8 +479,8 @@ class AliyunDataTrace(BaseTraceInstance):
attributes={
GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value,
GEN_AI_FRAMEWORK: "dify",
GEN_AI_MODEL_NAME: trace_info.metadata.get("ls_model_name", ""),
GEN_AI_SYSTEM: trace_info.metadata.get("ls_provider", ""),
GEN_AI_MODEL_NAME: trace_info.metadata.get("ls_model_name") or "",
GEN_AI_SYSTEM: trace_info.metadata.get("ls_provider") or "",
GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False),
GEN_AI_COMPLETION: json.dumps(trace_info.suggested_question, ensure_ascii=False),
INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False),

View File

@ -69,10 +69,10 @@ class TraceClient:
if response.status_code == 405:
return True
else:
logger.debug(f"AliyunTrace API check failed: Unexpected status code: {response.status_code}")
logger.debug("AliyunTrace API check failed: Unexpected status code: %s", response.status_code)
return False
except requests.exceptions.RequestException as e:
logger.debug(f"AliyunTrace API check failed: {str(e)}")
logger.debug("AliyunTrace API check failed: %s", str(e))
raise ValueError(f"AliyunTrace API check failed: {str(e)}")
def get_project_url(self):
@ -109,7 +109,7 @@ class TraceClient:
try:
self.exporter.export(spans_to_export)
except Exception as e:
logger.debug(f"Error exporting spans: {e}")
logger.debug("Error exporting spans: %s", e)
def shutdown(self):
with self.condition:
@ -181,15 +181,21 @@ def convert_to_trace_id(uuid_v4: Optional[str]) -> int:
raise ValueError(f"Invalid UUID input: {e}")
def convert_string_to_id(string: Optional[str]) -> int:
if not string:
return generate_span_id()
hash_bytes = hashlib.sha256(string.encode("utf-8")).digest()
id = int.from_bytes(hash_bytes[:8], byteorder="big", signed=False)
return id
def convert_to_span_id(uuid_v4: Optional[str], span_type: str) -> int:
try:
uuid_obj = uuid.UUID(uuid_v4)
except Exception as e:
raise ValueError(f"Invalid UUID input: {e}")
combined_key = f"{uuid_obj.hex}-{span_type}"
hash_bytes = hashlib.sha256(combined_key.encode("utf-8")).digest()
span_id = int.from_bytes(hash_bytes[:8], byteorder="big", signed=False)
return span_id
return convert_string_to_id(combined_key)
def convert_datetime_to_nanoseconds(start_time_a: Optional[datetime]) -> Optional[int]:

View File

@ -77,10 +77,10 @@ def setup_tracer(arize_phoenix_config: ArizeConfig | PhoenixConfig) -> tuple[tra
# Create a named tracer instead of setting the global provider
tracer_name = f"arize_phoenix_tracer_{arize_phoenix_config.project}"
logger.info(f"[Arize/Phoenix] Created tracer with name: {tracer_name}")
logger.info("[Arize/Phoenix] Created tracer with name: %s", tracer_name)
return cast(trace_sdk.Tracer, provider.get_tracer(tracer_name)), processor
except Exception as e:
logger.error(f"[Arize/Phoenix] Failed to setup the tracer: {str(e)}", exc_info=True)
logger.error("[Arize/Phoenix] Failed to setup the tracer: %s", str(e), exc_info=True)
raise
@ -91,16 +91,21 @@ def datetime_to_nanos(dt: Optional[datetime]) -> int:
return int(dt.timestamp() * 1_000_000_000)
def uuid_to_trace_id(string: Optional[str]) -> int:
"""Convert UUID string to a valid trace ID (16-byte integer)."""
def string_to_trace_id128(string: Optional[str]) -> int:
"""
Convert any input string into a stable 128-bit integer trace ID.
This uses SHA-256 hashing and takes the first 16 bytes (128 bits) of the digest.
It's suitable for generating consistent, unique identifiers from strings.
"""
if string is None:
string = ""
hash_object = hashlib.sha256(string.encode())
# Take the first 16 bytes (128 bits) of the hash
# Take the first 16 bytes (128 bits) of the hash digest
digest = hash_object.digest()[:16]
# Convert to integer (128 bits)
# Convert to a 128-bit integer
return int.from_bytes(digest, byteorder="big")
@ -120,7 +125,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001")
def trace(self, trace_info: BaseTraceInfo):
logger.info(f"[Arize/Phoenix] Trace: {trace_info}")
logger.info("[Arize/Phoenix] Trace: %s", trace_info)
try:
if isinstance(trace_info, WorkflowTraceInfo):
self.workflow_trace(trace_info)
@ -138,7 +143,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
self.generate_name_trace(trace_info)
except Exception as e:
logger.error(f"[Arize/Phoenix] Error in the trace: {str(e)}", exc_info=True)
logger.error("[Arize/Phoenix] Error in the trace: %s", str(e), exc_info=True)
raise
def workflow_trace(self, trace_info: WorkflowTraceInfo):
@ -153,8 +158,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
}
workflow_metadata.update(trace_info.metadata)
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or uuid_to_trace_id(trace_info.workflow_run_id)
trace_id = string_to_trace_id128(trace_info.trace_id or trace_info.workflow_run_id)
span_id = RandomIdGenerator().generate_span_id()
context = SpanContext(
trace_id=trace_id,
@ -310,7 +314,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
SpanAttributes.SESSION_ID: trace_info.message_data.conversation_id,
}
trace_id = uuid_to_trace_id(trace_info.message_id)
trace_id = string_to_trace_id128(trace_info.trace_id or trace_info.message_id)
message_span_id = RandomIdGenerator().generate_span_id()
span_context = SpanContext(
trace_id=trace_id,
@ -406,7 +410,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
}
metadata.update(trace_info.metadata)
trace_id = uuid_to_trace_id(trace_info.message_id)
trace_id = string_to_trace_id128(trace_info.message_id)
span_id = RandomIdGenerator().generate_span_id()
context = SpanContext(
trace_id=trace_id,
@ -468,7 +472,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
}
metadata.update(trace_info.metadata)
trace_id = uuid_to_trace_id(trace_info.message_id)
trace_id = string_to_trace_id128(trace_info.message_id)
span_id = RandomIdGenerator().generate_span_id()
context = SpanContext(
trace_id=trace_id,
@ -521,7 +525,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
}
metadata.update(trace_info.metadata)
trace_id = uuid_to_trace_id(trace_info.message_id)
trace_id = string_to_trace_id128(trace_info.message_id)
span_id = RandomIdGenerator().generate_span_id()
context = SpanContext(
trace_id=trace_id,
@ -568,9 +572,9 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
"tool_config": json.dumps(trace_info.tool_config, ensure_ascii=False),
}
trace_id = uuid_to_trace_id(trace_info.message_id)
trace_id = string_to_trace_id128(trace_info.message_id)
tool_span_id = RandomIdGenerator().generate_span_id()
logger.info(f"[Arize/Phoenix] Creating tool trace with trace_id: {trace_id}, span_id: {tool_span_id}")
logger.info("[Arize/Phoenix] Creating tool trace with trace_id: %s, span_id: %s", trace_id, tool_span_id)
# Create span context with the same trace_id as the parent
# todo: Create with the appropriate parent span context, so that the tool span is
@ -629,7 +633,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
}
metadata.update(trace_info.metadata)
trace_id = uuid_to_trace_id(trace_info.message_id)
trace_id = string_to_trace_id128(trace_info.message_id)
span_id = RandomIdGenerator().generate_span_id()
context = SpanContext(
trace_id=trace_id,
@ -673,7 +677,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
span.set_attribute("test", "true")
return True
except Exception as e:
logger.info(f"[Arize/Phoenix] API check failed: {str(e)}", exc_info=True)
logger.info("[Arize/Phoenix] API check failed: %s", str(e), exc_info=True)
raise ValueError(f"[Arize/Phoenix] API check failed: {str(e)}")
def get_project_url(self):
@ -683,7 +687,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
else:
return f"{self.arize_phoenix_config.endpoint}/projects/"
except Exception as e:
logger.info(f"[Arize/Phoenix] Get run url failed: {str(e)}", exc_info=True)
logger.info("[Arize/Phoenix] Get run url failed: %s", str(e), exc_info=True)
raise ValueError(f"[Arize/Phoenix] Get run url failed: {str(e)}")
def _get_workflow_nodes(self, workflow_run_id: str):

View File

@ -102,7 +102,7 @@ class LangfuseConfig(BaseTracingConfig):
@field_validator("host")
@classmethod
def host_validator(cls, v, info: ValidationInfo):
return cls.validate_endpoint_url(v, "https://api.langfuse.com")
return validate_url_with_path(v, "https://api.langfuse.com")
class LangSmithConfig(BaseTracingConfig):

View File

@ -14,6 +14,7 @@ class BaseTraceInfo(BaseModel):
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
metadata: dict[str, Any]
trace_id: Optional[str] = None
@field_validator("inputs", "outputs")
@classmethod

View File

@ -67,14 +67,13 @@ class LangFuseDataTrace(BaseTraceInstance):
self.generate_name_trace(trace_info)
def workflow_trace(self, trace_info: WorkflowTraceInfo):
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or trace_info.workflow_run_id
trace_id = trace_info.trace_id or trace_info.workflow_run_id
user_id = trace_info.metadata.get("user_id")
metadata = trace_info.metadata
metadata["workflow_app_log_id"] = trace_info.workflow_app_log_id
if trace_info.message_id:
trace_id = external_trace_id or trace_info.message_id
trace_id = trace_info.trace_id or trace_info.message_id
name = TraceTaskName.MESSAGE_TRACE.value
trace_data = LangfuseTrace(
id=trace_id,
@ -250,8 +249,10 @@ class LangFuseDataTrace(BaseTraceInstance):
user_id = end_user_data.session_id
metadata["user_id"] = user_id
trace_id = trace_info.trace_id or message_id
trace_data = LangfuseTrace(
id=message_id,
id=trace_id,
user_id=user_id,
name=TraceTaskName.MESSAGE_TRACE.value,
input={
@ -285,7 +286,7 @@ class LangFuseDataTrace(BaseTraceInstance):
langfuse_generation_data = LangfuseGeneration(
name="llm",
trace_id=message_id,
trace_id=trace_id,
start_time=trace_info.start_time,
end_time=trace_info.end_time,
model=message_data.model_id,
@ -311,7 +312,7 @@ class LangFuseDataTrace(BaseTraceInstance):
"preset_response": trace_info.preset_response,
"inputs": trace_info.inputs,
},
trace_id=trace_info.message_id,
trace_id=trace_info.trace_id or trace_info.message_id,
start_time=trace_info.start_time or trace_info.message_data.created_at,
end_time=trace_info.end_time or trace_info.message_data.created_at,
metadata=trace_info.metadata,
@ -334,7 +335,7 @@ class LangFuseDataTrace(BaseTraceInstance):
name=TraceTaskName.SUGGESTED_QUESTION_TRACE.value,
input=trace_info.inputs,
output=str(trace_info.suggested_question),
trace_id=trace_info.message_id,
trace_id=trace_info.trace_id or trace_info.message_id,
start_time=trace_info.start_time,
end_time=trace_info.end_time,
metadata=trace_info.metadata,
@ -352,7 +353,7 @@ class LangFuseDataTrace(BaseTraceInstance):
name=TraceTaskName.DATASET_RETRIEVAL_TRACE.value,
input=trace_info.inputs,
output={"documents": trace_info.documents},
trace_id=trace_info.message_id,
trace_id=trace_info.trace_id or trace_info.message_id,
start_time=trace_info.start_time or trace_info.message_data.created_at,
end_time=trace_info.end_time or trace_info.message_data.updated_at,
metadata=trace_info.metadata,
@ -365,7 +366,7 @@ class LangFuseDataTrace(BaseTraceInstance):
name=trace_info.tool_name,
input=trace_info.tool_inputs,
output=trace_info.tool_outputs,
trace_id=trace_info.message_id,
trace_id=trace_info.trace_id or trace_info.message_id,
start_time=trace_info.start_time,
end_time=trace_info.end_time,
metadata=trace_info.metadata,
@ -440,7 +441,7 @@ class LangFuseDataTrace(BaseTraceInstance):
try:
return self.langfuse_client.auth_check()
except Exception as e:
logger.debug(f"LangFuse API check failed: {str(e)}")
logger.debug("LangFuse API check failed: %s", str(e))
raise ValueError(f"LangFuse API check failed: {str(e)}")
def get_project_key(self):
@ -448,5 +449,5 @@ class LangFuseDataTrace(BaseTraceInstance):
projects = self.langfuse_client.client.projects.get()
return projects.data[0].id
except Exception as e:
logger.debug(f"LangFuse get project key failed: {str(e)}")
logger.debug("LangFuse get project key failed: %s", str(e))
raise ValueError(f"LangFuse get project key failed: {str(e)}")

View File

@ -65,8 +65,7 @@ class LangSmithDataTrace(BaseTraceInstance):
self.generate_name_trace(trace_info)
def workflow_trace(self, trace_info: WorkflowTraceInfo):
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or trace_info.message_id or trace_info.workflow_run_id
trace_id = trace_info.trace_id or trace_info.message_id or trace_info.workflow_run_id
if trace_info.start_time is None:
trace_info.start_time = datetime.now()
message_dotted_order = (
@ -290,7 +289,7 @@ class LangSmithDataTrace(BaseTraceInstance):
reference_example_id=None,
input_attachments={},
output_attachments={},
trace_id=None,
trace_id=trace_info.trace_id,
dotted_order=None,
parent_run_id=None,
)
@ -319,7 +318,7 @@ class LangSmithDataTrace(BaseTraceInstance):
reference_example_id=None,
input_attachments={},
output_attachments={},
trace_id=None,
trace_id=trace_info.trace_id,
dotted_order=None,
id=str(uuid.uuid4()),
)
@ -351,7 +350,7 @@ class LangSmithDataTrace(BaseTraceInstance):
reference_example_id=None,
input_attachments={},
output_attachments={},
trace_id=None,
trace_id=trace_info.trace_id,
dotted_order=None,
error="",
file_list=[],
@ -381,7 +380,7 @@ class LangSmithDataTrace(BaseTraceInstance):
reference_example_id=None,
input_attachments={},
output_attachments={},
trace_id=None,
trace_id=trace_info.trace_id,
dotted_order=None,
error="",
file_list=[],
@ -410,7 +409,7 @@ class LangSmithDataTrace(BaseTraceInstance):
reference_example_id=None,
input_attachments={},
output_attachments={},
trace_id=None,
trace_id=trace_info.trace_id,
dotted_order=None,
error="",
file_list=[],
@ -440,7 +439,7 @@ class LangSmithDataTrace(BaseTraceInstance):
reference_example_id=None,
input_attachments={},
output_attachments={},
trace_id=None,
trace_id=trace_info.trace_id,
dotted_order=None,
error=trace_info.error or "",
)
@ -465,7 +464,7 @@ class LangSmithDataTrace(BaseTraceInstance):
reference_example_id=None,
input_attachments={},
output_attachments={},
trace_id=None,
trace_id=trace_info.trace_id,
dotted_order=None,
error="",
file_list=[],
@ -504,7 +503,7 @@ class LangSmithDataTrace(BaseTraceInstance):
self.langsmith_client.delete_project(project_name=random_project_name)
return True
except Exception as e:
logger.debug(f"LangSmith API check failed: {str(e)}")
logger.debug("LangSmith API check failed: %s", str(e))
raise ValueError(f"LangSmith API check failed: {str(e)}")
def get_project_url(self):
@ -523,5 +522,5 @@ class LangSmithDataTrace(BaseTraceInstance):
)
return project_url.split("/r/")[0]
except Exception as e:
logger.debug(f"LangSmith get run url failed: {str(e)}")
logger.debug("LangSmith get run url failed: %s", str(e))
raise ValueError(f"LangSmith get run url failed: {str(e)}")

View File

@ -96,8 +96,7 @@ class OpikDataTrace(BaseTraceInstance):
self.generate_name_trace(trace_info)
def workflow_trace(self, trace_info: WorkflowTraceInfo):
external_trace_id = trace_info.metadata.get("external_trace_id")
dify_trace_id = external_trace_id or trace_info.workflow_run_id
dify_trace_id = trace_info.trace_id or trace_info.workflow_run_id
opik_trace_id = prepare_opik_uuid(trace_info.start_time, dify_trace_id)
workflow_metadata = wrap_metadata(
trace_info.metadata, message_id=trace_info.message_id, workflow_app_log_id=trace_info.workflow_app_log_id
@ -105,7 +104,7 @@ class OpikDataTrace(BaseTraceInstance):
root_span_id = None
if trace_info.message_id:
dify_trace_id = external_trace_id or trace_info.message_id
dify_trace_id = trace_info.trace_id or trace_info.message_id
opik_trace_id = prepare_opik_uuid(trace_info.start_time, dify_trace_id)
trace_data = {
@ -276,7 +275,7 @@ class OpikDataTrace(BaseTraceInstance):
return
metadata = trace_info.metadata
message_id = trace_info.message_id
dify_trace_id = trace_info.trace_id or trace_info.message_id
user_id = message_data.from_account_id
metadata["user_id"] = user_id
@ -291,7 +290,7 @@ class OpikDataTrace(BaseTraceInstance):
metadata["end_user_id"] = end_user_id
trace_data = {
"id": prepare_opik_uuid(trace_info.start_time, message_id),
"id": prepare_opik_uuid(trace_info.start_time, dify_trace_id),
"name": TraceTaskName.MESSAGE_TRACE.value,
"start_time": trace_info.start_time,
"end_time": trace_info.end_time,
@ -330,7 +329,7 @@ class OpikDataTrace(BaseTraceInstance):
start_time = trace_info.start_time or trace_info.message_data.created_at
span_data = {
"trace_id": prepare_opik_uuid(start_time, trace_info.message_id),
"trace_id": prepare_opik_uuid(start_time, trace_info.trace_id or trace_info.message_id),
"name": TraceTaskName.MODERATION_TRACE.value,
"type": "tool",
"start_time": start_time,
@ -356,7 +355,7 @@ class OpikDataTrace(BaseTraceInstance):
start_time = trace_info.start_time or message_data.created_at
span_data = {
"trace_id": prepare_opik_uuid(start_time, trace_info.message_id),
"trace_id": prepare_opik_uuid(start_time, trace_info.trace_id or trace_info.message_id),
"name": TraceTaskName.SUGGESTED_QUESTION_TRACE.value,
"type": "tool",
"start_time": start_time,
@ -376,7 +375,7 @@ class OpikDataTrace(BaseTraceInstance):
start_time = trace_info.start_time or trace_info.message_data.created_at
span_data = {
"trace_id": prepare_opik_uuid(start_time, trace_info.message_id),
"trace_id": prepare_opik_uuid(start_time, trace_info.trace_id or trace_info.message_id),
"name": TraceTaskName.DATASET_RETRIEVAL_TRACE.value,
"type": "tool",
"start_time": start_time,
@ -391,7 +390,7 @@ class OpikDataTrace(BaseTraceInstance):
def tool_trace(self, trace_info: ToolTraceInfo):
span_data = {
"trace_id": prepare_opik_uuid(trace_info.start_time, trace_info.message_id),
"trace_id": prepare_opik_uuid(trace_info.start_time, trace_info.trace_id or trace_info.message_id),
"name": trace_info.tool_name,
"type": "tool",
"start_time": trace_info.start_time,
@ -406,7 +405,7 @@ class OpikDataTrace(BaseTraceInstance):
def generate_name_trace(self, trace_info: GenerateNameTraceInfo):
trace_data = {
"id": prepare_opik_uuid(trace_info.start_time, trace_info.message_id),
"id": prepare_opik_uuid(trace_info.start_time, trace_info.trace_id or trace_info.message_id),
"name": TraceTaskName.GENERATE_NAME_TRACE.value,
"start_time": trace_info.start_time,
"end_time": trace_info.end_time,
@ -453,12 +452,12 @@ class OpikDataTrace(BaseTraceInstance):
self.opik_client.auth_check()
return True
except Exception as e:
logger.info(f"Opik API check failed: {str(e)}", exc_info=True)
logger.info("Opik API check failed: %s", str(e), exc_info=True)
raise ValueError(f"Opik API check failed: {str(e)}")
def get_project_url(self):
try:
return self.opik_client.get_project_url(project_name=self.project)
except Exception as e:
logger.info(f"Opik get run url failed: {str(e)}", exc_info=True)
logger.info("Opik get run url failed: %s", str(e), exc_info=True)
raise ValueError(f"Opik get run url failed: {str(e)}")

View File

@ -287,7 +287,7 @@ class OpsTraceManager:
# create new tracing_instance and update the cache if it absent
tracing_instance = trace_instance(config_class(**decrypt_trace_config))
cls.ops_trace_instances_cache[decrypt_trace_config_key] = tracing_instance
logging.info(f"new tracing_instance for app_id: {app_id}")
logging.info("new tracing_instance for app_id: %s", app_id)
return tracing_instance
@classmethod
@ -407,6 +407,7 @@ class TraceTask:
def __init__(
self,
trace_type: Any,
trace_id: Optional[str] = None,
message_id: Optional[str] = None,
workflow_execution: Optional[WorkflowExecution] = None,
conversation_id: Optional[str] = None,
@ -424,6 +425,9 @@ class TraceTask:
self.app_id = None
self.kwargs = kwargs
external_trace_id = kwargs.get("external_trace_id")
if external_trace_id:
self.trace_id = external_trace_id
def execute(self):
return self.preprocess()
@ -520,11 +524,8 @@ class TraceTask:
"app_id": workflow_run.app_id,
}
external_trace_id = self.kwargs.get("external_trace_id")
if external_trace_id:
metadata["external_trace_id"] = external_trace_id
workflow_trace_info = WorkflowTraceInfo(
trace_id=self.trace_id,
workflow_data=workflow_run.to_dict(),
conversation_id=conversation_id,
workflow_id=workflow_id,
@ -584,6 +585,7 @@ class TraceTask:
message_tokens = message_data.message_tokens
message_trace_info = MessageTraceInfo(
trace_id=self.trace_id,
message_id=message_id,
message_data=message_data.to_dict(),
conversation_model=conversation_mode,
@ -627,6 +629,7 @@ class TraceTask:
workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
moderation_trace_info = ModerationTraceInfo(
trace_id=self.trace_id,
message_id=workflow_app_log_id or message_id,
inputs=inputs,
message_data=message_data.to_dict(),
@ -667,6 +670,7 @@ class TraceTask:
workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None
suggested_question_trace_info = SuggestedQuestionTraceInfo(
trace_id=self.trace_id,
message_id=workflow_app_log_id or message_id,
message_data=message_data.to_dict(),
inputs=message_data.message,
@ -708,6 +712,7 @@ class TraceTask:
}
dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
trace_id=self.trace_id,
message_id=message_id,
inputs=message_data.query or message_data.inputs,
documents=[doc.model_dump() for doc in documents] if documents else [],
@ -772,6 +777,7 @@ class TraceTask:
)
tool_trace_info = ToolTraceInfo(
trace_id=self.trace_id,
message_id=message_id,
message_data=message_data.to_dict(),
tool_name=tool_name,
@ -807,6 +813,7 @@ class TraceTask:
}
generate_name_trace_info = GenerateNameTraceInfo(
trace_id=self.trace_id,
conversation_id=conversation_id,
inputs=inputs,
outputs=generate_conversation_name,
@ -843,7 +850,7 @@ class TraceQueueManager:
trace_task.app_id = self.app_id
trace_manager_queue.put(trace_task)
except Exception as e:
logging.exception(f"Error adding trace task, trace_type {trace_task.trace_type}")
logging.exception("Error adding trace task, trace_type %s", trace_task.trace_type)
finally:
self.start_timer()

View File

@ -67,7 +67,13 @@ def generate_dotted_order(
def validate_url(url: str, default_url: str, allowed_schemes: tuple = ("https", "http")) -> str:
"""
Validate and normalize URL with proper error handling
Validate and normalize URL with proper error handling.
NOTE: This function does not retain the `path` component of the provided URL.
In most cases, it is recommended to use `validate_url_with_path` instead.
This function is deprecated and retained only for compatibility purposes.
New implementations should use `validate_url_with_path`.
Args:
url: The URL to validate

View File

@ -66,11 +66,11 @@ class WeaveDataTrace(BaseTraceInstance):
project_url = f"https://wandb.ai/{self.weave_client._project_id()}"
return project_url
except Exception as e:
logger.debug(f"Weave get run url failed: {str(e)}")
logger.debug("Weave get run url failed: %s", str(e))
raise ValueError(f"Weave get run url failed: {str(e)}")
def trace(self, trace_info: BaseTraceInfo):
logger.debug(f"Trace info: {trace_info}")
logger.debug("Trace info: %s", trace_info)
if isinstance(trace_info, WorkflowTraceInfo):
self.workflow_trace(trace_info)
if isinstance(trace_info, MessageTraceInfo):
@ -87,8 +87,7 @@ class WeaveDataTrace(BaseTraceInstance):
self.generate_name_trace(trace_info)
def workflow_trace(self, trace_info: WorkflowTraceInfo):
external_trace_id = trace_info.metadata.get("external_trace_id")
trace_id = external_trace_id or trace_info.message_id or trace_info.workflow_run_id
trace_id = trace_info.trace_id or trace_info.message_id or trace_info.workflow_run_id
if trace_info.start_time is None:
trace_info.start_time = datetime.now()
@ -245,8 +244,12 @@ class WeaveDataTrace(BaseTraceInstance):
attributes["start_time"] = trace_info.start_time
attributes["end_time"] = trace_info.end_time
attributes["tags"] = ["message", str(trace_info.conversation_mode)]
trace_id = trace_info.trace_id or message_id
attributes["trace_id"] = trace_id
message_run = WeaveTraceModel(
id=message_id,
id=trace_id,
op=str(TraceTaskName.MESSAGE_TRACE.value),
input_tokens=trace_info.message_tokens,
output_tokens=trace_info.answer_tokens,
@ -274,7 +277,7 @@ class WeaveDataTrace(BaseTraceInstance):
)
self.start_call(
llm_run,
parent_run_id=message_id,
parent_run_id=trace_id,
)
self.finish_call(llm_run)
self.finish_call(message_run)
@ -289,6 +292,9 @@ class WeaveDataTrace(BaseTraceInstance):
attributes["start_time"] = trace_info.start_time or trace_info.message_data.created_at
attributes["end_time"] = trace_info.end_time or trace_info.message_data.updated_at
trace_id = trace_info.trace_id or trace_info.message_id
attributes["trace_id"] = trace_id
moderation_run = WeaveTraceModel(
id=str(uuid.uuid4()),
op=str(TraceTaskName.MODERATION_TRACE.value),
@ -303,7 +309,7 @@ class WeaveDataTrace(BaseTraceInstance):
exception=getattr(trace_info, "error", None),
file_list=[],
)
self.start_call(moderation_run, parent_run_id=trace_info.message_id)
self.start_call(moderation_run, parent_run_id=trace_id)
self.finish_call(moderation_run)
def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo):
@ -316,6 +322,9 @@ class WeaveDataTrace(BaseTraceInstance):
attributes["start_time"] = (trace_info.start_time or message_data.created_at,)
attributes["end_time"] = (trace_info.end_time or message_data.updated_at,)
trace_id = trace_info.trace_id or trace_info.message_id
attributes["trace_id"] = trace_id
suggested_question_run = WeaveTraceModel(
id=str(uuid.uuid4()),
op=str(TraceTaskName.SUGGESTED_QUESTION_TRACE.value),
@ -326,7 +335,7 @@ class WeaveDataTrace(BaseTraceInstance):
file_list=[],
)
self.start_call(suggested_question_run, parent_run_id=trace_info.message_id)
self.start_call(suggested_question_run, parent_run_id=trace_id)
self.finish_call(suggested_question_run)
def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo):
@ -338,6 +347,9 @@ class WeaveDataTrace(BaseTraceInstance):
attributes["start_time"] = (trace_info.start_time or trace_info.message_data.created_at,)
attributes["end_time"] = (trace_info.end_time or trace_info.message_data.updated_at,)
trace_id = trace_info.trace_id or trace_info.message_id
attributes["trace_id"] = trace_id
dataset_retrieval_run = WeaveTraceModel(
id=str(uuid.uuid4()),
op=str(TraceTaskName.DATASET_RETRIEVAL_TRACE.value),
@ -348,7 +360,7 @@ class WeaveDataTrace(BaseTraceInstance):
file_list=[],
)
self.start_call(dataset_retrieval_run, parent_run_id=trace_info.message_id)
self.start_call(dataset_retrieval_run, parent_run_id=trace_id)
self.finish_call(dataset_retrieval_run)
def tool_trace(self, trace_info: ToolTraceInfo):
@ -357,6 +369,11 @@ class WeaveDataTrace(BaseTraceInstance):
attributes["start_time"] = trace_info.start_time
attributes["end_time"] = trace_info.end_time
message_id = trace_info.message_id or getattr(trace_info, "conversation_id", None)
message_id = message_id or None
trace_id = trace_info.trace_id or message_id
attributes["trace_id"] = trace_id
tool_run = WeaveTraceModel(
id=str(uuid.uuid4()),
op=trace_info.tool_name,
@ -366,9 +383,7 @@ class WeaveDataTrace(BaseTraceInstance):
attributes=attributes,
exception=trace_info.error,
)
message_id = trace_info.message_id or getattr(trace_info, "conversation_id", None)
message_id = message_id or None
self.start_call(tool_run, parent_run_id=message_id)
self.start_call(tool_run, parent_run_id=trace_id)
self.finish_call(tool_run)
def generate_name_trace(self, trace_info: GenerateNameTraceInfo):
@ -403,7 +418,7 @@ class WeaveDataTrace(BaseTraceInstance):
print("Weave login successful")
return True
except Exception as e:
logger.debug(f"Weave API check failed: {str(e)}")
logger.debug("Weave API check failed: %s", str(e))
raise ValueError(f"Weave API check failed: {str(e)}")
def start_call(self, run_data: WeaveTraceModel, parent_run_id: Optional[str] = None):

View File

@ -1,3 +1,8 @@
from collections.abc import Mapping
from pydantic import TypeAdapter
class PluginDaemonError(Exception):
"""Base class for all plugin daemon errors."""
@ -36,6 +41,21 @@ class PluginDaemonBadRequestError(PluginDaemonClientSideError):
class PluginInvokeError(PluginDaemonClientSideError):
description: str = "Invoke Error"
def _get_error_object(self) -> Mapping:
try:
return TypeAdapter(Mapping).validate_json(self.description)
except Exception:
return {}
def get_error_type(self) -> str:
return self._get_error_object().get("error_type", "unknown")
def get_error_message(self) -> str:
try:
return self._get_error_object().get("message", "unknown")
except Exception:
return self.description
class PluginUniqueIdentifierError(PluginDaemonClientSideError):
description: str = "Unique Identifier Error"

View File

@ -24,7 +24,7 @@ class Jieba(BaseKeyword):
self._config = KeywordTableConfig()
def create(self, texts: list[Document], **kwargs) -> BaseKeyword:
lock_name = "keyword_indexing_lock_{}".format(self.dataset.id)
lock_name = f"keyword_indexing_lock_{self.dataset.id}"
with redis_client.lock(lock_name, timeout=600):
keyword_table_handler = JiebaKeywordTableHandler()
keyword_table = self._get_dataset_keyword_table()
@ -43,7 +43,7 @@ class Jieba(BaseKeyword):
return self
def add_texts(self, texts: list[Document], **kwargs):
lock_name = "keyword_indexing_lock_{}".format(self.dataset.id)
lock_name = f"keyword_indexing_lock_{self.dataset.id}"
with redis_client.lock(lock_name, timeout=600):
keyword_table_handler = JiebaKeywordTableHandler()
@ -76,7 +76,7 @@ class Jieba(BaseKeyword):
return id in set.union(*keyword_table.values())
def delete_by_ids(self, ids: list[str]) -> None:
lock_name = "keyword_indexing_lock_{}".format(self.dataset.id)
lock_name = f"keyword_indexing_lock_{self.dataset.id}"
with redis_client.lock(lock_name, timeout=600):
keyword_table = self._get_dataset_keyword_table()
if keyword_table is not None:
@ -116,7 +116,7 @@ class Jieba(BaseKeyword):
return documents
def delete(self) -> None:
lock_name = "keyword_indexing_lock_{}".format(self.dataset.id)
lock_name = f"keyword_indexing_lock_{self.dataset.id}"
with redis_client.lock(lock_name, timeout=600):
dataset_keyword_table = self.dataset.dataset_keyword_table
if dataset_keyword_table:

View File

@ -203,9 +203,9 @@ class BaiduVector(BaseVector):
def _create_table(self, dimension: int) -> None:
# Try to grab distributed lock and create table
lock_name = "vector_indexing_lock_{}".format(self._collection_name)
lock_name = f"vector_indexing_lock_{self._collection_name}"
with redis_client.lock(lock_name, timeout=60):
table_exist_cache_key = "vector_indexing_{}".format(self._collection_name)
table_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(table_exist_cache_key):
return

View File

@ -57,9 +57,9 @@ class ChromaVector(BaseVector):
self.add_texts(texts, embeddings, **kwargs)
def create_collection(self, collection_name: str):
lock_name = "vector_indexing_lock_{}".format(collection_name)
lock_name = f"vector_indexing_lock_{collection_name}"
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = "vector_indexing_{}".format(self._collection_name)
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
return
self._client.get_or_create_collection(collection_name)

View File

@ -74,9 +74,9 @@ class CouchbaseVector(BaseVector):
self.add_texts(texts, embeddings)
def _create_collection(self, vector_length: int, uuid: str):
lock_name = "vector_indexing_lock_{}".format(self._collection_name)
lock_name = f"vector_indexing_lock_{self._collection_name}"
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = "vector_indexing_{}".format(self._collection_name)
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
return
if self._collection_exists(self._collection_name):
@ -242,7 +242,7 @@ class CouchbaseVector(BaseVector):
try:
self._cluster.query(query, named_parameters={"doc_ids": ids}).execute()
except Exception as e:
logger.exception(f"Failed to delete documents, ids: {ids}")
logger.exception("Failed to delete documents, ids: %s", ids)
def delete_by_document_id(self, document_id: str):
query = f"""

View File

@ -29,7 +29,7 @@ class ElasticSearchJaVector(ElasticSearchVector):
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
logger.info(f"Collection {self._collection_name} already exists.")
logger.info("Collection %s already exists.", self._collection_name)
return
if not self._client.indices.exists(index=self._collection_name):

View File

@ -186,7 +186,7 @@ class ElasticSearchVector(BaseVector):
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
logger.info(f"Collection {self._collection_name} already exists.")
logger.info("Collection %s already exists.", self._collection_name)
return
if not self._client.indices.exists(index=self._collection_name):

View File

@ -164,7 +164,7 @@ class HuaweiCloudVector(BaseVector):
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
logger.info(f"Collection {self._collection_name} already exists.")
logger.info("Collection %s already exists.", self._collection_name)
return
if not self._client.indices.exists(index=self._collection_name):

View File

@ -89,7 +89,7 @@ class LindormVectorStore(BaseVector):
timeout: int = 60,
**kwargs,
):
logger.info(f"Total documents to add: {len(documents)}")
logger.info("Total documents to add: %s", len(documents))
uuids = self._get_uuids(documents)
total_docs = len(documents)
@ -147,7 +147,7 @@ class LindormVectorStore(BaseVector):
time.sleep(0.5)
except Exception:
logger.exception(f"Failed to process batch {batch_num + 1}")
logger.exception("Failed to process batch %s", batch_num + 1)
raise
def get_ids_by_metadata_field(self, key: str, value: str):
@ -180,7 +180,7 @@ class LindormVectorStore(BaseVector):
# 1. First check if collection exists
if not self._client.indices.exists(index=self._collection_name):
logger.warning(f"Collection {self._collection_name} does not exist")
logger.warning("Collection %s does not exist", self._collection_name)
return
# 2. Batch process deletions
@ -196,7 +196,7 @@ class LindormVectorStore(BaseVector):
}
)
else:
logger.warning(f"DELETE BY ID: ID {id} does not exist in the index.")
logger.warning("DELETE BY ID: ID %s does not exist in the index.", id)
# 3. Perform bulk deletion if there are valid documents to delete
if actions:
@ -209,9 +209,9 @@ class LindormVectorStore(BaseVector):
doc_id = delete_error.get("_id")
if status == 404:
logger.warning(f"Document not found for deletion: {doc_id}")
logger.warning("Document not found for deletion: %s", doc_id)
else:
logger.exception(f"Error deleting document: {error}")
logger.exception("Error deleting document: %s", error)
def delete(self) -> None:
if self._using_ugc:
@ -225,7 +225,7 @@ class LindormVectorStore(BaseVector):
self._client.indices.delete(index=self._collection_name, params={"timeout": 60})
logger.info("Delete index success")
else:
logger.warning(f"Index '{self._collection_name}' does not exist. No deletion performed.")
logger.warning("Index '%s' does not exist. No deletion performed.", self._collection_name)
def text_exists(self, id: str) -> bool:
try:
@ -257,7 +257,7 @@ class LindormVectorStore(BaseVector):
params["routing"] = self._routing # type: ignore
response = self._client.search(index=self._collection_name, body=query, params=params)
except Exception:
logger.exception(f"Error executing vector search, query: {query}")
logger.exception("Error executing vector search, query: %s", query)
raise
docs_and_scores = []
@ -324,10 +324,10 @@ class LindormVectorStore(BaseVector):
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
logger.info(f"Collection {self._collection_name} already exists.")
logger.info("Collection %s already exists.", self._collection_name)
return
if self._client.indices.exists(index=self._collection_name):
logger.info(f"{self._collection_name.lower()} already exists.")
logger.info("%s already exists.", self._collection_name.lower())
redis_client.set(collection_exist_cache_key, 1, ex=3600)
return
if len(self.kwargs) == 0 and len(kwargs) != 0:

View File

@ -103,7 +103,7 @@ class MilvusVector(BaseVector):
# For standard Milvus installations, check version number
return version.parse(milvus_version).base_version >= version.parse("2.5.0").base_version
except Exception as e:
logger.warning(f"Failed to check Milvus version: {str(e)}. Disabling hybrid search.")
logger.warning("Failed to check Milvus version: %s. Disabling hybrid search.", str(e))
return False
def get_type(self) -> str:
@ -289,9 +289,9 @@ class MilvusVector(BaseVector):
"""
Create a new collection in Milvus with the specified schema and index parameters.
"""
lock_name = "vector_indexing_lock_{}".format(self._collection_name)
lock_name = f"vector_indexing_lock_{self._collection_name}"
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = "vector_indexing_{}".format(self._collection_name)
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
return
# Grab the existing collection if it exists

View File

@ -53,7 +53,7 @@ class MyScaleVector(BaseVector):
return self.add_texts(documents=texts, embeddings=embeddings, **kwargs)
def _create_collection(self, dimension: int):
logging.info(f"create MyScale collection {self._collection_name} with dimension {dimension}")
logging.info("create MyScale collection %s with dimension %s", self._collection_name, dimension)
self._client.command(f"CREATE DATABASE IF NOT EXISTS {self._config.database}")
fts_params = f"('{self._config.fts_params}')" if self._config.fts_params else ""
sql = f"""
@ -151,7 +151,7 @@ class MyScaleVector(BaseVector):
for r in self._client.query(sql).named_results()
]
except Exception as e:
logging.exception(f"\033[91m\033[1m{type(e)}\033[0m \033[95m{str(e)}\033[0m") # noqa:TRY401
logging.exception("\033[91m\033[1m%s\033[0m \033[95m%s\033[0m", type(e), str(e)) # noqa:TRY401
return []
def delete(self) -> None:

View File

@ -147,7 +147,7 @@ class OceanBaseVector(BaseVector):
logger.debug("Current OceanBase version is %s", ob_version)
return version.parse(ob_version).base_version >= version.parse("4.3.5.1").base_version
except Exception as e:
logger.warning(f"Failed to check OceanBase version: {str(e)}. Disabling hybrid search.")
logger.warning("Failed to check OceanBase version: %s. Disabling hybrid search.", str(e))
return False
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
@ -229,7 +229,7 @@ class OceanBaseVector(BaseVector):
return docs
except Exception as e:
logger.warning(f"Failed to fulltext search: {str(e)}.")
logger.warning("Failed to fulltext search: %s.", str(e))
return []
def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]:

View File

@ -131,7 +131,7 @@ class OpenSearchVector(BaseVector):
def delete_by_ids(self, ids: list[str]) -> None:
index_name = self._collection_name.lower()
if not self._client.indices.exists(index=index_name):
logger.warning(f"Index {index_name} does not exist")
logger.warning("Index %s does not exist", index_name)
return
# Obtaining All Actual Documents_ID
@ -142,7 +142,7 @@ class OpenSearchVector(BaseVector):
if es_ids:
actual_ids.extend(es_ids)
else:
logger.warning(f"Document with metadata doc_id {doc_id} not found for deletion")
logger.warning("Document with metadata doc_id %s not found for deletion", doc_id)
if actual_ids:
actions = [{"_op_type": "delete", "_index": index_name, "_id": es_id} for es_id in actual_ids]
@ -155,9 +155,9 @@ class OpenSearchVector(BaseVector):
doc_id = delete_error.get("_id")
if status == 404:
logger.warning(f"Document not found for deletion: {doc_id}")
logger.warning("Document not found for deletion: %s", doc_id)
else:
logger.exception(f"Error deleting document: {error}")
logger.exception("Error deleting document: %s", error)
def delete(self) -> None:
self._client.indices.delete(index=self._collection_name.lower())
@ -198,7 +198,7 @@ class OpenSearchVector(BaseVector):
try:
response = self._client.search(index=self._collection_name.lower(), body=query)
except Exception as e:
logger.exception(f"Error executing vector search, query: {query}")
logger.exception("Error executing vector search, query: %s", query)
raise
docs = []
@ -242,7 +242,7 @@ class OpenSearchVector(BaseVector):
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = f"vector_indexing_{self._collection_name.lower()}"
if redis_client.get(collection_exist_cache_key):
logger.info(f"Collection {self._collection_name.lower()} already exists.")
logger.info("Collection %s already exists.", self._collection_name.lower())
return
if not self._client.indices.exists(index=self._collection_name.lower()):
@ -272,7 +272,7 @@ class OpenSearchVector(BaseVector):
},
}
logger.info(f"Creating OpenSearch index {self._collection_name.lower()}")
logger.info("Creating OpenSearch index %s", self._collection_name.lower())
self._client.indices.create(index=self._collection_name.lower(), body=index_body)
redis_client.set(collection_exist_cache_key, 1, ex=3600)

View File

@ -82,9 +82,9 @@ class PGVectoRS(BaseVector):
self.add_texts(texts, embeddings)
def create_collection(self, dimension: int):
lock_name = "vector_indexing_lock_{}".format(self._collection_name)
lock_name = f"vector_indexing_lock_{self._collection_name}"
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = "vector_indexing_{}".format(self._collection_name)
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
return
index_name = f"{self._collection_name}_embedding_index"

View File

@ -155,7 +155,7 @@ class PGVector(BaseVector):
cur.execute(f"DELETE FROM {self.table_name} WHERE id IN %s", (tuple(ids),))
except psycopg2.errors.UndefinedTable:
# table not exists
logging.warning(f"Table {self.table_name} not found, skipping delete operation.")
logging.warning("Table %s not found, skipping delete operation.", self.table_name)
return
except Exception as e:
raise e

View File

@ -95,9 +95,9 @@ class QdrantVector(BaseVector):
self.add_texts(texts, embeddings, **kwargs)
def create_collection(self, collection_name: str, vector_size: int):
lock_name = "vector_indexing_lock_{}".format(collection_name)
lock_name = f"vector_indexing_lock_{collection_name}"
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = "vector_indexing_{}".format(self._collection_name)
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
return
collection_name = collection_name or uuid.uuid4().hex

View File

@ -70,9 +70,9 @@ class RelytVector(BaseVector):
self.add_texts(texts, embeddings)
def create_collection(self, dimension: int):
lock_name = "vector_indexing_lock_{}".format(self._collection_name)
lock_name = f"vector_indexing_lock_{self._collection_name}"
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = "vector_indexing_{}".format(self._collection_name)
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
return
index_name = f"{self._collection_name}_embedding_index"

View File

@ -142,7 +142,7 @@ class TableStoreVector(BaseVector):
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
logging.info(f"Collection {self._collection_name} already exists.")
logging.info("Collection %s already exists.", self._collection_name)
return
self._create_table_if_not_exist()

View File

@ -92,9 +92,9 @@ class TencentVector(BaseVector):
def _create_collection(self, dimension: int) -> None:
self._dimension = dimension
lock_name = "vector_indexing_lock_{}".format(self._collection_name)
lock_name = f"vector_indexing_lock_{self._collection_name}"
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = "vector_indexing_{}".format(self._collection_name)
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
return

View File

@ -104,9 +104,9 @@ class TidbOnQdrantVector(BaseVector):
self.add_texts(texts, embeddings, **kwargs)
def create_collection(self, collection_name: str, vector_size: int):
lock_name = "vector_indexing_lock_{}".format(collection_name)
lock_name = f"vector_indexing_lock_{collection_name}"
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = "vector_indexing_{}".format(self._collection_name)
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
return
collection_name = collection_name or uuid.uuid4().hex

View File

@ -91,9 +91,9 @@ class TiDBVector(BaseVector):
def _create_collection(self, dimension: int):
logger.info("_create_collection, collection_name " + self._collection_name)
lock_name = "vector_indexing_lock_{}".format(self._collection_name)
lock_name = f"vector_indexing_lock_{self._collection_name}"
with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = "vector_indexing_{}".format(self._collection_name)
collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key):
return
tidb_dist_func = self._get_distance_func()
@ -192,7 +192,7 @@ class TiDBVector(BaseVector):
query_vector_str = ", ".join(format(x) for x in query_vector)
query_vector_str = "[" + query_vector_str + "]"
logger.debug(
f"_collection_name: {self._collection_name}, score_threshold: {score_threshold}, distance: {distance}"
"_collection_name: %s, score_threshold: %s, distance: %s", self._collection_name, score_threshold, distance
)
docs = []

Some files were not shown because too many files have changed in this diff Show More