Merge remote-tracking branch 'origin/main' into feat/collaboration

This commit is contained in:
lyzno1 2025-10-15 11:06:23 +08:00
commit cbf181bd76
No known key found for this signature in database
3 changed files with 53 additions and 7 deletions

View File

@ -25,7 +25,7 @@ class FirecrawlApp:
}
if params:
json_data.update(params)
response = self._post_request(f"{self.base_url}/v1/scrape", json_data, headers)
response = self._post_request(f"{self.base_url}/v2/scrape", json_data, headers)
if response.status_code == 200:
response_data = response.json()
data = response_data["data"]
@ -42,7 +42,7 @@ class FirecrawlApp:
json_data = {"url": url}
if params:
json_data.update(params)
response = self._post_request(f"{self.base_url}/v1/crawl", json_data, headers)
response = self._post_request(f"{self.base_url}/v2/crawl", json_data, headers)
if response.status_code == 200:
# There's also another two fields in the response: "success" (bool) and "url" (str)
job_id = response.json().get("id")
@ -51,9 +51,25 @@ class FirecrawlApp:
self._handle_error(response, "start crawl job")
return "" # unreachable
def map(self, url: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
# Documentation: https://docs.firecrawl.dev/api-reference/endpoint/map
headers = self._prepare_headers()
json_data: dict[str, Any] = {"url": url, "integration": "dify"}
if params:
# Pass through provided params, including optional "sitemap": "only" | "include" | "skip"
json_data.update(params)
response = self._post_request(f"{self.base_url}/v2/map", json_data, headers)
if response.status_code == 200:
return cast(dict[str, Any], response.json())
elif response.status_code in {402, 409, 500, 429, 408}:
self._handle_error(response, "start map job")
return {}
else:
raise Exception(f"Failed to start map job. Status code: {response.status_code}")
def check_crawl_status(self, job_id) -> dict[str, Any]:
headers = self._prepare_headers()
response = self._get_request(f"{self.base_url}/v1/crawl/{job_id}", headers)
response = self._get_request(f"{self.base_url}/v2/crawl/{job_id}", headers)
if response.status_code == 200:
crawl_status_response = response.json()
if crawl_status_response.get("status") == "completed":
@ -135,12 +151,16 @@ class FirecrawlApp:
"lang": "en",
"country": "us",
"timeout": 60000,
"ignoreInvalidURLs": False,
"ignoreInvalidURLs": True,
"scrapeOptions": {},
"sources": [
{"type": "web"},
],
"integration": "dify",
}
if params:
json_data.update(params)
response = self._post_request(f"{self.base_url}/v1/search", json_data, headers)
response = self._post_request(f"{self.base_url}/v2/search", json_data, headers)
if response.status_code == 200:
response_data = response.json()
if not response_data.get("success"):

View File

@ -7,6 +7,7 @@ from collections.abc import Mapping
from functools import singledispatchmethod
from typing import TYPE_CHECKING, final
from core.model_runtime.entities.llm_entities import LLMUsage
from core.workflow.entities import GraphRuntimeState
from core.workflow.enums import ErrorStrategy, NodeExecutionType
from core.workflow.graph import Graph
@ -125,6 +126,7 @@ class EventHandler:
node_execution = self._graph_execution.get_or_create_node_execution(event.node_id)
is_initial_attempt = node_execution.retry_count == 0
node_execution.mark_started(event.id)
self._graph_runtime_state.increment_node_run_steps()
# Track in response coordinator for stream ordering
self._response_coordinator.track_node_execution(event.node_id, event.id)
@ -163,6 +165,8 @@ class EventHandler:
node_execution = self._graph_execution.get_or_create_node_execution(event.node_id)
node_execution.mark_taken()
self._accumulate_node_usage(event.node_run_result.llm_usage)
# Store outputs in variable pool
self._store_node_outputs(event.node_id, event.node_run_result.outputs)
@ -212,6 +216,8 @@ class EventHandler:
node_execution.mark_failed(event.error)
self._graph_execution.record_node_failure()
self._accumulate_node_usage(event.node_run_result.llm_usage)
result = self._error_handler.handle_node_failure(event)
if result:
@ -235,6 +241,8 @@ class EventHandler:
node_execution = self._graph_execution.get_or_create_node_execution(event.node_id)
node_execution.mark_taken()
self._accumulate_node_usage(event.node_run_result.llm_usage)
# Persist outputs produced by the exception strategy (e.g. default values)
self._store_node_outputs(event.node_id, event.node_run_result.outputs)
@ -286,6 +294,19 @@ class EventHandler:
self._state_manager.enqueue_node(event.node_id)
self._state_manager.start_execution(event.node_id)
def _accumulate_node_usage(self, usage: LLMUsage) -> None:
"""Accumulate token usage into the shared runtime state."""
if usage.total_tokens <= 0:
return
self._graph_runtime_state.add_tokens(usage.total_tokens)
current_usage = self._graph_runtime_state.llm_usage
if current_usage.total_tokens == 0:
self._graph_runtime_state.llm_usage = usage
else:
self._graph_runtime_state.llm_usage = current_usage.plus(usage)
def _store_node_outputs(self, node_id: str, outputs: Mapping[str, object]) -> None:
"""
Store node outputs in the variable pool.

View File

@ -23,6 +23,7 @@ class CrawlOptions:
only_main_content: bool = False
includes: str | None = None
excludes: str | None = None
prompt: str | None = None
max_depth: int | None = None
use_sitemap: bool = True
@ -70,6 +71,7 @@ class WebsiteCrawlApiRequest:
only_main_content=self.options.get("only_main_content", False),
includes=self.options.get("includes"),
excludes=self.options.get("excludes"),
prompt=self.options.get("prompt"),
max_depth=self.options.get("max_depth"),
use_sitemap=self.options.get("use_sitemap", True),
)
@ -174,6 +176,7 @@ class WebsiteService:
def _crawl_with_firecrawl(cls, request: CrawlRequest, api_key: str, config: dict) -> dict[str, Any]:
firecrawl_app = FirecrawlApp(api_key=api_key, base_url=config.get("base_url"))
params: dict[str, Any]
if not request.options.crawl_sub_pages:
params = {
"includePaths": [],
@ -188,8 +191,10 @@ class WebsiteService:
"limit": request.options.limit,
"scrapeOptions": {"onlyMainContent": request.options.only_main_content},
}
if request.options.max_depth:
params["maxDepth"] = request.options.max_depth
# Add optional prompt for Firecrawl v2 crawl-params compatibility
if request.options.prompt:
params["prompt"] = request.options.prompt
job_id = firecrawl_app.crawl_url(request.url, params)
website_crawl_time_cache_key = f"website_crawl_{job_id}"