stream

package
v0.11.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 18, 2026 License: Apache-2.0 Imports: 4 Imported by: 0

Documentation

Overview

Package stream provides a Kafka/Redis-Streams-style durable event stream with consumer groups (shared work), at-least-once delivery, and replay.

Reading model

Every Subscribe takes a (stream, group) pair. Members of the same group share work — each message is delivered to exactly one member. Different groups each get their own copy of every message. To "fan out" to many independent consumers, use a unique group name per consumer.

Delivery and idempotency

Delivery is at-least-once. A message MAY be delivered more than once when:

  • A handler runs longer than the configured visibility timeout. Another worker in the same group reclaims the message and runs the handler again concurrently. Both invocations may succeed.
  • A handler succeeds but the backend acknowledgement fails. The message is reclaimed after the visibility timeout expires.
  • A worker crashes mid-handler. The message becomes claimable again after the visibility timeout.

Make handler effects idempotent (use unique keys, upserts, or check-then-act inside a transaction). Inspect Message.Attempt to detect retries.

Retries and dead-lettering

Each handler error or visibility-timeout reclaim increments the message's Attempt counter. Once Attempt would exceed MaxRetries+1 on the next reclaim, or the handler returns an error matching ErrSkipRetry, the message is moved to the DLQ stream — by default "<stream>.dead", a normal stream you can subscribe to with the same primitive.

Trim policy

Streams retain everything by default. Pass WithMaxLen / WithMaxAge on Publish (or WithDefaultMaxLen / WithDefaultMaxAge on the service) to cap growth. Trim policy is reasserted on every publish (last-write-wins), matching Redis Streams' XADD MAXLEN semantics.

Starting position

WithStartFrom and WithStartFromID apply only when the consumer group is first created. After that, the group's durable cursor wins; both options are ignored on subsequent Subscribe calls to the same group. Default is StartLatest — only see messages appended after the group is created.

Shutdown semantics

Consumer.Close stops claiming new messages and waits up to WithShutdownTimeout for in-flight handlers. Handlers exceeding the shutdown timeout continue as orphan goroutines until they return on their own; Go provides no mechanism to interrupt a running goroutine. Their messages will be reclaimed after the visibility timeout.

stream/errors.go

Index

Constants

View Source
const (
	// DefaultAppName matches queue.DefaultAppName / pubsub.DefaultAppName so
	// apps can share namespacing across messaging packages.
	DefaultAppName = "app"
	// DefaultNamespace matches queue.DefaultNamespace / pubsub.DefaultNamespace.
	DefaultNamespace = "default"

	// DefaultMaxRetries matches queue.DefaultMaxRetries. With this value a
	// message gets up to 4 total delivery attempts (Attempt 1..4) before
	// being routed to the DLQ on what would be Attempt 5.
	DefaultMaxRetries = 3

	// DefaultPollInterval is the fallback wake-up cadence for backends that
	// poll. Push-based wake-ups (sync.Cond, LISTEN/NOTIFY, XREADGROUP BLOCK)
	// preempt this where supported.
	DefaultPollInterval = time.Second

	// DefaultVisibilityTimeout is how long a claimed message stays hidden
	// from other workers in the same group before becoming re-claimable.
	DefaultVisibilityTimeout = 30 * time.Second

	// DefaultShutdownTimeout is how long Consumer.Close waits for in-flight
	// handlers before returning.
	DefaultShutdownTimeout = 30 * time.Second

	// DefaultJanitorInterval is the trim/reclaim janitor cadence used by
	// durable backends (sqlite, pgx, redis). The inmem backend reclaims
	// inline on each worker tick.
	DefaultJanitorInterval = 30 * time.Second
)

Variables

View Source
var ErrSkipRetry = errors.New("stream: skip retry")

ErrSkipRetry when returned (or wrapped) from a Handler signals that the message is permanently failed and should be routed to the dead letter stream immediately, regardless of remaining retry budget.

Functions

func DeadLetterStream

func DeadLetterStream(formatted string) string

DeadLetterStream returns the default DLQ stream name for a given already-formatted stream name. A DLQ stream is just another stream; subscribe to it to inspect, alert on, or reprocess failed messages.

func FormatStream

func FormatStream(app, ns, name string) string

FormatStream formats a stream name as "<app>.<ns>.<name>". This matches queue.FormatTopic and pubsub.FormatTopic so an application can share namespace configuration across all three packages.

func ResolveServiceConfig

func ResolveServiceConfig(cfg *ServiceConfig)

ResolveServiceConfig fills package defaults into any zero-valued fields on cfg. Backend constructors should call this once after applying user-supplied options.

func ResolveSubscribeConfig

func ResolveSubscribeConfig(cfg *SubscribeConfig, formattedStream string)

ResolveSubscribeConfig fills package defaults for any zero-valued field in cfg. Backend implementations call this once per Subscribe after applying user options. formattedStream is the result of FormatStream applied to the caller-supplied stream name.

func SkipRetry

