Documentation
¶
Overview ¶
Package schema is the Parsec schema registry. Channel patterns are registered at application startup; the registry resolves concrete channel names back to the pattern that owns them and exposes the payload schemas (per aspect) that envelopes published on those channels must conform to.
The package ships two things:
- A pure-Go Registry library used in-process by publishers and subscribers.
- An HTTP handler at /parsec/schemas (mounted by internal/server) that serves a JSON snapshot of the registry so out-of-process clients can cache schemas locally on startup.
Changes to the registry (Register / Update / Deregister) are emitted on a parsec channel — "public:parsec.registry.schemas" by default — so subscribers can hot-reload without polling the HTTP endpoint.
Index ¶
- Constants
- Variables
- func Handler(r Registry) http.Handler
- func ResolveAspect(r Registry, channel, aspect string) (*JSONSchema, ChannelPattern, map[string]string, error)
- type ACLPolicy
- type Aspect
- type Change
- type ChangeKind
- type ChannelPattern
- type JSONSchema
- type MemoryRegistry
- func (r *MemoryRegistry) Deregister(pattern string) error
- func (r *MemoryRegistry) Get(pattern string) (ChannelPattern, error)
- func (r *MemoryRegistry) History(pattern string) ([]ChannelPattern, error)
- func (r *MemoryRegistry) List() []ChannelPattern
- func (r *MemoryRegistry) LoadSnapshot(b []byte) error
- func (r *MemoryRegistry) Register(p ChannelPattern) error
- func (r *MemoryRegistry) Resolve(channel string) (ChannelPattern, map[string]string, error)
- func (r *MemoryRegistry) Snapshot() Snapshot
- func (r *MemoryRegistry) Subscribe() (<-chan Change, func())
- func (r *MemoryRegistry) Update(p ChannelPattern) error
- type Mode
- type Pattern
- type PublishFunc
- type Publisher
- type PublisherOptions
- type Registry
- type Snapshot
- type Validator
Constants ¶
const ChangeChannel = "public:parsec.registry.schemas"
ChangeChannel is the default broadcast channel for registry changes. Apps may override by passing a different channel name to the Publisher when wiring the HTTP surface; the wire shape stays the same.
Variables ¶
var ( ErrPatternConflict = errors.New("schema: pattern conflicts with existing registration") ErrPatternNotFound = errors.New("schema: pattern not found") ErrNoMatch = errors.New("schema: no pattern matches channel") ErrAspectNotFound = errors.New("schema: aspect not registered for pattern") ErrUnsupportedAspect = errors.New("schema: unsupported aspect") )
Sentinel errors.
var ErrPayloadInvalid = errors.New("schema: payload does not conform")
ErrPayloadInvalid is returned by Check in ModeStrict when the payload fails schema validation.
Functions ¶
func Handler ¶
Handler returns an http.Handler that serves registry snapshots and individual pattern lookups.
Routes:
GET <prefix> — full Snapshot (all current patterns)
GET <prefix>?channel=X — resolve channel X to its owning pattern
GET <prefix>?pattern=P — fetch the named pattern (and full history
when &history=true)
The handler does NOT enforce auth; mount it behind the bearer middleware when restricting access (the upgrade spec calls /parsec/schemas a public discovery surface, so default deployments expose it unguarded).
func ResolveAspect ¶
func ResolveAspect(r Registry, channel, aspect string) (*JSONSchema, ChannelPattern, map[string]string, error)
ResolveAspect is a convenience: resolve channel + look up the named aspect's payload schema. Returns ErrAspectNotFound if the aspect is not declared on the matched pattern. The bindings map captures any {name} placeholder values.
Types ¶
type ACLPolicy ¶
type ACLPolicy struct {
PublishRoles []string `json:"publish_roles,omitempty"`
SubscribeRoles []string `json:"subscribe_roles,omitempty"`
Notes string `json:"notes,omitempty"`
}
ACLPolicy is metadata describing which roles or scopes may subscribe / publish to channels matching the pattern. The schema registry does NOT enforce ACL — enforcement lives in the auth package + token broker. This struct is documentation that travels with the pattern so client libraries and the admin UI can render it.
type Aspect ¶
type Aspect struct {
Name string `json:"name"`
Description string `json:"description,omitempty"`
PayloadSchema *JSONSchema `json:"payload_schema,omitempty"`
Required bool `json:"required,omitempty"`
}
Aspect is one named payload contract within a channel pattern.
type Change ¶
type Change struct {
Kind ChangeKind `json:"kind"`
Pattern string `json:"pattern"`
Schema ChannelPattern `json:"schema,omitzero"`
At time.Time `json:"at"`
}
Change is one registry update broadcast to subscribers.
type ChangeKind ¶
type ChangeKind string
ChangeKind discriminates registry change events.
const ( ChangeRegistered ChangeKind = "registered" ChangeUpdated ChangeKind = "updated" ChangeDeregistered ChangeKind = "deregistered" )
type ChannelPattern ¶
type ChannelPattern struct {
Pattern string `json:"pattern"`
Description string `json:"description,omitempty"`
Version int `json:"version"`
Aspects map[string]Aspect `json:"aspects"`
ACL ACLPolicy `json:"acl,omitzero"`
// contains filtered or unexported fields
}
ChannelPattern declares the schema contract for a family of channels. The Pattern is matched against incoming channel names; on a match, the per-aspect PayloadSchema applies.
type JSONSchema ¶
type JSONSchema struct {
Type string `json:"type,omitempty"`
Required []string `json:"required,omitempty"`
Properties map[string]*JSONSchema `json:"properties,omitempty"`
Items *JSONSchema `json:"items,omitempty"`
AdditionalProperties *bool `json:"additionalProperties,omitempty"`
Enum []any `json:"enum,omitempty"`
Description string `json:"description,omitempty"`
}
JSONSchema is a lightweight subset of JSON Schema (draft 2020-12) sufficient for envelope payload validation in Parsec. It supports:
- type: string | number | integer | boolean | object | array | null
- required: list of object property names
- properties: per-property nested JSONSchema
- items: schema for array elements
- additionalProperties: whether unknown object fields are allowed
- enum: list of allowed literal values
Anything richer (oneOf, allOf, $ref, regex patterns) is intentionally out of scope — Parsec subscribers should treat unknown fields as opaque and decode strictly via Go/TS code generation when stronger guarantees are required. A full JSON Schema validator can be substituted later without changing the Registry API.
func (*JSONSchema) Validate ¶
func (s *JSONSchema) Validate(v any) error
Validate reports whether v conforms to s. The error is nil on success and a human-readable explanation otherwise. v is typically the result of json.Unmarshal on the envelope's payload.
func (*JSONSchema) ValidateBytes ¶
func (s *JSONSchema) ValidateBytes(raw []byte) error
ValidateBytes is a convenience that unmarshals raw and runs Validate.
type MemoryRegistry ¶
type MemoryRegistry struct {
// contains filtered or unexported fields
}
MemoryRegistry is the default in-process Registry. It holds patterns in a map keyed by pattern source string and a per-pattern history of prior versions for schema-evolution support.
func NewMemoryRegistry ¶
func NewMemoryRegistry() *MemoryRegistry
NewMemoryRegistry constructs an empty registry.
func (*MemoryRegistry) Deregister ¶
func (r *MemoryRegistry) Deregister(pattern string) error
Deregister removes pattern. Returns ErrPatternNotFound if it does not exist. Version history is dropped.
func (*MemoryRegistry) Get ¶
func (r *MemoryRegistry) Get(pattern string) (ChannelPattern, error)
Get returns the current registration for pattern.
func (*MemoryRegistry) History ¶
func (r *MemoryRegistry) History(pattern string) ([]ChannelPattern, error)
History returns every registered version of pattern, oldest first. Useful for resolving an explicit schema_ref carrying a version suffix.
func (*MemoryRegistry) List ¶
func (r *MemoryRegistry) List() []ChannelPattern
List returns every current pattern in lexicographic order.
func (*MemoryRegistry) LoadSnapshot ¶
func (r *MemoryRegistry) LoadSnapshot(b []byte) error
LoadSnapshot bulk-loads patterns from b (the wire form Snapshot produces). Existing patterns are replaced. Useful for warm-restart scenarios where an on-disk snapshot precedes any in-process registrations.
func (*MemoryRegistry) Register ¶
func (r *MemoryRegistry) Register(p ChannelPattern) error
Register adds p. The pattern must compile; duplicate patterns return ErrPatternConflict. The first registration is version 1 unless the caller set Version explicitly.
func (*MemoryRegistry) Resolve ¶
func (r *MemoryRegistry) Resolve(channel string) (ChannelPattern, map[string]string, error)
Resolve finds the pattern owning channel. The returned bindings map reflects any {name} placeholder values extracted during match. Returns ErrNoMatch when no pattern matches. When multiple patterns match, the one with the most literal (least wildcard) tokens wins — most-specific match. Ties are broken by pattern string in lexicographic order so resolution is deterministic.
func (*MemoryRegistry) Snapshot ¶
func (r *MemoryRegistry) Snapshot() Snapshot
Marshal returns a Snapshot as deterministic JSON bytes.
func (*MemoryRegistry) Subscribe ¶
func (r *MemoryRegistry) Subscribe() (<-chan Change, func())
Subscribe returns a channel that receives every Change going forward and a cancel func that unsubscribes. Sends are non-blocking; slow consumers drop events. Capacity is 64; size it accordingly.
func (*MemoryRegistry) Update ¶
func (r *MemoryRegistry) Update(p ChannelPattern) error
Update replaces the entry for p.Pattern with the new definition. The previous version is appended to history. If Version is zero on input, the new version is the previous version + 1.
type Mode ¶
type Mode string
Mode controls how a Validator handles envelopes whose payload does not conform to the registered schema.
const ( // ModeStrict drops failing envelopes and returns an error to the // caller. The validator's logger receives a WARN line. ModeStrict Mode = "strict" // ModeWarn logs a WARN line and lets the envelope through. ModeWarn Mode = "warn" // ModeOff disables validation entirely. The hot path performs a // single nil check. ModeOff Mode = "off" )
type Pattern ¶
type Pattern struct {
// contains filtered or unexported fields
}
Pattern is a compiled channel-name pattern. The source grammar mixes literal segments with two kinds of wildcards:
- {name} — matches exactly one segment; the matched value is bound to the placeholder name.
- * — matches exactly one segment, anonymous.
- ** — matches one or more trailing segments greedily. Only valid as the final token. Anonymous.
Segments are split on the channel-name delimiter set: '.' AND ':'. This lets a pattern address both Parsec's canonical wire form ("public:sessions.app.id") and the upgrade-spec's shorthand ("sessions:{id}") with one matcher.
Pattern is the schema registry's matching primitive. It does not depend on the channels grammar — names that fail channels.ParseName can still match a pattern (the schema registry is intentionally more permissive than the channel grammar).
type PublishFunc ¶ added in v0.3.0
PublishFunc is the broker-side primitive the Publisher calls for every Change. Mirrors the shape of parsec.Parsec.Publish minus the PublishResult: the broadcaster does not care about delivery metadata.
type Publisher ¶ added in v0.3.0
type Publisher struct {
// contains filtered or unexported fields
}
Publisher bridges a schema Registry to a parsec channel. Each registry Change (Register / Update / Deregister) is JSON-marshaled and published on the configured channel so subscribers can hot-reload without polling the HTTP snapshot endpoint.
The wire shape of each publication is exactly schema.Change as emitted by the registry; future revisions may wrap it in an envelope, at which point the format_version of the descriptor will be bumped.
func NewPublisher ¶ added in v0.3.0
func NewPublisher(reg Registry, publish PublishFunc, opts PublisherOptions) *Publisher
NewPublisher constructs a Publisher. The Registry and PublishFunc are required; nil for either is a programmer error and panics so the misuse surfaces at boot, not in production.
func (*Publisher) Run ¶ added in v0.3.0
Run subscribes to the Registry and republishes every Change until ctx is done. Returns nil on graceful shutdown (ctx.Done received or the underlying subscriber channel closed); returns the EnsureChannel error when the pre-flight hook fails.
Marshal / publish errors are logged at WARN and the loop continues: schema broadcasts are best-effort, and a transient broker failure must not block the registry. Subscribers re-sync via the HTTP snapshot endpoint when they detect a gap.
type PublisherOptions ¶ added in v0.3.0
type PublisherOptions struct {
// Channel is the parsec channel name onto which Change events are
// pushed. When empty, ChangeChannel is used.
Channel string
// Logger receives WARN entries for marshal / publish failures.
// nil uses slog.Default.
Logger *slog.Logger
// EnsureChannel, when non-nil, is invoked once at Run start with
// the resolved channel name. Use this to OpenPublic the channel
// on the broker before publication. Returning an error aborts
// Run before the subscribe loop starts.
EnsureChannel func(channel string) error
}
PublisherOptions configures the broadcast loop.
type Registry ¶
type Registry interface {
Register(p ChannelPattern) error
Update(p ChannelPattern) error
Deregister(pattern string) error
Resolve(channel string) (ChannelPattern, map[string]string, error)
Get(pattern string) (ChannelPattern, error)
List() []ChannelPattern
Subscribe() (<-chan Change, func())
}
Registry is the in-memory schema store. Safe for concurrent use.
The interface is intentionally minimal — third-party adapters (a SQL store, a config-file loader) can satisfy it without depending on internal Parsec types.
type Snapshot ¶
type Snapshot struct {
Patterns []ChannelPattern `json:"patterns"`
At time.Time `json:"at"`
}
Snapshot is the JSON shape returned by the HTTP endpoint and the shape pushed on registry-change envelopes.
type Validator ¶
Validator pairs a Registry with a Mode. Client libraries embed one in every Subscription so each received envelope is checked according to the operator's chosen policy.
func (*Validator) Check ¶
Check runs the validation policy against env. In ModeOff the call is a no-op. In ModeWarn validation errors are logged but the function returns nil. In ModeStrict a failure returns wrapped ErrPayloadInvalid.
Envelopes whose channel has no registered pattern are accepted in all modes — the registry is intentionally an opt-in surface. Operators who want closed-world enforcement should wrap Check with a "no pattern = reject" rule outside this package.