envelope

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2026 License: MIT Imports: 6 Imported by: 0

Documentation

Overview

Package envelope is the wire-format contract every Parsec subscriber and publisher uses. The shape is locked here; all higher-level Parsec components (schema registry, client library, token broker, cache, telemetry) depend on it.

An Envelope wraps an application payload with the metadata Parsec needs for ordered processing, schema introspection, and causation tracking:

  • Channel — the channel this envelope was published on
  • Sequence — monotonic per (channel, producer) counter assigned by the publishing client (NOT the server)
  • ProducedAt — wall-clock production time (millisecond precision)
  • Producer — identity + kind (user/agent/service) of the source
  • Aspect — routing key within a channel; subscribers filter on it
  • SchemaRef — schema-registry identifier for the payload
  • Causation — backward reference to the parent envelope (DAG edge)
  • Payload — application content, schema-conformant

The recommended size ceiling is MaxEnvelopeSize (64 KB). Larger payloads use the data_ref pattern: include a URL in the payload and have subscribers fetch the actual bytes over HTTP.

Index

Constants

View Source
const AspectDataRef = "data_ref"

AspectDataRef is the conventional aspect name for envelopes whose payload is a DataRefPayload.

View Source
const MaxEnvelopeSize = 64 * 1024

MaxEnvelopeSize is the recommended ceiling for an encoded envelope on the wire. Producers that exceed this size SHOULD switch to the data_ref pattern (publish a small envelope whose payload carries a URL, fetch the payload bytes over HTTP).

Variables

View Source
var (
	ErrChannelRequired     = errors.New("envelope: channel required")
	ErrAspectRequired      = errors.New("envelope: aspect required")
	ErrProducerRequired    = errors.New("envelope: producer required")
	ErrProducerKindUnknown = errors.New("envelope: unknown producer kind")
	ErrSequenceNonPositive = errors.New("envelope: sequence must be positive")
	ErrProducedAtZero      = errors.New("envelope: produced_at must be set")
	ErrTooLarge            = errors.New("envelope: encoded size exceeds MaxEnvelopeSize")
)

Sentinel errors. Callers errors.Is against these.

Functions

This section is empty.

Types

type Causation

type Causation struct {
	ParentChannel  string `json:"parent_channel,omitempty"`
	ParentSequence int64  `json:"parent_sequence,omitempty"`
}

Causation is a backward reference to the envelope that caused this one. The zero value (both fields empty/zero) means "this is a root event."

func (Causation) IsRoot

func (c Causation) IsRoot() bool

IsRoot reports whether the causation reference is unset (root event).

type DataRefPayload

type DataRefPayload struct {
	URL         string `json:"url"`
	SizeBytes   int64  `json:"size_bytes"`
	ContentHash string `json:"content_hash,omitempty"`
}

DataRefPayload is the canonical shape for the data_ref aspect: a small envelope whose payload points to bytes the subscriber must fetch over HTTP. Producers SHOULD use this shape any time the payload would otherwise exceed MaxEnvelopeSize.

type Envelope

type Envelope struct {
	Channel    string          `json:"channel"`
	Sequence   int64           `json:"sequence"`
	ProducedAt time.Time       `json:"produced_at"`
	Producer   Producer        `json:"producer"`
	Aspect     string          `json:"aspect"`
	SchemaRef  string          `json:"schema_ref,omitempty"`
	Causation  Causation       `json:"causation,omitzero"`
	Payload    json.RawMessage `json:"payload,omitempty"`
}

Envelope is the Parsec wire envelope.

All fields except ProducedAt round-trip cleanly through encoding/json. ProducedAt is encoded as RFC3339 with millisecond precision via custom MarshalJSON / UnmarshalJSON so it matches the spec's "2026-05-27T14:22:09.331Z" form.

func Decode

func Decode(b []byte) (Envelope, error)

Decode parses an envelope from its JSON wire form. It does NOT enforce the size ceiling — that is a producer-side guarantee, and subscribers MUST tolerate receiving any size their transport delivered.

func (Envelope) Encode

func (e Envelope) Encode() ([]byte, error)

