Documentation
¶
Index ¶
- type MemoryStreamBroker
- func (b *MemoryStreamBroker) Close(ctx context.Context, channel string) error
- func (b *MemoryStreamBroker) Publish(ctx context.Context, channel string, chunk *responses.ResponseChunk) error
- func (b *MemoryStreamBroker) Reset()
- func (b *MemoryStreamBroker) Subscribe(ctx context.Context, channel string) (<-chan *responses.ResponseChunk, error)
- type RedisStreamBroker
- func (b *RedisStreamBroker) Cleanup(ctx context.Context, channel string) error
- func (b *RedisStreamBroker) Close(ctx context.Context, channel string) error
- func (b *RedisStreamBroker) GetClient() *redis.Client
- func (b *RedisStreamBroker) Publish(ctx context.Context, channel string, chunk *responses.ResponseChunk) error
- func (b *RedisStreamBroker) Subscribe(ctx context.Context, channel string) (<-chan *responses.ResponseChunk, error)
- type RedisStreamBrokerOptions
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type MemoryStreamBroker ¶
type MemoryStreamBroker struct {
// contains filtered or unexported fields
}
MemoryStreamBroker is an in-memory implementation of StreamBroker. It's suitable for testing and local development, or when all components run in the same process.
Note: This broker does not persist across restarts. For production deployments with separate processes, use RedisStreamBroker.
func NewMemoryStreamBroker ¶
func NewMemoryStreamBroker() *MemoryStreamBroker
NewMemoryStreamBroker creates a new in-memory stream broker.
func (*MemoryStreamBroker) Close ¶
func (b *MemoryStreamBroker) Close(ctx context.Context, channel string) error
Close signals that no more chunks will be published to the channel. This closes all subscriber channels for the given channel.
func (*MemoryStreamBroker) Publish ¶
func (b *MemoryStreamBroker) Publish(ctx context.Context, channel string, chunk *responses.ResponseChunk) error
Publish sends a response chunk to all subscribers of the given channel.
func (*MemoryStreamBroker) Reset ¶
func (b *MemoryStreamBroker) Reset()
Reset clears all subscribers and closed state. Useful for testing.
func (*MemoryStreamBroker) Subscribe ¶
func (b *MemoryStreamBroker) Subscribe(ctx context.Context, channel string) (<-chan *responses.ResponseChunk, error)
Subscribe returns a channel that receives response chunks for the given channel. The buffer size is 100 chunks to handle bursts.
type RedisStreamBroker ¶
type RedisStreamBroker struct {
// contains filtered or unexported fields
}
RedisStreamBroker implements StreamBroker using Redis Pub/Sub. This is the recommended broker for production deployments where activities run in separate processes from the client.
Redis Pub/Sub provides: - Cross-process communication - Low latency message delivery - Automatic cleanup when subscribers disconnect
Note: Redis Pub/Sub is fire-and-forget. If no subscribers are connected when a message is published, the message is lost. For guaranteed delivery, consider using Redis Streams instead.
func NewRedisStreamBroker ¶
func NewRedisStreamBroker(opts RedisStreamBrokerOptions) (*RedisStreamBroker, error)
NewRedisStreamBroker creates a new Redis-backed stream broker.
func (*RedisStreamBroker) Cleanup ¶
func (b *RedisStreamBroker) Cleanup(ctx context.Context, channel string) error
Cleanup removes the closed marker for a channel. Call this to allow re-subscribing to a previously closed channel.
func (*RedisStreamBroker) Close ¶
func (b *RedisStreamBroker) Close(ctx context.Context, channel string) error
Close signals that no more chunks will be published to the channel. This sets a key in Redis to indicate the channel is closed, and publishes a close signal to any active subscribers.
func (*RedisStreamBroker) GetClient ¶
func (b *RedisStreamBroker) GetClient() *redis.Client
GetClient returns the underlying Redis client. Useful for advanced operations or sharing the client.
func (*RedisStreamBroker) Publish ¶
func (b *RedisStreamBroker) Publish(ctx context.Context, channel string, chunk *responses.ResponseChunk) error
Publish sends a response chunk to all subscribers of the given channel.
func (*RedisStreamBroker) Subscribe ¶
func (b *RedisStreamBroker) Subscribe(ctx context.Context, channel string) (<-chan *responses.ResponseChunk, error)
Subscribe returns a channel that receives response chunks for the given channel.
type RedisStreamBrokerOptions ¶
type RedisStreamBrokerOptions struct {
// Addr is the Redis server address (e.g., "localhost:6379").
Addr string
// Password is the Redis password (optional).
Password string
// DB is the Redis database number (default 0).
DB int
// Prefix is prepended to all channel names (default "uno:stream:").
// This allows multiple applications to share the same Redis instance.
Prefix string
// Client is an existing Redis client to use instead of creating a new one.
// If provided, Addr/Password/DB are ignored.
Client *redis.Client
}
RedisStreamBrokerOptions configures the Redis stream broker.