refactor(trigger): streamline trigger provider verification and update imports

- Updated `TriggerSubscriptionBuilderVerifyApi` to directly return the result of `verify_trigger_subscription_builder`, improving clarity.
- Refactored import statement in `trigger_plugin/__init__.py` to point to the correct module, enhancing code organization.
- Removed the obsolete `node.py` file, cleaning up the codebase by eliminating unused components.

These changes enhance the maintainability and clarity of the trigger provider functionality.
This commit is contained in:
Harry 2025-09-08 18:25:04 +08:00
parent eb95c5cd07
commit ac38614171
3 changed files with 5 additions and 37 deletions

View File

@ -117,13 +117,12 @@ class TriggerSubscriptionBuilderVerifyApi(Resource):
credentials=args.get("credentials", None),
),
)
TriggerSubscriptionBuilderService.verify_trigger_subscription_builder(
return TriggerSubscriptionBuilderService.verify_trigger_subscription_builder(
tenant_id=user.current_tenant_id,
user_id=user.id,
provider_id=TriggerProviderID(provider),
subscription_builder_id=subscription_builder_id,
)
return 200
except Exception as e:
logger.exception("Error verifying provider credential", exc_info=e)
raise

View File

@ -1,3 +1,3 @@
from .node import TriggerPluginNode
from .trigger_plugin_node import TriggerPluginNode
__all__ = ["TriggerPluginNode"]

View File

@ -56,41 +56,10 @@ class TriggerPluginNode(BaseNode):
def _run(self) -> NodeRunResult:
"""
Run the plugin trigger node.
Like the webhook node, this takes the trigger data from the variable pool
and makes it available to downstream nodes. The actual trigger invocation
happens in the async task executor.
"""
# Get trigger data from variable pool (injected by async task)
trigger_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs)
# Extract trigger-specific outputs
outputs = self._extract_trigger_outputs(trigger_inputs)
node_data = self._node_data
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs=trigger_inputs,
outputs=outputs,
)
def _extract_trigger_outputs(self, trigger_inputs: dict[str, Any]) -> dict[str, Any]:
"""Extract outputs from trigger invocation response."""
outputs = {}
# Get the trigger data (should be injected by async task)
trigger_data = trigger_inputs.get("trigger_data", {})
trigger_metadata = trigger_inputs.get("trigger_metadata", {})
# Make trigger data available as outputs
outputs["data"] = trigger_data
outputs["trigger_name"] = trigger_metadata.get("trigger_name", "")
outputs["provider_id"] = trigger_metadata.get("provider_id", "")
outputs["subscription_id"] = self._node_data.subscription_id
# Include raw trigger data for debugging/advanced use
outputs["_trigger_raw"] = {
"data": trigger_data,
"metadata": trigger_metadata,
}
return outputs
outputs={},
)