Fix #3680: MaskErrors does not mask errors for subscriptions#4301
Fix #3680: MaskErrors does not mask errors for subscriptions#4301Ladol wants to merge 6 commits intostrawberry-graphql:mainfrom
MaskErrors does not mask errors for subscriptions#4301Conversation
Reviewer's GuideEnsures WebSocket subscription results are passed through schema extensions (e.g., MaskErrors) before being sent to clients, by invoking extension _process_result hooks for graphql-ws and graphql-transport-ws subscription payloads and adding regression tests and release notes. File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
Thanks for adding the Here's a preview of the changelog: Fixes an issue where schema extensions (like DescriptionFixes an issue where schema extensions (such as Previously, standard Queries and Mutations would pass their results through the extension pipeline, but Subscriptions would send raw Migration guideNo migration required. Types of Changes
Checklist
Here's the tweet text: |
|
Thanks for adding the Here's a preview of the changelog: Fixes an issue where schema extensions (like DescriptionFixes an issue where schema extensions (such as Previously, standard Queries and Mutations would pass their results through the extension pipeline, but Subscriptions would send raw Migration guideNo migration required. Types of Changes
Checklist
Here's the tweet text: |
There was a problem hiding this comment.
Hey - I've found 1 issue, and left some high level feedback:
- The extension processing logic is duplicated between
graphql_transport_ws(_process_extensions) andgraphql_ws.send_data_message; consider extracting a shared helper or aligning them through a common abstraction so the two protocols stay in sync as behavior evolves. - The current approach instantiates extensions (
ext()whenisinstance(ext, type)) on each result, which may diverge from how extensions are normally constructed and maintain state in the HTTP pipeline; it would be safer to reuse the same extension instances and lifecycle that are used for non-subscription operations. - In
send_nextyou only run extensions whenexecution_result.errorsis truthy, but other code paths may run_process_resultfor all results; consider matching the existing extension pipeline behavior so subscriptions are processed consistently with queries/mutations.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The extension processing logic is duplicated between `graphql_transport_ws` (`_process_extensions`) and `graphql_ws.send_data_message`; consider extracting a shared helper or aligning them through a common abstraction so the two protocols stay in sync as behavior evolves.
- The current approach instantiates extensions (`ext()` when `isinstance(ext, type)`) on each result, which may diverge from how extensions are normally constructed and maintain state in the HTTP pipeline; it would be safer to reuse the same extension instances and lifecycle that are used for non-subscription operations.
- In `send_next` you only run extensions when `execution_result.errors` is truthy, but other code paths may run `_process_result` for all results; consider matching the existing extension pipeline behavior so subscriptions are processed consistently with queries/mutations.
## Individual Comments
### Comment 1
<location path="strawberry/subscriptions/protocols/graphql_transport_ws/handlers.py" line_range="375-377" />
<code_context>
self.completed = False
self.task: asyncio.Task | None = None
+ def _process_extensions(self, execution_result: ExecutionResult) -> None:
+ """Run the execution result through any active schema extensions."""
+ if not execution_result.errors:
+ return
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** The early-return on `execution_result.errors` doesn’t match the docstring and may skip useful extension hooks for successful results.
Right now, extensions won’t run for successful results, which contradicts the stated behavior and may break extensions that rely on seeing all results (e.g., logging/metrics/tracing). Consider either removing the `if not execution_result.errors` check or updating the docstring/contract to clarify that extensions only run on error cases.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
strawberry/subscriptions/protocols/graphql_transport_ws/handlers.py
Outdated
Show resolved
Hide resolved
Greptile SummaryThis PR fixes a long-standing bug where schema extensions such as What changed:
Remaining concerns (not fully resolved):
Confidence Score: 3/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Client
participant WSHandler as WS Handler<br/>(graphql_transport_ws / graphql_ws)
participant Schema as schema.subscribe()
participant ExtRunner as SchemaExtensionsRunner
participant OpExt as operation_extensions<br/>(build_operation_extensions)
participant MaskErrors
Client->>WSHandler: subscribe / start message
WSHandler->>OpExt: build_operation_extensions(schema.extensions)
Note over OpExt: Class-based: new instance (execution_context=None)<br/>Instance-based: same object as ExtRunner uses
WSHandler->>Schema: schema.subscribe(query, ...)
Schema->>ExtRunner: extensions_runner.operation() begin
loop Each subscription event
Schema-->>WSHandler: yield ExecutionResult
WSHandler->>OpExt: ext._process_result(result) [NEW]
OpExt->>MaskErrors: _process_result(result) — masks errors ✓
WSHandler->>Client: send next/data message (masked)
end
Schema->>ExtRunner: extensions_runner.operation() end (post-yield)
ExtRunner->>MaskErrors: on_operation post-yield → _process_result(last result) [DUPLICATE for instance-based]
Note over MaskErrors: Harmless for MaskErrors (idempotent),<br/>but a trap for stateful extensions
Client->>WSHandler: stop / close
WSHandler->>OpExt: cleanup — pop operation_extensions
Reviews (6): Last reviewed commit: "refactor: scope extension instantiation ..." | Re-trigger Greptile |
| def _process_extensions(self, execution_result: ExecutionResult) -> None: | ||
| """Run the execution result through any active schema extensions.""" | ||
| if not execution_result.errors: | ||
| return | ||
|
|
||
| extensions = getattr(self.handler.schema, "extensions", []) | ||
| for ext in extensions: | ||
| extension_instance = ext() if isinstance(ext, type) else ext | ||
| if hasattr(extension_instance, "_process_result"): | ||
| extension_instance._process_result(execution_result) |
There was a problem hiding this comment.
Freshly created instances never receive execution_context
The ext() call creates a brand-new extension instance, but the normal execution path (see schema.py line 591) explicitly sets extension.execution_context = execution_context on every instance after creation. This step is skipped here, so any extension whose _process_result accesses self.execution_context will raise an AttributeError at runtime.
MaskErrors._process_result happens to not use self.execution_context, which is why it works in the tests, but this is fragile for other extensions.
The correct approach is to reuse the already-configured extension instances from the current request's SchemaExtensionsRunner, or — at minimum — assign execution_context to each freshly created instance before calling _process_result.
def _process_extensions(self, execution_result: ExecutionResult) -> None:
"""Run the execution result through any active schema extensions."""
if not execution_result.errors:
return
extensions = getattr(self.handler.schema, "_async_extensions", [])
for extension_instance in extensions:
if hasattr(extension_instance, "_process_result"):
extension_instance._process_result(execution_result)| if execution_result.errors: | ||
| extensions = getattr(self.schema, "extensions", []) | ||
| for ext in extensions: | ||
| extension_instance = ext() if isinstance(ext, type) else ext | ||
| if hasattr(extension_instance, "_process_result"): | ||
| extension_instance._process_result(execution_result) |
There was a problem hiding this comment.
Same execution_context omission as in graphql_transport_ws
This inline block has the same problem: ext() creates a fresh extension instance with no execution_context, while the standard path sets extension.execution_context = execution_context before any lifecycle method is called. Extensions that access self.execution_context inside _process_result will fail with AttributeError.
Additionally, the logic is duplicated inline here rather than extracted to a shared helper (as was done in the graphql_transport_ws handler with _process_extensions), making future maintenance harder.
| if not execution_result.errors: | ||
| return |
There was a problem hiding this comment.
Early return silently skips non-error results for all extensions
The guard if not execution_result.errors: return prevents _process_result from ever being called on results that have no errors. While MaskErrors._process_result itself has the same guard internally, this outer early exit means any other extension whose _process_result needs to inspect or transform non-error results will be silently skipped.
The normal execution path (via on_operation) calls _process_result unconditionally and lets each extension decide what to do. Consider removing the guard here and letting each extension handle its own filtering, for consistency.
|
|
||
| @patch.object(MyExtension, "_process_result", create=True) | ||
| async def test_subscription_errors_trigger_extension_process_result( | ||
| mock: Mock, ws: WebSocketClient | ||
| ): | ||
| """Test that schema extensions are called to process results when a subscription yields an error.""" | ||
| await ws.send_message( | ||
| { | ||
| "id": "sub1", | ||
| "type": "subscribe", | ||
| "payload": { | ||
| "query": 'subscription { exception(message: "TEST EXC") }', | ||
| }, | ||
| } | ||
| ) | ||
|
|
||
| next_message: NextMessage = await ws.receive_json() | ||
|
|
||
| assert next_message["type"] == "next" | ||
| assert next_message["id"] == "sub1" | ||
| assert "errors" in next_message["payload"] | ||
|
|
||
| # Error intercepted and extension called |
There was a problem hiding this comment.
Test validates call count but not actual masking behaviour
The test patches _process_result with a no-op mock and asserts it was called once. This confirms the hook fires, but does not verify that error masking actually works end-to-end (e.g., that the error message is replaced with "Unexpected error." and that original exception details are not leaked to the client).
Consider adding a complementary integration test that uses a real MaskErrors extension and asserts the response contains the masked message rather than the raw exception text. This would catch regressions like the configuration-loss issue described in the handler comment.
Also note there is a missing blank line before the @patch.object decorator (PEP 8 E302).
| @patch.object(MyExtension, "_process_result", create=True) | |
| async def test_subscription_errors_trigger_extension_process_result( | |
| mock: Mock, ws: WebSocketClient | |
| ): | |
| """Test that schema extensions are called to process results when a subscription yields an error.""" | |
| await ws.send_message( | |
| { | |
| "id": "sub1", | |
| "type": "subscribe", | |
| "payload": { | |
| "query": 'subscription { exception(message: "TEST EXC") }', | |
| }, | |
| } | |
| ) | |
| next_message: NextMessage = await ws.receive_json() | |
| assert next_message["type"] == "next" | |
| assert next_message["id"] == "sub1" | |
| assert "errors" in next_message["payload"] | |
| # Error intercepted and extension called | |
| @patch.object(MyExtension, "_process_result", create=True) |
needs two blank lines after the previous test function body.
|
I have updated the PR, addressing some of the issues pointed out by the automatic reviews. |
…bscriptions Fixes an issue where schema extensions (like `MaskErrors`) were bypassed during WebSocket subscriptions. The extensions' `_process_result` hooks are now properly triggered for each yielded result in both `graphql-transport-ws` and `graphql-ws` protocols, ensuring errors are correctly formatted before being sent to the client.
|
@sourcery-ai review |
strawberry/subscriptions/utils.py
Outdated
| ) -> None: | ||
| """Run the execution result through active schema extensions.""" | ||
| for ext in extensions: | ||
| extension_instance = ext() if isinstance(ext, type) else ext |
There was a problem hiding this comment.
ext() call raises TypeError for extensions with required constructor args
process_extensions calls ext() with no arguments when the extension is a class. However, schema.get_extensions() (the authoritative path) calls ext(execution_context=None) to account for extensions like ApolloTracingExtension that declare execution_context as a required positional parameter:
class ApolloTracingExtension(SchemaExtension):
def __init__(self, execution_context: ExecutionContext) -> None:
...Calling ext() on such an extension raises TypeError: __init__() missing 1 required positional argument: 'execution_context', crashing the subscription handler entirely. This is a regression: before this PR, subscriptions simply bypassed extensions; now they can hard-crash for any user with ApolloTracingExtension (or a custom extension with a required constructor arg) in their schema.
The fix is to align with schema.get_extensions() and pass execution_context=None:
| extension_instance = ext() if isinstance(ext, type) else ext | |
| extension_instance = ext(execution_context=None) if isinstance(ext, type) else ext |
strawberry/subscriptions/utils.py
Outdated
| for ext in extensions: | ||
| extension_instance = ( | ||
| ext(execution_context=None) if isinstance(ext, type) else ext | ||
| ) | ||
|
|
||
| if hasattr(extension_instance, "_process_result"): | ||
| extension_instance._process_result(execution_result) |
There was a problem hiding this comment.
Class-type extensions with custom
__init__ will crash at runtime
When an extension is provided as a class (rather than an instance), the code calls ext(execution_context=None). This silently fails for any extension that overrides __init__ without an execution_context parameter. A concrete example is MaskErrors itself:
class MaskErrors(SchemaExtension):
def __init__(self, should_mask_error=..., error_message=...) -> None: ...Calling MaskErrors(execution_context=None) raises:
TypeError: __init__() got an unexpected keyword argument 'execution_context'
The end-to-end test avoids this by always passing a pre-constructed instance (extensions=[MaskErrors(...)]), so the bug path (isinstance(ext, type) → True) is never exercised by the current tests. Any user who passes extensions=[MaskErrors] (i.e. the class, without parentheses) will get an unhandled exception on the first subscription event.
The proper fix is to skip class-type extensions entirely here and only process instance-type extensions — or, better, reuse the already-instantiated extensions from the SchemaExtensionsRunner for the current request.
strawberry/subscriptions/utils.py
Outdated
| for ext in extensions: | ||
| if isinstance(ext, type): | ||
| try: | ||
| # Try passing the context for extensions like ApolloTracing | ||
| extension_instance = ext(execution_context=None) | ||
| except TypeError: | ||
| # Fallback for extensions like MaskErrors that don't want it | ||
| extension_instance = ext() | ||
|
|
||
| # Explicitly set this ONLY for newly constructed instances | ||
| extension_instance.execution_context = None | ||
| else: | ||
| extension_instance = ext |
There was a problem hiding this comment.
Instance-based extensions are double-processed on the last subscription event
When an extension is stored as an instance (the else branch, e.g. extensions=[MaskErrors(error_message="Custom")]), process_extensions reuses the same object that SchemaExtensionsRunner also holds, because schema.get_extensions() returns instances as-is.
Tracing the subscription lifecycle in schema.py:
async with extensions_runner.operation(): # ← wraps the whole subscription
async for result in aiter_or_result:
yield await self._handle_execution_result(...) # sets context.result = resultextensions_runner.operation() does not exit between yield calls — it only exits after the generator is exhausted. At that point, each extension's on_operation post-yield runs. For MaskErrors that means:
def on_operation(self):
yield
result = self.execution_context.result # ← the last result
self._process_result(result) # ← 2nd call on the last resultThe sequence for the final subscription event is therefore:
send_next/send_data_message→process_extensions→_process_result(last_result)(first call)- Subscription ends →
operation()exits →on_operationpost-yield →_process_result(context.result)(second call on the same object)
For MaskErrors the double-call is idempotent (re-masking an already-masked error is a no-op). But any extension that has observable side-effects in _process_result — writing to a log, incrementing a counter, appending audit data — will fire twice for the final event and once for every preceding event, producing an inconsistent per-event record.
The safest fix is to call _process_result directly through the existing extensions_runner (which already has properly initialised instances) rather than re-reading schema.extensions and constructing separate instances.
| if isinstance(ext, type): | ||
| try: | ||
| # Try passing the context for extensions like ApolloTracing | ||
| extension_instance = ext(execution_context=None) | ||
| except TypeError: | ||
| # Fallback for extensions like MaskErrors that don't want it | ||
| extension_instance = ext() |
There was a problem hiding this comment.
Broad
except TypeError can swallow unrelated constructor failures
The outer try/except TypeError is intended only to detect extensions whose __init__ does not accept an execution_context keyword argument. However, it also silently catches any TypeError raised inside the constructor body (e.g. a type mismatch in dependency injection logic, a wrong internal call, etc.).
When that happens the code falls back to ext(), which may also raise — or worse, may succeed but silently produce a misconfigured instance. The real constructor error is never surfaced.
A tighter guard would inspect the signature before calling, rather than catching the exception after:
import inspect
sig = inspect.signature(ext.__init__)
if "execution_context" in sig.parameters:
extension_instance = ext(execution_context=None)
else:
extension_instance = ext()This keeps the intent explicit and lets any other TypeError propagate normally.
| if isinstance(ext, type): | ||
| # Inspect the constructor to see if it requires execution_context | ||
| sig = inspect.signature(ext.__init__) | ||
| if "execution_context" in sig.parameters: | ||
| extension_instance = ext(execution_context=None) | ||
| else: | ||
| extension_instance = ext() | ||
|
|
||
| # Explicitly set this ONLY for newly constructed instances | ||
| extension_instance.execution_context = None |
There was a problem hiding this comment.
Extension instantiated on every subscription event
For class-based extensions, inspect.signature(ext.__init__) is called and a fresh instance is constructed on every invocation of process_extensions — i.e., for every individual subscription result. This means:
inspect.signatureintrospection runs on each subscription event, which is non-trivial for high-frequency streams.- Any class-based extension that accumulates mutable state (e.g., a counter, a timer, a deduplication set) will silently lose that state between events because a fresh object is handed each event.
The resolved extension instances (with proper execution_context) are already available inside schema._subscribe via the extensions_runner. A more robust and efficient fix would be to call _process_result on those already-configured instances rather than re-resolving and re-constructing from schema.extensions on every event.
If the current approach is kept, consider at minimum caching the per-schema extension list once (not per event) to avoid repeated signature inspection:
# Computed once per subscription start, not once per event
extension_instances = _build_extension_instances(schema.extensions)| async def send_next(self, execution_result: ExecutionResult) -> None: | ||
| extensions = getattr(self.handler.schema, "extensions", []) | ||
| process_extensions(execution_result, extensions) |
There was a problem hiding this comment.
schema.extensions does not reflect the per-request extension runner
getattr(self.handler.schema, "extensions", []) returns the raw list that was originally passed to strawberry.Schema(extensions=[...]). However, schema.subscribe() builds its extensions_runner via schema._async_extensions, which:
- Instantiates class-based extensions fresh and sets
execution_contexton them. - Also adds
DirectivesExtensionwhenschema.directivesis non-empty — an extension that is completely absent fromschema.extensions.
Because process_extensions bypasses the per-request runner, any extension added only inside get_extensions() (like DirectivesExtension) will never have its _process_result called, and class-based extensions receive a freshly constructed instance rather than the one already wired to the current ExecutionContext.
The structurally correct fix is to thread the SchemaExtensionsRunner that _subscribe already has through to send_next, so that _process_result is called on the same, already-configured instances that the rest of the operation lifecycle uses.
| else: | ||
| extension_instance = ext() |
There was a problem hiding this comment.
Class-based extensions with constructor args silently drop configuration
When a class-based extension is instantiated here via ext() (the branch where execution_context is absent from its signature), any constructor parameters that the extension normally accepts are silently omitted.
A concrete case: suppose a user registers a custom extension as a class with a default-valued argument that controls behaviour:
class RateLimitExtension(SchemaExtension):
def __init__(self, max_errors: int = 10) -> None:
self.max_errors = max_errorsbuild_operation_extensions will call RateLimitExtension() and get an instance configured with max_errors=10, regardless of what the original entry in schema.extensions was. More critically, schema.extensions stores the class itself — not a configured instance — so there is no way to recover the user-intended configuration from the class alone.
The safest fix for the general case is to avoid re-instantiation entirely: run schema.get_extensions() once per operation (as the normal query/mutation path does) so that Strawberry's own instantiation logic is reused, and then call _process_result on those instances instead of creating a separate set. If per-operation instantiation must stay, a clarifying note that only zero-argument (or execution_context-only) class-based extensions are supported would prevent silent misconfiguration.
| else: | ||
| instances.append(ext) |
There was a problem hiding this comment.
Instance-based extensions get
_process_result called from two places
When an extension is registered as a pre-constructed instance (e.g. extensions=[MaskErrors(error_message="Custom")]), build_operation_extensions returns the same object that schema._async_extensions also returns. That same object is passed to SchemaExtensionsRunner inside schema.subscribe(), whose operation() context manager's post-yield code calls _process_result again (via MaskErrors.on_operation) once the subscription generator is exhausted.
The result:
- For every subscription event except the last, only the new code path calls
_process_result— correct. - For the last event,
_process_resultis called once here (before the result is sent — correct) and then a second time byon_operation's post-yield after the result has already been transmitted — harmless forMaskErrors(idempotent masking), but incorrect for any extension that performs a non-idempotent transformation or side-effect (e.g. incrementing a counter, writing to a log with the original message).
Because MaskErrors is stateless this passes today's tests, but it creates a latent correctness trap for custom extensions. The cleanest resolution is to reuse the SchemaExtensionsRunner that schema.subscribe() already holds, so there is a single, authoritative call site for _process_result.
…erations Extensions are now built once per operation and stored on the operation itself.
|
After working through the bot's feedback, I've updated the code to set up extensions for each individual WebSocket operation. The bot pointed out two problems with this PR:
I believe fixing these problems is beyond the scope of this PR, as it would require changes to the core schema.subscribe generator. Instead of hardcoding a special rule just for MaskErrors (as the framework shouldn't care about what extensions exist), I added a comment in utils.py documenting these limitations. |
|
Subsumed by #4330 |

Release type: patch
Fixes an issue where schema extensions (like
MaskErrors) were bypassed during WebSocket subscriptions. The extensions'_process_resulthooks are now properly triggered for each yielded result in bothgraphql-transport-wsandgraphql-wsprotocols, ensuring errors are correctly formatted before being sent to the client.Description
Fixes an issue where schema extensions (such as
MaskErrors) were being bypassed when streaming data over WebSockets.Previously, standard Queries and Mutations would pass their results through the extension pipeline, but Subscriptions would send raw
ExecutionResultobjects directly over the WebSocket. This caused internal/unmasked errors to leak to the client. This PR manually triggers_process_resulton active extensions right beforesend_nextandsend_data_messagedispatch the payload.Migration guide
No migration required.
Types of Changes
Checklist
Summary by Sourcery
Ensure WebSocket GraphQL subscriptions pass execution results through schema extensions so errors are consistently processed before being sent to clients.
Bug Fixes:
Documentation:
Tests: