chore: improve py codecov

This commit is contained in:
hjlarry 2026-04-11 21:57:20 +08:00
parent 6209038918
commit a29e831ce5
3 changed files with 385 additions and 0 deletions

View File

@ -0,0 +1,79 @@
from unittest.mock import Mock, patch
from models.comment import WorkflowComment, WorkflowCommentMention, WorkflowCommentReply
def test_workflow_comment_account_properties_and_cache() -> None:
comment = WorkflowComment(created_by="user-1", resolved_by="user-2", content="hello", position_x=1, position_y=2)
created_account = Mock(id="user-1")
resolved_account = Mock(id="user-2")
with patch("models.comment.db.session.get", side_effect=[created_account, resolved_account]) as get_mock:
assert comment.created_by_account is created_account
assert comment.resolved_by_account is resolved_account
assert get_mock.call_count == 2
comment.cache_created_by_account(created_account)
comment.cache_resolved_by_account(resolved_account)
with patch("models.comment.db.session.get") as get_mock:
assert comment.created_by_account is created_account
assert comment.resolved_by_account is resolved_account
get_mock.assert_not_called()
comment_without_resolver = WorkflowComment(
created_by="user-1",
resolved_by=None,
content="hello",
position_x=1,
position_y=2,
)
with patch("models.comment.db.session.get") as get_mock:
assert comment_without_resolver.resolved_by_account is None
get_mock.assert_not_called()
def test_workflow_comment_counts_and_participants() -> None:
reply_1 = WorkflowCommentReply(comment_id="comment-1", content="reply-1", created_by="user-2")
reply_2 = WorkflowCommentReply(comment_id="comment-1", content="reply-2", created_by="user-2")
mention_1 = WorkflowCommentMention(comment_id="comment-1", mentioned_user_id="user-3")
mention_2 = WorkflowCommentMention(comment_id="comment-1", mentioned_user_id="user-4")
comment = WorkflowComment(created_by="user-1", resolved_by=None, content="hello", position_x=1, position_y=2)
comment.replies = [reply_1, reply_2]
comment.mentions = [mention_1, mention_2]
account_1 = Mock(id="user-1")
account_2 = Mock(id="user-2")
account_3 = Mock(id="user-3")
account_map = {
"user-1": account_1,
"user-2": account_2,
"user-3": account_3,
"user-4": None,
}
with patch("models.comment.db.session.get", side_effect=lambda _model, user_id: account_map[user_id]) as get_mock:
participants = comment.participants
assert comment.reply_count == 2
assert comment.mention_count == 2
assert set(participants) == {account_1, account_2, account_3}
assert get_mock.call_count == 4
def test_reply_and_mention_account_properties_and_cache() -> None:
reply = WorkflowCommentReply(comment_id="comment-1", content="reply", created_by="user-1")
mention = WorkflowCommentMention(comment_id="comment-1", mentioned_user_id="user-2")
reply_account = Mock(id="user-1")
mention_account = Mock(id="user-2")
with patch("models.comment.db.session.get", side_effect=[reply_account, mention_account]) as get_mock:
assert reply.created_by_account is reply_account
assert mention.mentioned_user_account is mention_account
assert get_mock.call_count == 2
reply.cache_created_by_account(reply_account)
mention.cache_mentioned_user_account(mention_account)
with patch("models.comment.db.session.get") as get_mock:
assert reply.created_by_account is reply_account
assert mention.mentioned_user_account is mention_account
get_mock.assert_not_called()

View File

