From c39dae06d45485a0a879ed5b270ff2fecc6dddaf Mon Sep 17 00:00:00 2001 From: kenwoodjw Date: Wed, 15 Oct 2025 10:39:51 +0800 Subject: [PATCH 1/2] fix: workflow token usage (#26723) Signed-off-by: kenwoodjw --- .../event_management/event_handlers.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/api/core/workflow/graph_engine/event_management/event_handlers.py b/api/core/workflow/graph_engine/event_management/event_handlers.py index 7247b17967..1cb5851ab1 100644 --- a/api/core/workflow/graph_engine/event_management/event_handlers.py +++ b/api/core/workflow/graph_engine/event_management/event_handlers.py @@ -7,6 +7,7 @@ from collections.abc import Mapping from functools import singledispatchmethod from typing import TYPE_CHECKING, final +from core.model_runtime.entities.llm_entities import LLMUsage from core.workflow.entities import GraphRuntimeState from core.workflow.enums import ErrorStrategy, NodeExecutionType from core.workflow.graph import Graph @@ -125,6 +126,7 @@ class EventHandler: node_execution = self._graph_execution.get_or_create_node_execution(event.node_id) is_initial_attempt = node_execution.retry_count == 0 node_execution.mark_started(event.id) + self._graph_runtime_state.increment_node_run_steps() # Track in response coordinator for stream ordering self._response_coordinator.track_node_execution(event.node_id, event.id) @@ -163,6 +165,8 @@ class EventHandler: node_execution = self._graph_execution.get_or_create_node_execution(event.node_id) node_execution.mark_taken() + self._accumulate_node_usage(event.node_run_result.llm_usage) + # Store outputs in variable pool self._store_node_outputs(event.node_id, event.node_run_result.outputs) @@ -212,6 +216,8 @@ class EventHandler: node_execution.mark_failed(event.error) self._graph_execution.record_node_failure() + self._accumulate_node_usage(event.node_run_result.llm_usage) + result = self._error_handler.handle_node_failure(event) if result: @@ -235,6 +241,8 @@ class EventHandler: node_execution = self._graph_execution.get_or_create_node_execution(event.node_id) node_execution.mark_taken() + self._accumulate_node_usage(event.node_run_result.llm_usage) + # Persist outputs produced by the exception strategy (e.g. default values) self._store_node_outputs(event.node_id, event.node_run_result.outputs) @@ -286,6 +294,19 @@ class EventHandler: self._state_manager.enqueue_node(event.node_id) self._state_manager.start_execution(event.node_id) + def _accumulate_node_usage(self, usage: LLMUsage) -> None: + """Accumulate token usage into the shared runtime state.""" + if usage.total_tokens <= 0: + return + + self._graph_runtime_state.add_tokens(usage.total_tokens) + + current_usage = self._graph_runtime_state.llm_usage + if current_usage.total_tokens == 0: + self._graph_runtime_state.llm_usage = usage + else: + self._graph_runtime_state.llm_usage = current_usage.plus(usage) + def _store_node_outputs(self, node_id: str, outputs: Mapping[str, object]) -> None: """ Store node outputs in the variable pool. From a16ef7e73c4f11fef98d59d9d87526e976139c31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adem=C3=ADlson=20Tonato?= Date: Tue, 14 Oct 2025 23:48:54 -0300 Subject: [PATCH 2/2] refactor: Update Firecrawl to use v2 API (#24734) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../rag/extractor/firecrawl/firecrawl_app.py | 30 +++++++++++++++---- api/services/website_service.py | 9 ++++-- 2 files changed, 32 insertions(+), 7 deletions(-) diff --git a/api/core/rag/extractor/firecrawl/firecrawl_app.py b/api/core/rag/extractor/firecrawl/firecrawl_app.py index c20ecd2b89..789ac8557d 100644 --- a/api/core/rag/extractor/firecrawl/firecrawl_app.py +++ b/api/core/rag/extractor/firecrawl/firecrawl_app.py @@ -25,7 +25,7 @@ class FirecrawlApp: } if params: json_data.update(params) - response = self._post_request(f"{self.base_url}/v1/scrape", json_data, headers) + response = self._post_request(f"{self.base_url}/v2/scrape", json_data, headers) if response.status_code == 200: response_data = response.json() data = response_data["data"] @@ -42,7 +42,7 @@ class FirecrawlApp: json_data = {"url": url} if params: json_data.update(params) - response = self._post_request(f"{self.base_url}/v1/crawl", json_data, headers) + response = self._post_request(f"{self.base_url}/v2/crawl", json_data, headers) if response.status_code == 200: # There's also another two fields in the response: "success" (bool) and "url" (str) job_id = response.json().get("id") @@ -51,9 +51,25 @@ class FirecrawlApp: self._handle_error(response, "start crawl job") return "" # unreachable + def map(self, url: str, params: dict[str, Any] | None = None) -> dict[str, Any]: + # Documentation: https://docs.firecrawl.dev/api-reference/endpoint/map + headers = self._prepare_headers() + json_data: dict[str, Any] = {"url": url, "integration": "dify"} + if params: + # Pass through provided params, including optional "sitemap": "only" | "include" | "skip" + json_data.update(params) + response = self._post_request(f"{self.base_url}/v2/map", json_data, headers) + if response.status_code == 200: + return cast(dict[str, Any], response.json()) + elif response.status_code in {402, 409, 500, 429, 408}: + self._handle_error(response, "start map job") + return {} + else: + raise Exception(f"Failed to start map job. Status code: {response.status_code}") + def check_crawl_status(self, job_id) -> dict[str, Any]: headers = self._prepare_headers() - response = self._get_request(f"{self.base_url}/v1/crawl/{job_id}", headers) + response = self._get_request(f"{self.base_url}/v2/crawl/{job_id}", headers) if response.status_code == 200: crawl_status_response = response.json() if crawl_status_response.get("status") == "completed": @@ -135,12 +151,16 @@ class FirecrawlApp: "lang": "en", "country": "us", "timeout": 60000, - "ignoreInvalidURLs": False, + "ignoreInvalidURLs": True, "scrapeOptions": {}, + "sources": [ + {"type": "web"}, + ], + "integration": "dify", } if params: json_data.update(params) - response = self._post_request(f"{self.base_url}/v1/search", json_data, headers) + response = self._post_request(f"{self.base_url}/v2/search", json_data, headers) if response.status_code == 200: response_data = response.json() if not response_data.get("success"): diff --git a/api/services/website_service.py b/api/services/website_service.py index 37588d6ba5..a23f01ec71 100644 --- a/api/services/website_service.py +++ b/api/services/website_service.py @@ -23,6 +23,7 @@ class CrawlOptions: only_main_content: bool = False includes: str | None = None excludes: str | None = None + prompt: str | None = None max_depth: int | None = None use_sitemap: bool = True @@ -70,6 +71,7 @@ class WebsiteCrawlApiRequest: only_main_content=self.options.get("only_main_content", False), includes=self.options.get("includes"), excludes=self.options.get("excludes"), + prompt=self.options.get("prompt"), max_depth=self.options.get("max_depth"), use_sitemap=self.options.get("use_sitemap", True), ) @@ -174,6 +176,7 @@ class WebsiteService: def _crawl_with_firecrawl(cls, request: CrawlRequest, api_key: str, config: dict) -> dict[str, Any]: firecrawl_app = FirecrawlApp(api_key=api_key, base_url=config.get("base_url")) + params: dict[str, Any] if not request.options.crawl_sub_pages: params = { "includePaths": [], @@ -188,8 +191,10 @@ class WebsiteService: "limit": request.options.limit, "scrapeOptions": {"onlyMainContent": request.options.only_main_content}, } - if request.options.max_depth: - params["maxDepth"] = request.options.max_depth + + # Add optional prompt for Firecrawl v2 crawl-params compatibility + if request.options.prompt: + params["prompt"] = request.options.prompt job_id = firecrawl_app.crawl_url(request.url, params) website_crawl_time_cache_key = f"website_crawl_{job_id}"