func SkipRetry(err error) error

SkipRetry wraps err so it satisfies errors.Is(err, ErrSkipRetry) while still preserving the wrapped error chain. Returning stream.SkipRetry(myErr) from a handler routes the message to the DLQ on this attempt.

Types

type Consumer

type Consumer interface {
	Close() error
}

Consumer is the lifetime handle for a subscription. Close stops claiming new messages and waits for in-flight handlers up to the configured shutdown timeout.

type Handler

type Handler func(ctx context.Context, msg *Message) error

Handler processes a single message. Returning nil acks the message and the backend removes it from the consumer group's pending list. Returning a non-nil error leaves the message pending; it becomes available for re-claim by another worker after the configured visibility timeout. If Attempt would exceed MaxRetries+1 on the next reclaim, or the error matches ErrSkipRetry, the message is routed to the DLQ stream instead.

type Message

type Message struct {
	// ID is the backend-assigned identifier, monotonic per stream. Always
	// non-empty when delivered to a Handler.
	ID string
	// Stream is the fully formatted stream name as it appears in the backend
	// (after FormatStream).
	Stream string
	// Payload is the opaque body supplied to Publish.
	Payload []byte
	// Headers are the caller-supplied key-value metadata. May be nil if no
	// headers were supplied.
	Headers map[string]string
	// CreatedAt is the time the message was first appended to the stream.
	CreatedAt time.Time
	// Attempt is 1 on first delivery, 2+ on every redelivery (visibility
	// timeout reclaim or handler-error retry).
	Attempt int
}

Message is a single entry delivered to a Handler.

type Producer

type Producer interface {
	Publish(ctx context.Context, stream string, payload []byte,
		opts ...PublishOption) error
}

Producer publishes messages to a stream.

type PublishConfig

type PublishConfig struct {
	App       string
	Namespace string
	Headers   map[string]string
	MaxLen    int64
	MaxAge    time.Duration
}

PublishConfig is the resolved configuration for a single Publish call. Backends populate App, Namespace, MaxLen, MaxAge defaults from their service-level config, then apply PublishOptions to overlay caller intent.

type PublishOption

type PublishOption interface {
	Apply(*PublishConfig)
}

PublishOption configures a single Publish call.

func WithHeaders

func WithHeaders(h map[string]string) PublishOption

WithHeaders attaches a metadata map to the message. The map is shallow- copied by the backend; nil is allowed and equivalent to "no headers".

func WithMaxAge

func WithMaxAge(d time.Duration) PublishOption

WithMaxAge overrides the service-level DefaultMaxAge for this publish. Zero means "no trim" for this call.

func WithMaxLen

func WithMaxLen(n int64) PublishOption

WithMaxLen overrides the service-level DefaultMaxLen for this publish. Zero means "no trim" for this call. Trim policy is reasserted by every publisher (last-write-wins per Redis Streams XADD MAXLEN semantics).

func WithPublishApp

func WithPublishApp(app string) PublishOption

WithPublishApp overrides the topic app prefix for this publish only.

func WithPublishNamespace

func WithPublishNamespace(ns string) PublishOption

WithPublishNamespace overrides the topic namespace for this publish only.

type PublishOptionFunc

type PublishOptionFunc func(*PublishConfig)

PublishOptionFunc adapts a function to PublishOption.

func (PublishOptionFunc) Apply

func (f PublishOptionFunc) Apply(cfg *PublishConfig)

Apply calls f(cfg).

type Service

type Service interface {
	Producer
	Subscriber
	// Close releases backend resources and shuts down all live consumers.
	// Safe to call once; subsequent calls return nil.
	Close(ctx context.Context) error
}

Service is the union of Producer and Subscriber plus a backend Close. Backend implementations expose this as their public type.

type ServiceConfig

type ServiceConfig struct {
	App           string
	Namespace     string
	DefaultMaxLen int64
	DefaultMaxAge time.Duration
}

ServiceConfig is the service-level configuration shared by all Publish and Subscribe calls on a Service.

type ServiceOption

type ServiceOption interface {
	Apply(*ServiceConfig)
}

ServiceOption configures a backend constructor.

func WithApp

func WithApp(app string) ServiceOption

WithApp sets the service-level app name. Empty string is ignored so callers can pass values from optional config without clobbering presets.

func WithDefaultMaxAge

func WithDefaultMaxAge(d time.Duration) ServiceOption

WithDefaultMaxAge sets the default per-publish MaxAge used when a Publish call doesn't provide one. Zero means no trimming by age.

func WithDefaultMaxLen

func WithDefaultMaxLen(n int64) ServiceOption

WithDefaultMaxLen sets the default per-publish MaxLen used when a Publish call doesn't provide one. Zero means no trimming by length.

func WithNamespace

func WithNamespace(ns string) ServiceOption

WithNamespace sets the service-level namespace. Empty string is ignored.

type ServiceOptionFunc

type ServiceOptionFunc func(*ServiceConfig)

ServiceOptionFunc adapts a function to ServiceOption.

