Documentation
¶
Overview ¶
Package envelope is the wire-format contract every Parsec subscriber and publisher uses. The shape is locked here; all higher-level Parsec components (schema registry, client library, token broker, cache, telemetry) depend on it.
An Envelope wraps an application payload with the metadata Parsec needs for ordered processing, schema introspection, and causation tracking:
- Channel — the channel this envelope was published on
- Sequence — monotonic per (channel, producer) counter assigned by the publishing client (NOT the server)
- ProducedAt — wall-clock production time (millisecond precision)
- Producer — identity + kind (user/agent/service) of the source
- Aspect — routing key within a channel; subscribers filter on it
- SchemaRef — schema-registry identifier for the payload
- Causation — backward reference to the parent envelope (DAG edge)
- Payload — application content, schema-conformant
The recommended size ceiling is MaxEnvelopeSize (64 KB). Larger payloads use the data_ref pattern: include a URL in the payload and have subscribers fetch the actual bytes over HTTP.
Index ¶
Constants ¶
const AspectDataRef = "data_ref"
AspectDataRef is the conventional aspect name for envelopes whose payload is a DataRefPayload.
const MaxEnvelopeSize = 64 * 1024
MaxEnvelopeSize is the recommended ceiling for an encoded envelope on the wire. Producers that exceed this size SHOULD switch to the data_ref pattern (publish a small envelope whose payload carries a URL, fetch the payload bytes over HTTP).
Variables ¶
var ( ErrChannelRequired = errors.New("envelope: channel required") ErrAspectRequired = errors.New("envelope: aspect required") ErrProducerRequired = errors.New("envelope: producer required") ErrProducerKindUnknown = errors.New("envelope: unknown producer kind") ErrSequenceNonPositive = errors.New("envelope: sequence must be positive") ErrProducedAtZero = errors.New("envelope: produced_at must be set") ErrTooLarge = errors.New("envelope: encoded size exceeds MaxEnvelopeSize") )
Sentinel errors. Callers errors.Is against these.
Functions ¶
This section is empty.
Types ¶
type Causation ¶
type Causation struct {
ParentChannel string `json:"parent_channel,omitempty"`
ParentSequence int64 `json:"parent_sequence,omitempty"`
}
Causation is a backward reference to the envelope that caused this one. The zero value (both fields empty/zero) means "this is a root event."
type DataRefPayload ¶
type DataRefPayload struct {
URL string `json:"url"`
SizeBytes int64 `json:"size_bytes"`
ContentHash string `json:"content_hash,omitempty"`
}
DataRefPayload is the canonical shape for the data_ref aspect: a small envelope whose payload points to bytes the subscriber must fetch over HTTP. Producers SHOULD use this shape any time the payload would otherwise exceed MaxEnvelopeSize.
type Envelope ¶
type Envelope struct {
Channel string `json:"channel"`
Sequence int64 `json:"sequence"`
ProducedAt time.Time `json:"produced_at"`
Producer Producer `json:"producer"`
Aspect string `json:"aspect"`
SchemaRef string `json:"schema_ref,omitempty"`
Causation Causation `json:"causation,omitzero"`
Payload json.RawMessage `json:"payload,omitempty"`
}
Envelope is the Parsec wire envelope.
All fields except ProducedAt round-trip cleanly through encoding/json. ProducedAt is encoded as RFC3339 with millisecond precision via custom MarshalJSON / UnmarshalJSON so it matches the spec's "2026-05-27T14:22:09.331Z" form.
func Decode ¶
Decode parses an envelope from its JSON wire form. It does NOT enforce the size ceiling — that is a producer-side guarantee, and subscribers MUST tolerate receiving any size their transport delivered.
func (Envelope) Encode ¶
Encode marshals the envelope to JSON and enforces the size ceiling. Use this at the publisher boundary; subscribers should decode with json.Unmarshal and call Validate.
func (Envelope) MarshalJSON ¶
MarshalJSON encodes the envelope so ProducedAt uses millisecond precision regardless of the underlying time.Time's resolution.
func (*Envelope) UnmarshalJSON ¶
UnmarshalJSON decodes the envelope. ProducedAt parses against both the canonical (ms precision) layout and RFC3339Nano so subscribers tolerate producers that emit higher-resolution timestamps.
type GapDetector ¶
type GapDetector struct {
// contains filtered or unexported fields
}
GapDetector tracks the highest sequence seen per (channel, producer) pair and reports gaps when an out-of-order or skipped envelope arrives. Subscribers use it for at-least-once delivery semantics: a positive Gap value means messages were missed.
func NewGapDetector ¶
func NewGapDetector() *GapDetector
NewGapDetector constructs an empty detector.
func (*GapDetector) HighWater ¶
func (d *GapDetector) HighWater(channel, producer string) int64
HighWater returns the highest sequence observed for the (channel, producer) pair, or 0 if none.
type Producer ¶
type Producer struct {
ID string `json:"id"`
Kind ProducerKind `json:"kind"`
}
Producer identifies the source of an envelope.
type ProducerKind ¶
type ProducerKind string
ProducerKind discriminates humans from agents from backend services.
const ( ProducerUser ProducerKind = "user" ProducerAgent ProducerKind = "agent" ProducerService ProducerKind = "service" )
func (ProducerKind) Valid ¶
func (k ProducerKind) Valid() bool
Valid reports whether k is one of the recognized producer kinds.
type Result ¶
type Result struct {
// Gap is the number of envelopes missed BEFORE the one just
// observed. Zero means perfectly in-order; a positive value means
// the subscriber should treat the (channel, producer) stream as
// having a hole.
Gap int64
// Duplicate is true when the observed sequence is <= the highest
// seen for this (channel, producer). The detector does NOT update
// its high-water-mark in that case.
Duplicate bool
// First is true when this is the first envelope ever observed for
// the (channel, producer) pair.
First bool
}
Result describes what Observe found.
type SequenceTracker ¶
type SequenceTracker struct {
// contains filtered or unexported fields
}
SequenceTracker assigns monotonic per-channel sequences for a single producer. It is the publishing client's source of next-sequence numbers and is safe for concurrent use. Sequences start at 1.
Persistence: the tracker exposes Snapshot / Restore so a publishing client can survive a process restart without resetting to zero. The publishing client owns the durable storage (typically a small boltdb / sqlite file or a Redis key) — the tracker itself is in-memory.
func NewSequenceTracker ¶
func NewSequenceTracker() *SequenceTracker
NewSequenceTracker constructs an empty tracker.
func (*SequenceTracker) Next ¶
func (t *SequenceTracker) Next(channel string) int64
Next returns the next sequence number for channel and advances the counter. The first call for a channel returns 1.
func (*SequenceTracker) Peek ¶
func (t *SequenceTracker) Peek(channel string) int64
Peek returns the most recently issued sequence number for channel without advancing the counter. Returns 0 if no sequence has been issued.
func (*SequenceTracker) Restore ¶
func (t *SequenceTracker) Restore(state map[string]int64)
Restore replaces the tracker's state with the supplied snapshot. Subsequent Next calls advance from the restored values.
func (*SequenceTracker) Snapshot ¶
func (t *SequenceTracker) Snapshot() map[string]int64
Snapshot returns the current per-channel sequence state. The map is a copy; mutating it does not affect the tracker.