@ -46,6 +46,22 @@ class TestWorkflowCollaborationService:
# Assert
assert result is None
def test_repr_and_save_session(self, service: tuple[WorkflowCollaborationService, Mock, Mock]) -> None:
collaboration_service, _repository, socketio = service
user = Mock()
user.id = "u-1"
user.name = "Jane"
user.avatar = "avatar.png"
assert "WorkflowCollaborationService" in repr(collaboration_service)
collaboration_service.save_session("sid-1", user)
socketio.save_session.assert_called_once_with(
"sid-1",
{"user_id": "u-1", "username": "Jane", "avatar": "avatar.png"},
)
def test_relay_collaboration_event_unauthorized(
self, service: tuple[WorkflowCollaborationService, Mock, Mock]
) -> None:
@ -79,6 +95,16 @@ class TestWorkflowCollaborationService:
skip_sid="sid-1",
)
def test_relay_collaboration_event_requires_event_type(
self, service: tuple[WorkflowCollaborationService, Mock, Mock]
) -> None:
collaboration_service, repository, _socketio = service
repository.get_sid_mapping.return_value = {"workflow_id": "wf-1", "user_id": "u-1"}
result = collaboration_service.relay_collaboration_event("sid-1", {"data": {"x": 1}})
assert result == ({"msg": "invalid event type"}, 400)
def test_relay_collaboration_event_sync_request_forwards_to_active_leader(
self, service: tuple[WorkflowCollaborationService, Mock, Mock]
) -> None:
@ -275,6 +301,17 @@ class TestWorkflowCollaborationService:
# Assert
assert result == "sid-3"
def test_get_or_set_leader_returns_sid_when_leader_still_missing(
self, service: tuple[WorkflowCollaborationService, Mock, Mock]
) -> None:
collaboration_service, repository, _socketio = service
repository.get_current_leader.side_effect = [None, None]
repository.set_leader_if_absent.return_value = False
result = collaboration_service.get_or_set_leader("wf-1", "sid-2")
assert result == "sid-2"
def test_handle_leader_disconnect_elects_new(
self, service: tuple[WorkflowCollaborationService, Mock, Mock]
) -> None:
@ -317,6 +354,32 @@ class TestWorkflowCollaborationService:
# Assert
repository.delete_leader.assert_called_once_with("wf-1")
def test_handle_leader_disconnect_ignores_non_leader_or_missing_leader(
self, service: tuple[WorkflowCollaborationService, Mock, Mock]
) -> None:
collaboration_service, repository, _socketio = service
repository.get_current_leader.return_value = None
collaboration_service.handle_leader_disconnect("wf-1", "sid-1")
repository.get_current_leader.return_value = "sid-leader"
collaboration_service.handle_leader_disconnect("wf-1", "sid-other")
repository.set_leader.assert_not_called()
repository.delete_leader.assert_not_called()
def test_broadcast_leader_change_logs_emit_errors(
self, service: tuple[WorkflowCollaborationService, Mock, Mock]
) -> None:
collaboration_service, repository, socketio = service
repository.get_session_sids.return_value = ["sid-1", "sid-2"]
socketio.emit.side_effect = [RuntimeError("boom"), None]
with patch("services.workflow_collaboration_service.logging.exception") as exception_mock:
collaboration_service.broadcast_leader_change("wf-1", "sid-2")
assert exception_mock.call_count == 1
def test_broadcast_online_users_sorts_and_emits(
self, service: tuple[WorkflowCollaborationService, Mock, Mock]
) -> None:
@ -346,6 +409,29 @@ class TestWorkflowCollaborationService:
room="wf-1",
)
def test_broadcast_online_users_reassigns_missing_leader(
self, service: tuple[WorkflowCollaborationService, Mock, Mock]
) -> None:
collaboration_service, repository, socketio = service
users = [{"user_id": "u-2", "username": "B", "avatar": None, "sid": "sid-2", "connected_at": 1}]
repository.get_current_leader.return_value = "sid-old"
with (
patch.object(collaboration_service, "_prune_inactive_sessions", return_value=users),
patch.object(collaboration_service, "_select_graph_leader", return_value="sid-2"),
patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change,
):
collaboration_service.broadcast_online_users("wf-1")
repository.delete_leader.assert_called_once_with("wf-1")
repository.set_leader.assert_called_once_with("wf-1", "sid-2")
broadcast_leader_change.assert_called_once_with("wf-1", "sid-2")
socketio.emit.assert_called_once_with(
"online_users",
{"workflow_id": "wf-1", "users": users, "leader": "sid-2"},
room="wf-1",
)
def test_refresh_session_state_expires_active_leader(
self, service: tuple[WorkflowCollaborationService, Mock, Mock]
) -> None:
@ -390,6 +476,22 @@ class TestWorkflowCollaborationService:
repository.set_leader.assert_called_once_with("wf-1", "sid-2")
broadcast_leader_change.assert_called_once_with("wf-1", "sid-2")
def test_refresh_session_state_replaces_inactive_existing_leader(
self, service: tuple[WorkflowCollaborationService, Mock, Mock]
) -> None:
collaboration_service, repository, _socketio = service
repository.get_current_leader.return_value = "sid-old"
with (
patch.object(collaboration_service, "is_session_active", return_value=False),
patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change,
):
collaboration_service.refresh_session_state("wf-1", "sid-new")
repository.delete_leader.assert_called_once_with("wf-1")
repository.set_leader.assert_called_once_with("wf-1", "sid-new")
broadcast_leader_change.assert_called_once_with("wf-1", "sid-new")
def test_relay_graph_event_emits_update(self, service: tuple[WorkflowCollaborationService, Mock, Mock]) -> None:
# Arrange
collaboration_service, repository, socketio = service
@ -402,3 +504,47 @@ class TestWorkflowCollaborationService:
assert result == ({"msg": "graph_update_broadcasted"}, 200)
repository.refresh_session_state.assert_called_once_with("wf-1", "sid-1")
socketio.emit.assert_called_once_with("graph_update", {"nodes": []}, room="wf-1", skip_sid="sid-1")
def test_prune_inactive_sessions_handles_empty_and_removes_stale(
self, service: tuple[WorkflowCollaborationService, Mock, Mock]
) -> None:
collaboration_service, repository, _socketio = service
repository.list_sessions.return_value = []
assert collaboration_service._prune_inactive_sessions("wf-1") == []
active = {"sid": "sid-1", "user_id": "u-1", "connected_at": 1}
stale = {"sid": "sid-2", "user_id": "u-2", "connected_at": 2}
repository.list_sessions.return_value = [active, stale]
with patch.object(
collaboration_service,
"is_session_active",
side_effect=lambda _workflow_id, sid: sid == "sid-1",
):
users = collaboration_service._prune_inactive_sessions("wf-1")
assert users == [active]
repository.delete_session.assert_called_with("wf-1", "sid-2")
def test_is_session_active_guard_branches(self, service: tuple[WorkflowCollaborationService, Mock, Mock]) -> None:
collaboration_service, repository, socketio = service
socketio.manager.is_connected.return_value = True
repository.session_exists.return_value = True
repository.sid_mapping_exists.return_value = True
assert collaboration_service.is_session_active("wf-1", "") is False
socketio.manager.is_connected.return_value = False
assert collaboration_service.is_session_active("wf-1", "sid-1") is False
socketio.manager.is_connected.side_effect = AttributeError("missing manager")
assert collaboration_service.is_session_active("wf-1", "sid-1") is False
socketio.manager.is_connected.side_effect = None
socketio.manager.is_connected.return_value = True
repository.session_exists.return_value = False
assert collaboration_service.is_session_active("wf-1", "sid-1") is False
repository.session_exists.return_value = True
repository.sid_mapping_exists.return_value = False
assert collaboration_service.is_session_active("wf-1", "sid-1") is False

