diff --git a/api/controllers/console/socketio/workflow.py b/api/controllers/console/socketio/workflow.py index 0bcf940018..ebc990b64e 100644 --- a/api/controllers/console/socketio/workflow.py +++ b/api/controllers/console/socketio/workflow.py @@ -96,6 +96,9 @@ def handle_collaboration_event(sid, data): 6. workflow_update 7. comments_update 8. node_panel_presence + 9. skill_file_active + 10. skill_sync_request + 11. skill_resync_request """ return collaboration_service.relay_collaboration_event(sid, data) @@ -106,3 +109,11 @@ def handle_graph_event(sid, data): Handle graph events - simple broadcast relay. """ return collaboration_service.relay_graph_event(sid, data) + + +@_sio_on("skill_event") +def handle_skill_event(sid, data): + """ + Handle skill events - simple broadcast relay. + """ + return collaboration_service.relay_skill_event(sid, data) diff --git a/api/repositories/workflow_collaboration_repository.py b/api/repositories/workflow_collaboration_repository.py index c6af15b434..75a483e156 100644 --- a/api/repositories/workflow_collaboration_repository.py +++ b/api/repositories/workflow_collaboration_repository.py @@ -8,6 +8,7 @@ from extensions.ext_redis import redis_client SESSION_STATE_TTL_SECONDS = 3600 WORKFLOW_ONLINE_USERS_PREFIX = "workflow_online_users:" WORKFLOW_LEADER_PREFIX = "workflow_leader:" +WORKFLOW_SKILL_LEADER_PREFIX = "workflow_skill_leader:" WS_SID_MAP_PREFIX = "ws_sid_map:" @@ -18,6 +19,7 @@ class WorkflowSessionInfo(TypedDict): sid: str connected_at: int graph_active: bool + active_skill_file_id: str | None class SidMapping(TypedDict): @@ -40,6 +42,10 @@ class WorkflowCollaborationRepository: def leader_key(workflow_id: str) -> str: return f"{WORKFLOW_LEADER_PREFIX}{workflow_id}" + @staticmethod + def skill_leader_key(workflow_id: str, file_id: str) -> str: + return f"{WORKFLOW_SKILL_LEADER_PREFIX}{workflow_id}:{file_id}" + @staticmethod def sid_key(sid: str) -> str: return f"{WS_SID_MAP_PREFIX}{sid}" @@ -92,6 +98,7 @@ class WorkflowCollaborationRepository: "sid": str(session_info["sid"]), "connected_at": int(session_info.get("connected_at") or 0), "graph_active": bool(session_info.get("graph_active")), + "active_skill_file_id": session_info.get("active_skill_file_id"), } def set_graph_active(self, workflow_id: str, sid: str, active: bool) -> None: @@ -108,6 +115,20 @@ class WorkflowCollaborationRepository: return False return bool(session_info.get("graph_active") or False) + def set_active_skill_file(self, workflow_id: str, sid: str, file_id: str | None) -> None: + session_info = self.get_session_info(workflow_id, sid) + if not session_info: + return + session_info["active_skill_file_id"] = file_id + self._redis.hset(self.workflow_key(workflow_id), sid, json.dumps(session_info)) + self.refresh_session_state(workflow_id, sid) + + def get_active_skill_file_id(self, workflow_id: str, sid: str) -> str | None: + session_info = self.get_session_info(workflow_id, sid) + if not session_info: + return None + return session_info.get("active_skill_file_id") + def get_sid_mapping(self, sid: str) -> SidMapping | None: raw = self._redis.get(self.sid_key(sid)) if not raw: @@ -165,6 +186,7 @@ class WorkflowCollaborationRepository: "sid": str(session_info["sid"]), "connected_at": int(session_info.get("connected_at") or 0), "graph_active": bool(session_info.get("graph_active")), + "active_skill_file_id": session_info.get("active_skill_file_id"), } ) @@ -174,14 +196,31 @@ class WorkflowCollaborationRepository: raw = self._redis.get(self.leader_key(workflow_id)) return self._decode(raw) + def get_skill_leader(self, workflow_id: str, file_id: str) -> str | None: + raw = self._redis.get(self.skill_leader_key(workflow_id, file_id)) + return self._decode(raw) + def set_leader_if_absent(self, workflow_id: str, sid: str) -> bool: return bool(self._redis.set(self.leader_key(workflow_id), sid, nx=True, ex=SESSION_STATE_TTL_SECONDS)) def set_leader(self, workflow_id: str, sid: str) -> None: self._redis.set(self.leader_key(workflow_id), sid, ex=SESSION_STATE_TTL_SECONDS) + def set_skill_leader(self, workflow_id: str, file_id: str, sid: str) -> None: + self._redis.set(self.skill_leader_key(workflow_id, file_id), sid, ex=SESSION_STATE_TTL_SECONDS) + def delete_leader(self, workflow_id: str) -> None: self._redis.delete(self.leader_key(workflow_id)) + def delete_skill_leader(self, workflow_id: str, file_id: str) -> None: + self._redis.delete(self.skill_leader_key(workflow_id, file_id)) + def expire_leader(self, workflow_id: str) -> None: self._redis.expire(self.leader_key(workflow_id), SESSION_STATE_TTL_SECONDS) + + def expire_skill_leader(self, workflow_id: str, file_id: str) -> None: + self._redis.expire(self.skill_leader_key(workflow_id, file_id), SESSION_STATE_TTL_SECONDS) + + def get_active_skill_session_sids(self, workflow_id: str, file_id: str) -> list[str]: + sessions = self.list_sessions(workflow_id) + return [session["sid"] for session in sessions if session.get("active_skill_file_id") == file_id] diff --git a/api/services/workflow_collaboration_service.py b/api/services/workflow_collaboration_service.py index d72299592b..501a4d0966 100644 --- a/api/services/workflow_collaboration_service.py +++ b/api/services/workflow_collaboration_service.py @@ -39,6 +39,7 @@ class WorkflowCollaborationService: "sid": sid, "connected_at": int(time.time()), "graph_active": True, + "active_skill_file_id": None, } self._repository.set_session_info(workflow_id, session_info) @@ -59,9 +60,12 @@ class WorkflowCollaborationService: return workflow_id = mapping["workflow_id"] + active_skill_file_id = self._repository.get_active_skill_file_id(workflow_id, sid) self._repository.delete_session(workflow_id, sid) self.handle_leader_disconnect(workflow_id, sid) + if active_skill_file_id: + self.handle_skill_leader_disconnect(workflow_id, active_skill_file_id, sid) self.broadcast_online_users(workflow_id) def relay_collaboration_event(self, sid: str, data: Mapping[str, object]) -> tuple[dict[str, str], int]: @@ -89,6 +93,33 @@ class WorkflowCollaborationService: self.broadcast_online_users(workflow_id) return {"msg": "graph_view_active_updated"}, 200 + if event_type == "skill_file_active": + file_id = None + is_active = False + if isinstance(event_data, dict): + file_id = event_data.get("file_id") + is_active = bool(event_data.get("active") or False) + + if not file_id or not isinstance(file_id, str): + return {"msg": "invalid skill_file_active payload"}, 400 + + previous_file_id = self._repository.get_active_skill_file_id(workflow_id, sid) + next_file_id = file_id if is_active else None + + if previous_file_id == next_file_id: + self.refresh_session_state(workflow_id, sid) + return {"msg": "skill_file_active_unchanged"}, 200 + + self._repository.set_active_skill_file(workflow_id, sid, next_file_id) + self.refresh_session_state(workflow_id, sid) + + if previous_file_id: + self._ensure_skill_leader(workflow_id, previous_file_id) + if next_file_id: + self._ensure_skill_leader(workflow_id, next_file_id, preferred_sid=sid) + + return {"msg": "skill_file_active_updated"}, 200 + self._socketio.emit( "collaboration_update", {"type": event_type, "userId": user_id, "data": event_data, "timestamp": timestamp}, @@ -110,6 +141,18 @@ class WorkflowCollaborationService: return {"msg": "graph_update_broadcasted"}, 200 + def relay_skill_event(self, sid: str, data: object) -> tuple[dict[str, str], int]: + mapping = self._repository.get_sid_mapping(sid) + if not mapping: + return {"msg": "unauthorized"}, 401 + + workflow_id = mapping["workflow_id"] + self.refresh_session_state(workflow_id, sid) + + self._socketio.emit("skill_update", data, room=workflow_id, skip_sid=sid) + + return {"msg": "skill_update_broadcasted"}, 200 + def get_or_set_leader(self, workflow_id: str, sid: str) -> str | None: current_leader = self._repository.get_current_leader(workflow_id) @@ -154,6 +197,22 @@ class WorkflowCollaborationService: self._repository.delete_leader(workflow_id) self.broadcast_leader_change(workflow_id, None) + def handle_skill_leader_disconnect(self, workflow_id: str, file_id: str, disconnected_sid: str) -> None: + current_leader = self._repository.get_skill_leader(workflow_id, file_id) + if not current_leader: + return + + if current_leader != disconnected_sid: + return + + new_leader_sid = self._select_skill_leader(workflow_id, file_id) + if new_leader_sid: + self._repository.set_skill_leader(workflow_id, file_id, new_leader_sid) + self.broadcast_skill_leader_change(workflow_id, file_id, new_leader_sid) + else: + self._repository.delete_skill_leader(workflow_id, file_id) + self.broadcast_skill_leader_change(workflow_id, file_id, None) + def broadcast_leader_change(self, workflow_id: str, new_leader_sid: str | None) -> None: for sid in self._repository.get_session_sids(workflow_id): try: @@ -162,6 +221,14 @@ class WorkflowCollaborationService: except Exception: logging.exception("Failed to emit leader status to session %s", sid) + def broadcast_skill_leader_change(self, workflow_id: str, file_id: str, new_leader_sid: str | None) -> None: + for sid in self._repository.get_session_sids(workflow_id): + try: + is_leader = new_leader_sid is not None and sid == new_leader_sid + self._socketio.emit("skill_status", {"file_id": file_id, "isLeader": is_leader}, room=sid) + except Exception: + logging.exception("Failed to emit skill leader status to session %s", sid) + def get_current_leader(self, workflow_id: str) -> str | None: return self._repository.get_current_leader(workflow_id) @@ -180,6 +247,9 @@ class WorkflowCollaborationService: def refresh_session_state(self, workflow_id: str, sid: str) -> None: self._repository.refresh_session_state(workflow_id, sid) self._ensure_leader(workflow_id, sid) + active_skill_file_id = self._repository.get_active_skill_file_id(workflow_id, sid) + if active_skill_file_id: + self._ensure_skill_leader(workflow_id, active_skill_file_id, preferred_sid=sid) def _ensure_leader(self, workflow_id: str, sid: str) -> None: current_leader = self._repository.get_current_leader(workflow_id) @@ -200,6 +270,25 @@ class WorkflowCollaborationService: self._repository.set_leader(workflow_id, new_leader_sid) self.broadcast_leader_change(workflow_id, new_leader_sid) + def _ensure_skill_leader(self, workflow_id: str, file_id: str, preferred_sid: str | None = None) -> None: + current_leader = self._repository.get_skill_leader(workflow_id, file_id) + active_sids = self._repository.get_active_skill_session_sids(workflow_id, file_id) + if current_leader and self.is_session_active(workflow_id, current_leader): + if current_leader in active_sids or not active_sids: + self._repository.expire_skill_leader(workflow_id, file_id) + return + + if current_leader: + self._repository.delete_skill_leader(workflow_id, file_id) + + new_leader_sid = self._select_skill_leader(workflow_id, file_id, preferred_sid=preferred_sid) + if not new_leader_sid: + self.broadcast_skill_leader_change(workflow_id, file_id, None) + return + + self._repository.set_skill_leader(workflow_id, file_id, new_leader_sid) + self.broadcast_skill_leader_change(workflow_id, file_id, new_leader_sid) + def _select_graph_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None: session_sids = [ session["sid"] @@ -212,6 +301,20 @@ class WorkflowCollaborationService: return preferred_sid return session_sids[0] + def _select_skill_leader( + self, workflow_id: str, file_id: str, preferred_sid: str | None = None + ) -> str | None: + session_sids = [ + sid + for sid in self._repository.get_active_skill_session_sids(workflow_id, file_id) + if self.is_session_active(workflow_id, sid) + ] + if not session_sids: + return None + if preferred_sid and preferred_sid in session_sids: + return preferred_sid + return session_sids[0] + def is_session_active(self, workflow_id: str, sid: str) -> bool: if not sid: return False diff --git a/web/app/components/workflow/collaboration/skills/skill-collaboration-manager.ts b/web/app/components/workflow/collaboration/skills/skill-collaboration-manager.ts new file mode 100644 index 0000000000..2dc2ae9b44 --- /dev/null +++ b/web/app/components/workflow/collaboration/skills/skill-collaboration-manager.ts @@ -0,0 +1,298 @@ +import type { Socket } from 'socket.io-client' +import type { CollaborationUpdate } from '@/app/components/workflow/collaboration/types/collaboration' +import { LoroDoc } from 'loro-crdt' +import { emitWithAuthGuard, webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-manager' + +type SkillUpdatePayload = { + file_id: string + update: Uint8Array + is_snapshot?: boolean +} + +type SkillStatusPayload = { + file_id: string + isLeader: boolean +} + +type SkillDocEntry = { + doc: LoroDoc + text: ReturnType + subscribers: Set<(text: string, source: 'remote') => void> + suppressBroadcast: boolean +} + +class SkillCollaborationManager { + private appId: string | null = null + private socket: Socket | null = null + private docs = new Map() + private leaderByFile = new Map() + private syncHandlers = new Map void>>() + private activeFileId: string | null = null + private pendingResync = new Set() + + private handleSkillUpdate = (payload: SkillUpdatePayload) => { + if (!payload || !payload.file_id || !payload.update) + return + + const entry = this.docs.get(payload.file_id) + if (!entry) + return + + try { + entry.doc.import(new Uint8Array(payload.update)) + } + catch (error) { + console.error('Failed to import skill update:', error) + } + } + + private handleSkillStatus = (payload: SkillStatusPayload) => { + if (!payload || !payload.file_id) + return + + this.leaderByFile.set(payload.file_id, !!payload.isLeader) + } + + private handleCollaborationUpdate = (update: CollaborationUpdate) => { + if (!update || !update.type) + return + + if (update.type === 'skill_resync_request') { + const fileId = (update.data as { file_id?: string } | undefined)?.file_id + if (!fileId || !this.isLeader(fileId)) + return + this.emitSnapshot(fileId) + return + } + + if (update.type === 'skill_sync_request') { + const fileId = (update.data as { file_id?: string } | undefined)?.file_id + if (!fileId || !this.isLeader(fileId)) + return + const handlers = this.syncHandlers.get(fileId) + handlers?.forEach(handler => handler()) + } + } + + private handleConnect = () => { + if (this.activeFileId) + this.emitSkillFileActive(this.activeFileId, true) + + if (this.pendingResync.size > 0) { + Array.from(this.pendingResync).forEach(fileId => this.emitResyncRequest(fileId)) + this.pendingResync.clear() + } + } + + private ensureSocket(appId: string): Socket { + if (this.appId && this.appId !== appId) { + this.teardownSocket() + this.docs.clear() + this.leaderByFile.clear() + this.syncHandlers.clear() + this.activeFileId = null + this.pendingResync.clear() + } + + this.appId = appId + const socket = webSocketClient.connect(appId) + if (this.socket !== socket) { + this.teardownSocket() + this.socket = socket + this.bindSocketListeners(socket) + } + + return socket + } + + private bindSocketListeners(socket: Socket) { + socket.on('skill_update', this.handleSkillUpdate) + socket.on('skill_status', this.handleSkillStatus) + socket.on('collaboration_update', this.handleCollaborationUpdate) + socket.on('connect', this.handleConnect) + } + + private teardownSocket() { + if (!this.socket) + return + + this.socket.off('skill_update', this.handleSkillUpdate) + this.socket.off('skill_status', this.handleSkillStatus) + this.socket.off('collaboration_update', this.handleCollaborationUpdate) + this.socket.off('connect', this.handleConnect) + this.socket = null + } + + openFile(appId: string, fileId: string, initialContent: string): void { + if (!appId || !fileId) + return + + const socket = this.ensureSocket(appId) + + if (!this.docs.has(fileId)) { + const doc = new LoroDoc() + const text = doc.getText('content') + const entry: SkillDocEntry = { + doc, + text, + subscribers: new Set(), + suppressBroadcast: true, + } + + if (initialContent) + text.update(initialContent) + + doc.commit() + entry.suppressBroadcast = false + + doc.subscribe((event: { by?: string }) => { + if (event.by === 'local') { + if (entry.suppressBroadcast) + return + const update = doc.export({ mode: 'update' }) + this.emitUpdate(fileId, update) + return + } + + const nextText = text.toString() + entry.subscribers.forEach(callback => callback(nextText, 'remote')) + }) + + this.docs.set(fileId, entry) + } + + if (socket.connected) + this.emitResyncRequest(fileId) + else + this.pendingResync.add(fileId) + } + + closeFile(fileId: string): void { + if (!fileId) + return + + if (this.activeFileId === fileId) + this.activeFileId = null + } + + updateText(fileId: string, text: string): void { + const entry = this.docs.get(fileId) + if (!entry) + return + if (entry.text.toString() === text) + return + + entry.text.update(text) + entry.doc.commit() + } + + getText(fileId: string): string | null { + const entry = this.docs.get(fileId) + return entry ? entry.text.toString() : null + } + + subscribe(fileId: string, callback: (text: string, source: 'remote') => void): () => void { + const entry = this.docs.get(fileId) + if (!entry) + return () => {} + + entry.subscribers.add(callback) + return () => { + entry.subscribers.delete(callback) + } + } + + onSyncRequest(fileId: string, callback: () => void): () => void { + const handlers = this.syncHandlers.get(fileId) || new Set() + handlers.add(callback) + this.syncHandlers.set(fileId, handlers) + return () => { + const current = this.syncHandlers.get(fileId) + if (!current) + return + current.delete(callback) + if (current.size === 0) + this.syncHandlers.delete(fileId) + } + } + + isLeader(fileId: string): boolean { + return this.leaderByFile.get(fileId) || false + } + + isFileCollaborative(fileId: string): boolean { + return this.docs.has(fileId) + } + + requestSync(fileId: string): void { + this.emitSyncRequest(fileId) + } + + setActiveFile(appId: string, fileId: string, active: boolean): void { + if (!appId || !fileId) + return + + this.ensureSocket(appId) + + if (active) + this.activeFileId = fileId + else if (this.activeFileId === fileId) + this.activeFileId = null + + if (this.socket?.connected) + this.emitSkillFileActive(fileId, active) + } + + private emitUpdate(fileId: string, update: Uint8Array): void { + if (!this.socket || !this.socket.connected || !this.appId) + return + + const payload: SkillUpdatePayload = { file_id: fileId, update } + emitWithAuthGuard(this.socket, 'skill_event', payload) + } + + private emitSnapshot(fileId: string): void { + const entry = this.docs.get(fileId) + if (!entry || !this.socket || !this.socket.connected) + return + + const snapshot = entry.doc.export({ mode: 'snapshot' }) + const payload: SkillUpdatePayload = { file_id: fileId, update: snapshot, is_snapshot: true } + emitWithAuthGuard(this.socket, 'skill_event', payload) + } + + private emitResyncRequest(fileId: string): void { + if (!this.socket || !this.socket.connected) + return + + emitWithAuthGuard(this.socket, 'collaboration_event', { + type: 'skill_resync_request', + data: { file_id: fileId }, + timestamp: Date.now(), + }) + } + + private emitSyncRequest(fileId: string): void { + if (!this.socket || !this.socket.connected) + return + + emitWithAuthGuard(this.socket, 'collaboration_event', { + type: 'skill_sync_request', + data: { file_id: fileId }, + timestamp: Date.now(), + }) + } + + private emitSkillFileActive(fileId: string, active: boolean): void { + if (!this.socket || !this.socket.connected) + return + + emitWithAuthGuard(this.socket, 'collaboration_event', { + type: 'skill_file_active', + data: { file_id: fileId, active }, + timestamp: Date.now(), + }) + } +} + +export const skillCollaborationManager = new SkillCollaborationManager() diff --git a/web/app/components/workflow/collaboration/skills/use-skill-markdown-collaboration.ts b/web/app/components/workflow/collaboration/skills/use-skill-markdown-collaboration.ts new file mode 100644 index 0000000000..923d4666eb --- /dev/null +++ b/web/app/components/workflow/collaboration/skills/use-skill-markdown-collaboration.ts @@ -0,0 +1,85 @@ +import { useCallback, useEffect, useRef } from 'react' +import { PROMPT_EDITOR_UPDATE_VALUE_BY_EVENT_EMITTER } from '@/app/components/base/prompt-editor/plugins/update-block' +import { useWorkflowStore } from '@/app/components/workflow/store' +import { useEventEmitterContextContext } from '@/context/event-emitter' +import { skillCollaborationManager } from './skill-collaboration-manager' + +type UseSkillMarkdownCollaborationProps = { + appId: string + fileId: string | null + enabled: boolean + initialContent: string + onLocalChange: (value: string) => void + onLeaderSync: () => void +} + +export const useSkillMarkdownCollaboration = ({ + appId, + fileId, + enabled, + initialContent, + onLocalChange, + onLeaderSync, +}: UseSkillMarkdownCollaborationProps) => { + const storeApi = useWorkflowStore() + const { eventEmitter } = useEventEmitterContextContext() + const suppressNextChangeRef = useRef(null) + + useEffect(() => { + suppressNextChangeRef.current = null + }, [fileId]) + + useEffect(() => { + if (!enabled || !fileId) + return + + skillCollaborationManager.openFile(appId, fileId, initialContent) + skillCollaborationManager.setActiveFile(appId, fileId, true) + + const unsubscribe = skillCollaborationManager.subscribe(fileId, (nextText) => { + suppressNextChangeRef.current = nextText + const state = storeApi.getState() + state.setDraftContent(fileId, nextText) + state.pinTab(fileId) + eventEmitter?.emit({ + type: PROMPT_EDITOR_UPDATE_VALUE_BY_EVENT_EMITTER, + instanceId: fileId, + payload: nextText, + } as unknown as string) + }) + + const unsubscribeSync = skillCollaborationManager.onSyncRequest(fileId, onLeaderSync) + + return () => { + unsubscribe() + unsubscribeSync() + skillCollaborationManager.setActiveFile(appId, fileId, false) + skillCollaborationManager.closeFile(fileId) + } + }, [appId, enabled, eventEmitter, fileId, initialContent, onLeaderSync, storeApi]) + + const handleCollaborativeChange = useCallback((value: string | undefined) => { + const nextValue = value ?? '' + if (!fileId) { + onLocalChange(nextValue) + return + } + + if (!enabled) { + onLocalChange(nextValue) + return + } + if (suppressNextChangeRef.current === nextValue) { + suppressNextChangeRef.current = null + return + } + + skillCollaborationManager.updateText(fileId, nextValue) + onLocalChange(nextValue) + }, [enabled, fileId, onLocalChange]) + + return { + handleCollaborativeChange, + isLeader: fileId ? skillCollaborationManager.isLeader(fileId) : false, + } +} diff --git a/web/app/components/workflow/collaboration/types/collaboration.ts b/web/app/components/workflow/collaboration/types/collaboration.ts index 75d6c079dc..2809e54d52 100644 --- a/web/app/components/workflow/collaboration/types/collaboration.ts +++ b/web/app/components/workflow/collaboration/types/collaboration.ts @@ -63,6 +63,9 @@ export type CollaborationEventType | 'node_panel_presence' | 'app_publish_update' | 'graph_view_active' + | 'skill_file_active' + | 'skill_sync_request' + | 'skill_resync_request' | 'graph_resync_request' | 'workflow_restore_request' | 'workflow_restore_intent' diff --git a/web/app/components/workflow/skill/editor/markdown-file-editor.tsx b/web/app/components/workflow/skill/editor/markdown-file-editor.tsx index d0a8045f0b..6181725240 100644 --- a/web/app/components/workflow/skill/editor/markdown-file-editor.tsx +++ b/web/app/components/workflow/skill/editor/markdown-file-editor.tsx @@ -4,11 +4,12 @@ import { useTranslation } from 'react-i18next' import SkillEditor from './skill-editor' type MarkdownFileEditorProps = { + instanceId?: string value: string onChange: (value: string) => void } -const MarkdownFileEditor: FC = ({ value, onChange }) => { +const MarkdownFileEditor: FC = ({ instanceId, value, onChange }) => { const { t } = useTranslation() const handleChange = React.useCallback((val: string) => { if (val !== value) { @@ -19,6 +20,7 @@ const MarkdownFileEditor: FC = ({ value, onChange }) => return (
{ const originalContent = fileContent?.content ?? '' const currentContent = draftContent !== undefined ? draftContent : originalContent + const initialContentRegistryRef = useRef>(new Map()) + const canInitCollaboration = Boolean(appId && fileTabId && isMarkdown && isEditable && !isLoading && !error) + + if (canInitCollaboration && fileTabId && !initialContentRegistryRef.current.has(fileTabId)) + initialContentRegistryRef.current.set(fileTabId, currentContent) + + const initialCollaborativeContent = fileTabId + ? (initialContentRegistryRef.current.get(fileTabId) ?? currentContent) + : '' useEffect(() => { if (!fileTabId || !fileContent) @@ -100,6 +110,11 @@ const FileContentPanel: FC = () => { }, [fileTabId, isEditable, originalContent, storeApi]) const { saveFile, registerFallback, unregisterFallback } = useSkillSaveManager() + const handleLeaderSync = useCallback(() => { + if (!fileTabId || !isEditable) + return + void saveFile(fileTabId) + }, [fileTabId, isEditable, saveFile]) const saveFileRef = useRef(saveFile) saveFileRef.current = saveFile @@ -141,6 +156,15 @@ const FileContentPanel: FC = () => { const language = currentFileNode ? getFileLanguage(currentFileNode.name) : 'plaintext' const theme = appTheme === Theme.light ? 'light' : 'vs-dark' + const { handleCollaborativeChange } = useSkillMarkdownCollaboration({ + appId, + fileId: fileTabId, + enabled: canInitCollaboration, + initialContent: initialCollaborativeContent, + onLocalChange: handleEditorChange, + onLeaderSync: handleLeaderSync, + }) + if (isStartTab) return @@ -184,8 +208,9 @@ const FileContentPanel: FC = () => { ? ( ) : null} diff --git a/web/app/components/workflow/skill/hooks/use-skill-save-manager.tsx b/web/app/components/workflow/skill/hooks/use-skill-save-manager.tsx index c5006dbf61..970bd8de72 100644 --- a/web/app/components/workflow/skill/hooks/use-skill-save-manager.tsx +++ b/web/app/components/workflow/skill/hooks/use-skill-save-manager.tsx @@ -7,8 +7,10 @@ import { useTranslation } from 'react-i18next' import Toast from '@/app/components/base/toast' import { useWorkflowStore } from '@/app/components/workflow/store' import { extractToolConfigIds } from '@/app/components/workflow/utils' +import { useGlobalPublicStore } from '@/context/global-public-context' import { consoleQuery } from '@/service/client' import { useUpdateAppAssetFileContent } from '@/service/use-app-asset' +import { skillCollaborationManager } from '../../collaboration/skills/skill-collaboration-manager' import { START_TAB_ID } from '../constants' type SaveSnapshot = { @@ -87,6 +89,7 @@ export const SkillSaveProvider = ({ const storeApi = useWorkflowStore() const queryClient = useQueryClient() const updateContent = useUpdateAppAssetFileContent() + const isCollaborationEnabled = useGlobalPublicStore(s => s.systemFeatures.enable_collaboration_mode) const queueRef = useRef>>(new Map()) const fallbackRegistryRef = useRef>(new Map()) @@ -172,6 +175,11 @@ export const SkillSaveProvider = ({ if (!appId || !fileId || fileId === START_TAB_ID) return { saved: false } + if (isCollaborationEnabled && skillCollaborationManager.isFileCollaborative(fileId) && !skillCollaborationManager.isLeader(fileId)) { + skillCollaborationManager.requestSync(fileId) + return { saved: false } + } + const snapshot = buildSnapshot(fileId, options?.fallbackContent, options?.fallbackMetadata) if (!snapshot) return { saved: false } @@ -207,7 +215,7 @@ export const SkillSaveProvider = ({ catch (error) { return { saved: false, error } } - }, [appId, buildSnapshot, storeApi, updateCachedContent, updateContent]) + }, [appId, buildSnapshot, isCollaborationEnabled, storeApi, updateCachedContent, updateContent]) const saveFile = useCallback(async ( fileId: string,