mirror of
https://github.com/langgenius/dify.git
synced 2026-04-15 18:06:36 +08:00
chore: improve log
This commit is contained in:
parent
cdbc2d631b
commit
078ab83009
@ -102,8 +102,36 @@ type SetNodesAnomalyLogEntry = {
|
||||
}
|
||||
}
|
||||
|
||||
type GraphSyncDiagnosticStage
|
||||
= | 'nodes_subscribe'
|
||||
| 'edges_subscribe'
|
||||
| 'nodes_import_apply'
|
||||
| 'edges_import_apply'
|
||||
| 'schedule_graph_import_emit'
|
||||
| 'graph_import_emit'
|
||||
| 'start_import_log'
|
||||
| 'finalize_import_log'
|
||||
|
||||
type GraphSyncDiagnosticEvent = {
|
||||
timestamp: number
|
||||
appId: string | null
|
||||
stage: GraphSyncDiagnosticStage
|
||||
status: 'triggered' | 'skipped' | 'applied' | 'queued' | 'emitted' | 'snapshot'
|
||||
reason?: string
|
||||
details?: Record<string, unknown>
|
||||
meta: {
|
||||
leaderId: string | null
|
||||
isLeader: boolean
|
||||
isUndoRedoInProgress: boolean
|
||||
pendingInitialSync: boolean
|
||||
pendingGraphImportEmit: boolean
|
||||
isConnected: boolean
|
||||
}
|
||||
}
|
||||
|
||||
const GRAPH_IMPORT_LOG_LIMIT = 20
|
||||
const SET_NODES_ANOMALY_LOG_LIMIT = 100
|
||||
const GRAPH_SYNC_DIAGNOSTIC_LOG_LIMIT = 400
|
||||
|
||||
const toLoroValue = (value: unknown): Value => cloneDeep(value) as Value
|
||||
const toLoroRecord = (value: unknown): Record<string, Value> => cloneDeep(value) as Record<string, Value>
|
||||
@ -129,6 +157,7 @@ export class CollaborationManager {
|
||||
private graphViewActive: boolean | null = null
|
||||
private graphImportLogs: GraphImportLogEntry[] = []
|
||||
private setNodesAnomalyLogs: SetNodesAnomalyLogEntry[] = []
|
||||
private graphSyncDiagnostics: GraphSyncDiagnosticEvent[] = []
|
||||
private pendingImportLog: {
|
||||
timestamp: number
|
||||
sources: Set<'nodes' | 'edges'>
|
||||
@ -947,98 +976,189 @@ export class CollaborationManager {
|
||||
private setupSubscriptions(): void {
|
||||
this.nodesMap?.subscribe((event: LoroSubscribeEvent) => {
|
||||
const reactFlowStore = this.reactFlowStore
|
||||
if (event.by === 'import' && reactFlowStore) {
|
||||
// Don't update React nodes during undo/redo to prevent loops
|
||||
if (this.isUndoRedoInProgress)
|
||||
return
|
||||
const eventBy = event.by ?? 'unknown'
|
||||
this.recordGraphSyncDiagnostic(
|
||||
'nodes_subscribe',
|
||||
'triggered',
|
||||
undefined,
|
||||
{
|
||||
eventBy,
|
||||
hasReactFlowStore: Boolean(reactFlowStore),
|
||||
},
|
||||
)
|
||||
|
||||
requestAnimationFrame(() => {
|
||||
const state = reactFlowStore.getState()
|
||||
const previousNodes: Node[] = state.getNodes()
|
||||
this.startImportLog('nodes', { nodes: previousNodes, edges: state.getEdges() })
|
||||
const previousNodeMap = new Map(previousNodes.map(node => [node.id, node]))
|
||||
const selectedIds = new Set(
|
||||
previousNodes
|
||||
.filter(node => node.data?.selected)
|
||||
.map(node => node.id),
|
||||
)
|
||||
|
||||
this.pendingInitialSync = false
|
||||
|
||||
const updatedNodes = Array
|
||||
.from(this.nodesMap?.keys() || [])
|
||||
.map((nodeId) => {
|
||||
const node = this.exportNode(nodeId as string)
|
||||
const clonedNode: Node = {
|
||||
...node,
|
||||
data: {
|
||||
...(node.data || {}),
|
||||
},
|
||||
}
|
||||
const clonedNodeData = clonedNode.data as (CommonNodeType & Record<string, unknown>)
|
||||
// Keep the previous node's private data properties (starting with _)
|
||||
const previousNode = previousNodeMap.get(clonedNode.id)
|
||||
if (previousNode?.data) {
|
||||
const previousData = previousNode.data as Record<string, unknown>
|
||||
Object.entries(previousData)
|
||||
.filter(([key]) => key.startsWith('_'))
|
||||
.forEach(([key, value]) => {
|
||||
if (!(key in clonedNodeData))
|
||||
clonedNodeData[key] = value
|
||||
})
|
||||
}
|
||||
|
||||
if (selectedIds.has(clonedNode.id))
|
||||
clonedNode.data.selected = true
|
||||
|
||||
return clonedNode
|
||||
})
|
||||
|
||||
// Call ReactFlow's native setter directly to avoid triggering collaboration
|
||||
this.captureSetNodesAnomaly(previousNodes, updatedNodes, 'reactflow-native:import-nodes-map-subscribe')
|
||||
state.setNodes(updatedNodes)
|
||||
|
||||
this.scheduleGraphImportEmit()
|
||||
})
|
||||
if (eventBy !== 'import') {
|
||||
this.recordGraphSyncDiagnostic('nodes_subscribe', 'skipped', 'event_by_not_import', { eventBy })
|
||||
return
|
||||
}
|
||||
|
||||
if (!reactFlowStore) {
|
||||
this.recordGraphSyncDiagnostic('nodes_subscribe', 'skipped', 'reactflow_store_missing')
|
||||
return
|
||||
}
|
||||
|
||||
// Don't update React nodes during undo/redo to prevent loops
|
||||
if (this.isUndoRedoInProgress) {
|
||||
this.recordGraphSyncDiagnostic('nodes_subscribe', 'skipped', 'undo_redo_in_progress')
|
||||
return
|
||||
}
|
||||
|
||||
this.recordGraphSyncDiagnostic('nodes_subscribe', 'queued', 'raf_scheduled')
|
||||
requestAnimationFrame(() => {
|
||||
const state = reactFlowStore.getState()
|
||||
const previousNodes: Node[] = state.getNodes()
|
||||
const previousEdges: Edge[] = state.getEdges()
|
||||
this.startImportLog('nodes', { nodes: previousNodes, edges: previousEdges })
|
||||
const previousNodeMap = new Map(previousNodes.map(node => [node.id, node]))
|
||||
const selectedIds = new Set(
|
||||
previousNodes
|
||||
.filter(node => node.data?.selected)
|
||||
.map(node => node.id),
|
||||
)
|
||||
|
||||
this.pendingInitialSync = false
|
||||
|
||||
const updatedNodes = Array
|
||||
.from(this.nodesMap?.keys() || [])
|
||||
.map((nodeId) => {
|
||||
const node = this.exportNode(nodeId as string)
|
||||
const clonedNode: Node = {
|
||||
...node,
|
||||
data: {
|
||||
...(node.data || {}),
|
||||
},
|
||||
}
|
||||
const clonedNodeData = clonedNode.data as (CommonNodeType & Record<string, unknown>)
|
||||
// Keep the previous node's private data properties (starting with _)
|
||||
const previousNode = previousNodeMap.get(clonedNode.id)
|
||||
if (previousNode?.data) {
|
||||
const previousData = previousNode.data as Record<string, unknown>
|
||||
Object.entries(previousData)
|
||||
.filter(([key]) => key.startsWith('_'))
|
||||
.forEach(([key, value]) => {
|
||||
if (!(key in clonedNodeData))
|
||||
clonedNodeData[key] = value
|
||||
})
|
||||
}
|
||||
|
||||
if (selectedIds.has(clonedNode.id))
|
||||
clonedNode.data.selected = true
|
||||
|
||||
return clonedNode
|
||||
})
|
||||
|
||||
// Call ReactFlow's native setter directly to avoid triggering collaboration
|
||||
this.captureSetNodesAnomaly(previousNodes, updatedNodes, 'reactflow-native:import-nodes-map-subscribe')
|
||||
state.setNodes(updatedNodes)
|
||||
this.recordGraphSyncDiagnostic(
|
||||
'nodes_import_apply',
|
||||
'applied',
|
||||
undefined,
|
||||
{
|
||||
eventBy,
|
||||
previousNodeCount: previousNodes.length,
|
||||
updatedNodeCount: updatedNodes.length,
|
||||
previousEdgeCount: previousEdges.length,
|
||||
selectedCount: selectedIds.size,
|
||||
},
|
||||
)
|
||||
|
||||
this.scheduleGraphImportEmit()
|
||||
})
|
||||
})
|
||||
|
||||
this.edgesMap?.subscribe((event: LoroSubscribeEvent) => {
|
||||
const reactFlowStore = this.reactFlowStore
|
||||
if (event.by === 'import' && reactFlowStore) {
|
||||
// Don't update React edges during undo/redo to prevent loops
|
||||
if (this.isUndoRedoInProgress)
|
||||
return
|
||||
const eventBy = event.by ?? 'unknown'
|
||||
this.recordGraphSyncDiagnostic(
|
||||
'edges_subscribe',
|
||||
'triggered',
|
||||
undefined,
|
||||
{
|
||||
eventBy,
|
||||
hasReactFlowStore: Boolean(reactFlowStore),
|
||||
},
|
||||
)
|
||||
|
||||
requestAnimationFrame(() => {
|
||||
// Get ReactFlow's native setters, not the collaborative ones
|
||||
const state = reactFlowStore.getState()
|
||||
this.startImportLog('edges', { nodes: state.getNodes(), edges: state.getEdges() })
|
||||
const updatedEdges = Array.from(this.edgesMap?.values() || []) as Edge[]
|
||||
|
||||
this.pendingInitialSync = false
|
||||
|
||||
// Call ReactFlow's native setter directly to avoid triggering collaboration
|
||||
state.setEdges(updatedEdges)
|
||||
|
||||
this.scheduleGraphImportEmit()
|
||||
})
|
||||
if (eventBy !== 'import') {
|
||||
this.recordGraphSyncDiagnostic('edges_subscribe', 'skipped', 'event_by_not_import', { eventBy })
|
||||
return
|
||||
}
|
||||
|
||||
if (!reactFlowStore) {
|
||||
this.recordGraphSyncDiagnostic('edges_subscribe', 'skipped', 'reactflow_store_missing')
|
||||
return
|
||||
}
|
||||
|
||||
// Don't update React edges during undo/redo to prevent loops
|
||||
if (this.isUndoRedoInProgress) {
|
||||
this.recordGraphSyncDiagnostic('edges_subscribe', 'skipped', 'undo_redo_in_progress')
|
||||
return
|
||||
}
|
||||
|
||||
this.recordGraphSyncDiagnostic('edges_subscribe', 'queued', 'raf_scheduled')
|
||||
requestAnimationFrame(() => {
|
||||
// Get ReactFlow's native setters, not the collaborative ones
|
||||
const state = reactFlowStore.getState()
|
||||
const previousNodes = state.getNodes()
|
||||
const previousEdges = state.getEdges()
|
||||
this.startImportLog('edges', { nodes: previousNodes, edges: previousEdges })
|
||||
const updatedEdges = Array.from(this.edgesMap?.values() || []) as Edge[]
|
||||
|
||||
this.pendingInitialSync = false
|
||||
|
||||
// Call ReactFlow's native setter directly to avoid triggering collaboration
|
||||
state.setEdges(updatedEdges)
|
||||
this.recordGraphSyncDiagnostic(
|
||||
'edges_import_apply',
|
||||
'applied',
|
||||
undefined,
|
||||
{
|
||||
eventBy,
|
||||
previousNodeCount: previousNodes.length,
|
||||
previousEdgeCount: previousEdges.length,
|
||||
updatedEdgeCount: updatedEdges.length,
|
||||
},
|
||||
)
|
||||
|
||||
this.scheduleGraphImportEmit()
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
private scheduleGraphImportEmit(): void {
|
||||
if (this.pendingGraphImportEmit)
|
||||
if (this.pendingGraphImportEmit) {
|
||||
this.recordGraphSyncDiagnostic(
|
||||
'schedule_graph_import_emit',
|
||||
'skipped',
|
||||
'already_pending',
|
||||
)
|
||||
return
|
||||
}
|
||||
|
||||
this.recordGraphSyncDiagnostic('schedule_graph_import_emit', 'queued')
|
||||
this.pendingGraphImportEmit = true
|
||||
requestAnimationFrame(() => {
|
||||
const beforeFinalizeNodes = this.getNodes().length
|
||||
const beforeFinalizeEdges = this.getEdges().length
|
||||
this.pendingGraphImportEmit = false
|
||||
this.finalizeImportLog()
|
||||
const mergedNodes = this.mergeLocalNodeState(this.getNodes())
|
||||
const mergedEdges = this.getEdges()
|
||||
this.recordGraphSyncDiagnostic(
|
||||
'graph_import_emit',
|
||||
'emitted',
|
||||
undefined,
|
||||
{
|
||||
mergedNodeCount: mergedNodes.length,
|
||||
mergedEdgeCount: mergedEdges.length,
|
||||
crdtNodeCountBeforeFinalize: beforeFinalizeNodes,
|
||||
crdtEdgeCountBeforeFinalize: beforeFinalizeEdges,
|
||||
},
|
||||
)
|
||||
this.eventEmitter.emit('graphImport', {
|
||||
nodes: mergedNodes,
|
||||
edges: this.getEdges(),
|
||||
edges: mergedEdges,
|
||||
})
|
||||
})
|
||||
}
|
||||
@ -1092,6 +1212,7 @@ export class CollaborationManager {
|
||||
clearGraphImportLog(): void {
|
||||
this.graphImportLogs = []
|
||||
this.setNodesAnomalyLogs = []
|
||||
this.graphSyncDiagnostics = []
|
||||
this.pendingImportLog = null
|
||||
}
|
||||
|
||||
@ -1102,9 +1223,11 @@ export class CollaborationManager {
|
||||
generatedAt: new Date().toISOString(),
|
||||
entries: this.graphImportLogs,
|
||||
setNodesAnomalies: this.setNodesAnomalyLogs,
|
||||
syncDiagnostics: this.graphSyncDiagnostics,
|
||||
summary: {
|
||||
logCount: this.graphImportLogs.length,
|
||||
setNodesAnomalyCount: this.setNodesAnomalyLogs.length,
|
||||
syncDiagnosticCount: this.graphSyncDiagnostics.length,
|
||||
leaderId: this.leaderId,
|
||||
isLeader: this.isLeader,
|
||||
graphViewActive: this.graphViewActive,
|
||||
@ -1135,6 +1258,34 @@ export class CollaborationManager {
|
||||
URL.revokeObjectURL(url)
|
||||
}
|
||||
|
||||
private recordGraphSyncDiagnostic(
|
||||
stage: GraphSyncDiagnosticStage,
|
||||
status: GraphSyncDiagnosticEvent['status'],
|
||||
reason?: string,
|
||||
details?: Record<string, unknown>,
|
||||
): void {
|
||||
const entry: GraphSyncDiagnosticEvent = {
|
||||
timestamp: Date.now(),
|
||||
appId: this.currentAppId,
|
||||
stage,
|
||||
status,
|
||||
reason,
|
||||
details,
|
||||
meta: {
|
||||
leaderId: this.leaderId,
|
||||
isLeader: this.isLeader,
|
||||
isUndoRedoInProgress: this.isUndoRedoInProgress,
|
||||
pendingInitialSync: this.pendingInitialSync,
|
||||
pendingGraphImportEmit: this.pendingGraphImportEmit,
|
||||
isConnected: this.isConnected(),
|
||||
},
|
||||
}
|
||||
|
||||
this.graphSyncDiagnostics.push(entry)
|
||||
if (this.graphSyncDiagnostics.length > GRAPH_SYNC_DIAGNOSTIC_LOG_LIMIT)
|
||||
this.graphSyncDiagnostics.splice(0, this.graphSyncDiagnostics.length - GRAPH_SYNC_DIAGNOSTIC_LOG_LIMIT)
|
||||
}
|
||||
|
||||
private captureSetNodesAnomaly(oldNodes: Node[], newNodes: Node[], source: string): void {
|
||||
const oldNodeIds = oldNodes.map(node => node.id)
|
||||
const newNodeIds = newNodes.map(node => node.id)
|
||||
@ -1209,14 +1360,35 @@ export class CollaborationManager {
|
||||
edges: cloneDeep(snapshot.edges),
|
||||
},
|
||||
}
|
||||
this.recordGraphSyncDiagnostic(
|
||||
'start_import_log',
|
||||
'snapshot',
|
||||
'created',
|
||||
{
|
||||
source,
|
||||
beforeNodes: snapshot.nodes.length,
|
||||
beforeEdges: snapshot.edges.length,
|
||||
},
|
||||
)
|
||||
return
|
||||
}
|
||||
this.pendingImportLog.sources.add(source)
|
||||
this.recordGraphSyncDiagnostic(
|
||||
'start_import_log',
|
||||
'snapshot',
|
||||
'merged_source',
|
||||
{
|
||||
source,
|
||||
sourceCount: this.pendingImportLog.sources.size,
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
private finalizeImportLog(): void {
|
||||
if (!this.pendingImportLog)
|
||||
if (!this.pendingImportLog) {
|
||||
this.recordGraphSyncDiagnostic('finalize_import_log', 'skipped', 'no_pending_import')
|
||||
return
|
||||
}
|
||||
|
||||
const afterSnapshot = this.snapshotReactFlowGraph()
|
||||
const entry: GraphImportLogEntry = {
|
||||
@ -1240,6 +1412,18 @@ export class CollaborationManager {
|
||||
}
|
||||
|
||||
this.graphImportLogs.push(entry)
|
||||
this.recordGraphSyncDiagnostic(
|
||||
'finalize_import_log',
|
||||
'snapshot',
|
||||
undefined,
|
||||
{
|
||||
sources: entry.sources,
|
||||
beforeNodes: entry.before.nodes.length,
|
||||
beforeEdges: entry.before.edges.length,
|
||||
afterNodes: entry.after.nodes.length,
|
||||
afterEdges: entry.after.edges.length,
|
||||
},
|
||||
)
|
||||
if (this.graphImportLogs.length > GRAPH_IMPORT_LOG_LIMIT)
|
||||
this.graphImportLogs.splice(0, this.graphImportLogs.length - GRAPH_IMPORT_LOG_LIMIT)
|
||||
this.pendingImportLog = null
|
||||
|
||||
Loading…
Reference in New Issue
Block a user