Documentation
¶
Overview ¶
Classification helpers for the sink retry loop. The default IsTransient implementation recognises common network / protocol "blip" errors that are worth retrying with backoff. Sinks may also wrap an error in a value that implements the transientHinter interface to override the heuristic.
Package sinks defines the small Sink interface that publishers can use when a realtime channel has no live subscriber and a message must be delivered out-of-band. Parsec ships three reference sinks (email, slack, webhook); embedders can register their own.
Sinks are intentionally minimal — they accept a typed Message and a Recipient and either succeed or return a coded error. Backpressure, retry, and queueing are the embedder's responsibility.
Index ¶
- func FactoryKinds() []string
- func IsTransient(err error) bool
- func RegisterFactory(kind string, f Factory)
- func Terminal(err error) error
- func Transient(err error) error
- type DLQ
- type DLQItem
- type Factory
- type MemoryDLQ
- func (m *MemoryDLQ) Count(_ context.Context, sink string) (int, error)
- func (m *MemoryDLQ) Discard(_ context.Context, id string) error
- func (m *MemoryDLQ) Items(_ context.Context, sink string, limit int) ([]DLQItem, error)
- func (m *MemoryDLQ) Push(_ context.Context, item DLQItem) error
- func (m *MemoryDLQ) Replay(ctx context.Context, id string) error
- func (m *MemoryDLQ) SetRegistryLookup(fn RegistryLookup)
- type Message
- type Recipient
- type RecipientDecoder
- type RedisDLQ
- func (d *RedisDLQ) Count(ctx context.Context, sink string) (int, error)
- func (d *RedisDLQ) Discard(ctx context.Context, id string) error
- func (d *RedisDLQ) Items(ctx context.Context, sink string, limit int) ([]DLQItem, error)
- func (d *RedisDLQ) Push(ctx context.Context, item DLQItem) error
- func (d *RedisDLQ) Replay(ctx context.Context, id string) error
- func (d *RedisDLQ) SetRegistryLookup(fn RegistryLookup)
- func (d *RedisDLQ) WithKeyPrefix(p string) *RedisDLQ
- func (d *RedisDLQ) WithMaxLen(n int64) *RedisDLQ
- type Registry
- type RegistryLookup
- type Retrier
- type RetryConfig
- type Sink
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FactoryKinds ¶
func FactoryKinds() []string
FactoryKinds returns every registered kind for diagnostic + manifest purposes. Order is unspecified.
func IsTransient ¶
IsTransient returns true when err looks like a "try again" failure.
Decision tree:
- err == nil → false
- context.Canceled / context.DeadlineExceeded → false (terminal, abort).
- an unwrapped value that implements Transient() bool → use its answer.
- *url.Error / net.OpError / net.Error.Timeout → true (network blip).
- ECONNREFUSED / ECONNRESET / EPIPE → true.
- *errors.Error with code SinkUnavailable and a recognisable HTTP/SMTP transient prefix in the message → true.
- anything else → false.
func RegisterFactory ¶
RegisterFactory associates kind with a factory. Calling twice for the same kind overrides; intended for tests. Built-in sinks call this from their package init() so importing the sub-package is enough.
Types ¶
type DLQ ¶
type DLQ interface {
// Push records item. The implementation assigns item.ID.
Push(ctx context.Context, item DLQItem) error
// Items returns up to limit items for sink, newest-first.
Items(ctx context.Context, sink string, limit int) ([]DLQItem, error)
// Count returns the live size of the DLQ for sink. -1 means "unknown".
Count(ctx context.Context, sink string) (int, error)
// Discard removes id. Missing ids are not an error.
Discard(ctx context.Context, id string) error
// Replay re-runs id through the original sink. Backends look the sink
// up via the registry attached at construction time. Implementations
// MUST NOT remove the item on a successful replay — replay through a
// Retrier may push the item back into the DLQ; the operator decides
// what to do next.
Replay(ctx context.Context, id string) error
}
DLQ is the persistence surface a Retrier uses when retries are exhausted (or when a terminal error is observed immediately). Implementations must be safe for concurrent use.
type DLQItem ¶
type DLQItem struct {
// ID is assigned by the DLQ implementation on Push.
ID string `json:"id"`
// Sink is the registered name of the sink that owns this item.
Sink string `json:"sink"`
// At is the wall-clock instant the failure was recorded.
At time.Time `json:"at"`
// Recipient is the typed-per-sink target. Persisted as JSON; consumers
// that need the original Go type should call sink-specific decoders.
Recipient Recipient `json:"recipient,omitempty"`
// Message is the payload that failed to deliver.
Message Message `json:"message"`
// Attempts is how many Send calls were made before giving up.
Attempts int `json:"attempts"`
// LastError is the stringified final error.
LastError string `json:"last_error"`
}
DLQItem is one failed-delivery record. Recipient is kept as `any` because each sink defines its own concrete recipient shape; persistence layers must round-trip through JSON so RegistryLookup can dispatch back to the original sink during Replay.
type Factory ¶
Factory builds a Sink instance from a YAML config block keyed by user- chosen name (which becomes the sink's Name()) and a raw map of the remaining config fields. Each built-in sink package registers a factory via its init() so that simply importing the package wires up the factory.
func LookupFactory ¶
LookupFactory returns the factory for kind, or false if unregistered.
type MemoryDLQ ¶
type MemoryDLQ struct {
// contains filtered or unexported fields
}
MemoryDLQ is an in-process DLQ implementation backed by a slice + mutex. It is the default when Options.RedisClient is nil. Persistence is not promised — a process restart loses everything in the queue.
func (*MemoryDLQ) Replay ¶
Replay looks up the original sink and re-runs Send. The DLQ entry stays in the queue — if the replay fails through a Retrier it will land back here as a new item.
func (*MemoryDLQ) SetRegistryLookup ¶
func (m *MemoryDLQ) SetRegistryLookup(fn RegistryLookup)
SetRegistryLookup wires the Replay callback so a stored item can resolve back to its sink. parsec.New calls this after assembling the registry.
type Message ¶
type Message struct {
Subject string
Body string
// Metadata is attached verbatim by sinks that support it (e.g. slack
// blocks, webhook headers, email X-* headers).
Metadata map[string]string
}
Message is what a sink delivers. Body is interpreted by the sink (HTML for email, mrkdwn for slack, JSON for webhook).
type Recipient ¶
type Recipient any
Recipient is the per-call target. The shape varies per sink — see each sink package for the concrete type it expects.
type RecipientDecoder ¶
RecipientDecoder is an optional capability a Sink (or its inner sink when wrapped by Retrier) can implement so the Redis DLQ can rebuild the typed Recipient from its persisted JSON form during Replay. Sinks that store state purely in their Config (e.g. slack — no per-call recipient) can return nil from DecodeRecipient.
Reference sinks email and webhook implement this; slack returns nil.
type RedisDLQ ¶
type RedisDLQ struct {
// contains filtered or unexported fields
}
RedisDLQ persists DLQ items as Redis Stream entries. One stream per sink, keyed by `<prefix>:dlq:<sink>`. IDs are the native stream IDs so that Discard / Replay can route directly via XRANGE / XDEL.
Operators tune retention via MaxLen (XADD MAXLEN ~).
func NewRedisDLQ ¶
func NewRedisDLQ(client redis.UniversalClient) *RedisDLQ
NewRedisDLQ constructs a Redis-backed DLQ. The default key prefix is "parsec" and the default MAXLEN cap is 10000 entries per sink stream.
func (*RedisDLQ) Count ¶
Count returns XLEN for sink, or the sum across all known streams when sink is empty.
func (*RedisDLQ) Discard ¶
Discard removes id from its stream. We resolve the stream by scanning every known sink stream — Redis IDs encode `<ms>-<seq>` and do not embed the sink, so a per-sink lookup is unavoidable. Memory cost is O(#known sinks), which is small in practice.
func (*RedisDLQ) Items ¶
Items returns up to limit newest-first DLQItems for sink. If sink is empty, items across every known stream are merged.
func (*RedisDLQ) Replay ¶
Replay fetches id, locates the owning sink, and re-sends. The original stream entry is not removed — replayed items that fail again will land back in the DLQ via the Retrier.
func (*RedisDLQ) SetRegistryLookup ¶
func (d *RedisDLQ) SetRegistryLookup(fn RegistryLookup)
SetRegistryLookup attaches the Sink lookup used by Replay.
func (*RedisDLQ) WithKeyPrefix ¶
WithKeyPrefix changes the Redis namespace.
func (*RedisDLQ) WithMaxLen ¶
WithMaxLen overrides the per-stream MAXLEN cap. <=0 disables capping.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry is a name-to-Sink lookup table. Embedders register Sinks at boot and publishers resolve them by name.
type RegistryLookup ¶
RegistryLookup is the function a DLQ uses to resolve a sink name back to its (already retry-wrapped) Sink for Replay. Implementations accept this via a setter so the registry doesn't have to leak into the constructor.
type Retrier ¶
type Retrier struct {
Inner Sink
Cfg RetryConfig
DLQ DLQ // optional; nil disables DLQ push
// IsTransientFn overrides the default classifier. Optional.
IsTransientFn func(error) bool
}
Retrier wraps a Sink with the retry-then-DLQ semantics described in the phase-12 spec. PublishOrSink stays synchronous: the caller blocks until the retry loop completes (success, DLQ push, or context cancellation).
The Retrier is itself a Sink — the wrapped sink's Name is preserved so callers and the registry behave identically.
func NewRetrier ¶
func NewRetrier(inner Sink, cfg RetryConfig, dlq DLQ) *Retrier
NewRetrier wraps inner with the supplied config + DLQ.
func (*Retrier) IsRetrier ¶
IsRetrier is a marker method used by the wrap-once gate in parsec.New to avoid double-wrapping a registered sink.
func (*Retrier) Name ¶
Name returns the inner sink's name. Callers must not depend on whether a sink is wrapped.
type RetryConfig ¶
type RetryConfig struct {
// MaxAttempts is the total number of Send calls (initial + retries).
// Default 5. A value of 1 disables retry.
MaxAttempts int
// BaseBackoff is the first wait after a failure. Default 1s.
BaseBackoff time.Duration
// MaxBackoff caps the exponential growth. Default 30s.
MaxBackoff time.Duration
// JitterFraction is the +/- proportion of jitter applied to each wait.
// Default 0.2 (i.e. wait * uniform(0.8, 1.2)).
JitterFraction float64
}
RetryConfig governs the sink retry loop. Zero values are interpreted by Normalize.
func (RetryConfig) BackoffFor ¶
func (c RetryConfig) BackoffFor(attempt int) time.Duration
BackoffFor computes the wait before attempt (1-indexed). Attempt 1 returns 0 (no wait before the first try); attempt N returns base*2^(N-2) capped at MaxBackoff, with +/- JitterFraction multiplied in.
func (RetryConfig) Normalize ¶
func (c RetryConfig) Normalize() RetryConfig
Normalize fills in defaults for any zero / out-of-range field. The returned value is safe to pass to a Retrier.
type Sink ¶
type Sink interface {
// Name is a short identifier used by the registry and the manifest.
Name() string
// Send delivers the message. Returns a coded error on failure.
Send(ctx context.Context, to Recipient, m Message) error
}
Sink delivers a Message to a Recipient. Implementations must be safe for concurrent use.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package email is a thin SMTP sink.
|
Package email is a thin SMTP sink. |
|
Package slack is a Slack incoming-webhook sink.
|
Package slack is a Slack incoming-webhook sink. |
|
Package webhook posts an arbitrary JSON envelope to a configured URL.
|
Package webhook posts an arbitrary JSON envelope to a configured URL. |