diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 8c0102d9bd..01c377956b 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -1,3 +1,4 @@ +import json import logging import re import time @@ -60,6 +61,7 @@ from core.app.task_pipeline.based_generate_task_pipeline import BasedGenerateTas from core.app.task_pipeline.message_cycle_manager import MessageCycleManager from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.model_runtime.entities.llm_entities import LLMUsage +from core.model_runtime.utils.encoders import jsonable_encoder from core.ops.ops_trace_manager import TraceQueueManager from core.workflow.enums import WorkflowExecutionStatus from core.workflow.nodes import NodeType @@ -391,6 +393,14 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): if should_direct_answer: return + current_time = time.perf_counter() + if self._task_state.first_token_time is None and delta_text.strip(): + self._task_state.first_token_time = current_time + self._task_state.is_streaming_response = True + + if delta_text.strip(): + self._task_state.last_token_time = current_time + # Only publish tts message at text chunk streaming if tts_publisher and queue_message: tts_publisher.publish(queue_message) @@ -772,7 +782,33 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): message.answer = answer_text message.updated_at = naive_utc_now() message.provider_response_latency = time.perf_counter() - self._base_task_pipeline.start_at - message.message_metadata = self._task_state.metadata.model_dump_json() + + # Set usage first before dumping metadata + if graph_runtime_state and graph_runtime_state.llm_usage: + usage = graph_runtime_state.llm_usage + message.message_tokens = usage.prompt_tokens + message.message_unit_price = usage.prompt_unit_price + message.message_price_unit = usage.prompt_price_unit + message.answer_tokens = usage.completion_tokens + message.answer_unit_price = usage.completion_unit_price + message.answer_price_unit = usage.completion_price_unit + message.total_price = usage.total_price + message.currency = usage.currency + self._task_state.metadata.usage = usage + else: + usage = LLMUsage.empty_usage() + self._task_state.metadata.usage = usage + + # Add streaming metrics to usage if available + if self._task_state.is_streaming_response and self._task_state.first_token_time: + start_time = self._base_task_pipeline.start_at + first_token_time = self._task_state.first_token_time + last_token_time = self._task_state.last_token_time or first_token_time + usage.time_to_first_token = round(first_token_time - start_time, 3) + usage.time_to_generate = round(last_token_time - first_token_time, 3) + + metadata = self._task_state.metadata.model_dump() + message.message_metadata = json.dumps(jsonable_encoder(metadata)) message_files = [ MessageFile( message_id=message.id, @@ -790,20 +826,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): ] session.add_all(message_files) - if graph_runtime_state and graph_runtime_state.llm_usage: - usage = graph_runtime_state.llm_usage - message.message_tokens = usage.prompt_tokens - message.message_unit_price = usage.prompt_unit_price - message.message_price_unit = usage.prompt_price_unit - message.answer_tokens = usage.completion_tokens - message.answer_unit_price = usage.completion_unit_price - message.answer_price_unit = usage.completion_price_unit - message.total_price = usage.total_price - message.currency = usage.currency - self._task_state.metadata.usage = usage - else: - self._task_state.metadata.usage = LLMUsage.empty_usage() - def _seed_graph_runtime_state_from_queue_manager(self) -> None: """Bootstrap the cached runtime state from the queue manager when present.""" candidate = self._base_task_pipeline.queue_manager.graph_runtime_state diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 72a92add04..79a5e657b3 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -48,6 +48,9 @@ class WorkflowTaskState(TaskState): """ answer: str = "" + first_token_time: float | None = None + last_token_time: float | None = None + is_streaming_response: bool = False class StreamEvent(StrEnum): diff --git a/api/core/helper/marketplace.py b/api/core/helper/marketplace.py index bddb864a95..b2286d39ed 100644 --- a/api/core/helper/marketplace.py +++ b/api/core/helper/marketplace.py @@ -29,6 +29,18 @@ def batch_fetch_plugin_manifests(plugin_ids: list[str]) -> Sequence[MarketplaceP return [MarketplacePluginDeclaration.model_validate(plugin) for plugin in response.json()["data"]["plugins"]] +def batch_fetch_plugin_by_ids(plugin_ids: list[str]) -> list[dict]: + if not plugin_ids: + return [] + + url = str(marketplace_api_url / "api/v1/plugins/batch") + response = httpx.post(url, json={"plugin_ids": plugin_ids}, headers={"X-Dify-Version": dify_config.project.version}) + response.raise_for_status() + + data = response.json() + return data.get("data", {}).get("plugins", []) + + def batch_fetch_plugin_manifests_ignore_deserialization_error( plugin_ids: list[str], ) -> Sequence[MarketplacePluginDeclaration]: diff --git a/api/core/model_runtime/entities/llm_entities.py b/api/core/model_runtime/entities/llm_entities.py index 17f6000d93..2c7c421eed 100644 --- a/api/core/model_runtime/entities/llm_entities.py +++ b/api/core/model_runtime/entities/llm_entities.py @@ -38,6 +38,8 @@ class LLMUsageMetadata(TypedDict, total=False): prompt_price: Union[float, str] completion_price: Union[float, str] latency: float + time_to_first_token: float + time_to_generate: float class LLMUsage(ModelUsage): @@ -57,6 +59,8 @@ class LLMUsage(ModelUsage): total_price: Decimal currency: str latency: float + time_to_first_token: float | None = None + time_to_generate: float | None = None @classmethod def empty_usage(cls): @@ -73,6 +77,8 @@ class LLMUsage(ModelUsage): total_price=Decimal("0.0"), currency="USD", latency=0.0, + time_to_first_token=None, + time_to_generate=None, ) @classmethod @@ -108,6 +114,8 @@ class LLMUsage(ModelUsage): prompt_price=Decimal(str(metadata.get("prompt_price", 0))), completion_price=Decimal(str(metadata.get("completion_price", 0))), latency=metadata.get("latency", 0.0), + time_to_first_token=metadata.get("time_to_first_token"), + time_to_generate=metadata.get("time_to_generate"), ) def plus(self, other: LLMUsage) -> LLMUsage: @@ -133,6 +141,8 @@ class LLMUsage(ModelUsage): total_price=self.total_price + other.total_price, currency=other.currency, latency=self.latency + other.latency, + time_to_first_token=other.time_to_first_token, + time_to_generate=other.time_to_generate, ) def __add__(self, other: LLMUsage) -> LLMUsage: diff --git a/api/core/ops/entities/trace_entity.py b/api/core/ops/entities/trace_entity.py index 5b81c09a2d..50a2cdea63 100644 --- a/api/core/ops/entities/trace_entity.py +++ b/api/core/ops/entities/trace_entity.py @@ -62,6 +62,9 @@ class MessageTraceInfo(BaseTraceInfo): file_list: Union[str, dict[str, Any], list] | None = None message_file_data: Any | None = None conversation_mode: str + gen_ai_server_time_to_first_token: float | None = None + llm_streaming_time_to_generate: float | None = None + is_streaming_request: bool = False class ModerationTraceInfo(BaseTraceInfo): diff --git a/api/core/ops/ops_trace_manager.py b/api/core/ops/ops_trace_manager.py index de0d4560e3..5bb539b7dc 100644 --- a/api/core/ops/ops_trace_manager.py +++ b/api/core/ops/ops_trace_manager.py @@ -14,7 +14,7 @@ from flask import current_app from sqlalchemy import select from sqlalchemy.orm import Session, sessionmaker -from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token +from core.helper.encrypter import batch_decrypt_token, encrypt_token, obfuscated_token from core.ops.entities.config_entity import ( OPS_FILE_PATH, TracingProviderEnum, @@ -141,6 +141,8 @@ provider_config_map = OpsTraceProviderConfigMap() class OpsTraceManager: ops_trace_instances_cache: LRUCache = LRUCache(maxsize=128) + decrypted_configs_cache: LRUCache = LRUCache(maxsize=128) + _decryption_cache_lock = threading.RLock() @classmethod def encrypt_tracing_config( @@ -161,7 +163,7 @@ class OpsTraceManager: provider_config_map[tracing_provider]["other_keys"], ) - new_config = {} + new_config: dict[str, Any] = {} # Encrypt necessary keys for key in secret_keys: if key in tracing_config: @@ -191,20 +193,41 @@ class OpsTraceManager: :param tracing_config: tracing config :return: """ - config_class, secret_keys, other_keys = ( - provider_config_map[tracing_provider]["config_class"], - provider_config_map[tracing_provider]["secret_keys"], - provider_config_map[tracing_provider]["other_keys"], + config_json = json.dumps(tracing_config, sort_keys=True) + decrypted_config_key = ( + tenant_id, + tracing_provider, + config_json, ) - new_config = {} - for key in secret_keys: - if key in tracing_config: - new_config[key] = decrypt_token(tenant_id, tracing_config[key]) - for key in other_keys: - new_config[key] = tracing_config.get(key, "") + # First check without lock for performance + cached_config = cls.decrypted_configs_cache.get(decrypted_config_key) + if cached_config is not None: + return dict(cached_config) - return config_class(**new_config).model_dump() + with cls._decryption_cache_lock: + # Second check (double-checked locking) to prevent race conditions + cached_config = cls.decrypted_configs_cache.get(decrypted_config_key) + if cached_config is not None: + return dict(cached_config) + + config_class, secret_keys, other_keys = ( + provider_config_map[tracing_provider]["config_class"], + provider_config_map[tracing_provider]["secret_keys"], + provider_config_map[tracing_provider]["other_keys"], + ) + new_config: dict[str, Any] = {} + keys_to_decrypt = [key for key in secret_keys if key in tracing_config] + if keys_to_decrypt: + decrypted_values = batch_decrypt_token(tenant_id, [tracing_config[key] for key in keys_to_decrypt]) + new_config.update(zip(keys_to_decrypt, decrypted_values)) + + for key in other_keys: + new_config[key] = tracing_config.get(key, "") + + decrypted_config = config_class(**new_config).model_dump() + cls.decrypted_configs_cache[decrypted_config_key] = decrypted_config + return dict(decrypted_config) @classmethod def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config: dict): @@ -219,7 +242,7 @@ class OpsTraceManager: provider_config_map[tracing_provider]["secret_keys"], provider_config_map[tracing_provider]["other_keys"], ) - new_config = {} + new_config: dict[str, Any] = {} for key in secret_keys: if key in decrypt_tracing_config: new_config[key] = obfuscated_token(decrypt_tracing_config[key]) @@ -596,6 +619,8 @@ class TraceTask: file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else "" file_list.append(file_url) + streaming_metrics = self._extract_streaming_metrics(message_data) + metadata = { "conversation_id": message_data.conversation_id, "ls_provider": message_data.model_provider, @@ -628,6 +653,9 @@ class TraceTask: metadata=metadata, message_file_data=message_file_data, conversation_mode=conversation_mode, + gen_ai_server_time_to_first_token=streaming_metrics.get("gen_ai_server_time_to_first_token"), + llm_streaming_time_to_generate=streaming_metrics.get("llm_streaming_time_to_generate"), + is_streaming_request=streaming_metrics.get("is_streaming_request", False), ) return message_trace_info @@ -853,6 +881,24 @@ class TraceTask: return generate_name_trace_info + def _extract_streaming_metrics(self, message_data) -> dict: + if not message_data.message_metadata: + return {} + + try: + metadata = json.loads(message_data.message_metadata) + usage = metadata.get("usage", {}) + time_to_first_token = usage.get("time_to_first_token") + time_to_generate = usage.get("time_to_generate") + + return { + "gen_ai_server_time_to_first_token": time_to_first_token, + "llm_streaming_time_to_generate": time_to_generate, + "is_streaming_request": time_to_first_token is not None, + } + except (json.JSONDecodeError, AttributeError): + return {} + trace_manager_timer: threading.Timer | None = None trace_manager_queue: queue.Queue = queue.Queue() diff --git a/api/core/ops/tencent_trace/client.py b/api/core/ops/tencent_trace/client.py index 270732aa02..733d5b8bb6 100644 --- a/api/core/ops/tencent_trace/client.py +++ b/api/core/ops/tencent_trace/client.py @@ -11,6 +11,11 @@ import socket from typing import TYPE_CHECKING from urllib.parse import urlparse +try: + from importlib.metadata import version +except ImportError: + from importlib_metadata import version # type: ignore[import-not-found] + if TYPE_CHECKING: from opentelemetry.metrics import Meter from opentelemetry.metrics._internal.instrument import Histogram @@ -27,12 +32,27 @@ from opentelemetry.util.types import AttributeValue from configs import dify_config -from .entities.tencent_semconv import LLM_OPERATION_DURATION +from .entities.semconv import ( + GEN_AI_SERVER_TIME_TO_FIRST_TOKEN, + GEN_AI_STREAMING_TIME_TO_GENERATE, + GEN_AI_TOKEN_USAGE, + GEN_AI_TRACE_DURATION, + LLM_OPERATION_DURATION, +) from .entities.tencent_trace_entity import SpanData logger = logging.getLogger(__name__) +def _get_opentelemetry_sdk_version() -> str: + """Get OpenTelemetry SDK version dynamically.""" + try: + return version("opentelemetry-sdk") + except Exception: + logger.debug("Failed to get opentelemetry-sdk version, using default") + return "1.27.0" # fallback version + + class TencentTraceClient: """Tencent APM trace client using OpenTelemetry OTLP exporter""" @@ -57,6 +77,9 @@ class TencentTraceClient: ResourceAttributes.SERVICE_VERSION: f"dify-{dify_config.project.version}-{dify_config.COMMIT_SHA}", ResourceAttributes.DEPLOYMENT_ENVIRONMENT: f"{dify_config.DEPLOY_ENV}-{dify_config.EDITION}", ResourceAttributes.HOST_NAME: socket.gethostname(), + ResourceAttributes.TELEMETRY_SDK_LANGUAGE: "python", + ResourceAttributes.TELEMETRY_SDK_NAME: "opentelemetry", + ResourceAttributes.TELEMETRY_SDK_VERSION: _get_opentelemetry_sdk_version(), } ) # Prepare gRPC endpoint/metadata @@ -80,13 +103,18 @@ class TencentTraceClient: ) self.tracer_provider.add_span_processor(self.span_processor) - self.tracer = self.tracer_provider.get_tracer("dify.tencent_apm") + # use dify api version as tracer version + self.tracer = self.tracer_provider.get_tracer("dify-sdk", dify_config.project.version) # Store span contexts for parent-child relationships self.span_contexts: dict[int, trace_api.SpanContext] = {} self.meter: Meter | None = None self.hist_llm_duration: Histogram | None = None + self.hist_token_usage: Histogram | None = None + self.hist_time_to_first_token: Histogram | None = None + self.hist_time_to_generate: Histogram | None = None + self.hist_trace_duration: Histogram | None = None self.metric_reader: MetricReader | None = None # Metrics exporter and instruments @@ -99,7 +127,7 @@ class TencentTraceClient: use_http_protobuf = protocol in {"http/protobuf", "http-protobuf"} use_http_json = protocol in {"http/json", "http-json"} - # Set preferred temporality for histograms to DELTA + # Tencent APM works best with delta aggregation temporality preferred_temporality: dict[type, AggregationTemporality] = {Histogram: AggregationTemporality.DELTA} def _create_metric_exporter(exporter_cls, **kwargs): @@ -177,20 +205,59 @@ class TencentTraceClient: provider = MeterProvider(resource=self.resource, metric_readers=[metric_reader]) metrics.set_meter_provider(provider) self.meter = metrics.get_meter("dify-sdk", dify_config.project.version) + + # LLM operation duration histogram self.hist_llm_duration = self.meter.create_histogram( name=LLM_OPERATION_DURATION, unit="s", description="LLM operation duration (seconds)", ) + + # Token usage histogram with exponential buckets + self.hist_token_usage = self.meter.create_histogram( + name=GEN_AI_TOKEN_USAGE, + unit="token", + description="Number of tokens used in prompt and completions", + ) + + # Time to first token histogram + self.hist_time_to_first_token = self.meter.create_histogram( + name=GEN_AI_SERVER_TIME_TO_FIRST_TOKEN, + unit="s", + description="Time to first token for streaming LLM responses (seconds)", + ) + + # Time to generate histogram + self.hist_time_to_generate = self.meter.create_histogram( + name=GEN_AI_STREAMING_TIME_TO_GENERATE, + unit="s", + description="Total time to generate streaming LLM responses (seconds)", + ) + + # Trace duration histogram + self.hist_trace_duration = self.meter.create_histogram( + name=GEN_AI_TRACE_DURATION, + unit="s", + description="End-to-end GenAI trace duration (seconds)", + ) + self.metric_reader = metric_reader else: self.meter = None self.hist_llm_duration = None + self.hist_token_usage = None + self.hist_time_to_first_token = None + self.hist_time_to_generate = None + self.hist_trace_duration = None self.metric_reader = None except Exception: logger.exception("[Tencent APM] Metrics initialization failed; metrics disabled") self.meter = None self.hist_llm_duration = None + self.hist_token_usage = None + self.hist_time_to_first_token = None + self.hist_time_to_generate = None + self.hist_trace_duration = None self.metric_reader = None def add_span(self, span_data: SpanData) -> None: @@ -216,6 +283,117 @@ class TencentTraceClient: except Exception: logger.debug("[Tencent APM] Failed to record LLM duration", exc_info=True) + def record_token_usage( + self, + token_count: int, + token_type: str, + operation_name: str, + request_model: str, + response_model: str, + server_address: str, + provider: str, + ) -> None: + """Record token usage histogram. + + Args: + token_count: Number of tokens used + token_type: "input" or "output" + operation_name: Operation name (e.g., "chat") + request_model: Model used in request + response_model: Model used in response + server_address: Server address + provider: Model provider name + """ + try: + if not hasattr(self, "hist_token_usage") or self.hist_token_usage is None: + return + + attributes = { + "gen_ai.operation.name": operation_name, + "gen_ai.request.model": request_model, + "gen_ai.response.model": response_model, + "gen_ai.system": provider, + "gen_ai.token.type": token_type, + "server.address": server_address, + } + + self.hist_token_usage.record(token_count, attributes) # type: ignore[attr-defined] + except Exception: + logger.debug("[Tencent APM] Failed to record token usage", exc_info=True) + + def record_time_to_first_token( + self, ttft_seconds: float, provider: str, model: str, operation_name: str = "chat" + ) -> None: + """Record time to first token histogram. + + Args: + ttft_seconds: Time to first token in seconds + provider: Model provider name + model: Model name + operation_name: Operation name (default: "chat") + """ + try: + if not hasattr(self, "hist_time_to_first_token") or self.hist_time_to_first_token is None: + return + + attributes = { + "gen_ai.operation.name": operation_name, + "gen_ai.system": provider, + "gen_ai.request.model": model, + "gen_ai.response.model": model, + "stream": "true", + } + + self.hist_time_to_first_token.record(ttft_seconds, attributes) # type: ignore[attr-defined] + except Exception: + logger.debug("[Tencent APM] Failed to record time to first token", exc_info=True) + + def record_time_to_generate( + self, ttg_seconds: float, provider: str, model: str, operation_name: str = "chat" + ) -> None: + """Record time to generate histogram. + + Args: + ttg_seconds: Time to generate in seconds + provider: Model provider name + model: Model name + operation_name: Operation name (default: "chat") + """ + try: + if not hasattr(self, "hist_time_to_generate") or self.hist_time_to_generate is None: + return + + attributes = { + "gen_ai.operation.name": operation_name, + "gen_ai.system": provider, + "gen_ai.request.model": model, + "gen_ai.response.model": model, + "stream": "true", + } + + self.hist_time_to_generate.record(ttg_seconds, attributes) # type: ignore[attr-defined] + except Exception: + logger.debug("[Tencent APM] Failed to record time to generate", exc_info=True) + + def record_trace_duration(self, duration_seconds: float, attributes: dict[str, str] | None = None) -> None: + """Record end-to-end trace duration histogram in seconds. + + Args: + duration_seconds: Trace duration in seconds + attributes: Optional attributes (e.g., conversation_mode, app_id) + """ + try: + if not hasattr(self, "hist_trace_duration") or self.hist_trace_duration is None: + return + + attrs: dict[str, str] = {} + if attributes: + for k, v in attributes.items(): + attrs[k] = str(v) if not isinstance(v, (str, int, float, bool)) else v # type: ignore[assignment] + self.hist_trace_duration.record(duration_seconds, attrs) # type: ignore[attr-defined] + except Exception: + logger.debug("[Tencent APM] Failed to record trace duration", exc_info=True) + def _create_and_export_span(self, span_data: SpanData) -> None: """Create span using OpenTelemetry Tracer API""" try: diff --git a/api/core/ops/tencent_trace/entities/tencent_semconv.py b/api/core/ops/tencent_trace/entities/semconv.py similarity index 69% rename from api/core/ops/tencent_trace/entities/tencent_semconv.py rename to api/core/ops/tencent_trace/entities/semconv.py index 5ea6eeacef..cd2dbade8b 100644 --- a/api/core/ops/tencent_trace/entities/tencent_semconv.py +++ b/api/core/ops/tencent_trace/entities/semconv.py @@ -47,6 +47,9 @@ GEN_AI_COMPLETION = "gen_ai.completion" GEN_AI_RESPONSE_FINISH_REASON = "gen_ai.response.finish_reason" +# Streaming Span Attributes +GEN_AI_IS_STREAMING_REQUEST = "llm.is_streaming" # Same as OpenLLMetry semconv + # Tool TOOL_NAME = "tool.name" @@ -62,6 +65,19 @@ INSTRUMENTATION_LANGUAGE = "python" # Metrics LLM_OPERATION_DURATION = "gen_ai.client.operation.duration" +GEN_AI_TOKEN_USAGE = "gen_ai.client.token.usage" +GEN_AI_SERVER_TIME_TO_FIRST_TOKEN = "gen_ai.server.time_to_first_token" +GEN_AI_STREAMING_TIME_TO_GENERATE = "gen_ai.streaming.time_to_generate" +# The LLM trace duration which is exclusive to tencent apm +GEN_AI_TRACE_DURATION = "gen_ai.trace.duration" + +# Token Usage Attributes +GEN_AI_OPERATION_NAME = "gen_ai.operation.name" +GEN_AI_REQUEST_MODEL = "gen_ai.request.model" +GEN_AI_RESPONSE_MODEL = "gen_ai.response.model" +GEN_AI_SYSTEM = "gen_ai.system" +GEN_AI_TOKEN_TYPE = "gen_ai.token.type" +SERVER_ADDRESS = "server.address" class GenAISpanKind(Enum): diff --git a/api/core/ops/tencent_trace/span_builder.py b/api/core/ops/tencent_trace/span_builder.py index 5ba592290d..26e8779e3e 100644 --- a/api/core/ops/tencent_trace/span_builder.py +++ b/api/core/ops/tencent_trace/span_builder.py @@ -14,10 +14,11 @@ from core.ops.entities.trace_entity import ( ToolTraceInfo, WorkflowTraceInfo, ) -from core.ops.tencent_trace.entities.tencent_semconv import ( +from core.ops.tencent_trace.entities.semconv import ( GEN_AI_COMPLETION, GEN_AI_FRAMEWORK, GEN_AI_IS_ENTRY, + GEN_AI_IS_STREAMING_REQUEST, GEN_AI_MODEL_NAME, GEN_AI_PROMPT, GEN_AI_PROVIDER, @@ -156,6 +157,25 @@ class TencentSpanBuilder: outputs = node_execution.outputs or {} usage_data = process_data.get("usage", {}) if "usage" in process_data else outputs.get("usage", {}) + attributes = { + GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""), + GEN_AI_SPAN_KIND: GenAISpanKind.GENERATION.value, + GEN_AI_FRAMEWORK: "dify", + GEN_AI_MODEL_NAME: process_data.get("model_name", ""), + GEN_AI_PROVIDER: process_data.get("model_provider", ""), + GEN_AI_USAGE_INPUT_TOKENS: str(usage_data.get("prompt_tokens", 0)), + GEN_AI_USAGE_OUTPUT_TOKENS: str(usage_data.get("completion_tokens", 0)), + GEN_AI_USAGE_TOTAL_TOKENS: str(usage_data.get("total_tokens", 0)), + GEN_AI_PROMPT: json.dumps(process_data.get("prompts", []), ensure_ascii=False), + GEN_AI_COMPLETION: str(outputs.get("text", "")), + GEN_AI_RESPONSE_FINISH_REASON: outputs.get("finish_reason", ""), + INPUT_VALUE: json.dumps(process_data.get("prompts", []), ensure_ascii=False), + OUTPUT_VALUE: str(outputs.get("text", "")), + } + + if usage_data.get("time_to_first_token") is not None: + attributes[GEN_AI_IS_STREAMING_REQUEST] = "true" + return SpanData( trace_id=trace_id, parent_span_id=workflow_span_id, @@ -163,21 +183,7 @@ class TencentSpanBuilder: name="GENERATION", start_time=TencentSpanBuilder._get_time_nanoseconds(node_execution.created_at), end_time=TencentSpanBuilder._get_time_nanoseconds(node_execution.finished_at), - attributes={ - GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""), - GEN_AI_SPAN_KIND: GenAISpanKind.GENERATION.value, - GEN_AI_FRAMEWORK: "dify", - GEN_AI_MODEL_NAME: process_data.get("model_name", ""), - GEN_AI_PROVIDER: process_data.get("model_provider", ""), - GEN_AI_USAGE_INPUT_TOKENS: str(usage_data.get("prompt_tokens", 0)), - GEN_AI_USAGE_OUTPUT_TOKENS: str(usage_data.get("completion_tokens", 0)), - GEN_AI_USAGE_TOTAL_TOKENS: str(usage_data.get("total_tokens", 0)), - GEN_AI_PROMPT: json.dumps(process_data.get("prompts", []), ensure_ascii=False), - GEN_AI_COMPLETION: str(outputs.get("text", "")), - GEN_AI_RESPONSE_FINISH_REASON: outputs.get("finish_reason", ""), - INPUT_VALUE: json.dumps(process_data.get("prompts", []), ensure_ascii=False), - OUTPUT_VALUE: str(outputs.get("text", "")), - }, + attributes=attributes, status=TencentSpanBuilder._get_workflow_node_status(node_execution), ) @@ -191,6 +197,19 @@ class TencentSpanBuilder: if trace_info.error: status = Status(StatusCode.ERROR, trace_info.error) + attributes = { + GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""), + GEN_AI_USER_ID: str(user_id), + GEN_AI_SPAN_KIND: GenAISpanKind.WORKFLOW.value, + GEN_AI_FRAMEWORK: "dify", + GEN_AI_IS_ENTRY: "true", + INPUT_VALUE: str(trace_info.inputs or ""), + OUTPUT_VALUE: str(trace_info.outputs or ""), + } + + if trace_info.is_streaming_request: + attributes[GEN_AI_IS_STREAMING_REQUEST] = "true" + return SpanData( trace_id=trace_id, parent_span_id=None, @@ -198,15 +217,7 @@ class TencentSpanBuilder: name="message", start_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.start_time), end_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.end_time), - attributes={ - GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id", ""), - GEN_AI_USER_ID: str(user_id), - GEN_AI_SPAN_KIND: GenAISpanKind.WORKFLOW.value, - GEN_AI_FRAMEWORK: "dify", - GEN_AI_IS_ENTRY: "true", - INPUT_VALUE: str(trace_info.inputs or ""), - OUTPUT_VALUE: str(trace_info.outputs or ""), - }, + attributes=attributes, status=status, links=links, ) diff --git a/api/core/ops/tencent_trace/tencent_trace.py b/api/core/ops/tencent_trace/tencent_trace.py index 5ef1c61b24..9b3df86e16 100644 --- a/api/core/ops/tencent_trace/tencent_trace.py +++ b/api/core/ops/tencent_trace/tencent_trace.py @@ -90,6 +90,9 @@ class TencentDataTrace(BaseTraceInstance): self._process_workflow_nodes(trace_info, trace_id) + # Record trace duration for entry span + self._record_workflow_trace_duration(trace_info) + except Exception: logger.exception("[Tencent APM] Failed to process workflow trace") @@ -107,6 +110,11 @@ class TencentDataTrace(BaseTraceInstance): self.trace_client.add_span(message_span) + self._record_message_llm_metrics(trace_info) + + # Record trace duration for entry span + self._record_message_trace_duration(trace_info) + except Exception: logger.exception("[Tencent APM] Failed to process message trace") @@ -290,24 +298,219 @@ class TencentDataTrace(BaseTraceInstance): def _record_llm_metrics(self, node_execution: WorkflowNodeExecution) -> None: """Record LLM performance metrics""" try: - if not hasattr(self.trace_client, "record_llm_duration"): - return - process_data = node_execution.process_data or {} - usage = process_data.get("usage", {}) - latency_s = float(usage.get("latency", 0.0)) + outputs = node_execution.outputs or {} + usage = process_data.get("usage", {}) if "usage" in process_data else outputs.get("usage", {}) - if latency_s > 0: - attributes = { - "provider": process_data.get("model_provider", ""), - "model": process_data.get("model_name", ""), - "span_kind": "GENERATION", - } - self.trace_client.record_llm_duration(latency_s, attributes) + model_provider = process_data.get("model_provider", "unknown") + model_name = process_data.get("model_name", "unknown") + model_mode = process_data.get("model_mode", "chat") + + # Record LLM duration + if hasattr(self.trace_client, "record_llm_duration"): + latency_s = float(usage.get("latency", 0.0)) + + if latency_s > 0: + # Determine if streaming from usage metrics + is_streaming = usage.get("time_to_first_token") is not None + + attributes = { + "gen_ai.system": model_provider, + "gen_ai.response.model": model_name, + "gen_ai.operation.name": model_mode, + "stream": "true" if is_streaming else "false", + } + self.trace_client.record_llm_duration(latency_s, attributes) + + # Record streaming metrics from usage + time_to_first_token = usage.get("time_to_first_token") + if time_to_first_token is not None and hasattr(self.trace_client, "record_time_to_first_token"): + ttft_seconds = float(time_to_first_token) + if ttft_seconds > 0: + self.trace_client.record_time_to_first_token( + ttft_seconds=ttft_seconds, provider=model_provider, model=model_name, operation_name=model_mode + ) + + time_to_generate = usage.get("time_to_generate") + if time_to_generate is not None and hasattr(self.trace_client, "record_time_to_generate"): + ttg_seconds = float(time_to_generate) + if ttg_seconds > 0: + self.trace_client.record_time_to_generate( + ttg_seconds=ttg_seconds, provider=model_provider, model=model_name, operation_name=model_mode + ) + + # Record token usage + if hasattr(self.trace_client, "record_token_usage"): + # Extract token counts + input_tokens = int(usage.get("prompt_tokens", 0)) + output_tokens = int(usage.get("completion_tokens", 0)) + + if input_tokens > 0 or output_tokens > 0: + server_address = f"{model_provider}" + + # Record input tokens + if input_tokens > 0: + self.trace_client.record_token_usage( + token_count=input_tokens, + token_type="input", + operation_name=model_mode, + request_model=model_name, + response_model=model_name, + server_address=server_address, + provider=model_provider, + ) + + # Record output tokens + if output_tokens > 0: + self.trace_client.record_token_usage( + token_count=output_tokens, + token_type="output", + operation_name=model_mode, + request_model=model_name, + response_model=model_name, + server_address=server_address, + provider=model_provider, + ) except Exception: logger.debug("[Tencent APM] Failed to record LLM metrics") + def _record_message_llm_metrics(self, trace_info: MessageTraceInfo) -> None: + """Record LLM metrics for message traces""" + try: + trace_metadata = trace_info.metadata or {} + message_data = trace_info.message_data or {} + provider_latency = 0.0 + if isinstance(message_data, dict): + provider_latency = float(message_data.get("provider_response_latency", 0.0) or 0.0) + else: + provider_latency = float(getattr(message_data, "provider_response_latency", 0.0) or 0.0) + + model_provider = trace_metadata.get("ls_provider") or ( + message_data.get("model_provider", "") if isinstance(message_data, dict) else "" + ) + model_name = trace_metadata.get("ls_model_name") or ( + message_data.get("model_id", "") if isinstance(message_data, dict) else "" + ) + + # Record LLM duration + if provider_latency > 0 and hasattr(self.trace_client, "record_llm_duration"): + is_streaming = trace_info.is_streaming_request + + duration_attributes = { + "gen_ai.system": model_provider, + "gen_ai.response.model": model_name, + "gen_ai.operation.name": "chat", # Message traces are always chat + "stream": "true" if is_streaming else "false", + } + self.trace_client.record_llm_duration(provider_latency, duration_attributes) + + # Record streaming metrics for message traces + if trace_info.is_streaming_request: + # Record time to first token + if trace_info.gen_ai_server_time_to_first_token is not None and hasattr( + self.trace_client, "record_time_to_first_token" + ): + ttft_seconds = float(trace_info.gen_ai_server_time_to_first_token) + if ttft_seconds > 0: + self.trace_client.record_time_to_first_token( + ttft_seconds=ttft_seconds, provider=str(model_provider or ""), model=str(model_name or "") + ) + + # Record time to generate + if trace_info.llm_streaming_time_to_generate is not None and hasattr( + self.trace_client, "record_time_to_generate" + ): + ttg_seconds = float(trace_info.llm_streaming_time_to_generate) + if ttg_seconds > 0: + self.trace_client.record_time_to_generate( + ttg_seconds=ttg_seconds, provider=str(model_provider or ""), model=str(model_name or "") + ) + + # Record token usage + if hasattr(self.trace_client, "record_token_usage"): + input_tokens = int(trace_info.message_tokens or 0) + output_tokens = int(trace_info.answer_tokens or 0) + + if input_tokens > 0: + self.trace_client.record_token_usage( + token_count=input_tokens, + token_type="input", + operation_name="chat", + request_model=str(model_name or ""), + response_model=str(model_name or ""), + server_address=str(model_provider or ""), + provider=str(model_provider or ""), + ) + + if output_tokens > 0: + self.trace_client.record_token_usage( + token_count=output_tokens, + token_type="output", + operation_name="chat", + request_model=str(model_name or ""), + response_model=str(model_name or ""), + server_address=str(model_provider or ""), + provider=str(model_provider or ""), + ) + + except Exception: + logger.debug("[Tencent APM] Failed to record message LLM metrics") + + def _record_workflow_trace_duration(self, trace_info: WorkflowTraceInfo) -> None: + """Record end-to-end workflow trace duration.""" + try: + if not hasattr(self.trace_client, "record_trace_duration"): + return + + # Calculate duration from start_time and end_time to match span duration + if trace_info.start_time and trace_info.end_time: + duration_s = (trace_info.end_time - trace_info.start_time).total_seconds() + else: + # Fallback to workflow_run_elapsed_time if timestamps not available + duration_s = float(trace_info.workflow_run_elapsed_time) + + if duration_s > 0: + attributes = { + "conversation_mode": "workflow", + "workflow_status": trace_info.workflow_run_status, + } + + # Add conversation_id if available + if trace_info.conversation_id: + attributes["has_conversation"] = "true" + else: + attributes["has_conversation"] = "false" + + self.trace_client.record_trace_duration(duration_s, attributes) + + except Exception: + logger.debug("[Tencent APM] Failed to record workflow trace duration") + + def _record_message_trace_duration(self, trace_info: MessageTraceInfo) -> None: + """Record end-to-end message trace duration.""" + try: + if not hasattr(self.trace_client, "record_trace_duration"): + return + + # Calculate duration from start_time and end_time + if trace_info.start_time and trace_info.end_time: + duration = (trace_info.end_time - trace_info.start_time).total_seconds() + + if duration > 0: + attributes = { + "conversation_mode": trace_info.conversation_mode, + } + + # Add streaming flag if available + if hasattr(trace_info, "is_streaming_request"): + attributes["stream"] = "true" if trace_info.is_streaming_request else "false" + + self.trace_client.record_trace_duration(duration, attributes) + + except Exception: + logger.debug("[Tencent APM] Failed to record message trace duration") + def __del__(self): """Ensure proper cleanup on garbage collection.""" try: diff --git a/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py b/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py index d2d8fcf964..dceade0af9 100644 --- a/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py +++ b/api/core/rag/datasource/vdb/weaviate/weaviate_vector.py @@ -100,6 +100,7 @@ class WeaviateVector(BaseVector): grpc_port=grpc_port, grpc_secure=grpc_secure, auth_credentials=Auth.api_key(config.api_key) if config.api_key else None, + skip_init_checks=True, # Skip PyPI version check to avoid unnecessary HTTP requests ) if not client.is_ready(): diff --git a/api/core/workflow/nodes/http_request/node.py b/api/core/workflow/nodes/http_request/node.py index 55dec3fb08..152d3cc562 100644 --- a/api/core/workflow/nodes/http_request/node.py +++ b/api/core/workflow/nodes/http_request/node.py @@ -104,7 +104,7 @@ class HttpRequestNode(Node): status=WorkflowNodeExecutionStatus.FAILED, outputs={ "status_code": response.status_code, - "body": response.text if not files else "", + "body": response.text if not files.value else "", "headers": response.headers, "files": files, }, diff --git a/api/core/workflow/nodes/llm/node.py b/api/core/workflow/nodes/llm/node.py index 1644f683bf..06c9beaed2 100644 --- a/api/core/workflow/nodes/llm/node.py +++ b/api/core/workflow/nodes/llm/node.py @@ -3,6 +3,7 @@ import io import json import logging import re +import time from collections.abc import Generator, Mapping, Sequence from typing import TYPE_CHECKING, Any, Literal @@ -384,6 +385,8 @@ class LLMNode(Node): output_schema = LLMNode.fetch_structured_output_schema( structured_output=structured_output or {}, ) + request_start_time = time.perf_counter() + invoke_result = invoke_llm_with_structured_output( provider=model_instance.provider, model_schema=model_schema, @@ -396,6 +399,8 @@ class LLMNode(Node): user=user_id, ) else: + request_start_time = time.perf_counter() + invoke_result = model_instance.invoke_llm( prompt_messages=list(prompt_messages), model_parameters=node_data_model.completion_params, @@ -411,6 +416,7 @@ class LLMNode(Node): node_id=node_id, node_type=node_type, reasoning_format=reasoning_format, + request_start_time=request_start_time, ) @staticmethod @@ -422,14 +428,20 @@ class LLMNode(Node): node_id: str, node_type: NodeType, reasoning_format: Literal["separated", "tagged"] = "tagged", + request_start_time: float | None = None, ) -> Generator[NodeEventBase | LLMStructuredOutput, None, None]: # For blocking mode if isinstance(invoke_result, LLMResult): + duration = None + if request_start_time is not None: + duration = time.perf_counter() - request_start_time + invoke_result.usage.latency = round(duration, 3) event = LLMNode.handle_blocking_result( invoke_result=invoke_result, saver=file_saver, file_outputs=file_outputs, reasoning_format=reasoning_format, + request_latency=duration, ) yield event return @@ -441,6 +453,12 @@ class LLMNode(Node): usage = LLMUsage.empty_usage() finish_reason = None full_text_buffer = io.StringIO() + + # Initialize streaming metrics tracking + start_time = request_start_time if request_start_time is not None else time.perf_counter() + first_token_time = None + has_content = False + collected_structured_output = None # Collect structured_output from streaming chunks # Consume the invoke result and handle generator exception try: @@ -457,6 +475,11 @@ class LLMNode(Node): file_saver=file_saver, file_outputs=file_outputs, ): + # Detect first token for TTFT calculation + if text_part and not has_content: + first_token_time = time.perf_counter() + has_content = True + full_text_buffer.write(text_part) yield StreamChunkEvent( selector=[node_id, "text"], @@ -489,6 +512,16 @@ class LLMNode(Node): # Extract clean text and reasoning from tags clean_text, reasoning_content = LLMNode._split_reasoning(full_text, reasoning_format) + # Calculate streaming metrics + end_time = time.perf_counter() + total_duration = end_time - start_time + usage.latency = round(total_duration, 3) + if has_content and first_token_time: + gen_ai_server_time_to_first_token = first_token_time - start_time + llm_streaming_time_to_generate = end_time - first_token_time + usage.time_to_first_token = round(gen_ai_server_time_to_first_token, 3) + usage.time_to_generate = round(llm_streaming_time_to_generate, 3) + yield ModelInvokeCompletedEvent( # Use clean_text for separated mode, full_text for tagged mode text=clean_text if reasoning_format == "separated" else full_text, @@ -1068,6 +1101,7 @@ class LLMNode(Node): saver: LLMFileSaver, file_outputs: list["File"], reasoning_format: Literal["separated", "tagged"] = "tagged", + request_latency: float | None = None, ) -> ModelInvokeCompletedEvent: buffer = io.StringIO() for text_part in LLMNode._save_multimodal_output_and_convert_result_to_markdown( @@ -1088,7 +1122,7 @@ class LLMNode(Node): # Extract clean text and reasoning from tags clean_text, reasoning_content = LLMNode._split_reasoning(full_text, reasoning_format) - return ModelInvokeCompletedEvent( + event = ModelInvokeCompletedEvent( # Use clean_text for separated mode, full_text for tagged mode text=clean_text if reasoning_format == "separated" else full_text, usage=invoke_result.usage, @@ -1098,6 +1132,9 @@ class LLMNode(Node): # Pass structured output if enabled structured_output=getattr(invoke_result, "structured_output", None), ) + if request_latency is not None: + event.usage.latency = round(request_latency, 3) + return event @staticmethod def save_multimodal_image_output( diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index 1e040abe3e..a3e62544c6 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -1416,8 +1416,6 @@ class DocumentService: # check document limit assert isinstance(current_user, Account) assert current_user.current_tenant_id is not None - assert knowledge_config.data_source - assert knowledge_config.data_source.info_list features = FeatureService.get_features(current_user.current_tenant_id) @@ -1448,7 +1446,7 @@ class DocumentService: DocumentService.check_documents_upload_quota(count, features) # if dataset is empty, update dataset data_source_type - if not dataset.data_source_type: + if not dataset.data_source_type and knowledge_config.data_source: dataset.data_source_type = knowledge_config.data_source.info_list.data_source_type if not dataset.indexing_technique: @@ -1494,6 +1492,10 @@ class DocumentService: documents.append(document) batch = document.batch else: + # When creating new documents, data_source must be provided + if not knowledge_config.data_source: + raise ValueError("Data source is required when creating new documents") + batch = time.strftime("%Y%m%d%H%M%S") + str(100000 + secrets.randbelow(exclusive_upper_bound=900000)) # save process rule if not dataset_process_rule: diff --git a/api/services/enterprise/enterprise_service.py b/api/services/enterprise/enterprise_service.py index 974aa849db..83d0fcf296 100644 --- a/api/services/enterprise/enterprise_service.py +++ b/api/services/enterprise/enterprise_service.py @@ -92,16 +92,6 @@ class EnterpriseService: return ret - @classmethod - def get_app_access_mode_by_code(cls, app_code: str) -> WebAppSettings: - if not app_code: - raise ValueError("app_code must be provided.") - params = {"appCode": app_code} - data = EnterpriseRequest.send_request("GET", "/webapp/access-mode/code", params=params) - if not data: - raise ValueError("No data found.") - return WebAppSettings.model_validate(data) - @classmethod def update_app_access_mode(cls, app_id: str, access_mode: str): if not app_id: diff --git a/api/services/rag_pipeline/rag_pipeline.py b/api/services/rag_pipeline/rag_pipeline.py index 50dec458a9..fed7a25e21 100644 --- a/api/services/rag_pipeline/rag_pipeline.py +++ b/api/services/rag_pipeline/rag_pipeline.py @@ -1265,8 +1265,8 @@ class RagPipelineService: ) providers_map = {provider.plugin_id: provider.to_dict() for provider in providers} - plugin_manifests = marketplace.batch_fetch_plugin_manifests(plugin_ids) - plugin_manifests_map = {manifest.plugin_id: manifest for manifest in plugin_manifests} + plugin_manifests = marketplace.batch_fetch_plugin_by_ids(plugin_ids) + plugin_manifests_map = {manifest["plugin_id"]: manifest for manifest in plugin_manifests} installed_plugin_list = [] uninstalled_plugin_list = [] @@ -1276,14 +1276,7 @@ class RagPipelineService: else: plugin_manifest = plugin_manifests_map.get(plugin_id) if plugin_manifest: - uninstalled_plugin_list.append( - { - "plugin_id": plugin_id, - "name": plugin_manifest.name, - "icon": plugin_manifest.icon, - "plugin_unique_identifier": plugin_manifest.latest_package_identifier, - } - ) + uninstalled_plugin_list.append(plugin_manifest) # Build recommended plugins list return { diff --git a/api/tests/test_containers_integration_tests/services/test_webapp_auth_service.py b/api/tests/test_containers_integration_tests/services/test_webapp_auth_service.py index 73e622b061..72b119b4ff 100644 --- a/api/tests/test_containers_integration_tests/services/test_webapp_auth_service.py +++ b/api/tests/test_containers_integration_tests/services/test_webapp_auth_service.py @@ -35,9 +35,7 @@ class TestWebAppAuthService: mock_enterprise_service.WebAppAuth.get_app_access_mode_by_id.return_value = type( "MockWebAppAuth", (), {"access_mode": "private"} )() - mock_enterprise_service.WebAppAuth.get_app_access_mode_by_code.return_value = type( - "MockWebAppAuth", (), {"access_mode": "private"} - )() + # Note: get_app_access_mode_by_code method was removed in refactoring yield { "passport_service": mock_passport_service, diff --git a/api/uv.lock b/api/uv.lock index 3e758aae91..28dfc59dfc 100644 --- a/api/uv.lock +++ b/api/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.11, <3.13" resolution-markers = [ "python_full_version >= '3.12.4' and platform_python_implementation != 'PyPy' and sys_platform == 'linux'", diff --git a/web/app/components/app/configuration/config/agent/agent-tools/setting-built-in-tool.tsx b/web/app/components/app/configuration/config/agent/agent-tools/setting-built-in-tool.tsx index d4ef4c55b7..ef28dd222c 100644 --- a/web/app/components/app/configuration/config/agent/agent-tools/setting-built-in-tool.tsx +++ b/web/app/components/app/configuration/config/agent/agent-tools/setting-built-in-tool.tsx @@ -215,6 +215,7 @@ const SettingBuiltInTool: FC = ({ pluginPayload={{ provider: collection.name, category: AuthCategory.tool, + providerType: collection.type, detail: collection as any, }} credentialId={credentialId} diff --git a/web/app/components/base/chip/index.stories.tsx b/web/app/components/base/chip/index.stories.tsx index 0ea018ef95..46d91c8cd6 100644 --- a/web/app/components/base/chip/index.stories.tsx +++ b/web/app/components/base/chip/index.stories.tsx @@ -23,6 +23,10 @@ const meta = { args: { items: ITEMS, value: 'all', + // eslint-disable-next-line no-empty-function + onSelect: () => {}, + // eslint-disable-next-line no-empty-function + onClear: () => {}, }, } satisfies Meta @@ -69,6 +73,13 @@ const [selection, setSelection] = useState('all') } export const WithoutLeftIcon: Story = { + args: { + showLeftIcon: false, + // eslint-disable-next-line no-empty-function + onSelect: () => {}, + // eslint-disable-next-line no-empty-function + onClear: () => {}, + }, render: args => ( ), - children: null, }, } @@ -112,7 +111,6 @@ export const WithoutFooter: Story = { args: { footer: undefined, title: 'Read-only summary', - children: null, }, parameters: { docs: { @@ -130,7 +128,6 @@ export const CustomStyling: Story = { bodyClassName: 'bg-gray-50 rounded-xl p-5', footerClassName: 'justify-between px-4 pb-4 pt-4', titleClassName: 'text-lg text-primary-600', - children: null, footer: ( <> Last synced 2 minutes ago @@ -144,7 +141,6 @@ export const CustomStyling: Story = { ), - children: null, }, parameters: { docs: { diff --git a/web/app/components/base/features/new-feature-panel/file-upload/setting-modal.tsx b/web/app/components/base/features/new-feature-panel/file-upload/setting-modal.tsx index 92f93b8819..6ebbc05ae5 100644 --- a/web/app/components/base/features/new-feature-panel/file-upload/setting-modal.tsx +++ b/web/app/components/base/features/new-feature-panel/file-upload/setting-modal.tsx @@ -37,7 +37,7 @@ const FileUploadSettings = ({ {children} -
+
onOpen(false)} diff --git a/web/app/components/base/icons/assets/public/tracing/tencent-icon-big.svg b/web/app/components/base/icons/assets/public/tracing/tencent-icon-big.svg new file mode 100644 index 0000000000..b38316f3b6 --- /dev/null +++ b/web/app/components/base/icons/assets/public/tracing/tencent-icon-big.svg @@ -0,0 +1,23 @@ + + + logo + + + + diff --git a/web/app/components/base/icons/assets/public/tracing/tencent-icon.svg b/web/app/components/base/icons/assets/public/tracing/tencent-icon.svg new file mode 100644 index 0000000000..53347bf23c --- /dev/null +++ b/web/app/components/base/icons/assets/public/tracing/tencent-icon.svg @@ -0,0 +1,23 @@ + + + logo + + + + \ No newline at end of file diff --git a/web/app/components/datasets/create-from-pipeline/list/built-in-pipeline-list.tsx b/web/app/components/datasets/create-from-pipeline/list/built-in-pipeline-list.tsx index 6d22f2115a..74e565a494 100644 --- a/web/app/components/datasets/create-from-pipeline/list/built-in-pipeline-list.tsx +++ b/web/app/components/datasets/create-from-pipeline/list/built-in-pipeline-list.tsx @@ -4,6 +4,7 @@ import CreateCard from './create-card' import { useI18N } from '@/context/i18n' import { useMemo } from 'react' import { LanguagesSupported } from '@/i18n-config/language' +import { useGlobalPublicStore } from '@/context/global-public-context' const BuiltInPipelineList = () => { const { locale } = useI18N() @@ -12,7 +13,8 @@ const BuiltInPipelineList = () => { return locale return LanguagesSupported[0] }, [locale]) - const { data: pipelineList, isLoading } = usePipelineTemplateList({ type: 'built-in', language }) + const enableMarketplace = useGlobalPublicStore(s => s.systemFeatures.enable_marketplace) + const { data: pipelineList, isLoading } = usePipelineTemplateList({ type: 'built-in', language }, enableMarketplace) const list = pipelineList?.pipeline_templates || [] return ( diff --git a/web/app/components/header/account-setting/data-source-page-new/card.tsx b/web/app/components/header/account-setting/data-source-page-new/card.tsx index 7a8790e76d..1e2e60bb7a 100644 --- a/web/app/components/header/account-setting/data-source-page-new/card.tsx +++ b/web/app/components/header/account-setting/data-source-page-new/card.tsx @@ -20,6 +20,7 @@ import { useDataSourceAuthUpdate } from './hooks' import Confirm from '@/app/components/base/confirm' import { useGetDataSourceOAuthUrl } from '@/service/use-datasource' import { openOAuthPopup } from '@/hooks/use-oauth' +import { CollectionType } from '@/app/components/tools/types' type CardProps = { item: DataSourceAuth @@ -42,6 +43,7 @@ const Card = ({ const pluginPayload = { category: AuthCategory.datasource, provider: `${item.plugin_id}/${item.name}`, + providerType: CollectionType.datasource, } const { handleAuthUpdate } = useDataSourceAuthUpdate({ pluginId: item.plugin_id, diff --git a/web/app/components/plugins/install-plugin/install-bundle/ready-to-install.tsx b/web/app/components/plugins/install-plugin/install-bundle/ready-to-install.tsx index 63c0b5b07e..b2b0aefb9b 100644 --- a/web/app/components/plugins/install-plugin/install-bundle/ready-to-install.tsx +++ b/web/app/components/plugins/install-plugin/install-bundle/ready-to-install.tsx @@ -4,7 +4,7 @@ import React, { useCallback, useState } from 'react' import { InstallStep } from '../../types' import Install from './steps/install' import Installed from './steps/installed' -import type { Dependency, InstallStatusResponse, Plugin } from '../../types' +import type { Dependency, InstallStatus, Plugin } from '../../types' type Props = { step: InstallStep @@ -26,8 +26,8 @@ const ReadyToInstall: FC = ({ isFromMarketPlace, }) => { const [installedPlugins, setInstalledPlugins] = useState([]) - const [installStatus, setInstallStatus] = useState([]) - const handleInstalled = useCallback((plugins: Plugin[], installStatus: InstallStatusResponse[]) => { + const [installStatus, setInstallStatus] = useState([]) + const handleInstalled = useCallback((plugins: Plugin[], installStatus: InstallStatus[]) => { setInstallStatus(installStatus) setInstalledPlugins(plugins) onStepChange(InstallStep.installed) diff --git a/web/app/components/plugins/install-plugin/install-bundle/steps/install.tsx b/web/app/components/plugins/install-plugin/install-bundle/steps/install.tsx index 758daafca0..a717e0a24a 100644 --- a/web/app/components/plugins/install-plugin/install-bundle/steps/install.tsx +++ b/web/app/components/plugins/install-plugin/install-bundle/steps/install.tsx @@ -2,23 +2,31 @@ import type { FC } from 'react' import { useRef } from 'react' import React, { useCallback, useState } from 'react' -import type { Dependency, InstallStatusResponse, Plugin, VersionInfo } from '../../../types' +import { + type Dependency, + type InstallStatus, + type InstallStatusResponse, + type Plugin, + TaskStatus, + type VersionInfo, +} from '../../../types' import Button from '@/app/components/base/button' import { RiLoader2Line } from '@remixicon/react' import { useTranslation } from 'react-i18next' import type { ExposeRefs } from './install-multi' import InstallMulti from './install-multi' -import { useInstallOrUpdate } from '@/service/use-plugins' +import { useInstallOrUpdate, usePluginTaskList } from '@/service/use-plugins' import useRefreshPluginList from '../../hooks/use-refresh-plugin-list' import { useCanInstallPluginFromMarketplace } from '@/app/components/plugins/plugin-page/use-reference-setting' import { useMittContextSelector } from '@/context/mitt-context' import Checkbox from '@/app/components/base/checkbox' +import checkTaskStatus from '../../base/check-task-status' const i18nPrefix = 'plugin.installModal' type Props = { allPlugins: Dependency[] onStartToInstall?: () => void - onInstalled: (plugins: Plugin[], installStatus: InstallStatusResponse[]) => void + onInstalled: (plugins: Plugin[], installStatus: InstallStatus[]) => void onCancel: () => void isFromMarketPlace?: boolean isHideButton?: boolean @@ -55,18 +63,60 @@ const Install: FC = ({ setCanInstall(true) }, []) + const { + check, + stop, + } = checkTaskStatus() + + const handleCancel = useCallback(() => { + stop() + onCancel() + }, [onCancel, stop]) + + const { handleRefetch } = usePluginTaskList() + // Install from marketplace and github const { mutate: installOrUpdate, isPending: isInstalling } = useInstallOrUpdate({ - onSuccess: (res: InstallStatusResponse[]) => { - onInstalled(selectedPlugins, res.map((r, i) => { - return ({ - ...r, - isFromMarketPlace: allPlugins[selectedIndexes[i]].type === 'marketplace', + onSuccess: async (res: InstallStatusResponse[]) => { + const isAllSettled = res.every(r => r.status === TaskStatus.success || r.status === TaskStatus.failed) + // if all settled, return the install status + if (isAllSettled) { + onInstalled(selectedPlugins, res.map((r, i) => { + return ({ + success: r.status === TaskStatus.success, + isFromMarketPlace: allPlugins[selectedIndexes[i]].type === 'marketplace', + }) + })) + const hasInstallSuccess = res.some(r => r.status === TaskStatus.success) + if (hasInstallSuccess) { + refreshPluginList(undefined, true) + emit('plugin:install:success', selectedPlugins.map((p) => { + return `${p.plugin_id}/${p.name}` + })) + } + return + } + // if not all settled, keep checking the status of the plugins + handleRefetch() + const installStatus = await Promise.all(res.map(async (item, index) => { + if (item.status !== TaskStatus.running) { + return { + success: item.status === TaskStatus.success, + isFromMarketPlace: allPlugins[selectedIndexes[index]].type === 'marketplace', + } + } + const { status } = await check({ + taskId: item.taskId, + pluginUniqueIdentifier: item.uniqueIdentifier, }) + return { + success: status === TaskStatus.success, + isFromMarketPlace: allPlugins[selectedIndexes[index]].type === 'marketplace', + } })) - const hasInstallSuccess = res.some(r => r.success) + onInstalled(selectedPlugins, installStatus) + const hasInstallSuccess = installStatus.some(r => r.success) if (hasInstallSuccess) { - refreshPluginList(undefined, true) emit('plugin:install:success', selectedPlugins.map((p) => { return `${p.plugin_id}/${p.name}` })) @@ -150,7 +200,7 @@ const Install: FC = ({
{!canInstall && ( - )} diff --git a/web/app/components/plugins/install-plugin/install-bundle/steps/installed.tsx b/web/app/components/plugins/install-plugin/install-bundle/steps/installed.tsx index 4e16d200e7..f787882211 100644 --- a/web/app/components/plugins/install-plugin/install-bundle/steps/installed.tsx +++ b/web/app/components/plugins/install-plugin/install-bundle/steps/installed.tsx @@ -1,7 +1,7 @@ 'use client' import type { FC } from 'react' import React from 'react' -import type { InstallStatusResponse, Plugin } from '../../../types' +import type { InstallStatus, Plugin } from '../../../types' import Card from '@/app/components/plugins/card' import Button from '@/app/components/base/button' import { useTranslation } from 'react-i18next' @@ -11,7 +11,7 @@ import { MARKETPLACE_API_PREFIX } from '@/config' type Props = { list: Plugin[] - installStatus: InstallStatusResponse[] + installStatus: InstallStatus[] onCancel: () => void isHideButton?: boolean } diff --git a/web/app/components/plugins/plugin-detail-panel/detail-header.tsx b/web/app/components/plugins/plugin-detail-panel/detail-header.tsx index ab979a79a5..a407a00aee 100644 --- a/web/app/components/plugins/plugin-detail-panel/detail-header.tsx +++ b/web/app/components/plugins/plugin-detail-panel/detail-header.tsx @@ -335,6 +335,7 @@ const DetailHeader = ({ pluginPayload={{ provider: provider?.name || '', category: AuthCategory.tool, + providerType: provider?.type || '', detail, }} /> diff --git a/web/app/components/plugins/plugin-detail-panel/tool-selector/index.tsx b/web/app/components/plugins/plugin-detail-panel/tool-selector/index.tsx index a41f44c3d5..ea7892be32 100644 --- a/web/app/components/plugins/plugin-detail-panel/tool-selector/index.tsx +++ b/web/app/components/plugins/plugin-detail-panel/tool-selector/index.tsx @@ -318,6 +318,7 @@ const ToolSelector: FC = ({ pluginPayload={{ provider: currentProvider.name, category: AuthCategory.tool, + providerType: currentProvider.type, detail: currentProvider as any, }} credentialId={value?.credential_id} diff --git a/web/app/components/plugins/types.ts b/web/app/components/plugins/types.ts index 06189af139..8d42e6787e 100644 --- a/web/app/components/plugins/types.ts +++ b/web/app/components/plugins/types.ts @@ -388,6 +388,12 @@ export type InstallPackageResponse = { } export type InstallStatusResponse = { + status: TaskStatus, + taskId: string, + uniqueIdentifier: string, +} + +export type InstallStatus = { success: boolean, isFromMarketPlace?: boolean } diff --git a/web/app/components/workflow/nodes/llm/use-config.ts b/web/app/components/workflow/nodes/llm/use-config.ts index 66d89a2ccd..d9b811bb85 100644 --- a/web/app/components/workflow/nodes/llm/use-config.ts +++ b/web/app/components/workflow/nodes/llm/use-config.ts @@ -28,6 +28,9 @@ const useConfig = (id: string, payload: LLMNodeType) => { const [defaultRolePrefix, setDefaultRolePrefix] = useState<{ user: string; assistant: string }>({ user: '', assistant: '' }) const { inputs, setInputs: doSetInputs } = useNodeCrud(id, payload) const inputRef = useRef(inputs) + useEffect(() => { + inputRef.current = inputs + }, [inputs]) const { deleteNodeInspectorVars } = useInspectVarsCrud() @@ -118,7 +121,7 @@ const useConfig = (id: string, payload: LLMNodeType) => { } = useConfigVision(model, { payload: inputs.vision, onChange: (newPayload) => { - const newInputs = produce(inputs, (draft) => { + const newInputs = produce(inputRef.current, (draft) => { draft.vision = newPayload }) setInputs(newInputs) @@ -149,11 +152,11 @@ const useConfig = (id: string, payload: LLMNodeType) => { }, [model.provider, currentProvider, currentModel, handleModelChanged]) const handleCompletionParamsChange = useCallback((newParams: Record) => { - const newInputs = produce(inputs, (draft) => { + const newInputs = produce(inputRef.current, (draft) => { draft.model.completion_params = newParams }) setInputs(newInputs) - }, [inputs, setInputs]) + }, [setInputs]) // change to vision model to set vision enabled, else disabled useEffect(() => { @@ -239,29 +242,29 @@ const useConfig = (id: string, payload: LLMNodeType) => { // context const handleContextVarChange = useCallback((newVar: ValueSelector | string) => { - const newInputs = produce(inputs, (draft) => { + const newInputs = produce(inputRef.current, (draft) => { draft.context.variable_selector = newVar as ValueSelector || [] draft.context.enabled = !!(newVar && newVar.length > 0) }) setInputs(newInputs) - }, [inputs, setInputs]) + }, [setInputs]) const handlePromptChange = useCallback((newPrompt: PromptItem[] | PromptItem) => { const newInputs = produce(inputRef.current, (draft) => { draft.prompt_template = newPrompt }) setInputs(newInputs) - }, [inputs, setInputs]) + }, [setInputs]) const handleMemoryChange = useCallback((newMemory?: Memory) => { - const newInputs = produce(inputs, (draft) => { + const newInputs = produce(inputRef.current, (draft) => { draft.memory = newMemory }) setInputs(newInputs) - }, [inputs, setInputs]) + }, [setInputs]) const handleSyeQueryChange = useCallback((newQuery: string) => { - const newInputs = produce(inputs, (draft) => { + const newInputs = produce(inputRef.current, (draft) => { if (!draft.memory) { draft.memory = { window: { @@ -276,7 +279,7 @@ const useConfig = (id: string, payload: LLMNodeType) => { } }) setInputs(newInputs) - }, [inputs, setInputs]) + }, [setInputs]) // structure output const { data: modelList } = useModelList(ModelTypeEnum.textGeneration) @@ -287,22 +290,22 @@ const useConfig = (id: string, payload: LLMNodeType) => { const [structuredOutputCollapsed, setStructuredOutputCollapsed] = useState(true) const handleStructureOutputEnableChange = useCallback((enabled: boolean) => { - const newInputs = produce(inputs, (draft) => { + const newInputs = produce(inputRef.current, (draft) => { draft.structured_output_enabled = enabled }) setInputs(newInputs) if (enabled) setStructuredOutputCollapsed(false) deleteNodeInspectorVars(id) - }, [inputs, setInputs, deleteNodeInspectorVars, id]) + }, [setInputs, deleteNodeInspectorVars, id]) const handleStructureOutputChange = useCallback((newOutput: StructuredOutput) => { - const newInputs = produce(inputs, (draft) => { + const newInputs = produce(inputRef.current, (draft) => { draft.structured_output = newOutput }) setInputs(newInputs) deleteNodeInspectorVars(id) - }, [inputs, setInputs, deleteNodeInspectorVars, id]) + }, [setInputs, deleteNodeInspectorVars, id]) const filterInputVar = useCallback((varPayload: Var) => { return [VarType.number, VarType.string, VarType.secret, VarType.arrayString, VarType.arrayNumber, VarType.file, VarType.arrayFile].includes(varPayload.type) @@ -318,11 +321,11 @@ const useConfig = (id: string, payload: LLMNodeType) => { // reasoning format const handleReasoningFormatChange = useCallback((reasoningFormat: 'tagged' | 'separated') => { - const newInputs = produce(inputs, (draft) => { + const newInputs = produce(inputRef.current, (draft) => { draft.reasoning_format = reasoningFormat }) setInputs(newInputs) - }, [inputs, setInputs]) + }, [setInputs]) const { availableVars, diff --git a/web/global.d.ts b/web/global.d.ts index c5488a6cae..0ccadf7887 100644 --- a/web/global.d.ts +++ b/web/global.d.ts @@ -1,6 +1,7 @@ import './types/i18n' import './types/jsx' import './types/mdx' +import './types/assets' declare module 'lamejs'; declare module 'lamejs/src/js/MPEGMode'; @@ -8,4 +9,4 @@ declare module 'lamejs/src/js/Lame'; declare module 'lamejs/src/js/BitStream'; declare module 'react-18-input-autosize'; -export {} +export { } diff --git a/web/service/use-pipeline.ts b/web/service/use-pipeline.ts index a7b9c89410..92a7542c56 100644 --- a/web/service/use-pipeline.ts +++ b/web/service/use-pipeline.ts @@ -39,13 +39,14 @@ import { useInvalid } from './use-base' const NAME_SPACE = 'pipeline' export const PipelineTemplateListQueryKeyPrefix = [NAME_SPACE, 'template-list'] -export const usePipelineTemplateList = (params: PipelineTemplateListParams) => { +export const usePipelineTemplateList = (params: PipelineTemplateListParams, enabled = true) => { const { type, language } = params return useQuery({ queryKey: [...PipelineTemplateListQueryKeyPrefix, type, language], queryFn: () => { return get('/rag/pipeline/templates', { params }) }, + enabled, }) } diff --git a/web/service/use-plugins.ts b/web/service/use-plugins.ts index 71e9dfddc1..f6dbecaeba 100644 --- a/web/service/use-plugins.ts +++ b/web/service/use-plugins.ts @@ -10,6 +10,7 @@ import type { Dependency, GitHubItemAndMarketPlaceDependency, InstallPackageResponse, + InstallStatusResponse, InstalledLatestVersionResponse, InstalledPluginListWithTotalResponse, PackageDependency, @@ -293,7 +294,7 @@ export const useUploadGitHub = (payload: { export const useInstallOrUpdate = ({ onSuccess, }: { - onSuccess?: (res: { success: boolean }[]) => void + onSuccess?: (res: InstallStatusResponse[]) => void }) => { const { mutateAsync: updatePackageFromMarketPlace } = useUpdatePackageFromMarketPlace() @@ -311,6 +312,8 @@ export const useInstallOrUpdate = ({ const installedPayload = installedInfo[orgAndName] const isInstalled = !!installedPayload let uniqueIdentifier = '' + let taskId = '' + let isFinishedInstallation = false if (item.type === 'github') { const data = item as GitHubItemAndMarketPlaceDependency @@ -328,12 +331,14 @@ export const useInstallOrUpdate = ({ // has the same version, but not installed if (uniqueIdentifier === installedPayload?.uniqueIdentifier) { return { - success: true, + status: TaskStatus.success, + taskId: '', + uniqueIdentifier: '', } } } if (!isInstalled) { - await post('/workspaces/current/plugin/install/github', { + const { task_id, all_installed } = await post('/workspaces/current/plugin/install/github', { body: { repo: data.value.repo!, version: data.value.release! || data.value.version!, @@ -341,6 +346,8 @@ export const useInstallOrUpdate = ({ plugin_unique_identifier: uniqueIdentifier, }, }) + taskId = task_id + isFinishedInstallation = all_installed } } if (item.type === 'marketplace') { @@ -348,15 +355,19 @@ export const useInstallOrUpdate = ({ uniqueIdentifier = data.value.marketplace_plugin_unique_identifier! || plugin[i]?.plugin_id if (uniqueIdentifier === installedPayload?.uniqueIdentifier) { return { - success: true, + status: TaskStatus.success, + taskId: '', + uniqueIdentifier: '', } } if (!isInstalled) { - await post('/workspaces/current/plugin/install/marketplace', { + const { task_id, all_installed } = await post('/workspaces/current/plugin/install/marketplace', { body: { plugin_unique_identifiers: [uniqueIdentifier], }, }) + taskId = task_id + isFinishedInstallation = all_installed } } if (item.type === 'package') { @@ -364,38 +375,59 @@ export const useInstallOrUpdate = ({ uniqueIdentifier = data.value.unique_identifier if (uniqueIdentifier === installedPayload?.uniqueIdentifier) { return { - success: true, + status: TaskStatus.success, + taskId: '', + uniqueIdentifier: '', } } if (!isInstalled) { - await post('/workspaces/current/plugin/install/pkg', { + const { task_id, all_installed } = await post('/workspaces/current/plugin/install/pkg', { body: { plugin_unique_identifiers: [uniqueIdentifier], }, }) + taskId = task_id + isFinishedInstallation = all_installed } } if (isInstalled) { if (item.type === 'package') { await uninstallPlugin(installedPayload.installedId) - await post('/workspaces/current/plugin/install/pkg', { + const { task_id, all_installed } = await post('/workspaces/current/plugin/install/pkg', { body: { plugin_unique_identifiers: [uniqueIdentifier], }, }) + taskId = task_id + isFinishedInstallation = all_installed } else { - await updatePackageFromMarketPlace({ + const { task_id, all_installed } = await updatePackageFromMarketPlace({ original_plugin_unique_identifier: installedPayload?.uniqueIdentifier, new_plugin_unique_identifier: uniqueIdentifier, }) + taskId = task_id + isFinishedInstallation = all_installed + } + } + if (isFinishedInstallation) { + return { + status: TaskStatus.success, + taskId: '', + uniqueIdentifier: '', + } + } + else { + return { + status: TaskStatus.running, + taskId, + uniqueIdentifier, } } - return ({ success: true }) } // eslint-disable-next-line unused-imports/no-unused-vars catch (e) { - return Promise.resolve({ success: false }) + return Promise.resolve({ status: TaskStatus.failed, taskId: '', uniqueIdentifier: '' }) } })) }, diff --git a/web/types/assets.d.ts b/web/types/assets.d.ts new file mode 100644 index 0000000000..d7711f7eb4 --- /dev/null +++ b/web/types/assets.d.ts @@ -0,0 +1,24 @@ +declare module '*.svg' { + const value: any + export default value +} + +declare module '*.png' { + const value: any + export default value +} + +declare module '*.jpg' { + const value: any + export default value +} + +declare module '*.jpeg' { + const value: any + export default value +} + +declare module '*.gif' { + const value: any + export default value +}