events

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2026 License: MIT Imports: 6 Imported by: 0

README

Events Package

Real-time event streaming system for processing LLM responses with buffering, backpressure handling, and concurrent stream management.

Overview

The events package provides a robust, thread-safe event streaming system designed for real-time LLM response processing. It supports:

  • Buffered Event Streams: Configurable buffer sizes with automatic backpressure handling
  • Multiple Event Types: Text deltas, content blocks, messages, usage stats, thinking, and errors
  • Stream Management: Concurrent management of multiple event streams
  • Context Awareness: Support for context cancellation and timeouts
  • Error Handling: Comprehensive error types with sentinel errors
  • JSON Serialization: Full support for event marshaling/unmarshaling

Architecture

┌─────────────────────────────────────────────────────────────┐
│                      StreamManager                           │
│  ┌───────────────────────────────────────────────────────┐  │
│  │  streams: map[string]*managedStream                   │  │
│  │  - session-1 -> EventStream (buffer: 100)            │  │
│  │  - session-2 -> EventStream (buffer: 100)            │  │
│  │  - session-3 -> EventStream (buffer: 100)            │  │
│  └───────────────────────────────────────────────────────┘  │
│                                                               │
│  Operations:                                                  │
│  - CreateStream(id) -> EventStream                           │
│  - GetStream(id) -> EventStream                              │
│  - GetOrCreate(id) -> EventStream, created                   │
│  - CloseStream(id)                                           │
│  - CleanupInactive(threshold)                                │
└─────────────────────────────────────────────────────────────┘
                            │
                            │ manages
                            ▼
┌─────────────────────────────────────────────────────────────┐
│                      EventStream                             │
│  ┌───────────────────────────────────────────────────────┐  │
│  │  events: chan *Event (buffered)                       │  │
│  │  backpressurePolicy: Block | Drop                     │  │
│  │  closed: bool                                         │  │
│  └───────────────────────────────────────────────────────┘  │
│                                                               │
│  Operations:                                                  │
│  - Send(event) -> error                                      │
│  - SendWithContext(ctx, event) -> error                      │
│  - Receive() -> <-chan Event                                 │
│  - Close() -> error                                          │
└─────────────────────────────────────────────────────────────┘
                            │
                            │ transports
                            ▼
┌─────────────────────────────────────────────────────────────┐
│                         Event                                │
│  ┌───────────────────────────────────────────────────────┐  │
│  │  Type: EventType                                      │  │
│  │  Data: map[string]interface{}                         │  │
│  │  Timestamp: time.Time                                 │  │
│  └───────────────────────────────────────────────────────┘  │
│                                                               │
│  Event Types:                                                │
│  - EventTextDelta      - Incremental text chunks            │
│  - EventContentStart   - Content block start                │
│  - EventContentEnd     - Content block end                  │
│  - EventMessageStart   - Message start                      │
│  - EventMessageStop    - Message stop                       │
│  - EventError          - Error events                       │
│  - EventUsage          - Token usage stats                  │
│  - EventThinking       - Extended thinking                  │
└─────────────────────────────────────────────────────────────┘

Event Flow Diagram

LLM Provider                EventStream              Client Application
     │                           │                            │
     │──── MessageStart ────────>│                            │
     │                           │────── Receive() ──────────>│
     │                           │                            │
     │──── ContentStart ────────>│                            │
     │                           │────── Receive() ──────────>│
     │                           │                            │
     │──── TextDelta("The") ────>│                            │
     │                           │────── Receive() ──────────>│
     │                           │                            │
     │──── TextDelta("answer")──>│                            │
     │                           │────── Receive() ──────────>│
     │                           │                            │
     │──── TextDelta("is") ─────>│                            │
     │                           │────── Receive() ──────────>│
     │                           │                            │
     │──── TextDelta("42") ─────>│                            │
     │                           │────── Receive() ──────────>│
     │                           │                            │
     │──── ContentEnd ──────────>│                            │
     │                           │────── Receive() ──────────>│
     │                           │                            │
     │──── Usage(10,4,14) ──────>│                            │
     │                           │────── Receive() ──────────>│
     │                           │                            │
     │──── MessageStop ─────────>│                            │
     │                           │────── Receive() ──────────>│
     │                           │                            │
     │                           │<──── Close() ───────────────│
     │                           │                            │
     │                           │ (channel closed)           │
     │                           │                            │