View File

@ -39,6 +39,63 @@ class TestWorkflowCommentService:
with pytest.raises(ValueError):
WorkflowCommentService._validate_content("a" * 1001)
def test_filter_valid_mentioned_user_ids_deduplicates_and_preserves_order(self) -> None:
result = WorkflowCommentService._filter_valid_mentioned_user_ids(
[
"123e4567-e89b-12d3-a456-426614174000",
"",
123, # type: ignore[list-item]
"123e4567-e89b-12d3-a456-426614174000",
"123e4567-e89b-12d3-a456-426614174001",
]
)
assert result == [
"123e4567-e89b-12d3-a456-426614174000",
"123e4567-e89b-12d3-a456-426614174001",
]
def test_format_comment_excerpt_handles_short_and_long_limits(self) -> None:
assert WorkflowCommentService._format_comment_excerpt(" hello ", max_length=10) == "hello"
assert WorkflowCommentService._format_comment_excerpt("abcdefghijk", max_length=3) == "abc"
assert WorkflowCommentService._format_comment_excerpt(" abcdefghijk ", max_length=8) == "abcde..."
def test_build_mention_email_payloads_returns_empty_for_no_candidates(self, mock_session: Mock) -> None:
assert (
WorkflowCommentService._build_mention_email_payloads(
session=mock_session,
tenant_id="tenant-1",
app_id="app-1",
mentioner_id="user-1",
mentioned_user_ids=[],
content="hello",
)
== []
)
assert (
WorkflowCommentService._build_mention_email_payloads(
session=mock_session,
tenant_id="tenant-1",
app_id="app-1",
mentioner_id="user-1",
mentioned_user_ids=["user-1"],
content="hello",
)
== []
)
def test_dispatch_mention_emails_enqueues_each_payload(self) -> None:
delay_mock = Mock()
with patch.object(service_module.send_workflow_comment_mention_email_task, "delay", delay_mock):
WorkflowCommentService._dispatch_mention_emails(
[
{"to": "a@example.com"},
{"to": "b@example.com"},
]
)
assert delay_mock.call_count == 2
def test_build_mention_email_payloads_skips_accounts_without_email(self, mock_session: Mock) -> None:
account_without_email = Mock()
account_without_email.email = None
@ -181,6 +238,72 @@ class TestWorkflowCommentService:
assert build_payloads_mock.call_args.kwargs["mentioned_user_ids"] == ["user-3"]
dispatch_mock.assert_called_once_with([])
def test_get_comments_preloads_related_accounts(self, mock_session: Mock) -> None:
comment = Mock()
comment.created_by = "user-1"
comment.resolved_by = "user-2"
reply = Mock()
reply.created_by = "user-3"
mention = Mock()
mention.mentioned_user_id = "user-4"
comment.replies = [reply]
comment.mentions = [mention]
comment.cache_created_by_account = Mock()
comment.cache_resolved_by_account = Mock()
reply.cache_created_by_account = Mock()
mention.cache_mentioned_user_account = Mock()
account_1 = Mock()
account_1.id = "user-1"
account_2 = Mock()
account_2.id = "user-2"
account_3 = Mock()
account_3.id = "user-3"
account_4 = Mock()
account_4.id = "user-4"
mock_session.scalars.side_effect = [
_mock_scalars([comment]),
_mock_scalars([account_1, account_2, account_3, account_4]),
]
result = WorkflowCommentService.get_comments("tenant-1", "app-1")
assert result == [comment]
comment.cache_created_by_account.assert_called_once_with(account_1)
comment.cache_resolved_by_account.assert_called_once_with(account_2)
reply.cache_created_by_account.assert_called_once_with(account_3)
mention.cache_mentioned_user_account.assert_called_once_with(account_4)
def test_preload_accounts_returns_early_for_empty_comments(self, mock_session: Mock) -> None:
WorkflowCommentService._preload_accounts(mock_session, [])
mock_session.scalars.assert_not_called()
def test_get_comment_raises_not_found_with_provided_session(self) -> None:
session = Mock()
session.scalar.return_value = None
with pytest.raises(NotFound):
WorkflowCommentService.get_comment("tenant-1", "app-1", "comment-1", session=session)
def test_get_comment_uses_context_manager_when_session_not_provided(self, mock_session: Mock) -> None:
comment = Mock()
comment.created_by = "user-1"
comment.resolved_by = None
comment.replies = []
comment.mentions = []
comment.cache_created_by_account = Mock()
comment.cache_resolved_by_account = Mock()
mock_session.scalar.return_value = comment
mock_session.scalars.return_value = _mock_scalars([])
result = WorkflowCommentService.get_comment("tenant-1", "app-1", "comment-1")
assert result is comment
comment.cache_created_by_account.assert_called_once()
comment.cache_resolved_by_account.assert_called_once_with(None)
def test_delete_comment_raises_forbidden(self, mock_session: Mock) -> None:
comment = Mock()
comment.created_by = "owner"
@ -296,6 +419,29 @@ class TestWorkflowCommentService:
mock_session.commit.assert_called_once()
mock_session.refresh.assert_called_once_with(reply)
def test_update_comment_updates_position_coordinates_when_provided(self, mock_session: Mock) -> None:
comment = Mock()
comment.id = "comment-1"
comment.created_by = "owner"
comment.position_x = 1.0
comment.position_y = 2.0
mock_session.scalar.return_value = comment
mock_session.scalars.return_value = _mock_scalars([])
WorkflowCommentService.update_comment(
tenant_id="tenant-1",
app_id="app-1",
comment_id="comment-1",
user_id="owner",
content="updated",
position_x=10.5,
position_y=20.5,
mentioned_user_ids=[],
)
assert comment.position_x == 10.5
assert comment.position_y == 20.5
def test_delete_reply_raises_forbidden(self, mock_session: Mock) -> None:
reply = Mock()
reply.created_by = "owner"
@ -304,6 +450,12 @@ class TestWorkflowCommentService:
with pytest.raises(Forbidden):
WorkflowCommentService.delete_reply("reply-1", "intruder")
def test_delete_reply_raises_not_found(self, mock_session: Mock) -> None:
mock_session.get.return_value = None
with pytest.raises(NotFound):
WorkflowCommentService.delete_reply("reply-1", "owner")
def test_delete_reply_removes_mentions(self, mock_session: Mock) -> None:
reply = Mock()
reply.created_by = "owner"
@ -314,3 +466,11 @@ class TestWorkflowCommentService:
assert mock_session.delete.call_count == 3
mock_session.commit.assert_called_once()
def test_validate_comment_access_delegates_to_get_comment(self) -> None:
comment = Mock()
with patch.object(WorkflowCommentService, "get_comment", return_value=comment) as get_comment_mock:
result = WorkflowCommentService.validate_comment_access("comment-1", "tenant-1", "app-1")
assert result is comment
get_comment_mock.assert_called_once_with("tenant-1", "app-1", "comment-1")