Documentation
¶
Overview ¶
Package stream provides a Kafka/Redis-Streams-style durable event stream with consumer groups (shared work), at-least-once delivery, and replay.
Reading model ¶
Every Subscribe takes a (stream, group) pair. Members of the same group share work — each message is delivered to exactly one member. Different groups each get their own copy of every message. To "fan out" to many independent consumers, use a unique group name per consumer.
Delivery and idempotency ¶
Delivery is at-least-once. A message MAY be delivered more than once when:
- A handler runs longer than the configured visibility timeout. Another worker in the same group reclaims the message and runs the handler again concurrently. Both invocations may succeed.
- A handler succeeds but the backend acknowledgement fails. The message is reclaimed after the visibility timeout expires.
- A worker crashes mid-handler. The message becomes claimable again after the visibility timeout.
Make handler effects idempotent (use unique keys, upserts, or check-then-act inside a transaction). Inspect Message.Attempt to detect retries.
Retries and dead-lettering ¶
Each handler error or visibility-timeout reclaim increments the message's Attempt counter. Once Attempt would exceed MaxRetries+1 on the next reclaim, or the handler returns an error matching ErrSkipRetry, the message is moved to the DLQ stream — by default "<stream>.dead", a normal stream you can subscribe to with the same primitive.
Trim policy ¶
Streams retain everything by default. Pass WithMaxLen / WithMaxAge on Publish (or WithDefaultMaxLen / WithDefaultMaxAge on the service) to cap growth. Trim policy is reasserted on every publish (last-write-wins), matching Redis Streams' XADD MAXLEN semantics.
Starting position ¶
WithStartFrom and WithStartFromID apply only when the consumer group is first created. After that, the group's durable cursor wins; both options are ignored on subsequent Subscribe calls to the same group. Default is StartLatest — only see messages appended after the group is created.
Shutdown semantics ¶
Consumer.Close stops claiming new messages and waits up to WithShutdownTimeout for in-flight handlers. Handlers exceeding the shutdown timeout continue as orphan goroutines until they return on their own; Go provides no mechanism to interrupt a running goroutine. Their messages will be reclaimed after the visibility timeout.
stream/errors.go
Index ¶
- Constants
- Variables
- func DeadLetterStream(formatted string) string
- func FormatStream(app, ns, name string) string
- func ResolveServiceConfig(cfg *ServiceConfig)
- func ResolveSubscribeConfig(cfg *SubscribeConfig, formattedStream string)
- func SkipRetry(err error) error
- type Consumer
- type Handler
- type Message
- type Producer
- type PublishConfig
- type PublishOption
- type PublishOptionFunc
- type Service
- type ServiceConfig
- type ServiceOption
- type ServiceOptionFunc
- type StartPosition
- type SubscribeConfig
- type SubscribeOption
- func WithConcurrency(n int) SubscribeOption
- func WithDeadLetterStream(name string) SubscribeOption
- func WithMaxRetries(n int) SubscribeOption
- func WithPollInterval(d time.Duration) SubscribeOption
- func WithShutdownTimeout(d time.Duration) SubscribeOption
- func WithStartFrom(p StartPosition) SubscribeOption
- func WithStartFromID(id string) SubscribeOption
- func WithSubscribeApp(app string) SubscribeOption
- func WithSubscribeNamespace(ns string) SubscribeOption
- func WithVisibilityTimeout(d time.Duration) SubscribeOption
- type SubscribeOptionFunc
- type Subscriber
Constants ¶
const ( // DefaultAppName matches queue.DefaultAppName / pubsub.DefaultAppName so // apps can share namespacing across messaging packages. DefaultAppName = "app" // DefaultNamespace matches queue.DefaultNamespace / pubsub.DefaultNamespace. DefaultNamespace = "default" // DefaultMaxRetries matches queue.DefaultMaxRetries. With this value a // message gets up to 4 total delivery attempts (Attempt 1..4) before // being routed to the DLQ on what would be Attempt 5. DefaultMaxRetries = 3 // DefaultPollInterval is the fallback wake-up cadence for backends that // poll. Push-based wake-ups (sync.Cond, LISTEN/NOTIFY, XREADGROUP BLOCK) // preempt this where supported. DefaultPollInterval = time.Second // DefaultVisibilityTimeout is how long a claimed message stays hidden // from other workers in the same group before becoming re-claimable. DefaultVisibilityTimeout = 30 * time.Second // DefaultShutdownTimeout is how long Consumer.Close waits for in-flight // handlers before returning. DefaultShutdownTimeout = 30 * time.Second // DefaultJanitorInterval is the trim/reclaim janitor cadence used by // durable backends (sqlite, pgx, redis). The inmem backend reclaims // inline on each worker tick. DefaultJanitorInterval = 30 * time.Second )
Variables ¶
var ErrSkipRetry = errors.New("stream: skip retry")
ErrSkipRetry when returned (or wrapped) from a Handler signals that the message is permanently failed and should be routed to the dead letter stream immediately, regardless of remaining retry budget.
Functions ¶
func DeadLetterStream ¶
DeadLetterStream returns the default DLQ stream name for a given already-formatted stream name. A DLQ stream is just another stream; subscribe to it to inspect, alert on, or reprocess failed messages.
func FormatStream ¶
FormatStream formats a stream name as "<app>.<ns>.<name>". This matches queue.FormatTopic and pubsub.FormatTopic so an application can share namespace configuration across all three packages.
func ResolveServiceConfig ¶
func ResolveServiceConfig(cfg *ServiceConfig)
ResolveServiceConfig fills package defaults into any zero-valued fields on cfg. Backend constructors should call this once after applying user-supplied options.
func ResolveSubscribeConfig ¶
func ResolveSubscribeConfig(cfg *SubscribeConfig, formattedStream string)
ResolveSubscribeConfig fills package defaults for any zero-valued field in cfg. Backend implementations call this once per Subscribe after applying user options. formattedStream is the result of FormatStream applied to the caller-supplied stream name.
Types ¶
type Consumer ¶
type Consumer interface {
Close() error
}
Consumer is the lifetime handle for a subscription. Close stops claiming new messages and waits for in-flight handlers up to the configured shutdown timeout.
type Handler ¶
Handler processes a single message. Returning nil acks the message and the backend removes it from the consumer group's pending list. Returning a non-nil error leaves the message pending; it becomes available for re-claim by another worker after the configured visibility timeout. If Attempt would exceed MaxRetries+1 on the next reclaim, or the error matches ErrSkipRetry, the message is routed to the DLQ stream instead.
type Message ¶
type Message struct {
// ID is the backend-assigned identifier, monotonic per stream. Always
// non-empty when delivered to a Handler.
ID string
// Stream is the fully formatted stream name as it appears in the backend
// (after FormatStream).
Stream string
// Payload is the opaque body supplied to Publish.
Payload []byte
// Headers are the caller-supplied key-value metadata. May be nil if no
// headers were supplied.
Headers map[string]string
// CreatedAt is the time the message was first appended to the stream.
CreatedAt time.Time
// Attempt is 1 on first delivery, 2+ on every redelivery (visibility
// timeout reclaim or handler-error retry).
Attempt int
}
Message is a single entry delivered to a Handler.
type Producer ¶
type Producer interface {
Publish(ctx context.Context, stream string, payload []byte,
opts ...PublishOption) error
}
Producer publishes messages to a stream.
type PublishConfig ¶
type PublishConfig struct {
App string
Namespace string
Headers map[string]string
MaxLen int64
MaxAge time.Duration
}
PublishConfig is the resolved configuration for a single Publish call. Backends populate App, Namespace, MaxLen, MaxAge defaults from their service-level config, then apply PublishOptions to overlay caller intent.
type PublishOption ¶
type PublishOption interface {
Apply(*PublishConfig)
}
PublishOption configures a single Publish call.
func WithHeaders ¶
func WithHeaders(h map[string]string) PublishOption
WithHeaders attaches a metadata map to the message. The map is shallow- copied by the backend; nil is allowed and equivalent to "no headers".
func WithMaxAge ¶
func WithMaxAge(d time.Duration) PublishOption
WithMaxAge overrides the service-level DefaultMaxAge for this publish. Zero means "no trim" for this call.
func WithMaxLen ¶
func WithMaxLen(n int64) PublishOption
WithMaxLen overrides the service-level DefaultMaxLen for this publish. Zero means "no trim" for this call. Trim policy is reasserted by every publisher (last-write-wins per Redis Streams XADD MAXLEN semantics).
func WithPublishApp ¶
func WithPublishApp(app string) PublishOption
WithPublishApp overrides the topic app prefix for this publish only.
func WithPublishNamespace ¶
func WithPublishNamespace(ns string) PublishOption
WithPublishNamespace overrides the topic namespace for this publish only.
type PublishOptionFunc ¶
type PublishOptionFunc func(*PublishConfig)
PublishOptionFunc adapts a function to PublishOption.
func (PublishOptionFunc) Apply ¶
func (f PublishOptionFunc) Apply(cfg *PublishConfig)
Apply calls f(cfg).
type Service ¶
type Service interface {
Producer
Subscriber
// Close releases backend resources and shuts down all live consumers.
// Safe to call once; subsequent calls return nil.
Close(ctx context.Context) error
}
Service is the union of Producer and Subscriber plus a backend Close. Backend implementations expose this as their public type.
type ServiceConfig ¶
type ServiceConfig struct {
App string
Namespace string
DefaultMaxLen int64
DefaultMaxAge time.Duration
}
ServiceConfig is the service-level configuration shared by all Publish and Subscribe calls on a Service.
type ServiceOption ¶
type ServiceOption interface {
Apply(*ServiceConfig)
}
ServiceOption configures a backend constructor.
func WithApp ¶
func WithApp(app string) ServiceOption
WithApp sets the service-level app name. Empty string is ignored so callers can pass values from optional config without clobbering presets.
func WithDefaultMaxAge ¶
func WithDefaultMaxAge(d time.Duration) ServiceOption
WithDefaultMaxAge sets the default per-publish MaxAge used when a Publish call doesn't provide one. Zero means no trimming by age.
func WithDefaultMaxLen ¶
func WithDefaultMaxLen(n int64) ServiceOption
WithDefaultMaxLen sets the default per-publish MaxLen used when a Publish call doesn't provide one. Zero means no trimming by length.
func WithNamespace ¶
func WithNamespace(ns string) ServiceOption
WithNamespace sets the service-level namespace. Empty string is ignored.
type ServiceOptionFunc ¶
type ServiceOptionFunc func(*ServiceConfig)
ServiceOptionFunc adapts a function to ServiceOption.
func (ServiceOptionFunc) Apply ¶
func (f ServiceOptionFunc) Apply(cfg *ServiceConfig)
Apply calls f(cfg).
type StartPosition ¶
type StartPosition int8
StartPosition controls where a brand-new consumer group starts reading from. Has no effect when the group already exists; the durable cursor wins. The zero value is StartLatest.
const ( // StartLatest sets the new group's cursor to the highest message ID at // the time of group creation. Group sees only messages appended after // this point. Default; matches Redis Streams' "$" and Kafka's // auto.offset.reset=latest. StartLatest StartPosition = iota // StartEarliest sets the new group's cursor to "before any message" so // the group replays the entire current backlog. Messages already // trimmed by MaxLen/MaxAge are gone — there is no recovering them. StartEarliest )
type SubscribeConfig ¶
type SubscribeConfig struct {
App string
Namespace string
StartFrom StartPosition
StartFromID string // overrides StartFrom when non-empty
Concurrency int
MaxRetries int
PollInterval time.Duration
VisibilityTimeout time.Duration
ShutdownTimeout time.Duration
// DeadLetterStream is the stream that exhausted messages are routed
// to. Zero value means "<formatted-stream>.dead".
DeadLetterStream string
}
SubscribeConfig is the resolved configuration for a single Subscribe call.
type SubscribeOption ¶
type SubscribeOption interface {
Apply(*SubscribeConfig)
}
SubscribeOption configures a single Subscribe call.
func WithConcurrency ¶
func WithConcurrency(n int) SubscribeOption
WithConcurrency sets the number of worker goroutines for this consumer. Default 1. Values < 1 clamp to 1.
func WithDeadLetterStream ¶
func WithDeadLetterStream(name string) SubscribeOption
WithDeadLetterStream sets a custom DLQ stream. Default "<formatted-stream>.dead".
func WithMaxRetries ¶
func WithMaxRetries(n int) SubscribeOption
WithMaxRetries sets how many retries are allowed before routing to the DLQ. Default 3 (= 4 total attempts). -1 means infinite retries.
func WithPollInterval ¶
func WithPollInterval(d time.Duration) SubscribeOption
WithPollInterval sets the fallback wake-up cadence for backends that poll. Push wake-ups (sync.Cond, NOTIFY, XREADGROUP BLOCK) preempt this.
func WithShutdownTimeout ¶
func WithShutdownTimeout(d time.Duration) SubscribeOption
WithShutdownTimeout sets how long Consumer.Close waits for in-flight handlers before returning.
func WithStartFrom ¶
func WithStartFrom(p StartPosition) SubscribeOption
WithStartFrom controls where a brand-new consumer group starts reading. Has no effect when the group already exists.
func WithStartFromID ¶
func WithStartFromID(id string) SubscribeOption
WithStartFromID sets a brand-new group's cursor to a specific message ID. Group sees messages with ID strictly greater than id. Overrides WithStartFrom. Has no effect when the group already exists.
func WithSubscribeApp ¶
func WithSubscribeApp(app string) SubscribeOption
WithSubscribeApp overrides the topic app prefix for this subscription.
func WithSubscribeNamespace ¶
func WithSubscribeNamespace(ns string) SubscribeOption
WithSubscribeNamespace overrides the topic namespace for this subscription.
func WithVisibilityTimeout ¶
func WithVisibilityTimeout(d time.Duration) SubscribeOption
WithVisibilityTimeout sets how long a claimed message stays hidden from other workers in the same group. Also acts as the implicit retry delay when a handler returns an error. Handlers exceeding this timeout will see their message reclaimed by another worker.
type SubscribeOptionFunc ¶
type SubscribeOptionFunc func(*SubscribeConfig)
SubscribeOptionFunc adapts a function to SubscribeOption.
func (SubscribeOptionFunc) Apply ¶
func (f SubscribeOptionFunc) Apply(cfg *SubscribeConfig)
Apply calls f(cfg).
type Subscriber ¶
type Subscriber interface {
Subscribe(ctx context.Context, stream, group string, h Handler,
opts ...SubscribeOption) (Consumer, error)
}
Subscriber registers a Handler against a (stream, consumer-group) pair. Members of the same group share work — each message is delivered to exactly one member.
Directories
¶
| Path | Synopsis |
|---|---|
|
Package inmem provides an in-process stream.Service implementation for tests and single-process development.
|
Package inmem provides an in-process stream.Service implementation for tests and single-process development. |
|
Package pgx provides a distributed stream.Service backed by Postgres.
|
Package pgx provides a distributed stream.Service backed by Postgres. |
|
Package redis provides a distributed stream.Service backed by Redis Streams.
|
Package redis provides a distributed stream.Service backed by Redis Streams. |
|
Package sqlite provides a durable, embedded stream.Service backed by SQLite.
|
Package sqlite provides a durable, embedded stream.Service backed by SQLite. |
|
Package streamtest provides a backend-agnostic conformance test suite for stream.Service implementations.
|
Package streamtest provides a backend-agnostic conformance test suite for stream.Service implementations. |