Usage Examples

Basic Event Streaming
// Create a new event stream
stream := events.NewEventStream(100)
defer stream.Close()

// Send events
stream.Send(events.MessageStartEvent("msg_123"))
stream.Send(events.TextDeltaEvent("Hello "))
stream.Send(events.TextDeltaEvent("World!"))
stream.Send(events.MessageStopEvent("msg_123", "end_turn"))

// Receive events
for event := range stream.Receive() {
    switch event.Type {
    case events.EventTextDelta:
        fmt.Print(event.Data["text"])
    case events.EventMessageStop:
        fmt.Println("\nDone!")
    }
}
Context-Aware Streaming
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

stream := events.NewEventStream(10)
defer stream.Close()

// Send with context
err := stream.SendWithContext(ctx, events.TextDeltaEvent("test"))
if err != nil {
    if errors.Is(err, context.DeadlineExceeded) {
        fmt.Println("Timeout!")
    }
}
Managing Multiple Streams
manager := events.NewStreamManager(100)
defer manager.CloseAll()

// Create streams for different sessions
stream1, _ := manager.CreateStream("session-user-123")
stream2, _ := manager.CreateStream("session-user-456")

// Send events to specific streams
stream1.Send(events.TextDeltaEvent("Hello user 123"))
stream2.Send(events.TextDeltaEvent("Hello user 456"))

// Get or create pattern
stream, created, _ := manager.GetOrCreate("session-user-789")
if created {
    fmt.Println("New stream created")
}
Backpressure Handling
stream := events.NewEventStream(5) // Small buffer
defer stream.Close()

// Set policy to drop events when buffer is full
stream.SetBackpressurePolicy(events.BackpressureDrop)

err := stream.Send(event)
if events.IsStreamFull(err) {
    fmt.Println("Buffer full, event dropped")
}

// Alternative: Block until space is available (default)
stream.SetBackpressurePolicy(events.BackpressureBlock)
Error Handling
// Check for specific errors
err := manager.GetStream("nonexistent")
if events.IsStreamNotFound(err) {
    fmt.Println("Stream not found")
}

err = stream.Send(event)
if events.IsStreamClosed(err) {
    fmt.Println("Stream is closed")
}

if events.IsStreamFull(err) {
    fmt.Println("Buffer is full")
}

if events.IsInvalidEvent(err) {
    fmt.Println("Invalid event")
}
JSON Serialization
// Create event
event := events.UsageEvent(100, 50, 150)

// Marshal to JSON
jsonData, _ := json.Marshal(event)
// Output: {"type":"Usage","data":{"prompt_tokens":100,...},"timestamp":"..."}

// Unmarshal from JSON
var newEvent events.Event
json.Unmarshal(jsonData, &newEvent)

Event Types

EventTextDelta

Incremental text chunks from the LLM.

event := events.TextDeltaEvent("Hello ")
// Data: {"text": "Hello "}
EventContentStart / EventContentEnd

Marks the beginning/end of a content block.

startEvent := events.ContentStartEvent(0)
// Data: {"index": 0}

endEvent := events.ContentEndEvent(0)
// Data: {"index": 0}
EventMessageStart / EventMessageStop

Marks the start/end of a message.

startEvent := events.MessageStartEvent("msg_123")
// Data: {"message_id": "msg_123"}

stopEvent := events.MessageStopEvent("msg_123", "end_turn")
// Data: {"message_id": "msg_123", "stop_reason": "end_turn"}
EventError

Error events during streaming.

event := events.ErrorEvent("Connection timeout")
// Data: {"error": "Connection timeout"}
EventUsage

Token usage statistics.

event := events.UsageEvent(100, 50, 150)
// Data: {
//   "prompt_tokens": 100,
//   "completion_tokens": 50,
//   "total_tokens": 150
// }
EventThinking

