Documentation
¶
Overview ¶
Package events provides a real-time event streaming system for processing LLM responses.
The events package implements a robust, thread-safe event streaming infrastructure designed for real-time LLM response processing. It supports buffered event streams, backpressure handling, concurrent stream management, and context-aware operations.
Core Components ¶
The package consists of four main components:
1. Event Types (types.go) - Defines event structures and event types 2. EventStream (stream.go) - Manages individual event streams with buffering 3. StreamManager (manager.go) - Manages multiple concurrent streams 4. Error Types (errors.go) - Provides comprehensive error handling
Event Types ¶
The package supports eight event types for LLM streaming:
- EventTextDelta: Incremental text chunks from the LLM
- EventContentStart: Beginning of a content block
- EventContentEnd: End of a content block
- EventMessageStart: Start of a message
- EventMessageStop: End of a message
- EventError: Error events during streaming
- EventUsage: Token usage statistics
- EventThinking: Extended thinking events (e.g., Claude's reasoning)
Basic Usage ¶
Create and use a simple event stream:
stream := events.NewEventStream(100)
defer stream.Close()
// Send events
stream.Send(events.TextDeltaEvent("Hello "))
stream.Send(events.TextDeltaEvent("World!"))
// Receive events
for event := range stream.Receive() {
fmt.Print(event.Data["text"])
}
Stream Management ¶
Manage multiple concurrent streams:
manager := events.NewStreamManager(100)
defer manager.CloseAll()
// Create streams for different sessions
stream1, _ := manager.CreateStream("session-1")
stream2, _ := manager.CreateStream("session-2")
// Get or create pattern
stream, created, _ := manager.GetOrCreate("session-3")
Context Awareness ¶
Support for context cancellation and timeouts:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
err := stream.SendWithContext(ctx, event)
if err != nil {
if errors.Is(err, context.DeadlineExceeded) {
// Handle timeout
}
}
Backpressure Handling ¶
Two backpressure policies are supported:
// Block until space is available (default) stream.SetBackpressurePolicy(events.BackpressureBlock) // Drop events when buffer is full stream.SetBackpressurePolicy(events.BackpressureDrop)
Error Handling ¶
Comprehensive error types with sentinel errors:
err := manager.GetStream("nonexistent")
if events.IsStreamNotFound(err) {
// Handle stream not found
}
err = stream.Send(event)
if events.IsStreamClosed(err) {
// Handle closed stream
}
if events.IsStreamFull(err) {
// Handle backpressure
}
Thread Safety ¶
All operations are thread-safe. The package uses sync.RWMutex for state protection and Go channels for event delivery, ensuring safe concurrent access.
Performance ¶
Buffer sizes affect performance and memory usage:
- Small buffers (10-50): Lower memory, higher backpressure risk
- Medium buffers (100-500): Balanced for most use cases
- Large buffers (1000+): High throughput, higher memory usage
JSON Serialization ¶
Events support JSON marshaling and unmarshaling:
event := events.UsageEvent(100, 50, 150) jsonData, _ := json.Marshal(event) var newEvent events.Event json.Unmarshal(jsonData, &newEvent)
Integration ¶
The events package is designed to integrate with:
- internal/provider: LLM provider streaming implementations
- internal/client: Client-side event consumption
- internal/tui: Terminal UI real-time updates
For more examples and detailed documentation, see the README.md and examples.go files.
Index ¶
- Variables
- func ErrInvalidEvent(reason string) error
- func ErrStreamClosed(streamID string) error
- func ErrStreamFull(streamID string, bufferSize int) error
- func ErrStreamNotFound(streamID string) error
- func Example_backpressureHandling()
- func Example_basicEventStream()
- func Example_contextAwareStreaming()
- func Example_errorHandling()
- func Example_getOrCreate()
- func Example_jsonSerialization()
- func Example_realTimeLLMStreaming()
- func Example_streamManager()
- func IsInvalidEvent(err error) bool
- func IsStreamClosed(err error) bool
- func IsStreamFull(err error) bool
- func IsStreamNotFound(err error) bool
- type BackpressurePolicy
- type Event
- func ContentEndEvent(index int) *Event
- func ContentStartEvent(index int) *Event
- func ErrorEvent(errorMsg string) *Event
- func MessageStartEvent(messageID string) *Event
- func MessageStopEvent(messageID, stopReason string) *Event
- func NewEvent(eventType EventType, data map[string]interface{}) (*Event, error)
- func TextDeltaEvent(text string) *Event
- func ThinkingEvent(thinking string) *Event
- func UsageEvent(promptTokens, completionTokens, totalTokens int) *Event
- type EventStream
- func (s *EventStream) BufferSize() int
- func (s *EventStream) Close() error
- func (s *EventStream) IsClosed() bool
- func (s *EventStream) Len() int
- func (s *EventStream) Receive() <-chan *Event
- func (s *EventStream) Send(event *Event) error
- func (s *EventStream) SendWithContext(ctx context.Context, event *Event) error
- func (s *EventStream) SetBackpressurePolicy(policy BackpressurePolicy)
- type EventType
- type StreamError
- type StreamInfo
- type StreamManager
- func (m *StreamManager) CleanupInactive(threshold time.Duration) int
- func (m *StreamManager) CloseAll()
- func (m *StreamManager) CloseStream(streamID string) error
- func (m *StreamManager) CreateStream(streamID string) (*EventStream, error)
- func (m *StreamManager) GetOrCreate(streamID string) (*EventStream, bool, error)
- func (m *StreamManager) GetStream(streamID string) (*EventStream, error)
- func (m *StreamManager) GetStreamInfo(streamID string) (*StreamInfo, error)
- func (m *StreamManager) ListStreamInfo() []*StreamInfo
- func (m *StreamManager) ListStreams() []string
- func (m *StreamManager) StreamCount() int
Constants ¶
This section is empty.
Variables ¶
var ( // ErrClosed indicates the stream has been closed ErrClosed = errors.New("stream is closed") // ErrNotFound indicates the stream was not found ErrNotFound = errors.New("stream not found") // ErrFull indicates the stream buffer is full (backpressure) ErrFull = errors.New("stream buffer is full") // ErrInvalid indicates an invalid event ErrInvalid = errors.New("invalid event") )
Sentinel errors for stream operations
Functions ¶
func ErrInvalidEvent ¶
ErrInvalidEvent creates a new invalid event error
func ErrStreamClosed ¶
ErrStreamClosed creates a new stream closed error
func ErrStreamFull ¶
ErrStreamFull creates a new stream full error (backpressure)
func ErrStreamNotFound ¶
ErrStreamNotFound creates a new stream not found error
func Example_backpressureHandling ¶
func Example_backpressureHandling()
Example_backpressureHandling demonstrates backpressure handling
func Example_basicEventStream ¶
func Example_basicEventStream()
Example_basicEventStream demonstrates basic event stream usage
func Example_contextAwareStreaming ¶
func Example_contextAwareStreaming()
Example_contextAwareStreaming demonstrates context-aware event streaming
func Example_errorHandling ¶
func Example_errorHandling()
Example_errorHandling demonstrates proper error handling
func Example_getOrCreate ¶
func Example_getOrCreate()
Example_getOrCreate demonstrates the GetOrCreate pattern
func Example_jsonSerialization ¶
func Example_jsonSerialization()
Example_jsonSerialization demonstrates event JSON serialization
func Example_realTimeLLMStreaming ¶
func Example_realTimeLLMStreaming()
Example_realTimeLLMStreaming demonstrates real-time LLM response streaming
func Example_streamManager ¶
func Example_streamManager()
Example_streamManager demonstrates managing multiple concurrent streams
func IsInvalidEvent ¶
IsInvalidEvent checks if an error is an invalid event error
func IsStreamClosed ¶
IsStreamClosed checks if an error is a stream closed error
func IsStreamFull ¶
IsStreamFull checks if an error is a stream full error
func IsStreamNotFound ¶
IsStreamNotFound checks if an error is a stream not found error
Types ¶
type BackpressurePolicy ¶
type BackpressurePolicy int
BackpressurePolicy defines how the stream handles backpressure when the buffer is full
const ( // BackpressureBlock blocks the sender until space is available BackpressureBlock BackpressurePolicy = iota // BackpressureDrop drops the event and returns an error BackpressureDrop )
type Event ¶
type Event struct {
// Type specifies the kind of event (TextDelta, ContentStart, etc.)
Type EventType
// Data contains the event-specific payload as a flexible map
// The structure depends on the event type
Data map[string]interface{}
// Timestamp records when the event was created
Timestamp time.Time
}
Event represents a streaming event from an LLM provider It contains type information, event data, and timestamp
func ContentEndEvent ¶
ContentEndEvent creates a content end event with the given index
func ContentStartEvent ¶
ContentStartEvent creates a content start event with the given index
func ErrorEvent ¶
ErrorEvent creates an error event with the given error message
func MessageStartEvent ¶
MessageStartEvent creates a message start event with the given message ID
func MessageStopEvent ¶
MessageStopEvent creates a message stop event with the given message ID and reason
func NewEvent ¶
NewEvent creates a new event with the given type and data It automatically sets the timestamp to the current time
func TextDeltaEvent ¶
TextDeltaEvent creates a text delta event with the given text
func ThinkingEvent ¶
ThinkingEvent creates a thinking event with extended thinking text
func UsageEvent ¶
UsageEvent creates a usage event with token usage statistics
func (Event) MarshalJSON ¶
MarshalJSON implements json.Marshaler interface for Event
func (*Event) UnmarshalJSON ¶
UnmarshalJSON implements json.Unmarshaler interface for Event
type EventStream ¶
type EventStream struct {
// contains filtered or unexported fields
}
EventStream manages a stream of events with buffering and backpressure handling It provides thread-safe operations for sending and receiving events
func NewEventStream ¶
func NewEventStream(bufferSize int) *EventStream
NewEventStream creates a new event stream with the specified buffer size If bufferSize is 0, defaultBufferSize is used
func (*EventStream) BufferSize ¶
func (s *EventStream) BufferSize() int
BufferSize returns the capacity of the event buffer
func (*EventStream) Close ¶
func (s *EventStream) Close() error
Close gracefully shuts down the stream It closes the event channel, allowing consumers to drain remaining events Returns an error if the stream is already closed
func (*EventStream) IsClosed ¶
func (s *EventStream) IsClosed() bool
IsClosed returns true if the stream has been closed
func (*EventStream) Len ¶
func (s *EventStream) Len() int
Len returns the current number of events in the buffer
func (*EventStream) Receive ¶
func (s *EventStream) Receive() <-chan *Event
Receive returns the receive-only channel for consuming events The channel will be closed when the stream is closed
func (*EventStream) Send ¶
func (s *EventStream) Send(event *Event) error
Send sends an event to the stream Returns an error if the stream is closed or if the event is invalid Behavior when buffer is full depends on the backpressure policy
func (*EventStream) SendWithContext ¶
func (s *EventStream) SendWithContext(ctx context.Context, event *Event) error
SendWithContext sends an event to the stream with context support Returns an error if the context is cancelled, the stream is closed, or the event is invalid
func (*EventStream) SetBackpressurePolicy ¶
func (s *EventStream) SetBackpressurePolicy(policy BackpressurePolicy)
SetBackpressurePolicy sets the backpressure handling policy This should be called before sending events
type EventType ¶
type EventType int
EventType represents the type of streaming event
const ( // EventTextDelta represents incremental text chunks from the LLM EventTextDelta EventType = iota // EventContentStart marks the beginning of a content block EventContentStart // EventContentEnd marks the end of a content block EventContentEnd // EventMessageStart marks the start of a message EventMessageStart // EventMessageStop marks the end of a message EventMessageStop // EventError represents an error event EventError // EventUsage represents token usage statistics EventUsage // EventThinking represents extended thinking events (e.g., Claude's thinking process) EventThinking )
func ParseEventType ¶
ParseEventType converts a string to EventType
type StreamError ¶
type StreamError struct {
// StreamID identifies the stream where the error occurred
StreamID string
// Op is the operation that failed (e.g., "send", "receive", "close")
Op string
// Err is the underlying error
Err error
}
StreamError represents an error that occurred during stream operations It wraps the underlying error and provides context about the stream and operation
func (*StreamError) Error ¶
func (e *StreamError) Error() string
Error implements the error interface
func (*StreamError) Unwrap ¶
func (e *StreamError) Unwrap() error
Unwrap returns the underlying error for error chain support
type StreamInfo ¶
type StreamInfo struct {
ID string
BufferSize int
CurrentLoad int
IsClosed bool
LastActivity time.Time
}
StreamInfo contains information about a stream
type StreamManager ¶
type StreamManager struct {
// contains filtered or unexported fields
}
StreamManager manages multiple concurrent event streams It provides thread-safe operations for creating, retrieving, and closing streams
func NewStreamManager ¶
func NewStreamManager(bufferSize int) *StreamManager
NewStreamManager creates a new stream manager with the specified default buffer size
func (*StreamManager) CleanupInactive ¶
func (m *StreamManager) CleanupInactive(threshold time.Duration) int
CleanupInactive removes streams that have been inactive for longer than the threshold Returns the number of streams cleaned up
func (*StreamManager) CloseAll ¶
func (m *StreamManager) CloseAll()
CloseAll closes all active streams and clears the manager
func (*StreamManager) CloseStream ¶
func (m *StreamManager) CloseStream(streamID string) error
CloseStream closes and removes a stream by ID Returns an error if the stream ID is empty or if the stream does not exist
func (*StreamManager) CreateStream ¶
func (m *StreamManager) CreateStream(streamID string) (*EventStream, error)
CreateStream creates a new event stream with the given ID Returns an error if the stream ID is empty or if a stream with the same ID already exists
func (*StreamManager) GetOrCreate ¶
func (m *StreamManager) GetOrCreate(streamID string) (*EventStream, bool, error)
GetOrCreate retrieves an existing stream or creates a new one if it doesn't exist Returns the stream, a boolean indicating if it was created, and any error
func (*StreamManager) GetStream ¶
func (m *StreamManager) GetStream(streamID string) (*EventStream, error)
GetStream retrieves an existing stream by ID Returns an error if the stream ID is empty or if the stream does not exist
func (*StreamManager) GetStreamInfo ¶
func (m *StreamManager) GetStreamInfo(streamID string) (*StreamInfo, error)
GetStreamInfo returns detailed information about a stream
func (*StreamManager) ListStreamInfo ¶
func (m *StreamManager) ListStreamInfo() []*StreamInfo
ListStreamInfo returns detailed information about all streams
func (*StreamManager) ListStreams ¶
func (m *StreamManager) ListStreams() []string
ListStreams returns a list of all active stream IDs
func (*StreamManager) StreamCount ¶
func (m *StreamManager) StreamCount() int
StreamCount returns the number of active streams