sinks

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 27, 2026 License: MIT Imports: 17 Imported by: 0

Documentation

Overview

Classification helpers for the sink retry loop. The default IsTransient implementation recognises common network / protocol "blip" errors that are worth retrying with backoff. Sinks may also wrap an error in a value that implements the transientHinter interface to override the heuristic.

Package sinks defines the small Sink interface that publishers can use when a realtime channel has no live subscriber and a message must be delivered out-of-band. Parsec ships three reference sinks (email, slack, webhook); embedders can register their own.

Sinks are intentionally minimal — they accept a typed Message and a Recipient and either succeed or return a coded error. Backpressure, retry, and queueing are the embedder's responsibility.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FactoryKinds

func FactoryKinds() []string

FactoryKinds returns every registered kind for diagnostic + manifest purposes. Order is unspecified.

func IsTransient

func IsTransient(err error) bool

IsTransient returns true when err looks like a "try again" failure.

Decision tree:

  1. err == nil → false
  2. context.Canceled / context.DeadlineExceeded → false (terminal, abort).
  3. an unwrapped value that implements Transient() bool → use its answer.
  4. *url.Error / net.OpError / net.Error.Timeout → true (network blip).
  5. ECONNREFUSED / ECONNRESET / EPIPE → true.
  6. *errors.Error with code SinkUnavailable and a recognisable HTTP/SMTP transient prefix in the message → true.
  7. anything else → false.

func RegisterFactory

func RegisterFactory(kind string, f Factory)

RegisterFactory associates kind with a factory. Calling twice for the same kind overrides; intended for tests. Built-in sinks call this from their package init() so importing the sub-package is enough.

func Terminal

func Terminal(err error) error

Terminal marks err as not worth retrying.

func Transient

func Transient(err error) error

Transient marks err as worth retrying. Preserves wrapped *errors.Error codes so RPC mapping continues to work.

Types

type DLQ

type DLQ interface {
	// Push records item. The implementation assigns item.ID.
	Push(ctx context.Context, item DLQItem) error
	// Items returns up to limit items for sink, newest-first.
	Items(ctx context.Context, sink string, limit int) ([]DLQItem, error)
	// Count returns the live size of the DLQ for sink. -1 means "unknown".
	Count(ctx context.Context, sink string) (int, error)
	// Discard removes id. Missing ids are not an error.
	Discard(ctx context.Context, id string) error
	// Replay re-runs id through the original sink. Backends look the sink
	// up via the registry attached at construction time. Implementations
	// MUST NOT remove the item on a successful replay — replay through a
	// Retrier may push the item back into the DLQ; the operator decides
	// what to do next.
	Replay(ctx context.Context, id string) error
}

DLQ is the persistence surface a Retrier uses when retries are exhausted (or when a terminal error is observed immediately). Implementations must be safe for concurrent use.

type DLQItem

type DLQItem struct {
	// ID is assigned by the DLQ implementation on Push.
	ID string `json:"id"`
	// Sink is the registered name of the sink that owns this item.
	Sink string `json:"sink"`
	// At is the wall-clock instant the failure was recorded.
	At time.Time `json:"at"`
	// Recipient is the typed-per-sink target. Persisted as JSON; consumers
	// that need the original Go type should call sink-specific decoders.
	Recipient Recipient `json:"recipient,omitempty"`
	// Message is the payload that failed to deliver.
	Message Message `json:"message"`
	// Attempts is how many Send calls were made before giving up.
	Attempts int `json:"attempts"`
	// LastError is the stringified final error.
	LastError string `json:"last_error"`
}

DLQItem is one failed-delivery record. Recipient is kept as `any` because each sink defines its own concrete recipient shape; persistence layers must round-trip through JSON so RegistryLookup can dispatch back to the original sink during Replay.

type Factory

type Factory func(name string, raw map[string]any) (Sink, error)