Extended thinking events (e.g., Claude's reasoning process).

event := events.ThinkingEvent("Analyzing the problem...")
// Data: {"thinking": "Analyzing the problem..."}

Error Types

Sentinel Errors
  • ErrClosed - Stream is closed
  • ErrNotFound - Stream not found
  • ErrFull - Stream buffer is full (backpressure)
  • ErrInvalid - Invalid event
Helper Functions
ErrStreamClosed(streamID) error
ErrStreamNotFound(streamID) error
ErrStreamFull(streamID, bufferSize) error
ErrInvalidEvent(reason) error
Error Checking
IsStreamClosed(err) bool
IsStreamNotFound(err) bool
IsStreamFull(err) bool
IsInvalidEvent(err) bool

Best Practices

1. Always Close Streams
stream := events.NewEventStream(100)
defer stream.Close() // Always defer close
2. Use Context for Long-Running Operations
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

err := stream.SendWithContext(ctx, event)
3. Handle Backpressure Appropriately
// For real-time UI updates: use Drop policy
stream.SetBackpressurePolicy(events.BackpressureDrop)

// For critical events: use Block policy (default)
stream.SetBackpressurePolicy(events.BackpressureBlock)
4. Clean Up Inactive Streams
manager := events.NewStreamManager(100)

// Periodically cleanup
go func() {
    ticker := time.NewTicker(1 * time.Hour)
    defer ticker.Stop()

    for range ticker.C {
        cleaned := manager.CleanupInactive(24 * time.Hour)
        log.Printf("Cleaned up %d inactive streams", cleaned)
    }
}()
5. Use GetOrCreate for Idempotent Operations
stream, created, err := manager.GetOrCreate(sessionID)
if err != nil {
    return err
}

if created {
    log.Printf("Created new stream for session: %s", sessionID)
}

Thread Safety

All operations are thread-safe:

  • EventStream: Uses sync.RWMutex for state protection
  • StreamManager: Uses sync.RWMutex for map protection
  • Channels: Go channels provide inherent thread safety

Performance Considerations

Buffer Sizing
  • Small buffers (10-50): Lower memory, higher backpressure risk
  • Medium buffers (100-500): Balanced for most use cases
  • Large buffers (1000+): High throughput, higher memory usage
Backpressure Policies
  • Block: Guarantees delivery but may block senders
  • Drop: Never blocks but may lose events under load
Cleanup

Regular cleanup of inactive streams prevents memory leaks:

// Cleanup streams inactive for > 1 hour
manager.CleanupInactive(1 * time.Hour)

Testing

Run tests with coverage:

go test -v -race -cover ./internal/events/

Current coverage: 84.3%

Integration

The events package integrates with:

  • internal/provider - LLM provider streaming
  • internal/client - Client-side event consumption
  • internal/tui - Terminal UI updates

License

Part of the AINative-Code project.

Documentation

Overview

Package events provides a real-time event streaming system for processing LLM responses.

The events package implements a robust, thread-safe event streaming infrastructure designed for real-time LLM response processing. It supports buffered event streams, backpressure handling, concurrent stream management, and context-aware operations.

Core Components

The package consists of four main components:

1. Event Types (types.go) - Defines event structures and event types 2. EventStream (stream.go) - Manages individual event streams with buffering 3. StreamManager (manager.go) - Manages multiple concurrent streams 4. Error Types (errors.go) - Provides comprehensive error handling

Event Types

The package supports eight event types for LLM streaming:

  • EventTextDelta: Incremental text chunks from the LLM
  • EventContentStart: Beginning of a content block
  • EventContentEnd: End of a content block
  • EventMessageStart: Start of a message
  • EventMessageStop: End of a message
  • EventError: Error events during streaming
  • EventUsage: Token usage statistics
  • EventThinking: Extended thinking events (e.g., Claude's reasoning)

Basic Usage

Create and use a simple event stream:

stream := events.NewEventStream(100)
defer stream.Close()

// Send events
stream.Send(events.TextDeltaEvent("Hello "))
stream.Send(events.TextDeltaEvent("World!"))

// Receive events
for event := range stream.Receive() {
    fmt.Print(event.Data["text"])
}

Stream Management

Manage multiple concurrent streams:

manager := events.NewStreamManager(100)
defer manager.CloseAll()

// Create streams for different sessions
stream1, _ := manager.CreateStream("session-1")
stream2, _ := manager.CreateStream("session-2")

// Get or create pattern
stream, created, _ := manager.GetOrCreate("session-3")

Context Awareness

Support for context cancellation and timeouts:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

err := stream.SendWithContext(ctx, event)
if err != nil {
    if errors.Is(err, context.DeadlineExceeded) {
        // Handle timeout
    }
}

Backpressure Handling

Two backpressure policies are supported:

// Block until space is available (default)
stream.SetBackpressurePolicy(events.BackpressureBlock)

// Drop events when buffer is full
stream.SetBackpressurePolicy(events.BackpressureDrop)

Error Handling

Comprehensive error types with sentinel errors:

err := manager.GetStream("nonexistent")
if events.IsStreamNotFound(err) {
    // Handle stream not found
}

err = stream.Send(event)
if events.IsStreamClosed(err) {
    // Handle closed stream
}

if events.IsStreamFull(err) {
    // Handle backpressure
}

Thread Safety

All operations are thread-safe. The package uses sync.RWMutex for state protection and Go channels for event delivery, ensuring safe concurrent access.

Performance

Buffer sizes affect performance and memory usage:

  • Small buffers (10-50): Lower memory, higher backpressure risk
  • Medium buffers (100-500): Balanced for most use cases
  • Large buffers (1000+): High throughput, higher memory usage

JSON Serialization

Events support JSON marshaling and unmarshaling:

event := events.UsageEvent(100, 50, 150)
jsonData, _ := json.Marshal(event)

var newEvent events.Event
json.Unmarshal(jsonData, &newEvent)

Integration

The events package is designed to integrate with:

  • internal/provider: LLM provider streaming implementations
  • internal/client: Client-side event consumption
  • internal/tui: Terminal UI real-time updates

For more examples and detailed documentation, see the README.md and examples.go files.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrClosed indicates the stream has been closed
	ErrClosed = errors.New("stream is closed")

	// ErrNotFound indicates the stream was not found
	ErrNotFound = errors.New("stream not found")

	// ErrFull indicates the stream buffer is full (backpressure)
	ErrFull = errors.New("stream buffer is full")

	// ErrInvalid indicates an invalid event
	ErrInvalid = errors.New("invalid event")
)

Sentinel errors for stream operations

Functions

func ErrInvalidEvent

func ErrInvalidEvent(reason string) error

ErrInvalidEvent creates a new invalid event error

func ErrStreamClosed

func ErrStreamClosed(streamID string) error

ErrStreamClosed creates a new stream closed error

func ErrStreamFull

func ErrStreamFull(streamID string, bufferSize int) error

ErrStreamFull creates a new stream full error (backpressure)

func ErrStreamNotFound

func ErrStreamNotFound(streamID string) error

ErrStreamNotFound creates a new stream not found error

func Example_backpressureHandling

func Example_backpressureHandling()

Example_backpressureHandling demonstrates backpressure handling

func Example_basicEventStream

func Example_basicEventStream()

Example_basicEventStream demonstrates basic event stream usage

func Example_contextAwareStreaming

func Example_contextAwareStreaming()

Example_contextAwareStreaming demonstrates context-aware event streaming

func Example_errorHandling

func Example_errorHandling()

Example_errorHandling demonstrates proper error handling

func Example_getOrCreate

func Example_getOrCreate()

Example_getOrCreate demonstrates the GetOrCreate pattern

func Example_jsonSerialization

func Example_jsonSerialization()

Example_jsonSerialization demonstrates event JSON serialization

func Example_realTimeLLMStreaming

func Example_realTimeLLMStreaming()

Example_realTimeLLMStreaming demonstrates real-time LLM response streaming

func Example_streamManager

func Example_streamManager()

Example_streamManager demonstrates managing multiple concurrent streams

func IsInvalidEvent

func IsInvalidEvent(err error) bool

IsInvalidEvent checks if an error is an invalid event error

func IsStreamClosed

func IsStreamClosed(err error) bool

IsStreamClosed checks if an error is a stream closed error

func IsStreamFull

func IsStreamFull(err error) bool

IsStreamFull checks if an error is a stream full error

func IsStreamNotFound

func IsStreamNotFound(err error) bool

IsStreamNotFound checks if an error is a stream not found error

Types

type BackpressurePolicy

type BackpressurePolicy int

BackpressurePolicy defines how the stream handles backpressure when the buffer is full

const (
	// BackpressureBlock blocks the sender until space is available
	BackpressureBlock BackpressurePolicy = iota

	// BackpressureDrop drops the event and returns an error
	BackpressureDrop
)

type Event

type Event struct {
	// Type specifies the kind of event (TextDelta, ContentStart, etc.)
	Type EventType

	// Data contains the event-specific payload as a flexible map
	// The structure depends on the event type
	Data map[string]interface{}

	// Timestamp records when the event was created
	Timestamp time.Time
}

Event represents a streaming event from an LLM provider It contains type information, event data, and timestamp

func ContentEndEvent

func ContentEndEvent(index int) *Event

ContentEndEvent creates a content end event with the given index

func ContentStartEvent

func ContentStartEvent(index int) *Event

ContentStartEvent creates a content start event with the given index

func ErrorEvent

func ErrorEvent(errorMsg string) *Event

ErrorEvent creates an error event with the given error message

func MessageStartEvent

func MessageStartEvent(messageID string) *Event

MessageStartEvent creates a message start event with the given message ID

func MessageStopEvent

func MessageStopEvent(messageID, stopReason string) *Event

MessageStopEvent creates a message stop event with the given message ID and reason

func NewEvent

func NewEvent(eventType EventType, data map[string]interface{}) (*Event, error)

NewEvent creates a new event with the given type and data It automatically sets the timestamp to the current time

func TextDeltaEvent

func TextDeltaEvent(text string) *Event

TextDeltaEvent creates a text delta event with the given text

func ThinkingEvent

func ThinkingEvent(thinking string) *Event

ThinkingEvent creates a thinking event with extended thinking text

func UsageEvent

func UsageEvent(promptTokens, completionTokens, totalTokens int) *Event

UsageEvent creates a usage event with token usage statistics

func (Event) MarshalJSON

func (e Event) MarshalJSON() ([]byte, error)

MarshalJSON implements json.Marshaler interface for Event

func (*Event) UnmarshalJSON

func (e *Event) UnmarshalJSON(data []byte) error

UnmarshalJSON implements json.Unmarshaler interface for Event

func (*Event) Validate

func (e *Event) Validate() error

Validate checks if the event is valid

type EventStream

type EventStream struct {
	// contains filtered or unexported fields
}

EventStream manages a stream of events with buffering and backpressure handling It provides thread-safe operations for sending and receiving events

func NewEventStream

func NewEventStream(bufferSize int) *EventStream

NewEventStream creates a new event stream with the specified buffer size If bufferSize is 0, defaultBufferSize is used

func (*EventStream) BufferSize

func (s *EventStream) BufferSize() int

BufferSize returns the capacity of the event buffer

func (*EventStream) Close

func (s *EventStream) Close() error

Close gracefully shuts down the stream It closes the event channel, allowing consumers to drain remaining events Returns an error if the stream is already closed

func (*EventStream) IsClosed

func (s *EventStream) IsClosed() bool

IsClosed returns true if the stream has been closed

func (*EventStream) Len

func (s *EventStream) Len() int

Len returns the current number of events in the buffer

func (*EventStream) Receive

func (s *EventStream) Receive() <-chan *Event

Receive returns the receive-only channel for consuming events The channel will be closed when the stream is closed

func (*EventStream) Send

func (s *EventStream) Send(event *Event) error

Send sends an event to the stream Returns an error if the stream is closed or if the event is invalid Behavior when buffer is full depends on the backpressure policy

func (*EventStream) SendWithContext

func (s *EventStream) SendWithContext(ctx context.Context, event *Event) error

SendWithContext sends an event to the stream with context support Returns an error if the context is cancelled, the stream is closed, or the event is invalid

func (*EventStream) SetBackpressurePolicy

func (s *EventStream) SetBackpressurePolicy(policy BackpressurePolicy)

SetBackpressurePolicy sets the backpressure handling policy This should be called before sending events

type EventType

type EventType int

EventType represents the type of streaming event

const (
	// EventTextDelta represents incremental text chunks from the LLM
	EventTextDelta EventType = iota

	// EventContentStart marks the beginning of a content block
	EventContentStart

	// EventContentEnd marks the end of a content block
	EventContentEnd

	// EventMessageStart marks the start of a message
	EventMessageStart

	// EventMessageStop marks the end of a message
	EventMessageStop

	// EventError represents an error event
	EventError

	// EventUsage represents token usage statistics
	EventUsage

	// EventThinking represents extended thinking events (e.g., Claude's thinking process)
	EventThinking
)

func ParseEventType

func ParseEventType(s string) (EventType, error)

ParseEventType converts a string to EventType

func (EventType) String

func (e EventType) String() string

String returns the string representation of EventType

type StreamError

type StreamError struct {
	// StreamID identifies the stream where the error occurred
	StreamID string

	// Op is the operation that failed (e.g., "send", "receive", "close")
	Op string

	// Err is the underlying error
	Err error
}

StreamError represents an error that occurred during stream operations It wraps the underlying error and provides context about the stream and operation

func (*StreamError) Error

func (e *StreamError) Error() string

Error implements the error interface

func (*StreamError) Unwrap

func (e *StreamError) Unwrap() error

Unwrap returns the underlying error for error chain support

type StreamInfo

type StreamInfo struct {
	ID           string
	BufferSize   int
	CurrentLoad  int
	IsClosed     bool
	LastActivity time.Time
}

StreamInfo contains information about a stream

type StreamManager

type StreamManager struct {
	// contains filtered or unexported fields
}

StreamManager manages multiple concurrent event streams It provides thread-safe operations for creating, retrieving, and closing streams

func NewStreamManager

func NewStreamManager(bufferSize int) *StreamManager

NewStreamManager creates a new stream manager with the specified default buffer size

func (*StreamManager) CleanupInactive

func (m *StreamManager) CleanupInactive(threshold time.Duration) int

CleanupInactive removes streams that have been inactive for longer than the threshold Returns the number of streams cleaned up

func (*StreamManager) CloseAll

func (m *StreamManager) CloseAll()

CloseAll closes all active streams and clears the manager

func (*StreamManager) CloseStream

func (m *StreamManager) CloseStream(streamID string) error

CloseStream closes and removes a stream by ID Returns an error if the stream ID is empty or if the stream does not exist

func (*StreamManager) CreateStream

func (m *StreamManager) CreateStream(streamID string) (*EventStream, error)

CreateStream creates a new event stream with the given ID Returns an error if the stream ID is empty or if a stream with the same ID already exists

func (*StreamManager) GetOrCreate

func (m *StreamManager) GetOrCreate(streamID string) (*EventStream, bool, error)

GetOrCreate retrieves an existing stream or creates a new one if it doesn't exist Returns the stream, a boolean indicating if it was created, and any error

func (*StreamManager) GetStream

func (m *StreamManager) GetStream(streamID string) (*EventStream, error)

GetStream retrieves an existing stream by ID Returns an error if the stream ID is empty or if the stream does not exist

func (*StreamManager) GetStreamInfo

func (m *StreamManager) GetStreamInfo(streamID string) (*StreamInfo, error)

GetStreamInfo returns detailed information about a stream

func (*StreamManager) ListStreamInfo

func (m *StreamManager) ListStreamInfo() []*StreamInfo

ListStreamInfo returns detailed information about all streams

func (*StreamManager) ListStreams

func (m *StreamManager) ListStreams() []string

ListStreams returns a list of all active stream IDs

func (*StreamManager) StreamCount

func (m *StreamManager) StreamCount() int

StreamCount returns the number of active streams

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL