client

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package client is the envelope-aware Go client for Parsec.

The client wraps a pluggable Transport (websocket via centrifuge-go, HTTP-SSE for probes, or any other implementation) with the convention layer the upgrade spec calls for: typed aspect handlers, sequence tracking with gap detection, schema validation, causation propagation, transparent reconnect-with-resume.

Subscribers parse envelopes once at the transport boundary; handlers see typed payloads. Publishers stamp sequence + producer + causation automatically; application code only supplies the aspect and payload.

Index

Constants

This section is empty.

Variables

View Source
var ErrNotImplemented = errors.New("transport: not implemented")

ErrNotImplemented is returned by transports whose feature surface is incomplete (e.g. a publish-only transport's Subscribe).

Functions

func OnAspectTyped

func OnAspectTyped[T any](s *Subscription, aspect string, handler func(T, envelope.Envelope))

OnAspectTyped is the typed-payload variant of OnAspect. The payload is unmarshaled into T; on failure the handler is skipped and the error is silently dropped (use the Subscription's Validator to surface payload errors).

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is the top-level envelope-aware client.

func New

func New(opts ...ClientOption) (*Client, error)

New constructs a Client.

func (*Client) Close

func (c *Client) Close() error

Close terminates the transport and every active subscription.

func (*Client) Connect

func (c *Client) Connect(ctx context.Context, token string) error

Connect opens the underlying transport.

func (*Client) History

func (c *Client) History(ctx context.Context, channel string, opts HistoryOptions) ([]envelope.Envelope, error)

History fetches prior envelopes for channel.

func (*Client) Publish

func (c *Client) Publish(ctx context.Context, channel string, env envelope.Envelope) error

Publish stamps env with the next sequence + ProducedAt + Producer and sends it over the transport.

Caller-supplied fields:

  • Aspect — required
  • Payload — payload bytes (already JSON-encoded)
  • Causation — optional; set by clients that want explicit DAG edges (PublishDerived on a Subscription does this automatically)
  • SchemaRef — optional

Stamped automatically:

  • Channel — set to the supplied channel
  • Sequence — next from the tracker
  • ProducedAt — time.Now().UTC()
  • Producer — from the client config

func (*Client) Subscribe

func (c *Client) Subscribe(ctx context.Context, channel string, opts ...SubscribeOption) (*Subscription, error)

Subscribe opens a Subscription on channel. The Client owns the reconnect loop — the application registers handlers and drives them to completion; the Subscription survives transport-level disconnects.

type ClientOption

type ClientOption func(*clientConfig)

ClientOption configures a Client at construction.

func WithProducer

func WithProducer(p envelope.Producer) ClientOption

WithProducer sets the publisher identity stamped on every outgoing envelope.

func WithSequenceTracker

func WithSequenceTracker(t *envelope.SequenceTracker) ClientOption

WithSequenceTracker overrides the default sequence tracker. Inject a shared tracker when multiple Clients in one process publish to overlapping channels.

func WithSnapshotter

func WithSnapshotter(s SequenceSnapshotter) ClientOption

WithSnapshotter enables sequence persistence.

func WithTransport

func WithTransport(t Transport) ClientOption

WithTransport injects the Transport. Required.

func WithValidator

func WithValidator(v *schema.Validator) ClientOption

WithValidator wires a schema.Validator that runs on every incoming envelope (per the Validator.Mode policy).

type FileIO

type FileIO interface {
	Read(path string) ([]byte, error)
	Write(path string, data []byte) error
}

FileIO is the minimal filesystem surface FileSnapshotter consumes. Tests substitute an in-memory implementation.

type FileSnapshotter

type FileSnapshotter struct {
	Path string
	// contains filtered or unexported fields
}

FileSnapshotter persists the sequence tracker state to a JSON file. Suitable for single-process clients that need to survive a restart without losing per-channel sequence counters.

func NewFileSnapshotter

func NewFileSnapshotter(path string) *FileSnapshotter

NewFileSnapshotter constructs a snapshotter writing to path.

func (*FileSnapshotter) Load

func (f *FileSnapshotter) Load() (map[string]int64, error)

Load reads state.

func (*FileSnapshotter) Save

func (f *FileSnapshotter) Save(state map[string]int64) error

Save writes state.

func (*FileSnapshotter) WithFileIO

func (f *FileSnapshotter) WithFileIO(io FileIO) *FileSnapshotter

WithFileIO swaps the underlying IO (tests).

type HistoryOptions

type HistoryOptions struct {
	Limit int
	Since int64 // since sequence (exclusive)
	Until int64 // until sequence (inclusive); 0 = unbounded
}

HistoryOptions controls a history fetch.

type MemoryTransport

type MemoryTransport struct {
	// contains filtered or unexported fields
}

MemoryTransport is an in-process Transport useful for tests, examples, and same-process publisher/subscriber wiring. Publish hands the bytes to every subscriber on the same channel; history is unbounded; presence is empty.

MemoryTransport is NOT a production transport — it does not survive a process restart, has no auth, and broadcasts inside one process. Use it for envelope-level tests; swap in a WebSocketTransport in production.

func NewMemoryTransport

func NewMemoryTransport() *MemoryTransport

NewMemoryTransport constructs an empty transport.

func (*MemoryTransport) Close

func (m *MemoryTransport) Close() error

Close cancels every active subscription.

func (*MemoryTransport) Connect

Connect is a no-op.

func (*MemoryTransport) History

func (m *MemoryTransport) History(_ context.Context, channel string, opts HistoryOptions) ([][]byte, error)

History returns every envelope ever published to channel, optionally bounded by opts.Since (exclusive) and opts.Limit.

func (*MemoryTransport) Presence

func (m *MemoryTransport) Presence(_ context.Context, channel string) ([]PresenceEntry, error)

Presence returns the count of subscribers as a single anonymous entry. Memory transport has no auth and therefore no user identities to report.

func (*MemoryTransport) Publish

func (m *MemoryTransport) Publish(_ context.Context, channel string, payload []byte) error

Publish broadcasts payload to every subscriber on channel.

func (*MemoryTransport) Subscribe

func (m *MemoryTransport) Subscribe(_ context.Context, channel string, opts SubscribeOptions) (<-chan []byte, func(), error)

Subscribe opens a stream of bytes. FromSequence replays history above the given sequence number by decoding+filtering — useful for resume after disconnect.

type PresenceEntry

type PresenceEntry struct {
	UserID   string          `json:"user_id"`
	ClientID string          `json:"client_id"`
	ConnInfo json.RawMessage `json:"conn_info,omitempty"`
}

PresenceEntry is one entry in the presence list.

type SequenceSnapshotter

type SequenceSnapshotter interface {
	Save(state map[string]int64) error
	Load() (map[string]int64, error)
}

SequenceSnapshotter persists per-channel sequence state for crash recovery. The client snapshots after every publish; the snapshotter's implementation chooses the durability tier (BoltDB, SQLite, file, in-memory).

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

SubscribeOption is the public option type for Subscribe.

func WithAspectFilter

func WithAspectFilter(aspects ...string) SubscribeOption

WithAspectFilter restricts the subscription to envelopes whose Aspect is in the supplied list. The transport may push the filter down to the server (when supported); otherwise the client filters locally.

func WithFromSequence

func WithFromSequence(seq int64) SubscribeOption

WithFromSequence resumes from the given sequence number. Used by the reconnect path; explicit consumers can also supply it for catch-up.

type SubscribeOptions

type SubscribeOptions struct {
	FromSequence int64
	Aspects      []string
}

SubscribeOptions controls the wire-level subscribe call. The Client fills FromSequence from its persisted sequence state on reconnect.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription is one envelope-aware subscription. Handlers are dispatched in registration order; OnEnvelope handlers see every envelope, OnAspect handlers see only envelopes matching their aspect.

func (*Subscription) Channel

func (s *Subscription) Channel() string

Channel returns the subscribed channel name.

func (*Subscription) History

func (s *Subscription) History(ctx context.Context, opts HistoryOptions) ([]envelope.Envelope, error)

History is a convenience that delegates to Client.History.

func (*Subscription) OnAspect

func (s *Subscription) OnAspect(aspect string, handler func(envelope.Envelope))

OnAspect registers a handler that receives only envelopes whose Aspect equals the supplied name.

func (*Subscription) OnEnvelope

func (s *Subscription) OnEnvelope(handler func(envelope.Envelope))

OnEnvelope registers a handler that receives every envelope.

func (*Subscription) OnGap

func (s *Subscription) OnGap(handler func(channel string, gap int64))

OnGap registers a handler that fires when a sequence gap is detected on the subscription. Apps typically use this to reload state from a durable source.

func (*Subscription) Presence

func (s *Subscription) Presence(ctx context.Context) ([]PresenceEntry, error)

Presence delegates to the Transport.

func (*Subscription) PublishDerived

func (s *Subscription) PublishDerived(ctx context.Context, aspect string, payload []byte) error

PublishDerived publishes an envelope on s.channel whose Causation references the most recently dispatched envelope. This is the programmatic surface for the "auto-propagated causation" behavior the upgrade spec describes.

func (*Subscription) Unsubscribe

func (s *Subscription) Unsubscribe() error

Unsubscribe terminates the subscription.

type Transport

type Transport interface {
	// Connect opens the underlying transport. Idempotent.
	Connect(ctx context.Context, token string) error
	// Close terminates the connection. Pending subscriptions are
	// canceled.
	Close() error
	// Publish writes one encoded envelope to channel. The Transport
	// is responsible for delivering the bytes; sequencing was already
	// stamped by the Client.
	Publish(ctx context.Context, channel string, payload []byte) error
	// Subscribe opens a subscription. The returned channel emits raw
	// envelope bytes; the Client decodes and dispatches. cancelFn
	// terminates the subscription.
	Subscribe(ctx context.Context, channel string, opts SubscribeOptions) (<-chan []byte, func(), error)
	// History returns prior envelopes for channel. Empty slice when
	// the transport does not implement history.
	History(ctx context.Context, channel string, opts HistoryOptions) ([][]byte, error)
	// Presence returns the active presence list. Empty slice when
	// the transport does not implement presence.
	Presence(ctx context.Context, channel string) ([]PresenceEntry, error)
}

Transport is the wire-level surface the Client sits on top of. Each Transport implementation handles one connection lifecycle — authenticate, publish, subscribe, replay-from-sequence, presence, history.

Implementations:

  • SSETransport (this package) — HTTP-based publish + SSE subscribe, suitable for probes and tests
  • WebSocketTransport (centrifuge-go) — left as a stub the consumer supplies in production

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL