AI Agents LangGraph

Streaming

Intermediate

Streaming

This post explores Streaming in LangGraph , including real-time state updates, token streaming, node-level streaming, and streaming LLM or tool outputs. We cover stream() vs invoke() , streaming events and modes, handling partial outputs, streaming in multi-agent systems, performance considerations, common mistakes, and best practices for building responsive AI applications.

What Is Streaming?

Streaming in LangGraph refers to the ability to receive and process outputs incrementally as they are generated, rather than waiting for the entire graph execution to finish. Instead of getting one final response after all nodes complete, you can see results in real-time, token by token, message by message, or node by node. This is essential for building responsive, production-grade AI applications.

Why Streaming Matters

Streaming provides significant benefits:
  • Better User Experience: Users see responses immediately instead of waiting
  • Perceived Performance: Feels much faster
  • Transparency: Users can see the agent thinking step-by-step
  • Error Handling: Catch issues earlier
  • Long-running Workflows: Monitor progress in real time
Without streaming, users often face long loading times, especially with complex agents or slow tool calls.

Real-Time State Updates

LangGraph allows you to stream state changes as they happen.
app = graph.compile()

inputs = {"messages": [HumanMessage(content="Tell me about LangGraph")]}

for chunk in app.stream(inputs, stream_mode="values"):
    print("State Update:", chunk)
This streams the full state after each node execution.

Token Streaming

The most popular form, streaming LLM tokens as they are generated.
for chunk in app.stream(inputs, stream_mode="messages"):
    message, metadata = chunk
    if isinstance(message, AIMessage) and message.content:
        print(message.content, end="", flush=True)
Alternative using astream_events (most powerful):
async for event in app.astream_events(inputs, version="v2"):
    if event["event"] == "on_chat_model_stream":
        print(event["data"]["chunk"].content, end="", flush=True)

Node-Level Streaming

Stream updates from specific nodes.
for chunk in app.stream(inputs, stream_mode="updates"):
    for node_name, update in chunk.items():
        print(f"Node '{node_name}' updated:")
        if "messages" in update:
            for msg in update["messages"]:
                print("   →", msg.content)

Streaming LLM Responses

# Method 1: Simple token streaming
for chunk in app.stream(inputs, stream_mode="messages"):
    if chunk[1] and isinstance(chunk[1], AIMessage):
        print(chunk[1].content, end="", flush=True)

# Method 2: Using LCEL-style streaming
chain = prompt | llm
for token in chain.stream({"question": "What is LangGraph?"}):
    print(token.content, end="", flush=True)

Streaming Tool Outputs

for event in app.stream(inputs, stream_mode="updates"):
    if "tools" in event:
        for msg in event["tools"].get("messages", []):
            if isinstance(msg, ToolMessage):
                print("Tool Result:", msg.content)

Streaming Events (astream_events)

The most powerful and flexible streaming method (LangGraph v0.2+):
async for event in app.astream_events(inputs, version="v2"):
    kind = event["event"]
    if kind == "on_chain_start":
        print(f"Starting: {event['name']}")
    elif kind == "on_chat_model_stream":
        print(event["data"]["chunk"].content, end="", flush=True)
    elif kind == "on_tool_end":
        print(f"\nTool finished: {event['name']}")
stream() vs invoke()
Feature .invoke() .stream()
Output Final result only Incremental updates
User Experience Blocking Real-time
Use Case Simple scripts Interactive applications
Memory Usage Lower for short runs Better for long-running graphs
# Blocking
result = app.invoke(inputs)

# Streaming
for chunk in app.stream(inputs, stream_mode="values"):
    # Process live
    ...

Streaming Modes in LangGraph

Mode What it streams Best For
values Full state after each node State monitoring
updates Only the updates from each node Debugging node outputs
messages Individual messages Chat UIs
events All internal events Advanced observability
custom Custom data Specialized use cases

Handling Partial Outputs

async for event in app.astream_events(inputs, version="v2"):
    if event["event"] == "on_chat_model_stream":
        token = event["data"]["chunk"].content
        if token:
            # Send to frontend immediately
            await websocket.send_text(token)

Streaming in Multi-Agent Systems

for chunk in app.stream(inputs, stream_mode="updates"):
    for node, update in chunk.items():
        if "messages" in update:
            msg = update["messages"][-1]
            print(f"[{node}] {msg.content}")
This lets users see which agent is speaking in real time.

Performance Considerations for Streaming

  • Streaming has minimal overhead
  • Use astream() for async applications
  • Be careful with very frequent small updates (can cause UI lag)
  • Combine with trim_messages in long conversations

Common Streaming Mistakes

  • Using .invoke() in user-facing apps
  • Not handling flush=True when printing
  • Assuming all chunks contain content
  • Not managing partial JSON or structured outputs
  • Ignoring errors during streaming

Best Practices for Streaming

  1. Always prefer streaming for user-facing applications
  2. Use stream_mode="messages" for chat interfaces
  3. Use astream_events for maximum control
  4. Handle partial outputs gracefully
  5. Show thinking indicators during tool calls
  6. Log streaming events for debugging
  7. Combine with checkpointing for resumable streams
Example Production Pattern:
async for event in app.astream_events(inputs, version="v2"):
    if event["event"] == "on_chat_model_stream":
        content = event["data"]["chunk"].content
        if content:
            yield content  # FastAPI / WebSocket
    elif event["event"] == "on_tool_start":
        yield f"\n[Using tool: {event['name']}]"

AI agent LangChain LangGraph Python

← All training