Encode marshals the envelope to JSON and enforces the size ceiling. Use this at the publisher boundary; subscribers should decode with json.Unmarshal and call Validate.

func (Envelope) MarshalJSON

func (e Envelope) MarshalJSON() ([]byte, error)

MarshalJSON encodes the envelope so ProducedAt uses millisecond precision regardless of the underlying time.Time's resolution.

func (*Envelope) UnmarshalJSON

func (e *Envelope) UnmarshalJSON(b []byte) error

UnmarshalJSON decodes the envelope. ProducedAt parses against both the canonical (ms precision) layout and RFC3339Nano so subscribers tolerate producers that emit higher-resolution timestamps.

func (Envelope) Validate

func (e Envelope) Validate() error

Validate reports whether the envelope is structurally well-formed. It does NOT validate the payload against a schema (that lives in the schema package); it only checks the envelope-level invariants.

type GapDetector

type GapDetector struct {
	// contains filtered or unexported fields
}

GapDetector tracks the highest sequence seen per (channel, producer) pair and reports gaps when an out-of-order or skipped envelope arrives. Subscribers use it for at-least-once delivery semantics: a positive Gap value means messages were missed.

func NewGapDetector

func NewGapDetector() *GapDetector

NewGapDetector constructs an empty detector.

func (*GapDetector) HighWater

func (d *GapDetector) HighWater(channel, producer string) int64

HighWater returns the highest sequence observed for the (channel, producer) pair, or 0 if none.

func (*GapDetector) Observe

func (d *GapDetector) Observe(channel, producer string, sequence int64) Result

Observe records the (channel, producer, sequence) tuple and reports the delivery state. Safe for concurrent use.

type Producer

type Producer struct {
	ID   string       `json:"id"`
	Kind ProducerKind `json:"kind"`
}

Producer identifies the source of an envelope.

type ProducerKind

type ProducerKind string

ProducerKind discriminates humans from agents from backend services.

const (
	ProducerUser    ProducerKind = "user"
	ProducerAgent   ProducerKind = "agent"
	ProducerService ProducerKind = "service"
)

func (ProducerKind) Valid

func (k ProducerKind) Valid() bool

Valid reports whether k is one of the recognized producer kinds.

type Result

type Result struct {
	// Gap is the number of envelopes missed BEFORE the one just
	// observed. Zero means perfectly in-order; a positive value means
	// the subscriber should treat the (channel, producer) stream as
	// having a hole.
	Gap int64
	// Duplicate is true when the observed sequence is <= the highest
	// seen for this (channel, producer). The detector does NOT update
	// its high-water-mark in that case.
	Duplicate bool
	// First is true when this is the first envelope ever observed for
	// the (channel, producer) pair.
	First bool
}

Result describes what Observe found.

type SequenceTracker

type SequenceTracker struct {
	// contains filtered or unexported fields
}

SequenceTracker assigns monotonic per-channel sequences for a single producer. It is the publishing client's source of next-sequence numbers and is safe for concurrent use. Sequences start at 1.

Persistence: the tracker exposes Snapshot / Restore so a publishing client can survive a process restart without resetting to zero. The publishing client owns the durable storage (typically a small boltdb / sqlite file or a Redis key) — the tracker itself is in-memory.

func NewSequenceTracker

func NewSequenceTracker() *SequenceTracker

NewSequenceTracker constructs an empty tracker.

func (*SequenceTracker) Next

func (t *SequenceTracker) Next(channel string) int64

Next returns the next sequence number for channel and advances the counter. The first call for a channel returns 1.

func (*SequenceTracker) Peek

func (t *SequenceTracker) Peek(channel string) int64

Peek returns the most recently issued sequence number for channel without advancing the counter. Returns 0 if no sequence has been issued.

func (*SequenceTracker) Restore

func (t *SequenceTracker) Restore(state map[string]int64)

Restore replaces the tracker's state with the supplied snapshot. Subsequent Next calls advance from the restored values.

func (*SequenceTracker) Snapshot

func (t *SequenceTracker) Snapshot() map[string]int64

Snapshot returns the current per-channel sequence state. The map is a copy; mutating it does not affect the tracker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL