fix: await cancelled subscription tasks on ws shutdown#4319
fix: await cancelled subscription tasks on ws shutdown#4319Flamefork wants to merge 4 commits intostrawberry-graphql:mainfrom
Conversation
Reviewer's GuideEnsures that subscription tasks cancelled during GraphQL over WebSocket shutdown are explicitly awaited so their cleanup/finally blocks run before shared resources are torn down, and adds a regression test plus release note for this behavior. File-Level Changes
Assessment against linked issues
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Thanks for adding the Here's a preview of the changelog: Await cancelled subscription tasks during WebSocket shutdown so their Here's the tweet text: |
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- In
shutdown, you collect tasks for all operations before callingcleanup_operation, but you rely onop.taskstill being valid; consider iteratingself.operations.values()directly and/or documenting the invariant thatcleanup_operationwon’t replace the task reference so future maintainers don’t accidentally break this ordering assumption. - The test
test_shutdown_awaits_cancelled_subscription_tasksuses a fixedasyncio.sleep(0.5)to wait for shutdown; to avoid flakiness, consider synchronizing on a concrete condition (e.g., pollingcleanup_done_at_shutdown_endor using an event) instead of a hardcoded delay.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- In `shutdown`, you collect tasks for all operations before calling `cleanup_operation`, but you rely on `op.task` still being valid; consider iterating `self.operations.values()` directly and/or documenting the invariant that `cleanup_operation` won’t replace the task reference so future maintainers don’t accidentally break this ordering assumption.
- The test `test_shutdown_awaits_cancelled_subscription_tasks` uses a fixed `asyncio.sleep(0.5)` to wait for shutdown; to avoid flakiness, consider synchronizing on a concrete condition (e.g., polling `cleanup_done_at_shutdown_end` or using an event) instead of a hardcoded delay.
## Individual Comments
### Comment 1
<location path="strawberry/subscriptions/protocols/graphql_transport_ws/handlers.py" line_range="95-104" />
<code_context>
with suppress(asyncio.CancelledError):
await self.connection_init_timeout_task
+ cancelled_tasks: list[asyncio.Task] = []
for operation_id in list(self.operations.keys()):
+ op = self.operations[operation_id]
+ if op.task:
+ cancelled_tasks.append(op.task)
await self.cleanup_operation(operation_id)
+
await self.reap_completed_tasks()
+ # cleanup_operation cancels but does not await tasks (would block
+ # the message loop). Safe to await here — no more messages to process.
+ await asyncio.gather(*cancelled_tasks, return_exceptions=True)
def on_request_accepted(self) -> None:
</code_context>
<issue_to_address>
**issue (bug_risk):** Consider a timeout or bounded wait for cancelled tasks during shutdown.
Because `cleanup_operation` may cancel tasks that never finish (e.g., stuck in uninterruptible I/O or user code that ignores cancellation), this `asyncio.gather` can block shutdown indefinitely. Consider bounding the wait with `asyncio.wait_for` and a reasonable timeout, then logging and proceeding with shutdown if the timeout is hit.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
strawberry/subscriptions/protocols/graphql_transport_ws/handlers.py
Outdated
Show resolved
Hide resolved
Greptile SummaryThis patch fixes a race condition where subscription tasks' The fix is minimal and correct: task references are collected before each Key observations:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant WS as WebSocket Client
participant H as BaseGraphQLTransportWSHandler
participant T as Subscription Task(s)
participant EL as Event Loop
WS->>H: disconnect
H->>H: shutdown()
Note over H: Collect op.task refs → cancelled_tasks[]
loop For each active operation
H->>T: cleanup_operation() → task.cancel()
end
H->>H: reap_completed_tasks()<br/>(awaits already-finished tasks)
H->>EL: asyncio.gather(*cancelled_tasks,<br/>return_exceptions=True)
EL->>T: raise CancelledError
T->>T: finally: active_subscriptions -= 1
T-->>EL: done
EL-->>H: gather returns
Note over H: All finally blocks complete<br/>before shutdown() returns
Last reviewed commit: "Fix formatting" |
|
|
||
| await ws.close() | ||
|
|
||
| await asyncio.sleep(0.5) |
There was a problem hiding this comment.
Short sleep may cause intermittent failures
The existing similar test test_unexpected_client_disconnects_are_gracefully_handled uses asyncio.sleep(1) to wait for server-side shutdown. Using only 0.5 seconds here could make this test intermittently fail on slow CI systems, since the tracked_shutdown coroutine must fully complete (including the new asyncio.gather over cancelled tasks) within that window.
| await asyncio.sleep(0.5) | |
| await asyncio.sleep(1) |
There was a problem hiding this comment.
Pull request overview
Fixes graphql-transport-ws shutdown behavior so cancelled subscription tasks are awaited during WebSocket teardown, ensuring their finally blocks complete before shared resources (e.g., DB pools / event loop) are torn down.
Changes:
- Track cancelled subscription operation tasks and await them during handler shutdown.
- Add a regression test asserting subscription cleanup has completed by the end of shutdown.
- Add a patch release note describing the fix.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
strawberry/subscriptions/protocols/graphql_transport_ws/handlers.py |
Adds tracking for cancelled tasks and awaits them during shutdown() via a new reap_cancelled_tasks() method. |
tests/websockets/test_graphql_transport_ws.py |
Adds a regression test to ensure shutdown waits for subscription cancellation cleanup. |
RELEASE.md |
Documents the patch release change. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| async def cleanup_operation(self, operation_id: str) -> None: | ||
| if operation_id not in self.operations: | ||
| return | ||
| operation = self.operations.pop(operation_id) | ||
| assert operation.task | ||
| operation.task.cancel() | ||
| self.cancelled_tasks.append(operation.task) | ||
| # do not await the task here, lest we block the main |
There was a problem hiding this comment.
cleanup_operation appends every cancelled operation task to self.cancelled_tasks, but reap_cancelled_tasks() is only called during shutdown(). For long-lived WebSocket connections that start/stop many subscriptions, this list can grow without bound (and can retain already-finished tasks that were reaped via reap_completed_tasks), causing a memory leak. Consider avoiding the instance-level list by collecting tasks locally inside shutdown() before calling cleanup_operation, or ensure cancelled_tasks is periodically drained without blocking the message loop (e.g., keep only pending tasks, and opportunistically reap task.done() ones in handle_message like reap_completed_tasks does).
There was a problem hiding this comment.
This is something that I noticed with the current code. It is not an issue with cancelled_tasks only, but completed_tasks as well.
Maybe we should consider creating a periodic task that cleans up those?
Will discuss this tomorrow with @patrick91, but maybe it can be done in a follow-up PR
There was a problem hiding this comment.
we could use task.add_done_callback (let's do this in a new PR)
| await asyncio.sleep(0.5) | ||
| assert handler is not None | ||
| assert cleanup_done_at_shutdown_end == 0 |
There was a problem hiding this comment.
The shutdown sync point is currently a fixed await asyncio.sleep(0.5). This can be flaky on slow CI (shutdown may not have run yet, leaving cleanup_done_at_shutdown_end as None). Prefer synchronizing deterministically (e.g., set an asyncio.Event in tracked_shutdown and await asyncio.wait_for(event.wait(), timeout=...), or poll cleanup_done_at_shutdown_end is not None with wait_for) so the test fails only when the shutdown invariant is actually broken.
|
Hi @Flamefork , Based on #4319 (comment), I ended up making this change to fix the memory leak: #4345 Maybe it also fixes the issue here? If not, could you rebase this PR to adjust it to the new code? |

Description
cleanup_operationcancels subscription tasks but does not await them (to avoid blocking the message loop). During WebSocket shutdown this meant tasks'finallyblocks could run after shared state (DB pools, event loop) was already torn down.This fix collects cancelled tasks during shutdown and awaits them via
asyncio.gatherbefore returning.Types of Changes
Issues Fixed or Closed by This PR
Checklist
Summary by Sourcery
Ensure WebSocket shutdown waits for cancelled subscription tasks so their cleanup completes before shared resources are torn down.
Bug Fixes:
Documentation:
Tests: