streaming

package
v0.1.17 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 20, 2026 License: Apache-2.0 Imports: 7 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type MemoryStreamBroker

type MemoryStreamBroker struct {
	// contains filtered or unexported fields
}

MemoryStreamBroker is an in-memory implementation of StreamBroker. It's suitable for testing and local development, or when all components run in the same process.

Note: This broker does not persist across restarts. For production deployments with separate processes, use RedisStreamBroker.

func NewMemoryStreamBroker

func NewMemoryStreamBroker() *MemoryStreamBroker

NewMemoryStreamBroker creates a new in-memory stream broker.

func (*MemoryStreamBroker) Close

func (b *MemoryStreamBroker) Close(ctx context.Context, channel string) error

Close signals that no more chunks will be published to the channel. This closes all subscriber channels for the given channel.

func (*MemoryStreamBroker) Publish

func (b *MemoryStreamBroker) Publish(ctx context.Context, channel string, chunk *responses.ResponseChunk) error

Publish sends a response chunk to all subscribers of the given channel.

func (*MemoryStreamBroker) Reset

func (b *MemoryStreamBroker) Reset()

Reset clears all subscribers and closed state. Useful for testing.

func (*MemoryStreamBroker) Subscribe

func (b *MemoryStreamBroker) Subscribe(ctx context.Context, channel string) (<-chan *responses.ResponseChunk, error)

Subscribe returns a channel that receives response chunks for the given channel. The buffer size is 100 chunks to handle bursts.

type RedisStreamBroker

type RedisStreamBroker struct {
	// contains filtered or unexported fields
}

RedisStreamBroker implements StreamBroker using Redis Pub/Sub. This is the recommended broker for production deployments where activities run in separate processes from the client.

Redis Pub/Sub provides: - Cross-process communication - Low latency message delivery - Automatic cleanup when subscribers disconnect

Note: Redis Pub/Sub is fire-and-forget. If no subscribers are connected when a message is published, the message is lost. For guaranteed delivery, consider using Redis Streams instead.

func NewRedisStreamBroker

func NewRedisStreamBroker(opts RedisStreamBrokerOptions) (*RedisStreamBroker, error)

NewRedisStreamBroker creates a new Redis-backed stream broker.

func (*RedisStreamBroker) Cleanup

func (b *RedisStreamBroker) Cleanup(ctx context.Context, channel string) error

Cleanup removes the closed marker for a channel. Call this to allow re-subscribing to a previously closed channel.

func (*RedisStreamBroker) Close

func (b *RedisStreamBroker) Close(ctx context.Context, channel string) error

Close signals that no more chunks will be published to the channel. This sets a key in Redis to indicate the channel is closed, and publishes a close signal to any active subscribers.

func (*RedisStreamBroker) GetClient

func (b *RedisStreamBroker) GetClient() *redis.Client

GetClient returns the underlying Redis client. Useful for advanced operations or sharing the client.

func (*RedisStreamBroker) Publish

func (b *RedisStreamBroker) Publish(ctx context.Context, channel string, chunk *responses.ResponseChunk) error

Publish sends a response chunk to all subscribers of the given channel.

func (*RedisStreamBroker) Subscribe

func (b *RedisStreamBroker) Subscribe(ctx context.Context, channel string) (<-chan *responses.ResponseChunk, error)

Subscribe returns a channel that receives response chunks for the given channel.

type RedisStreamBrokerOptions

type RedisStreamBrokerOptions struct {
	// Addr is the Redis server address (e.g., "localhost:6379").
	Addr string

	// Password is the Redis password (optional).
	Password string

	// DB is the Redis database number (default 0).
	DB int

	// Prefix is prepended to all channel names (default "uno:stream:").
	// This allows multiple applications to share the same Redis instance.
	Prefix string

	// Client is an existing Redis client to use instead of creating a new one.
	// If provided, Addr/Password/DB are ignored.
	Client *redis.Client
}

RedisStreamBrokerOptions configures the Redis stream broker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL