From b462a96fa0185a843bcd0e3f4886a292abe062d6 Mon Sep 17 00:00:00 2001 From: weiguang li Date: Sun, 1 Mar 2026 19:37:51 +0800 Subject: [PATCH] fix: serialize data_source_info with json.dumps in Notion sync task (#32747) --- api/tasks/document_indexing_sync_task.py | 3 +- .../tasks/test_document_indexing_sync_task.py | 8 -- .../tasks/test_document_indexing_sync_task.py | 76 +++++++++++++++++++ 3 files changed, 78 insertions(+), 9 deletions(-) diff --git a/api/tasks/document_indexing_sync_task.py b/api/tasks/document_indexing_sync_task.py index 45b44438e7..fddd9199d1 100644 --- a/api/tasks/document_indexing_sync_task.py +++ b/api/tasks/document_indexing_sync_task.py @@ -1,3 +1,4 @@ +import json import logging import time @@ -125,7 +126,7 @@ def document_indexing_sync_task(dataset_id: str, document_id: str): data_source_info = document.data_source_info_dict data_source_info["last_edited_time"] = last_edited_time - document.data_source_info = data_source_info + document.data_source_info = json.dumps(data_source_info) document.indexing_status = "parsing" document.processing_started_at = naive_utc_now() diff --git a/api/tests/test_containers_integration_tests/tasks/test_document_indexing_sync_task.py b/api/tests/test_containers_integration_tests/tasks/test_document_indexing_sync_task.py index 0b9e29fde9..df5c5dc54b 100644 --- a/api/tests/test_containers_integration_tests/tasks/test_document_indexing_sync_task.py +++ b/api/tests/test_containers_integration_tests/tasks/test_document_indexing_sync_task.py @@ -12,8 +12,6 @@ from unittest.mock import Mock, patch from uuid import uuid4 import pytest -from psycopg2.extensions import register_adapter -from psycopg2.extras import Json from core.indexing_runner import DocumentIsPausedError, IndexingRunner from models import Account, Tenant, TenantAccountJoin, TenantAccountRole @@ -21,12 +19,6 @@ from models.dataset import Dataset, Document, DocumentSegment from tasks.document_indexing_sync_task import document_indexing_sync_task -@pytest.fixture(autouse=True) -def _register_dict_adapter_for_psycopg2(): - """Align test DB adapter behavior with dict payloads used in task update flow.""" - register_adapter(dict, Json) - - class DocumentIndexingSyncTaskTestDataFactory: """Create real DB entities for document indexing sync integration tests.""" diff --git a/api/tests/unit_tests/tasks/test_document_indexing_sync_task.py b/api/tests/unit_tests/tasks/test_document_indexing_sync_task.py index a68aad7606..3668416e36 100644 --- a/api/tests/unit_tests/tasks/test_document_indexing_sync_task.py +++ b/api/tests/unit_tests/tasks/test_document_indexing_sync_task.py @@ -5,6 +5,7 @@ These tests intentionally stay in unit scope because they validate call argument for external collaborators rather than SQL-backed state transitions. """ +import json import uuid from unittest.mock import MagicMock, Mock, patch @@ -196,3 +197,78 @@ class TestDocumentIndexingSyncTaskCollaboratorParams: provider="notion_datasource", plugin_id="langgenius/notion_datasource", ) + + +class TestDataSourceInfoSerialization: + """Regression test: data_source_info must be written as a JSON string, not a raw dict. + + See https://github.com/langgenius/dify/issues/32705 + psycopg2 raises ``ProgrammingError: can't adapt type 'dict'`` when a Python + dict is passed directly to a text/LongText column. + """ + + def test_data_source_info_serialized_as_json_string( + self, + mock_document, + mock_dataset, + dataset_id, + document_id, + ): + """data_source_info must be serialized with json.dumps before DB write.""" + with ( + patch("tasks.document_indexing_sync_task.session_factory") as mock_session_factory, + patch("tasks.document_indexing_sync_task.DatasourceProviderService") as mock_service_class, + patch("tasks.document_indexing_sync_task.NotionExtractor") as mock_extractor_class, + patch("tasks.document_indexing_sync_task.IndexProcessorFactory") as mock_ipf, + patch("tasks.document_indexing_sync_task.IndexingRunner") as mock_runner_class, + ): + # External collaborators + mock_service = MagicMock() + mock_service.get_datasource_credentials.return_value = {"integration_secret": "token"} + mock_service_class.return_value = mock_service + + mock_extractor = MagicMock() + # Return a *different* timestamp so the task enters the sync/update branch + mock_extractor.get_notion_last_edited_time.return_value = "2024-02-01T00:00:00Z" + mock_extractor_class.return_value = mock_extractor + + mock_ip = MagicMock() + mock_ipf.return_value.init_index_processor.return_value = mock_ip + + mock_runner = MagicMock() + mock_runner_class.return_value = mock_runner + + # DB session mock — shared across all ``session_factory.create_session()`` calls + session = MagicMock() + session.scalars.return_value.all.return_value = [] + # .where() path: session 1 reads document + dataset, session 2 reads dataset + session.query.return_value.where.return_value.first.side_effect = [ + mock_document, + mock_dataset, + mock_dataset, + ] + # .filter_by() path: session 3 (update), session 4 (indexing) + session.query.return_value.filter_by.return_value.first.side_effect = [ + mock_document, + mock_document, + ] + + begin_cm = MagicMock() + begin_cm.__enter__.return_value = session + begin_cm.__exit__.return_value = False + session.begin.return_value = begin_cm + + session_cm = MagicMock() + session_cm.__enter__.return_value = session + session_cm.__exit__.return_value = False + mock_session_factory.create_session.return_value = session_cm + + # Act + document_indexing_sync_task(dataset_id, document_id) + + # Assert: data_source_info must be a JSON *string*, not a dict + assert isinstance(mock_document.data_source_info, str), ( + f"data_source_info should be a JSON string, got {type(mock_document.data_source_info).__name__}" + ) + parsed = json.loads(mock_document.data_source_info) + assert parsed["last_edited_time"] == "2024-02-01T00:00:00Z"