func (ServiceOptionFunc) Apply

func (f ServiceOptionFunc) Apply(cfg *ServiceConfig)

Apply calls f(cfg).

type StartPosition

type StartPosition int8

StartPosition controls where a brand-new consumer group starts reading from. Has no effect when the group already exists; the durable cursor wins. The zero value is StartLatest.

const (
	// StartLatest sets the new group's cursor to the highest message ID at
	// the time of group creation. Group sees only messages appended after
	// this point. Default; matches Redis Streams' "$" and Kafka's
	// auto.offset.reset=latest.
	StartLatest StartPosition = iota
	// StartEarliest sets the new group's cursor to "before any message" so
	// the group replays the entire current backlog. Messages already
	// trimmed by MaxLen/MaxAge are gone — there is no recovering them.
	StartEarliest
)

type SubscribeConfig

type SubscribeConfig struct {
	App       string
	Namespace string

	StartFrom   StartPosition
	StartFromID string // overrides StartFrom when non-empty

	Concurrency       int
	MaxRetries        int
	PollInterval      time.Duration
	VisibilityTimeout time.Duration
	ShutdownTimeout   time.Duration

	// DeadLetterStream is the stream that exhausted messages are routed
	// to. Zero value means "<formatted-stream>.dead".
	DeadLetterStream string
}

SubscribeConfig is the resolved configuration for a single Subscribe call.

type SubscribeOption

type SubscribeOption interface {
	Apply(*SubscribeConfig)
}

SubscribeOption configures a single Subscribe call.

func WithConcurrency

func WithConcurrency(n int) SubscribeOption

WithConcurrency sets the number of worker goroutines for this consumer. Default 1. Values < 1 clamp to 1.

func WithDeadLetterStream

func WithDeadLetterStream(name string) SubscribeOption

WithDeadLetterStream sets a custom DLQ stream. Default "<formatted-stream>.dead".

func WithMaxRetries

func WithMaxRetries(n int) SubscribeOption

WithMaxRetries sets how many retries are allowed before routing to the DLQ. Default 3 (= 4 total attempts). -1 means infinite retries.

func WithPollInterval

func WithPollInterval(d time.Duration) SubscribeOption

WithPollInterval sets the fallback wake-up cadence for backends that poll. Push wake-ups (sync.Cond, NOTIFY, XREADGROUP BLOCK) preempt this.

func WithShutdownTimeout

func WithShutdownTimeout(d time.Duration) SubscribeOption

WithShutdownTimeout sets how long Consumer.Close waits for in-flight handlers before returning.

func WithStartFrom

func WithStartFrom(p StartPosition) SubscribeOption

WithStartFrom controls where a brand-new consumer group starts reading. Has no effect when the group already exists.

func WithStartFromID

func WithStartFromID(id string) SubscribeOption

WithStartFromID sets a brand-new group's cursor to a specific message ID. Group sees messages with ID strictly greater than id. Overrides WithStartFrom. Has no effect when the group already exists.

func WithSubscribeApp

func WithSubscribeApp(app string) SubscribeOption

WithSubscribeApp overrides the topic app prefix for this subscription.

func WithSubscribeNamespace

func WithSubscribeNamespace(ns string) SubscribeOption

WithSubscribeNamespace overrides the topic namespace for this subscription.

func WithVisibilityTimeout

func WithVisibilityTimeout(d time.Duration) SubscribeOption

WithVisibilityTimeout sets how long a claimed message stays hidden from other workers in the same group. Also acts as the implicit retry delay when a handler returns an error. Handlers exceeding this timeout will see their message reclaimed by another worker.

type SubscribeOptionFunc

type SubscribeOptionFunc func(*SubscribeConfig)

SubscribeOptionFunc adapts a function to SubscribeOption.

func (SubscribeOptionFunc) Apply

func (f SubscribeOptionFunc) Apply(cfg *SubscribeConfig)

Apply calls f(cfg).

type Subscriber

type Subscriber interface {
	Subscribe(ctx context.Context, stream, group string, h Handler,
		opts ...SubscribeOption) (Consumer, error)
}

Subscriber registers a Handler against a (stream, consumer-group) pair. Members of the same group share work — each message is delivered to exactly one member.

Directories

Path Synopsis
Package inmem provides an in-process stream.Service implementation for tests and single-process development.
Package inmem provides an in-process stream.Service implementation for tests and single-process development.
Package pgx provides a distributed stream.Service backed by Postgres.
Package pgx provides a distributed stream.Service backed by Postgres.
Package redis provides a distributed stream.Service backed by Redis Streams.
Package redis provides a distributed stream.Service backed by Redis Streams.
Package sqlite provides a durable, embedded stream.Service backed by SQLite.
Package sqlite provides a durable, embedded stream.Service backed by SQLite.
Package streamtest provides a backend-agnostic conformance test suite for stream.Service implementations.
Package streamtest provides a backend-agnostic conformance test suite for stream.Service implementations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL