Skip to content

feat(langgraph): add streaming transformer infrastructure and tests #7519

Open
Nick Hollon (nick-hollon-lc) wants to merge 11 commits intomainfrom
nh/streaming-transformer
Open

feat(langgraph): add streaming transformer infrastructure and tests #7519
Nick Hollon (nick-hollon-lc) wants to merge 11 commits intomainfrom
nh/streaming-transformer

Conversation

@nick-hollon-lc
Copy link
Copy Markdown

@nick-hollon-lc Nick Hollon (nick-hollon-lc) commented Apr 15, 2026

Introduces the StreamingHandler, StreamMux, EventLog, StreamChannel, and StreamTransformer abstractions for ergonomic streaming projections over compiled graphs. Includes ValuesTransformer and MessagesTransformer as built-in native projections, plus support for user-defined custom transformers.

examples/streaming-v2/steaming_comparison.py provides an example of how the new streaming approach compares to the old one

Introduces the StreamingHandler, StreamMux, EventLog, StreamChannel,
and StreamTransformer abstractions for ergonomic streaming projections
over compiled graphs. Includes ValuesTransformer and MessagesTransformer
as built-in native projections, plus support for user-defined custom
transformers.
Add timestamp (ms epoch) to event params and eventId to the event
envelope, aligning the in-process event shape with the protocol spec.
Update tests to include timestamps and verify their presence.
Side-by-side comparison of token-level LLM streaming using v1
graph.stream() and v2 StreamingHandler, both sync and async. Includes
a TokenMetrics custom transformer to demonstrate extensibility vs the
equivalent inline bookkeeping in v1.
Separate EventLog (sync, __iter__) and AsyncEventLog (async, __aiter__)
with a shared _EventLogBase for the producer API. Thread is_async through
StreamMux, StreamChannel, and transformers so the right log type is
created based on whether stream() or astream() is called. Prevents
accidentally mixing sync and async iteration on the same log.
Replace the daemon thread pump with a pull-based model where the
caller's iteration on any projection drives the graph forward. EventLog
uses a _request_more callback instead of threading.Condition. Matches
v1's model where the caller's for loop is the pump. Async path is
unchanged (background task on the event loop).
Merge EventLog and AsyncEventLog into a single class with a _bind()
mechanism. EventLog starts unbound; the StreamMux calls _bind(is_async)
after transformer registration so only the correct iteration protocol
is available. This removes the is_async parameter from EventLog,
StreamChannel, and all transformer constructors — transformers just
create EventLog() and never need to know whether they run in sync or
async context.
@@ -0,0 +1,499 @@
"""Compare token-level LLM streaming: v1 graph.stream() vs v2 StreamingHandler.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems out of place in the examples folder. Better suited to docs? Or something condensed in PR description?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call yeah can go in PR description!

def _get_llm() -> ChatOpenAI:
global _llm
if _llm is None:
_llm = ChatOpenAI(model="gpt-4o-mini", streaming=True)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should verify that streaming works without streaming=True

also nitpick but gpt-4o-mini is an old model

"""Wake waiting consumers. Overridden by subclasses."""


class EventLog(_EventLogBase[T]):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed using built-in abstractions for in-process, do we like EventLog in this context still?

"""Wake waiting consumers. Overridden by subclasses."""


class EventLog(_EventLogBase[T]):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if EventLog gets big, can we / do we flush it?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not yet but we could add some buffering to it

run = handler.stream(input_data)

# run.messages yields (chunk, metadata) tuples as the LLM produces them
for chunk, metadata in run.messages:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does parallel consumption look? if we want to get tokens but also state updates relative to each other during a stream

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't have this yet but maybe something like:

# All root-namespace events, interleaved                                                                              
for method, data in run.events():                                                                                     
    if method == "values":                                                                                            
        ...                                                                                                           
    elif method == "messages":                                                                                        
        ...                                                                                                           
                                                                     
# Or filtered to just the modes you care about
for method, data in run.events("values", "messages"):
    ...                                                                                                               
   

@@ -0,0 +1,21 @@
"""Streaming infrastructure for LangGraph.

Provides a ``StreamingHandler`` that wraps a compiled graph and exposes
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(here and elsewhere) check intended formatting for docstrings, I don't think we use sphinx anymore?

…rrupted/interrupts

Three usability fixes:

- Mux now auto-closes/fails EventLogs in projections (like StreamChannels),
  so transformers no longer need finalize/fail boilerplate
- StreamingHandler._setup() raises ValueError if a user transformer
  returns projection keys that collide with already-registered keys
- AsyncGraphRunStream.interrupted and .interrupts now await the pump
  task before returning, matching the output property's behavior
Remove threading.Lock and call_soon_threadsafe. Both sync and async
paths are single-threaded (caller-driven sync, event-loop-bound
async), so there is no concurrent access to the buffer. Direct
fut.set_result() replaces call_soon_threadsafe for async notification
since the producer always runs on the event loop thread.
Replace the _async_waiters list and manual future management with a
single shared asyncio.Event. Simpler notification (just event.set()),
no per-cursor future allocation, no get_running_loop/create_future
in our code.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants