diff --git a/web/app/components/workflow/collaboration/core/collaboration-manager.ts b/web/app/components/workflow/collaboration/core/collaboration-manager.ts index 004d595a1f..dcffab9043 100644 --- a/web/app/components/workflow/collaboration/core/collaboration-manager.ts +++ b/web/app/components/workflow/collaboration/core/collaboration-manager.ts @@ -4,7 +4,21 @@ import { webSocketClient } from './websocket-manager' import { CRDTProvider } from './crdt-provider' import { EventEmitter } from './event-emitter' import type { Edge, Node } from '../../types' -import type { CollaborationState, CursorPosition, OnlineUser } from '../types/collaboration' +import type { + CollaborationState, + CursorPosition, + NodePanelPresenceMap, + NodePanelPresenceUser, + OnlineUser, +} from '../types/collaboration' + +type NodePanelPresenceEventData = { + nodeId: string + action: 'open' | 'close' + user: NodePanelPresenceUser + clientId: string + timestamp?: number +} export class CollaborationManager { private doc: LoroDoc | null = null @@ -18,9 +32,75 @@ export class CollaborationManager { private isLeader = false private leaderId: string | null = null private cursors: Record = {} + private nodePanelPresence: NodePanelPresenceMap = {} private activeConnections = new Set() private isUndoRedoInProgress = false + private getNodePanelPresenceSnapshot(): NodePanelPresenceMap { + const snapshot: NodePanelPresenceMap = {} + Object.entries(this.nodePanelPresence).forEach(([nodeId, viewers]) => { + snapshot[nodeId] = { ...viewers } + }) + return snapshot + } + + private applyNodePanelPresenceUpdate(update: NodePanelPresenceEventData): void { + const { nodeId, action, clientId, user, timestamp } = update + + if (action === 'open') { + // ensure a client only appears on a single node at a time + Object.entries(this.nodePanelPresence).forEach(([id, viewers]) => { + if (viewers[clientId]) { + delete viewers[clientId] + if (Object.keys(viewers).length === 0) + delete this.nodePanelPresence[id] + } + }) + + if (!this.nodePanelPresence[nodeId]) + this.nodePanelPresence[nodeId] = {} + + this.nodePanelPresence[nodeId][clientId] = { + ...user, + clientId, + timestamp: timestamp || Date.now(), + } + } + else { + const viewers = this.nodePanelPresence[nodeId] + if (viewers) { + delete viewers[clientId] + if (Object.keys(viewers).length === 0) + delete this.nodePanelPresence[nodeId] + } + } + + this.eventEmitter.emit('nodePanelPresence', this.getNodePanelPresenceSnapshot()) + } + + private cleanupNodePanelPresence(activeClientIds: Set, activeUserIds: Set): void { + let hasChanges = false + + Object.entries(this.nodePanelPresence).forEach(([nodeId, viewers]) => { + Object.keys(viewers).forEach((clientId) => { + const viewer = viewers[clientId] + const clientActive = activeClientIds.has(clientId) + const userActive = viewer?.userId ? activeUserIds.has(viewer.userId) : false + + if (!clientActive && !userActive) { + delete viewers[clientId] + hasChanges = true + } + }) + + if (Object.keys(viewers).length === 0) + delete this.nodePanelPresence[nodeId] + }) + + if (hasChanges) + this.eventEmitter.emit('nodePanelPresence', this.getNodePanelPresenceSnapshot()) + } + init = (appId: string, reactFlowStore: any): void => { if (!reactFlowStore) { console.warn('CollaborationManager.init called without reactFlowStore, deferring to connect()') @@ -172,6 +252,7 @@ export class CollaborationManager { this.currentAppId = null this.reactFlowStore = null this.cursors = {} + this.nodePanelPresence = {} this.isUndoRedoInProgress = false // Only reset leader status when actually disconnecting @@ -240,6 +321,29 @@ export class CollaborationManager { } } + emitNodePanelPresence(nodeId: string, isOpen: boolean, user: NodePanelPresenceUser): void { + if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return + + const socket = webSocketClient.getSocket(this.currentAppId) + if (!socket || !nodeId || !user?.userId) return + + const payload: NodePanelPresenceEventData = { + nodeId, + action: isOpen ? 'open' : 'close', + user, + clientId: socket.id as string, + timestamp: Date.now(), + } + + socket.emit('collaboration_event', { + type: 'nodePanelPresence', + data: payload, + timestamp: payload.timestamp, + }) + + this.applyNodePanelPresenceUpdate(payload) + } + onSyncRequest(callback: () => void): () => void { return this.eventEmitter.on('syncRequest', callback) } @@ -272,6 +376,12 @@ export class CollaborationManager { return this.eventEmitter.on('mcpServerUpdate', callback) } + onNodePanelPresenceUpdate(callback: (presence: NodePanelPresenceMap) => void): () => void { + const off = this.eventEmitter.on('nodePanelPresence', callback) + callback(this.getNodePanelPresenceSnapshot()) + return off + } + onLeaderChange(callback: (isLeader: boolean) => void): () => void { return this.eventEmitter.on('leaderChange', callback) } @@ -636,6 +746,10 @@ export class CollaborationManager { console.log('Processing commentsUpdate event:', update) this.eventEmitter.emit('commentsUpdate', update.data) } + else if (update.type === 'nodePanelPresence') { + console.log('Processing nodePanelPresence event:', update) + this.applyNodePanelPresenceUpdate(update.data as NodePanelPresenceEventData) + } else if (update.type === 'syncRequest') { console.log('Received sync request from another user') // Only process if we are the leader @@ -654,6 +768,11 @@ export class CollaborationManager { } const onlineUserIds = new Set(data.users.map((user: OnlineUser) => user.user_id)) + const onlineClientIds = new Set( + data.users + .map((user: OnlineUser) => user.sid) + .filter((sid): sid is string => typeof sid === 'string' && sid.length > 0), + ) // Remove cursors for offline users Object.keys(this.cursors).forEach((userId) => { @@ -661,6 +780,8 @@ export class CollaborationManager { delete this.cursors[userId] }) + this.cleanupNodePanelPresence(onlineClientIds, onlineUserIds) + // Update leader information if (data.leader && typeof data.leader === 'string') this.leaderId = data.leader diff --git a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts index 42cff00ebb..31aad6e943 100644 --- a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts +++ b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts @@ -9,6 +9,7 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { isConnected: false, onlineUsers: [], cursors: {}, + nodePanelPresence: {}, isLeader: false, }) @@ -43,6 +44,10 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { setState((prev: any) => ({ ...prev, onlineUsers: users })) }) + const unsubscribeNodePanelPresence = collaborationManager.onNodePanelPresenceUpdate((presence) => { + setState((prev: any) => ({ ...prev, nodePanelPresence: presence })) + }) + const unsubscribeLeaderChange = collaborationManager.onLeaderChange((isLeader: boolean) => { console.log('Leader status changed:', isLeader) setState((prev: any) => ({ ...prev, isLeader })) @@ -52,6 +57,7 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { unsubscribeStateChange() unsubscribeCursors() unsubscribeUsers() + unsubscribeNodePanelPresence() unsubscribeLeaderChange() cursorServiceRef.current?.stopTracking() if (connectionId) @@ -75,6 +81,7 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { isConnected: state.isConnected || false, onlineUsers: state.onlineUsers || [], cursors: state.cursors || {}, + nodePanelPresence: state.nodePanelPresence || {}, isLeader: state.isLeader || false, leaderId: collaborationManager.getLeaderId(), startCursorTracking, diff --git a/web/app/components/workflow/collaboration/types/collaboration.ts b/web/app/components/workflow/collaboration/types/collaboration.ts index 7b69897b63..beb9ad2648 100644 --- a/web/app/components/workflow/collaboration/types/collaboration.ts +++ b/web/app/components/workflow/collaboration/types/collaboration.ts @@ -23,11 +23,25 @@ export type CursorPosition = { timestamp: number } +export type NodePanelPresenceUser = { + userId: string + username: string + avatar?: string | null +} + +export type NodePanelPresenceInfo = NodePanelPresenceUser & { + clientId: string + timestamp: number +} + +export type NodePanelPresenceMap = Record> + export type CollaborationState = { appId: string isConnected: boolean onlineUsers: OnlineUser[] cursors: Record + nodePanelPresence: NodePanelPresenceMap } export type GraphSyncData = { diff --git a/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx b/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx index 03b142ba43..06b0fd8ee6 100644 --- a/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx +++ b/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx @@ -75,6 +75,10 @@ import { DataSourceClassification } from '@/app/components/workflow/nodes/data-s import { useModalContext } from '@/context/modal-context' import DataSourceBeforeRunForm from '@/app/components/workflow/nodes/data-source/before-run-form' import useInspectVarsCrud from '@/app/components/workflow/hooks/use-inspect-vars-crud' +import { useCollaboration } from '@/app/components/workflow/collaboration/hooks/use-collaboration' +import { collaborationManager } from '@/app/components/workflow/collaboration/core/collaboration-manager' +import { useAppContext } from '@/context/app-context' +import { UserAvatarList } from '@/app/components/base/user-avatar-list' const getCustomRunForm = (params: CustomRunFormProps): React.JSX.Element => { const nodeType = params.payload.type @@ -97,11 +101,51 @@ const BasePanel: FC = ({ children, }) => { const { t } = useTranslation() + const appId = useStore(s => s.appId) + const { userProfile } = useAppContext() + const { isConnected, nodePanelPresence } = useCollaboration(appId as string) const { showMessageLogModal } = useAppStore(useShallow(state => ({ showMessageLogModal: state.showMessageLogModal, }))) const isSingleRunning = data._singleRunningStatus === NodeRunningStatus.Running + const currentUserPresence = useMemo(() => { + const userId = userProfile?.id || '' + const username = userProfile?.name || userProfile?.email || 'User' + const avatar = userProfile?.avatar_url || userProfile?.avatar || null + + return { + userId, + username, + avatar, + } + }, [userProfile?.avatar, userProfile?.avatar_url, userProfile?.email, userProfile?.id, userProfile?.name]) + + useEffect(() => { + if (!isConnected || !currentUserPresence.userId) + return + + collaborationManager.emitNodePanelPresence(id, true, currentUserPresence) + + return () => { + collaborationManager.emitNodePanelPresence(id, false, currentUserPresence) + } + }, [id, isConnected, currentUserPresence]) + + const viewingUsers = useMemo(() => { + const presence = nodePanelPresence?.[id] + if (!presence) + return [] + + return Object.values(presence) + .filter(viewer => viewer.userId && viewer.userId !== currentUserPresence.userId) + .map(viewer => ({ + id: viewer.clientId, + name: viewer.username, + avatar_url: viewer.avatar || null, + })) + }, [currentUserPresence.userId, id, nodePanelPresence]) + const showSingleRunPanel = useStore(s => s.showSingleRunPanel) const workflowCanvasWidth = useStore(s => s.workflowCanvasWidth) const nodePanelWidth = useStore(s => s.nodePanelWidth) @@ -393,6 +437,15 @@ const BasePanel: FC = ({ value={data.title || ''} onBlur={handleTitleBlur} /> + {viewingUsers.length > 0 && ( +
+ +
+ )}
{ isSupportSingleRun && !nodesReadOnly && (