Streaming Requests
Streaming enables real-time, incremental object generation where you receive field values as they're generated, rather than waiting for the entire object to complete. This provides a better user experience for applications that need to display content progressively.
Overview
What is Streaming?
Streaming allows you to:
- Receive data incrementally as each field completes generation
- Display content immediately rather than waiting for all fields to finish
- Provide real-time feedback to users about generation progress
- Handle large objects efficiently without blocking the entire response
When to Use Streaming
| Use Case | Recommended | Reason |
|---|---|---|
| Real-time chat interfaces | Yes | Users expect immediate, progressive responses |
| Live content generation | Yes | Show content as it's created for better UX |
| Large objects with many fields | Yes | Display fields as they complete |
| Interactive applications | Yes | Reduce perceived latency |
| Background tasks | No | Use batch processing instead (50% cheaper) |
| Bulk operations | No | Standard requests are more efficient |
Streaming is incompatible with batch processing. All fields in a streaming request must have priority >= 0 to ensure immediate processing. Fields with negative priorities will be queued for batch processing and won't stream back results.
How Streaming Works
gRPC Streaming
Proto Definition
service JSONSchemaService {
// Server-side streaming RPC
rpc StreamGeneratedObjects(RequestBody) returns (stream StreamingResponse);
}
message StreamingResponse {
google.protobuf.Struct data = 1; // Progressive field data
double usdCost = 2; // Accumulated cost
string status = 3; // Status: "processing" or "complete"
}
message RequestBody {
string prompt = 1;
Definition definition = 2;
}
gRPC Client Examples
- Go
- Python
- JavaScript (Node.js)
package main
import (
"context"
"fmt"
"io"
"log"
pb "your-project/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
// Connect to server
conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()
client := pb.NewJSONSchemaServiceClient(conn)
// Define your schema
definition := &pb.Definition{
Type: "object",
Instruction: "Generate a compelling blog post",
Model: "gpt-4o-mini",
Properties: map[string]*pb.Definition{
"title": {
Type: "string",
Instruction: "Create an engaging title (max 60 characters)",
Priority: 3, // High priority for immediate response
},
"excerpt": {
Type: "string",
Instruction: "Write a compelling excerpt (150-200 words)",
Priority: 2,
},
"mainContent": {
Type: "string",
Instruction: "Write the main blog post (800-1200 words)",
Priority: 1,
},
"tags": {
Type: "array",
Instruction: "Generate 5-10 relevant tags",
Priority: 0, // Lower priority but still real-time
Items: &pb.Definition{
Type: "string",
},
},
},
}
// Create streaming request
request := &pb.RequestBody{
Prompt: "Write a blog post about artificial intelligence",
Definition: definition,
}
// Open stream
stream, err := client.StreamGeneratedObjects(context.Background(), request)
if err != nil {
log.Fatalf("Failed to start stream: %v", err)
}
// Receive and process streamed responses
accumulatedData := make(map[string]interface{})
totalCost := 0.0
for {
response, err := stream.Recv()
if err == io.EOF {
// Stream completed
break
}
if err != nil {
log.Fatalf("Stream error: %v", err)
}
// Extract data from protobuf Struct
for key, value := range response.Data.AsMap() {
accumulatedData[key] = value
fmt.Printf("✓ Received field '%s': %v\n", key, value)
}
totalCost = response.UsdCost
if response.Status == "complete" {
fmt.Println("\n✅ Generation complete!")
break
}
}
fmt.Printf("\n📊 Final object: %+v\n", accumulatedData)
fmt.Printf("💰 Total cost: $%.4f\n", totalCost)
}
import grpc
from google.protobuf.json_format import MessageToDict
import objectweaver_pb2
import objectweaver_pb2_grpc
def stream_generate():
# Connect to server
channel = grpc.insecure_channel('localhost:50051')
stub = objectweaver_pb2_grpc.JSONSchemaServiceStub(channel)
# Define schema
definition = objectweaver_pb2.Definition(
type="object",
instruction="Generate a compelling blog post",
model="gpt-4o-mini",
properties={
"title": objectweaver_pb2.Definition(
type="string",
instruction="Create an engaging title (max 60 characters)",
priority=3 # High priority
),
"excerpt": objectweaver_pb2.Definition(
type="string",
instruction="Write a compelling excerpt (150-200 words)",
priority=2
),
"mainContent": objectweaver_pb2.Definition(
type="string",
instruction="Write the main blog post (800-1200 words)",
priority=1
),
"tags": objectweaver_pb2.Definition(
type="array",
instruction="Generate 5-10 relevant tags",
priority=0,
items=objectweaver_pb2.Definition(type="string")
)
}
)
# Create request
request = objectweaver_pb2.RequestBody(
prompt="Write a blog post about artificial intelligence",
definition=definition
)
# Open stream
accumulated_data = {}
total_cost = 0.0
try:
for response in stub.StreamGeneratedObjects(request):
# Convert protobuf Struct to dict
data = MessageToDict(response.data)
# Update accumulated data
accumulated_data.update(data)
# Print each field as it arrives
for key, value in data.items():
print(f"✓ Received field '{key}': {value}")
total_cost = response.usdCost
if response.status == "complete":
print("\n✅ Generation complete!")
break
except grpc.RpcError as e:
print(f"❌ RPC error: {e.code()} - {e.details()}")
return
print(f"\n📊 Final object: {accumulated_data}")
print(f"💰 Total cost: ${total_cost:.4f}")
if __name__ == "__main__":
stream_generate()
const grpc = require('@grpc/grpc-js');
const protoLoader = require('@grpc/proto-loader');
// Load proto file
const packageDefinition = protoLoader.loadSync('objectweaver.proto', {
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
const protoDescriptor = grpc.loadPackageDefinition(packageDefinition);
const JSONSchemaService = protoDescriptor.JSONSchemaService;
async function streamGenerate() {
// Connect to server
const client = new JSONSchemaService(
'localhost:50051',
grpc.credentials.createInsecure()
);
// Define schema
const definition = {
type: "object",
instruction: "Generate a compelling blog post",
model: "gpt-4o-mini",
properties: {
title: {
type: "string",
instruction: "Create an engaging title (max 60 characters)",
priority: 3
},
excerpt: {
type: "string",
instruction: "Write a compelling excerpt (150-200 words)",
priority: 2
},
mainContent: {
type: "string",
instruction: "Write the main blog post (800-1200 words)",
priority: 1
},
tags: {
type: "array",
instruction: "Generate 5-10 relevant tags",
priority: 0,
items: { type: "string" }
}
}
};
// Create request
const request = {
prompt: "Write a blog post about artificial intelligence",
definition: definition
};
// Open stream
const call = client.StreamGeneratedObjects(request);
let accumulatedData = {};
let totalCost = 0;
call.on('data', (response) => {
// Extract data from response
const data = response.data || {};
// Update accumulated data
Object.assign(accumulatedData, data);
// Print each field as it arrives
for (const [key, value] of Object.entries(data)) {
console.log(`✓ Received field '${key}':`, value);
}
totalCost = response.usdCost;
if (response.status === "complete") {
console.log("\n✅ Generation complete!");
}
});
call.on('end', () => {
console.log("\n📊 Final object:", accumulatedData);
console.log(`💰 Total cost: $${totalCost.toFixed(4)}`);
});
call.on('error', (error) => {
console.error("❌ Stream error:", error.message);
});
}
streamGenerate();
HTTP/REST Streaming (Server-Sent Events)
If you're using the HTTP API, streaming uses Server-Sent Events (SSE) for progressive updates.
Endpoint
POST /api/objectGen
Content-Type: application/json
Authorization: Bearer your-token
Request Format
- cURL
- JavaScript (Fetch API)
- Python (requests)
- Go (net/http)
curl -N -X POST http://localhost:2008/api/objectGen \
-H "Authorization: Bearer your-token" \
-H "Content-Type: application/json" \
-d '{
"prompt": "Write a blog post about artificial intelligence",
"stream": true,
"definition": {
"type": "object",
"instruction": "Generate a compelling blog post",
"model": "gpt-4o-mini",
"properties": {
"title": {
"type": "string",
"instruction": "Create an engaging title (max 60 characters)",
"priority": 3
},
"excerpt": {
"type": "string",
"instruction": "Write a compelling excerpt (150-200 words)",
"priority": 2
},
"mainContent": {
"type": "string",
"instruction": "Write the main blog post (800-1200 words)",
"priority": 1
},
"tags": {
"type": "array",
"instruction": "Generate 5-10 relevant tags",
"priority": 0,
"items": {
"type": "string"
}
}
}
}
}'
Note: The -N flag is crucial - it disables buffering so you receive data immediately.
async function streamGenerate() {
const response = await fetch('http://localhost:2008/api/objectGen', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': 'Bearer your-token'
},
body: JSON.stringify({
prompt: "Write a blog post about artificial intelligence",
stream: true,
definition: {
type: "object",
instruction: "Generate a compelling blog post",
model: "gpt-4o-mini",
properties: {
title: {
type: "string",
instruction: "Create an engaging title (max 60 characters)",
priority: 3
},
excerpt: {
type: "string",
instruction: "Write a compelling excerpt (150-200 words)",
priority: 2
},
mainContent: {
type: "string",
instruction: "Write the main blog post (800-1200 words)",
priority: 1
},
tags: {
type: "array",
instruction: "Generate 5-10 relevant tags",
priority: 0,
items: { type: "string" }
}
}
}
})
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
let accumulatedData = {};
while (true) {
const { done, value } = await reader.read();
if (done) {
console.log("✅ Stream complete");
break;
}
// Decode chunk
const chunk = decoder.decode(value);
// Parse SSE format
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = JSON.parse(line.substring(6));
// Update UI with new field
Object.assign(accumulatedData, data);
console.log("✓ Received:", data);
// Update your UI here
updateUI(data);
}
}
}
console.log("📊 Final object:", accumulatedData);
}
function updateUI(data) {
// Update your DOM elements as fields arrive
for (const [key, value] of Object.entries(data)) {
const element = document.getElementById(key);
if (element) {
element.textContent = value;
element.classList.add('loaded');
}
}
}
import requests
import json
def stream_generate():
url = "http://localhost:2008/api/objectGen"
headers = {
"Content-Type": "application/json",
"Authorization": "Bearer your-token"
}
payload = {
"prompt": "Write a blog post about artificial intelligence",
"stream": True,
"definition": {
"type": "object",
"instruction": "Generate a compelling blog post",
"model": "gpt-4o-mini",
"properties": {
"title": {
"type": "string",
"instruction": "Create an engaging title (max 60 characters)",
"priority": 3
},
"excerpt": {
"type": "string",
"instruction": "Write a compelling excerpt (150-200 words)",
"priority": 2
},
"mainContent": {
"type": "string",
"instruction": "Write the main blog post (800-1200 words)",
"priority": 1
},
"tags": {
"type": "array",
"instruction": "Generate 5-10 relevant tags",
"priority": 0,
"items": {"type": "string"}
}
}
}
}
# Stream response
accumulated_data = {}
with requests.post(url, json=payload, headers=headers, stream=True) as response:
response.raise_for_status()
for line in response.iter_lines():
if line:
line_str = line.decode('utf-8')
# Parse SSE format
if line_str.startswith('data: '):
data = json.loads(line_str[6:])
# Update accumulated data
accumulated_data.update(data)
# Print each field as it arrives
for key, value in data.items():
print(f"✓ Received field '{key}': {value}")
print(f"\n📊 Final object: {accumulated_data}")
if __name__ == "__main__":
stream_generate()
package main
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
)
type StreamRequest struct {
Prompt string `json:"prompt"`
Stream bool `json:"stream"`
Definition map[string]interface{} `json:"definition"`
}
func main() {
request := StreamRequest{
Prompt: "Write a blog post about artificial intelligence",
Stream: true,
Definition: map[string]interface{}{
"type": "object",
"instruction": "Generate a compelling blog post",
"model": "gpt-4o-mini",
"properties": map[string]interface{}{
"title": map[string]interface{}{
"type": "string",
"instruction": "Create an engaging title (max 60 characters)",
"priority": 3,
},
"excerpt": map[string]interface{}{
"type": "string",
"instruction": "Write a compelling excerpt (150-200 words)",
"priority": 2,
},
"mainContent": map[string]interface{}{
"type": "string",
"instruction": "Write the main blog post (800-1200 words)",
"priority": 1,
},
"tags": map[string]interface{}{
"type": "array",
"instruction": "Generate 5-10 relevant tags",
"priority": 0,
"items": map[string]interface{}{
"type": "string",
},
},
},
},
}
jsonData, _ := json.Marshal(request)
// Create HTTP request
req, err := http.NewRequest("POST", "http://localhost:2008/api/objectGen", bytes.NewBuffer(jsonData))
if err != nil {
panic(err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer your-token")
// Send request
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
// Read streaming response
accumulatedData := make(map[string]interface{})
reader := bufio.NewReader(resp.Body)
for {
line, err := reader.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
panic(err)
}
// Parse SSE format
if len(line) > 6 && line[:6] == "data: " {
var data map[string]interface{}
if err := json.Unmarshal([]byte(line[6:]), &data); err == nil {
// Update accumulated data
for key, value := range data {
accumulatedData[key] = value
fmt.Printf("✓ Received field '%s': %v\n", key, value)
}
}
}
}
fmt.Printf("\n📊 Final object: %+v\n", accumulatedData)
}
Response Format
Stream Chunks
Each streamed response contains:
{
"data": {
"fieldName": "fieldValue"
},
"usdCost": 0.0023,
"status": "processing"
}
Final Response
The last chunk includes:
{
"data": {
"lastField": "lastValue"
},
"usdCost": 0.0156,
"status": "complete"
}
Best Practices
1. Set Appropriate Priorities
For streaming requests, ensure all fields have priority >= 0:
{
"properties": {
"urgentField": {
"priority": 3, // Highest priority - returns first
"type": "string"
},
"importantField": {
"priority": 1, // Returns soon
"type": "string"
},
"backgroundField": {
"priority": -1 // WRONG - will be batched and block stream
}
}
}
2. Handle Connection Errors
- Go
- Python
- JavaScript
// Use context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
stream, err := client.StreamGeneratedObjects(ctx, request)
if err != nil {
log.Fatalf("Failed to start stream: %v", err)
}
for {
response, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
// Handle specific gRPC errors
if status, ok := status.FromError(err); ok {
log.Printf("gRPC error: %v - %v", status.Code(), status.Message())
}
return
}
// Process response...
}
import grpc
try:
for response in stub.StreamGeneratedObjects(request, timeout=60):
# Process response
pass
except grpc.RpcError as e:
if e.code() == grpc.StatusCode.DEADLINE_EXCEEDED:
print("Request timed out")
elif e.code() == grpc.StatusCode.UNAVAILABLE:
print("Server unavailable")
else:
print(f"Error: {e.details()}")
call.on('error', (error) => {
if (error.code === grpc.status.DEADLINE_EXCEEDED) {
console.error("Request timed out");
} else if (error.code === grpc.status.UNAVAILABLE) {
console.error("Server unavailable");
} else {
console.error("Stream error:", error.message);
}
});
// Set deadline
const deadline = new Date();
deadline.setSeconds(deadline.getSeconds() + 60);
const call = client.StreamGeneratedObjects(request, { deadline });
3. Progressive UI Updates
Update your UI as each field arrives:
// React example
function StreamingContent() {
const [data, setData] = useState({});
const [loading, setLoading] = useState({});
useEffect(() => {
const stream = startStream();
stream.on('data', (chunk) => {
setData(prev => ({ ...prev, ...chunk.data }));
// Mark fields as loaded
Object.keys(chunk.data).forEach(key => {
setLoading(prev => ({ ...prev, [key]: false }));
});
});
}, []);
return (
<div>
<h1>
{loading.title ? <Skeleton /> : data.title}
</h1>
<p>
{loading.excerpt ? <Skeleton /> : data.excerpt}
</p>
{/* ... */}
</div>
);
}
Performance Considerations
Latency Optimization
| Strategy | Impact | Use When |
|---|---|---|
| Higher priorities | Fields return faster | User-visible content |
| Smaller field instructions | Faster generation | Short-form content |
| Faster models (e.g., GPT-4o-mini) | Lower latency | Speed > quality |
| Parallel field generation | Fields process simultaneously | Independent fields |