Documentation
¶
Overview ¶
Package client is the envelope-aware Go client for Parsec.
The client wraps a pluggable Transport (websocket via centrifuge-go, HTTP-SSE for probes, or any other implementation) with the convention layer the upgrade spec calls for: typed aspect handlers, sequence tracking with gap detection, schema validation, causation propagation, transparent reconnect-with-resume.
Subscribers parse envelopes once at the transport boundary; handlers see typed payloads. Publishers stamp sequence + producer + causation automatically; application code only supplies the aspect and payload.
Index ¶
- Variables
- func OnAspectTyped[T any](s *Subscription, aspect string, handler func(T, envelope.Envelope))
- type Client
- func (c *Client) Close() error
- func (c *Client) Connect(ctx context.Context, token string) error
- func (c *Client) History(ctx context.Context, channel string, opts HistoryOptions) ([]envelope.Envelope, error)
- func (c *Client) Publish(ctx context.Context, channel string, env envelope.Envelope) error
- func (c *Client) Subscribe(ctx context.Context, channel string, opts ...SubscribeOption) (*Subscription, error)
- type ClientOption
- type FileIO
- type FileSnapshotter
- type HistoryOptions
- type MemoryTransport
- func (m *MemoryTransport) Close() error
- func (m *MemoryTransport) Connect(context.Context, string) error
- func (m *MemoryTransport) History(_ context.Context, channel string, opts HistoryOptions) ([][]byte, error)
- func (m *MemoryTransport) Presence(_ context.Context, channel string) ([]PresenceEntry, error)
- func (m *MemoryTransport) Publish(_ context.Context, channel string, payload []byte) error
- func (m *MemoryTransport) Subscribe(_ context.Context, channel string, opts SubscribeOptions) (<-chan []byte, func(), error)
- type PresenceEntry
- type SequenceSnapshotter
- type SubscribeOption
- type SubscribeOptions
- type Subscription
- func (s *Subscription) Channel() string
- func (s *Subscription) History(ctx context.Context, opts HistoryOptions) ([]envelope.Envelope, error)
- func (s *Subscription) OnAspect(aspect string, handler func(envelope.Envelope))
- func (s *Subscription) OnEnvelope(handler func(envelope.Envelope))
- func (s *Subscription) OnGap(handler func(channel string, gap int64))
- func (s *Subscription) Presence(ctx context.Context) ([]PresenceEntry, error)
- func (s *Subscription) PublishDerived(ctx context.Context, aspect string, payload []byte) error
- func (s *Subscription) Unsubscribe() error
- type Transport
Constants ¶
This section is empty.
Variables ¶
var ErrNotImplemented = errors.New("transport: not implemented")
ErrNotImplemented is returned by transports whose feature surface is incomplete (e.g. a publish-only transport's Subscribe).
Functions ¶
func OnAspectTyped ¶
func OnAspectTyped[T any](s *Subscription, aspect string, handler func(T, envelope.Envelope))
OnAspectTyped is the typed-payload variant of OnAspect. The payload is unmarshaled into T; on failure the handler is skipped and the error is silently dropped (use the Subscription's Validator to surface payload errors).
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is the top-level envelope-aware client.
func (*Client) History ¶
func (c *Client) History(ctx context.Context, channel string, opts HistoryOptions) ([]envelope.Envelope, error)
History fetches prior envelopes for channel.
func (*Client) Publish ¶
Publish stamps env with the next sequence + ProducedAt + Producer and sends it over the transport.
Caller-supplied fields:
- Aspect — required
- Payload — payload bytes (already JSON-encoded)
- Causation — optional; set by clients that want explicit DAG edges (PublishDerived on a Subscription does this automatically)
- SchemaRef — optional
Stamped automatically:
- Channel — set to the supplied channel
- Sequence — next from the tracker
- ProducedAt — time.Now().UTC()
- Producer — from the client config
func (*Client) Subscribe ¶
func (c *Client) Subscribe(ctx context.Context, channel string, opts ...SubscribeOption) (*Subscription, error)
Subscribe opens a Subscription on channel. The Client owns the reconnect loop — the application registers handlers and drives them to completion; the Subscription survives transport-level disconnects.
type ClientOption ¶
type ClientOption func(*clientConfig)
ClientOption configures a Client at construction.
func WithProducer ¶
func WithProducer(p envelope.Producer) ClientOption
WithProducer sets the publisher identity stamped on every outgoing envelope.
func WithSequenceTracker ¶
func WithSequenceTracker(t *envelope.SequenceTracker) ClientOption
WithSequenceTracker overrides the default sequence tracker. Inject a shared tracker when multiple Clients in one process publish to overlapping channels.
func WithSnapshotter ¶
func WithSnapshotter(s SequenceSnapshotter) ClientOption
WithSnapshotter enables sequence persistence.
func WithTransport ¶
func WithTransport(t Transport) ClientOption
WithTransport injects the Transport. Required.
func WithValidator ¶
func WithValidator(v *schema.Validator) ClientOption
WithValidator wires a schema.Validator that runs on every incoming envelope (per the Validator.Mode policy).
type FileIO ¶
FileIO is the minimal filesystem surface FileSnapshotter consumes. Tests substitute an in-memory implementation.
type FileSnapshotter ¶
type FileSnapshotter struct {
Path string
// contains filtered or unexported fields
}
FileSnapshotter persists the sequence tracker state to a JSON file. Suitable for single-process clients that need to survive a restart without losing per-channel sequence counters.
func NewFileSnapshotter ¶
func NewFileSnapshotter(path string) *FileSnapshotter
NewFileSnapshotter constructs a snapshotter writing to path.
func (*FileSnapshotter) Load ¶
func (f *FileSnapshotter) Load() (map[string]int64, error)
Load reads state.
func (*FileSnapshotter) Save ¶
func (f *FileSnapshotter) Save(state map[string]int64) error
Save writes state.
func (*FileSnapshotter) WithFileIO ¶
func (f *FileSnapshotter) WithFileIO(io FileIO) *FileSnapshotter
WithFileIO swaps the underlying IO (tests).
type HistoryOptions ¶
type HistoryOptions struct {
Limit int
Since int64 // since sequence (exclusive)
Until int64 // until sequence (inclusive); 0 = unbounded
}
HistoryOptions controls a history fetch.
type MemoryTransport ¶
type MemoryTransport struct {
// contains filtered or unexported fields
}
MemoryTransport is an in-process Transport useful for tests, examples, and same-process publisher/subscriber wiring. Publish hands the bytes to every subscriber on the same channel; history is unbounded; presence is empty.
MemoryTransport is NOT a production transport — it does not survive a process restart, has no auth, and broadcasts inside one process. Use it for envelope-level tests; swap in a WebSocketTransport in production.
func NewMemoryTransport ¶
func NewMemoryTransport() *MemoryTransport
NewMemoryTransport constructs an empty transport.
func (*MemoryTransport) Close ¶
func (m *MemoryTransport) Close() error
Close cancels every active subscription.
func (*MemoryTransport) Connect ¶
func (m *MemoryTransport) Connect(context.Context, string) error
Connect is a no-op.
func (*MemoryTransport) History ¶
func (m *MemoryTransport) History(_ context.Context, channel string, opts HistoryOptions) ([][]byte, error)
History returns every envelope ever published to channel, optionally bounded by opts.Since (exclusive) and opts.Limit.
func (*MemoryTransport) Presence ¶
func (m *MemoryTransport) Presence(_ context.Context, channel string) ([]PresenceEntry, error)
Presence returns the count of subscribers as a single anonymous entry. Memory transport has no auth and therefore no user identities to report.
func (*MemoryTransport) Subscribe ¶
func (m *MemoryTransport) Subscribe(_ context.Context, channel string, opts SubscribeOptions) (<-chan []byte, func(), error)
Subscribe opens a stream of bytes. FromSequence replays history above the given sequence number by decoding+filtering — useful for resume after disconnect.
type PresenceEntry ¶
type PresenceEntry struct {
UserID string `json:"user_id"`
ClientID string `json:"client_id"`
ConnInfo json.RawMessage `json:"conn_info,omitempty"`
}
PresenceEntry is one entry in the presence list.
type SequenceSnapshotter ¶
type SequenceSnapshotter interface {
Save(state map[string]int64) error
Load() (map[string]int64, error)
}
SequenceSnapshotter persists per-channel sequence state for crash recovery. The client snapshots after every publish; the snapshotter's implementation chooses the durability tier (BoltDB, SQLite, file, in-memory).
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
SubscribeOption is the public option type for Subscribe.
func WithAspectFilter ¶
func WithAspectFilter(aspects ...string) SubscribeOption
WithAspectFilter restricts the subscription to envelopes whose Aspect is in the supplied list. The transport may push the filter down to the server (when supported); otherwise the client filters locally.
func WithFromSequence ¶
func WithFromSequence(seq int64) SubscribeOption
WithFromSequence resumes from the given sequence number. Used by the reconnect path; explicit consumers can also supply it for catch-up.
type SubscribeOptions ¶
SubscribeOptions controls the wire-level subscribe call. The Client fills FromSequence from its persisted sequence state on reconnect.
type Subscription ¶
type Subscription struct {
// contains filtered or unexported fields
}
Subscription is one envelope-aware subscription. Handlers are dispatched in registration order; OnEnvelope handlers see every envelope, OnAspect handlers see only envelopes matching their aspect.
func (*Subscription) Channel ¶
func (s *Subscription) Channel() string
Channel returns the subscribed channel name.
func (*Subscription) History ¶
func (s *Subscription) History(ctx context.Context, opts HistoryOptions) ([]envelope.Envelope, error)
History is a convenience that delegates to Client.History.
func (*Subscription) OnAspect ¶
func (s *Subscription) OnAspect(aspect string, handler func(envelope.Envelope))
OnAspect registers a handler that receives only envelopes whose Aspect equals the supplied name.
func (*Subscription) OnEnvelope ¶
func (s *Subscription) OnEnvelope(handler func(envelope.Envelope))
OnEnvelope registers a handler that receives every envelope.
func (*Subscription) OnGap ¶
func (s *Subscription) OnGap(handler func(channel string, gap int64))
OnGap registers a handler that fires when a sequence gap is detected on the subscription. Apps typically use this to reload state from a durable source.
func (*Subscription) Presence ¶
func (s *Subscription) Presence(ctx context.Context) ([]PresenceEntry, error)
Presence delegates to the Transport.
func (*Subscription) PublishDerived ¶
PublishDerived publishes an envelope on s.channel whose Causation references the most recently dispatched envelope. This is the programmatic surface for the "auto-propagated causation" behavior the upgrade spec describes.
func (*Subscription) Unsubscribe ¶
func (s *Subscription) Unsubscribe() error
Unsubscribe terminates the subscription.
type Transport ¶
type Transport interface {
// Connect opens the underlying transport. Idempotent.
Connect(ctx context.Context, token string) error
// Close terminates the connection. Pending subscriptions are
// canceled.
Close() error
// Publish writes one encoded envelope to channel. The Transport
// is responsible for delivering the bytes; sequencing was already
// stamped by the Client.
Publish(ctx context.Context, channel string, payload []byte) error
// Subscribe opens a subscription. The returned channel emits raw
// envelope bytes; the Client decodes and dispatches. cancelFn
// terminates the subscription.
Subscribe(ctx context.Context, channel string, opts SubscribeOptions) (<-chan []byte, func(), error)
// History returns prior envelopes for channel. Empty slice when
// the transport does not implement history.
History(ctx context.Context, channel string, opts HistoryOptions) ([][]byte, error)
// Presence returns the active presence list. Empty slice when
// the transport does not implement presence.
Presence(ctx context.Context, channel string) ([]PresenceEntry, error)
}
Transport is the wire-level surface the Client sits on top of. Each Transport implementation handles one connection lifecycle — authenticate, publish, subscribe, replay-from-sequence, presence, history.
Implementations:
- SSETransport (this package) — HTTP-based publish + SSE subscribe, suitable for probes and tests
- WebSocketTransport (centrifuge-go) — left as a stub the consumer supplies in production