feat: add transform node to node data

This commit is contained in:
Joel 2025-01-06 18:36:45 +08:00
parent 228cd1cdbe
commit 61d2f70927
2 changed files with 286 additions and 77 deletions

View File

@ -1,95 +1,128 @@
import { parseDSL } from './graph-to-log-struct-2'
describe('parseDSL', () => {
test('parse plain flow', () => {
const dsl = 'a -> b -> c'
it('should parse plain nodes correctly', () => {
const dsl = 'plainNode1 -> plainNode2'
const result = parseDSL(dsl)
expect(result).toEqual([
{ nodeType: 'plain', nodeId: 'a' },
{ nodeType: 'plain', nodeId: 'b' },
{ nodeType: 'plain', nodeId: 'c' },
{ id: 'plainNode1', node_id: 'plainNode1', title: 'plainNode1', execution_metadata: {}, status: 'succeeded' },
{ id: 'plainNode2', node_id: 'plainNode2', title: 'plainNode2', execution_metadata: {}, status: 'succeeded' },
])
})
test('parse iteration node with flow', () => {
const dsl = '(iteration, a, b -> c)'
it('should parse retry nodes correctly', () => {
const dsl = '(retry, retryNode, 3)'
const result = parseDSL(dsl)
expect(result).toEqual([
{ id: 'retryNode', node_id: 'retryNode', title: 'retryNode', execution_metadata: {}, status: 'succeeded' },
{ id: 'retryNode', node_id: 'retryNode', title: 'retryNode', execution_metadata: {}, status: 'retry' },
{ id: 'retryNode', node_id: 'retryNode', title: 'retryNode', execution_metadata: {}, status: 'retry' },
{ id: 'retryNode', node_id: 'retryNode', title: 'retryNode', execution_metadata: {}, status: 'retry' },
])
})
it('should parse iteration nodes correctly', () => {
const dsl = '(iteration, iterationNode, plainNode1 -> plainNode2)'
const result = parseDSL(dsl)
expect(result).toEqual([
{ id: 'iterationNode', node_id: 'iterationNode', title: 'iterationNode', node_type: 'iteration', execution_metadata: {}, status: 'succeeded' },
{ id: 'plainNode1', node_id: 'plainNode1', title: 'plainNode1', execution_metadata: { iteration_id: 'iterationNode', iteration_index: 0 }, status: 'succeeded' },
{ id: 'plainNode2', node_id: 'plainNode2', title: 'plainNode2', execution_metadata: { iteration_id: 'iterationNode', iteration_index: 0 }, status: 'succeeded' },
])
})
it('should parse parallel nodes correctly', () => {
const dsl = '(parallel, parallelNode, nodeA, nodeB -> nodeC)'
const result = parseDSL(dsl)
expect(result).toEqual([
{ id: 'parallelNode', node_id: 'parallelNode', title: 'parallelNode', execution_metadata: { parallel_id: 'parallelNode' }, status: 'succeeded' },
{ id: 'nodeA', node_id: 'nodeA', title: 'nodeA', execution_metadata: { parallel_id: 'parallelNode', parallel_start_node_id: 'nodeA' }, status: 'succeeded' },
{ id: 'nodeB', node_id: 'nodeB', title: 'nodeB', execution_metadata: { parallel_id: 'parallelNode', parallel_start_node_id: 'nodeB' }, status: 'succeeded' },
{ id: 'nodeC', node_id: 'nodeC', title: 'nodeC', execution_metadata: { parallel_id: 'parallelNode', parallel_start_node_id: 'nodeB' }, status: 'succeeded' },
])
})
// TODO
it('should handle nested parallel nodes', () => {
const dsl = '(parallel, outerParallel, (parallel, innerParallel, plainNode1 -> plainNode2) -> plainNode3)'
const result = parseDSL(dsl)
expect(result).toEqual([
{
nodeType: 'iteration',
nodeId: 'a',
params: [
[
{ nodeType: 'plain', nodeId: 'b', iterationId: 'a', iterationIndex: 0 },
{ nodeType: 'plain', nodeId: 'c', iterationId: 'a', iterationIndex: 0 },
],
],
id: 'outerParallel',
node_id: 'outerParallel',
title: 'outerParallel',
execution_metadata: { parallel_id: 'outerParallel' },
status: 'succeeded',
},
{
id: 'innerParallel',
node_id: 'innerParallel',
title: 'innerParallel',
execution_metadata: { parallel_id: 'outerParallel', parallel_start_node_id: 'innerParallel' },
status: 'succeeded',
},
{
id: 'plainNode1',
node_id: 'plainNode1',
title: 'plainNode1',
execution_metadata: {
parallel_id: 'innerParallel',
parallel_start_node_id: 'plainNode1',
parent_parallel_id: 'outerParallel',
parent_parallel_start_node_id: 'innerParallel',
},
status: 'succeeded',
},
{
id: 'plainNode2',
node_id: 'plainNode2',
title: 'plainNode2',
execution_metadata: {
parallel_id: 'innerParallel',
parallel_start_node_id: 'plainNode1',
parent_parallel_id: 'outerParallel',
parent_parallel_start_node_id: 'innerParallel',
},
status: 'succeeded',
},
{
id: 'plainNode3',
node_id: 'plainNode3',
title: 'plainNode3',
execution_metadata: {
parallel_id: 'outerParallel',
parallel_start_node_id: 'plainNode3',
},
status: 'succeeded',
},
])
})
test('parse parallel node with flow', () => {
const dsl = 'a -> (parallel, b, c -> d, e)'
// iterations not support nested iterations
// it('should handle nested iterations', () => {
// const dsl = '(iteration, outerIteration, (iteration, innerIteration -> plainNode1 -> plainNode2))'
// const result = parseDSL(dsl)
// expect(result).toEqual([
// { id: 'outerIteration', node_id: 'outerIteration', title: 'outerIteration', node_type: 'iteration', execution_metadata: {}, status: 'succeeded' },
// { id: 'innerIteration', node_id: 'innerIteration', title: 'innerIteration', node_type: 'iteration', execution_metadata: { iteration_id: 'outerIteration', iteration_index: 0 }, status: 'succeeded' },
// { id: 'plainNode1', node_id: 'plainNode1', title: 'plainNode1', execution_metadata: { iteration_id: 'innerIteration', iteration_index: 0 }, status: 'succeeded' },
// { id: 'plainNode2', node_id: 'plainNode2', title: 'plainNode2', execution_metadata: { iteration_id: 'innerIteration', iteration_index: 0 }, status: 'succeeded' },
// ])
// })
it('should handle nested iterations within parallel nodes', () => {
const dsl = '(parallel, parallelNode, (iteration, iterationNode, plainNode1, plainNode2))'
const result = parseDSL(dsl)
expect(result).toEqual([
{
nodeType: 'plain',
nodeId: 'a',
},
{
nodeType: 'parallel',
nodeId: 'b',
params: [
[
{ nodeType: 'plain', nodeId: 'c' },
{ nodeType: 'plain', nodeId: 'd' },
],
// single node don't need to be wrapped in an array
{ nodeType: 'plain', nodeId: 'e' },
],
},
{ id: 'parallelNode', node_id: 'parallelNode', title: 'parallelNode', execution_metadata: { parallel_id: 'parallelNode' }, status: 'succeeded' },
{ id: 'iterationNode', node_id: 'iterationNode', title: 'iterationNode', node_type: 'iteration', execution_metadata: { parallel_id: 'parallelNode', parallel_start_node_id: 'iterationNode' }, status: 'succeeded' },
{ id: 'plainNode1', node_id: 'plainNode1', title: 'plainNode1', execution_metadata: { iteration_id: 'iterationNode', iteration_index: 0, parallel_id: 'parallelNode', parallel_start_node_id: 'iterationNode' }, status: 'succeeded' },
{ id: 'plainNode2', node_id: 'plainNode2', title: 'plainNode2', execution_metadata: { iteration_id: 'iterationNode', iteration_index: 0, parallel_id: 'parallelNode', parallel_start_node_id: 'iterationNode' }, status: 'succeeded' },
])
})
test('parse retry', () => {
const dsl = '(retry, a, 3)'
const result = parseDSL(dsl)
expect(result).toEqual([
{
nodeType: 'retry',
nodeId: 'a',
params: [3],
},
])
})
test('parse nested complex nodes', () => {
const dsl = '(iteration, a, b -> (parallel, e, f -> g, h))'
const result = parseDSL(dsl)
expect(result).toEqual([
{
nodeType: 'iteration',
nodeId: 'a',
params: [
[
{ nodeType: 'plain', nodeId: 'b', iterationId: 'a', iterationIndex: 0 },
{
nodeType: 'parallel',
nodeId: 'e',
iterationId: 'a',
iterationIndex: 0,
params: [
[
{ nodeType: 'plain', nodeId: 'f', iterationId: 'a', iterationIndex: 0 },
{ nodeType: 'plain', nodeId: 'g', iterationId: 'a', iterationIndex: 0 },
],
// single node don't need to be wrapped in an array
{ nodeType: 'plain', nodeId: 'h', iterationId: 'a', iterationIndex: 0 },
],
},
],
],
},
])
it('should throw an error for unknown node types', () => {
const dsl = '(unknown, nodeId)'
expect(() => parseDSL(dsl)).toThrowError('Unknown nodeType: unknown')
})
})

View File

@ -8,8 +8,8 @@ type Node = NodePlain | NodeComplex
* @param dsl - The input DSL string.
* @returns An array of parsed nodes.
*/
function parseDSL(dsl: string): Node[] {
return parseTopLevelFlow(dsl).map(nodeStr => parseNode(nodeStr))
function parseDSL(dsl: string): NodeData[] {
return convertToNodeData(parseTopLevelFlow(dsl).map(nodeStr => parseNode(nodeStr)))
}
/**
@ -81,8 +81,8 @@ function parseNode(nodeStr: string, parentIterationId?: string): Node {
params,
}
if (parentIterationId) {
complexNode.iterationId = parentIterationId
complexNode.iterationIndex = 0 // Fixed as 0
(complexNode as any).iterationId = parentIterationId;
(complexNode as any).iterationIndex = 0 // Fixed as 0
}
return complexNode
}
@ -125,4 +125,180 @@ function parseParams(paramParts: string[], iterationId?: string): (Node | Node[]
})
}
type NodeData = {
id: string;
node_id: string;
title: string;
node_type?: string;
execution_metadata: Record<string, any>;
status: string;
}
/**
* Converts a plain node to node data.
*/
function convertPlainNode(node: Node): NodeData[] {
return [
{
id: node.nodeId,
node_id: node.nodeId,
title: node.nodeId,
execution_metadata: {},
status: 'succeeded',
},
]
}
/**
* Converts a retry node to node data.
*/
function convertRetryNode(node: Node): NodeData[] {
const { nodeId, iterationId, iterationIndex, params } = node as NodeComplex
const retryCount = params ? Number.parseInt(params[0] as unknown as string, 10) : 0
const result: NodeData[] = [
{
id: nodeId,
node_id: nodeId,
title: nodeId,
execution_metadata: {},
status: 'succeeded',
},
]
for (let i = 0; i < retryCount; i++) {
result.push({
id: nodeId,
node_id: nodeId,
title: nodeId,
execution_metadata: iterationId ? {
iteration_id: iterationId,
iteration_index: iterationIndex || 0,
} : {},
status: 'retry',
})
}
return result
}
/**
* Converts an iteration node to node data.
*/
function convertIterationNode(node: Node): NodeData[] {
const { nodeId, params } = node as NodeComplex
const result: NodeData[] = [
{
id: nodeId,
node_id: nodeId,
title: nodeId,
node_type: 'iteration',
status: 'succeeded',
execution_metadata: {},
},
]
params?.forEach((param: any) => {
if (Array.isArray(param)) {
param.forEach((childNode: Node) => {
const childData = convertToNodeData([childNode])
childData.forEach((data) => {
data.execution_metadata = {
...data.execution_metadata,
iteration_id: nodeId,
iteration_index: 0,
}
})
result.push(...childData)
})
}
})
return result
}
/**
* Converts a parallel node to node data.
*/
function convertParallelNode(node: Node, parentParallelId?: string, parentStartNodeId?: string): NodeData[] {
const { nodeId, params } = node as NodeComplex
const result: NodeData[] = [
{
id: nodeId,
node_id: nodeId,
title: nodeId,
execution_metadata: {
parallel_id: nodeId,
},
status: 'succeeded',
},
]
params?.forEach((param) => {
if (Array.isArray(param)) {
const startNodeId = param[0]?.nodeId
param.forEach((childNode: Node) => {
const childData = convertToNodeData([childNode])
childData.forEach((data) => {
data.execution_metadata = {
...data.execution_metadata,
parallel_id: nodeId,
parallel_start_node_id: startNodeId,
...(parentParallelId && {
parent_parallel_id: parentParallelId,
parent_parallel_start_node_id: parentStartNodeId,
}),
}
})
result.push(...childData)
})
}
else if (param && typeof param === 'object') {
const startNodeId = param.nodeId
const childData = convertToNodeData([param])
childData.forEach((data) => {
data.execution_metadata = {
...data.execution_metadata,
parallel_id: nodeId,
parallel_start_node_id: startNodeId,
...(parentParallelId && {
parent_parallel_id: parentParallelId,
parent_parallel_start_node_id: parentStartNodeId,
}),
}
})
result.push(...childData)
}
})
return result
}
/**
* Main function to convert nodes to node data.
*/
function convertToNodeData(nodes: Node[], parentParallelId?: string, parentStartNodeId?: string): NodeData[] {
const result: NodeData[] = []
nodes.forEach((node) => {
switch (node.nodeType) {
case 'plain':
result.push(...convertPlainNode(node))
break
case 'retry':
result.push(...convertRetryNode(node))
break
case 'iteration':
result.push(...convertIterationNode(node))
break
case 'parallel':
result.push(...convertParallelNode(node, parentParallelId, parentStartNodeId))
break
default:
throw new Error(`Unknown nodeType: ${node.nodeType}`)
}
})
return result
}
export { parseDSL }