From be74b760791ddeaa5d7f219406c904f9f1b7488c Mon Sep 17 00:00:00 2001 From: hjlarry Date: Thu, 7 Aug 2025 17:31:12 +0800 Subject: [PATCH] refactor websocket init --- .../(appDetailLayout)/[appId]/layout-main.tsx | 5 --- .../workflow-app/components/workflow-main.tsx | 2 +- web/app/components/workflow-app/index.tsx | 12 ----- .../core/collaboration-manager.ts | 44 ++++++++++++++----- .../collaboration/hooks/use-collaboration.ts | 3 ++ 5 files changed, 38 insertions(+), 28 deletions(-) diff --git a/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/layout-main.tsx b/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/layout-main.tsx index b27f842f12..7d5d4cb52d 100644 --- a/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/layout-main.tsx +++ b/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/layout-main.tsx @@ -26,7 +26,6 @@ import Loading from '@/app/components/base/loading' import useBreakpoints, { MediaType } from '@/hooks/use-breakpoints' import type { App } from '@/types/app' import useDocumentTitle from '@/hooks/use-document-title' -import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-manager' export type IAppDetailLayoutProps = { children: React.ReactNode @@ -115,9 +114,6 @@ const AppDetailLayout: FC = (props) => { setIsLoadingAppDetail(true) fetchAppDetail({ url: '/apps', id: appId }).then((res) => { setAppDetailRes(res) - // Only connect for workflow/advanced-chat apps and if not already connected - if ((res.mode === 'workflow' || res.mode === 'advanced-chat') && !webSocketClient.isConnected(appId)) - webSocketClient.connect(appId) }).catch((e: any) => { if (e.status === 404) router.replace('/apps') @@ -152,7 +148,6 @@ const AppDetailLayout: FC = (props) => { useUnmount(() => { setAppDetail() - webSocketClient.disconnect(appId) }) if (!appDetail) { diff --git a/web/app/components/workflow-app/components/workflow-main.tsx b/web/app/components/workflow-app/components/workflow-main.tsx index 15ef50a3a0..694636af91 100644 --- a/web/app/components/workflow-app/components/workflow-main.tsx +++ b/web/app/components/workflow-app/components/workflow-main.tsx @@ -107,7 +107,7 @@ const WorkflowMain = ({ const response = await fetchWorkflowDraft(`/apps/${appId}/workflows/draft`) handleWorkflowDataUpdate(response) } - catch (error) { + catch (error) { console.error('workflow vars and features update failed:', error) } }) diff --git a/web/app/components/workflow-app/index.tsx b/web/app/components/workflow-app/index.tsx index 3348f905a8..393858d73b 100644 --- a/web/app/components/workflow-app/index.tsx +++ b/web/app/components/workflow-app/index.tsx @@ -1,5 +1,4 @@ import { - useEffect, useMemo, } from 'react' import useSWR from 'swr' @@ -25,26 +24,15 @@ import { import { createWorkflowSlice } from './store/workflow/workflow-slice' import WorkflowAppMain from './components/workflow-main' import { collaborationManager } from '@/app/components/workflow/collaboration/core/collaboration-manager' -import { useStore } from '@/app/components/workflow/store' const WorkflowAppWithAdditionalContext = () => { const { data, isLoading, } = useWorkflowInit() - const appId = useStore(s => s.appId) const { data: fileUploadConfigResponse } = useSWR({ url: '/files/upload' }, fetchFileUploadConfig) - useEffect(() => { - if (appId && data) - collaborationManager.init(appId, null) - - return () => { - collaborationManager.destroy() - } - }, [appId, data]) - const nodesData = useMemo(() => { if (data) { const processedNodes = initialNodes(data.graph.nodes, data.graph.edges) diff --git a/web/app/components/workflow/collaboration/core/collaboration-manager.ts b/web/app/components/workflow/collaboration/core/collaboration-manager.ts index 09278ded8a..8e565bd700 100644 --- a/web/app/components/workflow/collaboration/core/collaboration-manager.ts +++ b/web/app/components/workflow/collaboration/core/collaboration-manager.ts @@ -14,8 +14,13 @@ export class CollaborationManager { private eventEmitter = new EventEmitter() private currentAppId: string | null = null private reactFlowStore: any = null + private cursors: Record = {} init = (appId: string, reactFlowStore: any): void => { + if (!reactFlowStore) { + console.warn('CollaborationManager.init called without reactFlowStore, deferring to connect()') + return + } this.connect(appId, reactFlowStore) } @@ -64,6 +69,7 @@ export class CollaborationManager { this.edgesMap = null this.currentAppId = null this.reactFlowStore = null + this.cursors = {} this.eventEmitter.removeAllListeners() } @@ -125,7 +131,7 @@ export class CollaborationManager { if (!oldNode) { this.nodesMap.set(newNode.id, newNode) } - else { + else { const oldPersistentData = this.getPersistentNodeData(oldNode) const newPersistentData = this.getPersistentNodeData(newNode) if (!isEqual(oldPersistentData, newPersistentData)) @@ -185,24 +191,42 @@ export class CollaborationManager { } private setupSocketEventListeners(socket: any): void { + console.log('Setting up socket event listeners for collaboration') + socket.on('collaboration_update', (update: any) => { if (update.type === 'mouseMove') { - this.eventEmitter.emit('cursors', { - [update.userId]: { - x: update.data.x, - y: update.data.y, - userId: update.userId, - timestamp: update.timestamp, - }, - }) + console.log('Processing mouseMove event:', update) + + // Update cursor state for this user + this.cursors[update.userId] = { + x: update.data.x, + y: update.data.y, + userId: update.userId, + timestamp: update.timestamp, + } + + // Emit the complete cursor state + console.log('Emitting complete cursor state:', this.cursors) + this.eventEmitter.emit('cursors', { ...this.cursors }) } - else if (update.type === 'varsAndFeaturesUpdate') { + else if (update.type === 'varsAndFeaturesUpdate') { + console.log('Processing varsAndFeaturesUpdate event:', update) this.eventEmitter.emit('varsAndFeaturesUpdate', update) } }) socket.on('online_users', (data: { users: OnlineUser[] }) => { + const onlineUserIds = new Set(data.users.map(user => user.user_id)) + + // Remove cursors for offline users + Object.keys(this.cursors).forEach((userId) => { + if (!onlineUserIds.has(userId)) + delete this.cursors[userId] + }) + + console.log('Updated online users and cleaned offline cursors:', data.users) this.eventEmitter.emit('onlineUsers', data.users) + this.eventEmitter.emit('cursors', { ...this.cursors }) }) socket.on('connect', () => { diff --git a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts index 5924afc0c5..bd570508a9 100644 --- a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts +++ b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts @@ -30,14 +30,17 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { initCollaboration() const unsubscribeStateChange = collaborationManager.onStateChange((newState: any) => { + console.log('Collaboration state change:', newState) setState((prev: any) => ({ ...prev, ...newState })) }) const unsubscribeCursors = collaborationManager.onCursorUpdate((cursors: any) => { + console.log('Cursor update received:', cursors) setState((prev: any) => ({ ...prev, cursors })) }) const unsubscribeUsers = collaborationManager.onOnlineUsersUpdate((users: any) => { + console.log('Online users update:', users) setState((prev: any) => ({ ...prev, onlineUsers: users })) })