Skip to main content

Streaming Requests

Streaming enables real-time, incremental object generation where you receive field values as they're generated, rather than waiting for the entire object to complete. This provides a better user experience for applications that need to display content progressively.

Overview

What is Streaming?

Streaming allows you to:

  • Receive data incrementally as each field completes generation
  • Display content immediately rather than waiting for all fields to finish
  • Provide real-time feedback to users about generation progress
  • Handle large objects efficiently without blocking the entire response

When to Use Streaming

Use CaseRecommendedReason
Real-time chat interfacesYesUsers expect immediate, progressive responses
Live content generationYesShow content as it's created for better UX
Large objects with many fieldsYesDisplay fields as they complete
Interactive applicationsYesReduce perceived latency
Background tasksNoUse batch processing instead (50% cheaper)
Bulk operationsNoStandard requests are more efficient
Important

Streaming is incompatible with batch processing. All fields in a streaming request must have priority >= 0 to ensure immediate processing. Fields with negative priorities will be queued for batch processing and won't stream back results.

How Streaming Works

gRPC Streaming

Proto Definition

service JSONSchemaService {
// Server-side streaming RPC
rpc StreamGeneratedObjects(RequestBody) returns (stream StreamingResponse);
}

message StreamingResponse {
google.protobuf.Struct data = 1; // Progressive field data
double usdCost = 2; // Accumulated cost
string status = 3; // Status: "processing" or "complete"
}

message RequestBody {
string prompt = 1;
Definition definition = 2;
}

gRPC Client Examples

package main

import (
"context"
"fmt"
"io"
"log"

pb "your-project/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

func main() {
// Connect to server
conn, err := grpc.Dial("localhost:50051", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()

client := pb.NewJSONSchemaServiceClient(conn)

// Define your schema
definition := &pb.Definition{
Type: "object",
Instruction: "Generate a compelling blog post",
Model: "gpt-4o-mini",
Properties: map[string]*pb.Definition{
"title": {
Type: "string",
Instruction: "Create an engaging title (max 60 characters)",
Priority: 3, // High priority for immediate response
},
"excerpt": {
Type: "string",
Instruction: "Write a compelling excerpt (150-200 words)",
Priority: 2,
},
"mainContent": {
Type: "string",
Instruction: "Write the main blog post (800-1200 words)",
Priority: 1,
},
"tags": {
Type: "array",
Instruction: "Generate 5-10 relevant tags",
Priority: 0, // Lower priority but still real-time
Items: &pb.Definition{
Type: "string",
},
},
},
}

// Create streaming request
request := &pb.RequestBody{
Prompt: "Write a blog post about artificial intelligence",
Definition: definition,
}

// Open stream
stream, err := client.StreamGeneratedObjects(context.Background(), request)
if err != nil {
log.Fatalf("Failed to start stream: %v", err)
}

// Receive and process streamed responses
accumulatedData := make(map[string]interface{})
totalCost := 0.0

for {
response, err := stream.Recv()
if err == io.EOF {
// Stream completed
break
}
if err != nil {
log.Fatalf("Stream error: %v", err)
}

// Extract data from protobuf Struct
for key, value := range response.Data.AsMap() {
accumulatedData[key] = value
fmt.Printf("✓ Received field '%s': %v\n", key, value)
}

totalCost = response.UsdCost

if response.Status == "complete" {
fmt.Println("\n✅ Generation complete!")
break
}
}

fmt.Printf("\n📊 Final object: %+v\n", accumulatedData)
fmt.Printf("💰 Total cost: $%.4f\n", totalCost)
}

HTTP/REST Streaming (Server-Sent Events)

If you're using the HTTP API, streaming uses Server-Sent Events (SSE) for progressive updates.

Endpoint

POST /api/objectGen
Content-Type: application/json
Authorization: Bearer your-token

Request Format

curl -N -X POST http://localhost:2008/api/objectGen \
-H "Authorization: Bearer your-token" \
-H "Content-Type: application/json" \
-d '{
"prompt": "Write a blog post about artificial intelligence",
"stream": true,
"definition": {
"type": "object",
"instruction": "Generate a compelling blog post",
"model": "gpt-4o-mini",
"properties": {
"title": {
"type": "string",
"instruction": "Create an engaging title (max 60 characters)",
"priority": 3
},
"excerpt": {
"type": "string",
"instruction": "Write a compelling excerpt (150-200 words)",
"priority": 2
},
"mainContent": {
"type": "string",
"instruction": "Write the main blog post (800-1200 words)",
"priority": 1
},
"tags": {
"type": "array",
"instruction": "Generate 5-10 relevant tags",
"priority": 0,
"items": {
"type": "string"
}
}
}
}
}'

Note: The -N flag is crucial - it disables buffering so you receive data immediately.

Response Format

Stream Chunks

Each streamed response contains:

{
"data": {
"fieldName": "fieldValue"
},
"usdCost": 0.0023,
"status": "processing"
}

Final Response

The last chunk includes:

{
"data": {
"lastField": "lastValue"
},
"usdCost": 0.0156,
"status": "complete"
}

Best Practices

1. Set Appropriate Priorities

For streaming requests, ensure all fields have priority >= 0:

{
"properties": {
"urgentField": {
"priority": 3, // Highest priority - returns first
"type": "string"
},
"importantField": {
"priority": 1, // Returns soon
"type": "string"
},
"backgroundField": {
"priority": -1 // WRONG - will be batched and block stream
}
}
}

2. Handle Connection Errors

// Use context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

stream, err := client.StreamGeneratedObjects(ctx, request)
if err != nil {
log.Fatalf("Failed to start stream: %v", err)
}

for {
response, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
// Handle specific gRPC errors
if status, ok := status.FromError(err); ok {
log.Printf("gRPC error: %v - %v", status.Code(), status.Message())
}
return
}
// Process response...
}

3. Progressive UI Updates

Update your UI as each field arrives:

// React example
function StreamingContent() {
const [data, setData] = useState({});
const [loading, setLoading] = useState({});

useEffect(() => {
const stream = startStream();

stream.on('data', (chunk) => {
setData(prev => ({ ...prev, ...chunk.data }));

// Mark fields as loaded
Object.keys(chunk.data).forEach(key => {
setLoading(prev => ({ ...prev, [key]: false }));
});
});
}, []);

return (
<div>
<h1>
{loading.title ? <Skeleton /> : data.title}
</h1>
<p>
{loading.excerpt ? <Skeleton /> : data.excerpt}
</p>
{/* ... */}
</div>
);
}

Performance Considerations

Latency Optimization

StrategyImpactUse When
Higher prioritiesFields return fasterUser-visible content
Smaller field instructionsFaster generationShort-form content
Faster models (e.g., GPT-4o-mini)Lower latencySpeed > quality
Parallel field generationFields process simultaneouslyIndependent fields