Factory builds a Sink instance from a YAML config block keyed by user- chosen name (which becomes the sink's Name()) and a raw map of the remaining config fields. Each built-in sink package registers a factory via its init() so that simply importing the package wires up the factory.

func LookupFactory

func LookupFactory(kind string) (Factory, bool)

LookupFactory returns the factory for kind, or false if unregistered.

type MemoryDLQ

type MemoryDLQ struct {
	// contains filtered or unexported fields
}

MemoryDLQ is an in-process DLQ implementation backed by a slice + mutex. It is the default when Options.RedisClient is nil. Persistence is not promised — a process restart loses everything in the queue.

func NewMemoryDLQ

func NewMemoryDLQ() *MemoryDLQ

NewMemoryDLQ returns an empty in-process DLQ.

func (*MemoryDLQ) Count

func (m *MemoryDLQ) Count(_ context.Context, sink string) (int, error)

Count returns the size of the DLQ for sink.

func (*MemoryDLQ) Discard

func (m *MemoryDLQ) Discard(_ context.Context, id string) error

Discard removes the item with id. Missing ids are no-ops.

func (*MemoryDLQ) Items

func (m *MemoryDLQ) Items(_ context.Context, sink string, limit int) ([]DLQItem, error)

Items returns up to limit items for sink, newest first.

func (*MemoryDLQ) Push

func (m *MemoryDLQ) Push(_ context.Context, item DLQItem) error

Push appends item. ID is assigned if empty.

func (*MemoryDLQ) Replay

func (m *MemoryDLQ) Replay(ctx context.Context, id string) error

Replay looks up the original sink and re-runs Send. The DLQ entry stays in the queue — if the replay fails through a Retrier it will land back here as a new item.

func (*MemoryDLQ) SetRegistryLookup

func (m *MemoryDLQ) SetRegistryLookup(fn RegistryLookup)

SetRegistryLookup wires the Replay callback so a stored item can resolve back to its sink. parsec.New calls this after assembling the registry.

type Message

type Message struct {
	Subject string
	Body    string
	// Metadata is attached verbatim by sinks that support it (e.g. slack
	// blocks, webhook headers, email X-* headers).
	Metadata map[string]string
}

Message is what a sink delivers. Body is interpreted by the sink (HTML for email, mrkdwn for slack, JSON for webhook).

type Recipient

type Recipient any

Recipient is the per-call target. The shape varies per sink — see each sink package for the concrete type it expects.

type RecipientDecoder

type RecipientDecoder interface {
	DecodeRecipient(raw []byte) (Recipient, error)
}

RecipientDecoder is an optional capability a Sink (or its inner sink when wrapped by Retrier) can implement so the Redis DLQ can rebuild the typed Recipient from its persisted JSON form during Replay. Sinks that store state purely in their Config (e.g. slack — no per-call recipient) can return nil from DecodeRecipient.

Reference sinks email and webhook implement this; slack returns nil.

type RedisDLQ

type RedisDLQ struct {
	// contains filtered or unexported fields
}

RedisDLQ persists DLQ items as Redis Stream entries. One stream per sink, keyed by `<prefix>:dlq:<sink>`. IDs are the native stream IDs so that Discard / Replay can route directly via XRANGE / XDEL.

Operators tune retention via MaxLen (XADD MAXLEN ~).

func NewRedisDLQ

func NewRedisDLQ(client redis.UniversalClient) *RedisDLQ

NewRedisDLQ constructs a Redis-backed DLQ. The default key prefix is "parsec" and the default MAXLEN cap is 10000 entries per sink stream.

func (*RedisDLQ) Count

func (d *RedisDLQ) Count(ctx context.Context, sink string) (int, error)

Count returns XLEN for sink, or the sum across all known streams when sink is empty.

func (*RedisDLQ) Discard

func (d *RedisDLQ) Discard(ctx context.Context, id string) error

Discard removes id from its stream. We resolve the stream by scanning every known sink stream — Redis IDs encode `<ms>-<seq>` and do not embed the sink, so a per-sink lookup is unavoidable. Memory cost is O(#known sinks), which is small in practice.

func (*RedisDLQ) Items

func (d *RedisDLQ) Items(ctx context.Context, sink string, limit int) ([]DLQItem, error)

Items returns up to limit newest-first DLQItems for sink. If sink is empty, items across every known stream are merged.

func (*RedisDLQ) Push

func (d *RedisDLQ) Push(ctx context.Context, item DLQItem) error

Push XADDs the encoded item. The returned ID is the Redis stream ID.

func (*RedisDLQ) Replay

func (d *RedisDLQ) Replay(ctx context.Context, id string) error

Replay fetches id, locates the owning sink, and re-sends. The original stream entry is not removed — replayed items that fail again will land back in the DLQ via the Retrier.

func (*RedisDLQ) SetRegistryLookup

func (d *RedisDLQ) SetRegistryLookup(fn RegistryLookup)

SetRegistryLookup attaches the Sink lookup used by Replay.

func (*RedisDLQ) WithKeyPrefix

func (d *RedisDLQ) WithKeyPrefix(p string) *RedisDLQ

WithKeyPrefix changes the Redis namespace.

func (*RedisDLQ) WithMaxLen

func (d *RedisDLQ) WithMaxLen(n int64) *RedisDLQ

WithMaxLen overrides the per-stream MAXLEN cap. <=0 disables capping.

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry is a name-to-Sink lookup table. Embedders register Sinks at boot and publishers resolve them by name.

func NewRegistry

func NewRegistry() *Registry

NewRegistry returns an empty Registry.

func (*Registry) Get

func (r *Registry) Get(name string) Sink

Get returns the sink registered as name, or nil if absent.

func (*Registry) Names

func (r *Registry) Names() []string

Names returns the registered names in arbitrary order. Used by the manifest.

func (*Registry) Register

func (r *Registry) Register(s Sink)

Register adds s under its declared name. Last write wins.

type RegistryLookup

type RegistryLookup func(name string) Sink

RegistryLookup is the function a DLQ uses to resolve a sink name back to its (already retry-wrapped) Sink for Replay. Implementations accept this via a setter so the registry doesn't have to leak into the constructor.

type Retrier

type Retrier struct {
	Inner Sink
	Cfg   RetryConfig
	DLQ   DLQ // optional; nil disables DLQ push
	// IsTransientFn overrides the default classifier. Optional.
	IsTransientFn func(error) bool
}

Retrier wraps a Sink with the retry-then-DLQ semantics described in the phase-12 spec. PublishOrSink stays synchronous: the caller blocks until the retry loop completes (success, DLQ push, or context cancellation).

The Retrier is itself a Sink — the wrapped sink's Name is preserved so callers and the registry behave identically.

func NewRetrier

func NewRetrier(inner Sink, cfg RetryConfig, dlq DLQ) *Retrier

NewRetrier wraps inner with the supplied config + DLQ.

func (*Retrier) IsRetrier

func (r *Retrier) IsRetrier() bool

IsRetrier is a marker method used by the wrap-once gate in parsec.New to avoid double-wrapping a registered sink.

func (*Retrier) Name

func (r *Retrier) Name() string

Name returns the inner sink's name. Callers must not depend on whether a sink is wrapped.

func (*Retrier) Send

func (r *Retrier) Send(ctx context.Context, to Recipient, msg Message) error

Send drives the retry loop. On terminal failure it pushes a DLQItem and returns nil (the failure has been recorded). It returns an error only when the DLQ push itself fails OR when no DLQ is configured.

func (*Retrier) Unwrap

func (r *Retrier) Unwrap() Sink

Unwrap exposes the wrapped sink so tests and tools can reach it without reflection.

type RetryConfig

type RetryConfig struct {
	// MaxAttempts is the total number of Send calls (initial + retries).
	// Default 5. A value of 1 disables retry.
	MaxAttempts int

	// BaseBackoff is the first wait after a failure. Default 1s.
	BaseBackoff time.Duration

	// MaxBackoff caps the exponential growth. Default 30s.
	MaxBackoff time.Duration

	// JitterFraction is the +/- proportion of jitter applied to each wait.
	// Default 0.2 (i.e. wait * uniform(0.8, 1.2)).
	JitterFraction float64
}

RetryConfig governs the sink retry loop. Zero values are interpreted by Normalize.

func (RetryConfig) BackoffFor

func (c RetryConfig) BackoffFor(attempt int) time.Duration

BackoffFor computes the wait before attempt (1-indexed). Attempt 1 returns 0 (no wait before the first try); attempt N returns base*2^(N-2) capped at MaxBackoff, with +/- JitterFraction multiplied in.

func (RetryConfig) Normalize

func (c RetryConfig) Normalize() RetryConfig

Normalize fills in defaults for any zero / out-of-range field. The returned value is safe to pass to a Retrier.

type Sink

type Sink interface {
	// Name is a short identifier used by the registry and the manifest.
	Name() string
	// Send delivers the message. Returns a coded error on failure.
	Send(ctx context.Context, to Recipient, m Message) error
}

Sink delivers a Message to a Recipient. Implementations must be safe for concurrent use.

func BuildSink

func BuildSink(kind, name string, raw map[string]any) (Sink, error)

BuildSink invokes the registered factory and validates the result. Returns a user-facing error if kind is unregistered or the factory rejects raw.

func WrapSink

func WrapSink(inner Sink, cfg RetryConfig, dlq DLQ) Sink

WrapSink returns inner wrapped in a Retrier when cfg.MaxAttempts > 1 or a DLQ is configured. Already-wrapped sinks are returned unchanged so the helper is idempotent.

Directories

Path Synopsis
Package email is a thin SMTP sink.
Package email is a thin SMTP sink.
Package slack is a Slack incoming-webhook sink.
Package slack is a Slack incoming-webhook sink.
Package webhook posts an arbitrary JSON envelope to a configured URL.
Package webhook posts an arbitrary JSON envelope to a configured URL.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL