schema

package
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 16, 2026 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package schema is the Parsec schema registry. Channel patterns are registered at application startup; the registry resolves concrete channel names back to the pattern that owns them and exposes the payload schemas (per aspect) that envelopes published on those channels must conform to.

The package ships two things:

  • A pure-Go Registry library used in-process by publishers and subscribers.
  • An HTTP handler at /parsec/schemas (mounted by internal/server) that serves a JSON snapshot of the registry so out-of-process clients can cache schemas locally on startup.

Changes to the registry (Register / Update / Deregister) are emitted on a parsec channel — "public:parsec.registry.schemas" by default — so subscribers can hot-reload without polling the HTTP endpoint.

Index

Constants

View Source
const ChangeChannel = "public:parsec.registry.schemas"

ChangeChannel is the default broadcast channel for registry changes. Apps may override by passing a different channel name to the Publisher when wiring the HTTP surface; the wire shape stays the same.

Variables

View Source
var (
	ErrPatternConflict   = errors.New("schema: pattern conflicts with existing registration")
	ErrPatternNotFound   = errors.New("schema: pattern not found")
	ErrNoMatch           = errors.New("schema: no pattern matches channel")
	ErrAspectNotFound    = errors.New("schema: aspect not registered for pattern")
	ErrUnsupportedAspect = errors.New("schema: unsupported aspect")
)

Sentinel errors.

View Source
var ErrPayloadInvalid = errors.New("schema: payload does not conform")

ErrPayloadInvalid is returned by Check in ModeStrict when the payload fails schema validation.

Functions

func Handler

func Handler(r Registry) http.Handler

Handler returns an http.Handler that serves registry snapshots and individual pattern lookups.

Routes:

GET  <prefix>            — full Snapshot (all current patterns)
GET  <prefix>?channel=X  — resolve channel X to its owning pattern
GET  <prefix>?pattern=P  — fetch the named pattern (and full history
                           when &history=true)

The handler does NOT enforce auth; mount it behind the bearer middleware when restricting access (the upgrade spec calls /parsec/schemas a public discovery surface, so default deployments expose it unguarded).

func ResolveAspect

func ResolveAspect(r Registry, channel, aspect string) (*JSONSchema, ChannelPattern, map[string]string, error)

ResolveAspect is a convenience: resolve channel + look up the named aspect's payload schema. Returns ErrAspectNotFound if the aspect is not declared on the matched pattern. The bindings map captures any {name} placeholder values.

Types

type ACLPolicy

type ACLPolicy struct {
	PublishRoles   []string `json:"publish_roles,omitempty"`
	SubscribeRoles []string `json:"subscribe_roles,omitempty"`
	Notes          string   `json:"notes,omitempty"`
}

ACLPolicy is metadata describing which roles or scopes may subscribe / publish to channels matching the pattern. The schema registry does NOT enforce ACL — enforcement lives in the auth package + token broker. This struct is documentation that travels with the pattern so client libraries and the admin UI can render it.

type Aspect

type Aspect struct {
	Name          string      `json:"name"`
	Description   string      `json:"description,omitempty"`
	PayloadSchema *JSONSchema `json:"payload_schema,omitempty"`
	Required      bool        `json:"required,omitempty"`
}

Aspect is one named payload contract within a channel pattern.

type Change

type Change struct {
	Kind    ChangeKind     `json:"kind"`
	Pattern string         `json:"pattern"`
	Schema  ChannelPattern `json:"schema,omitzero"`
	At      time.Time      `json:"at"`
}

Change is one registry update broadcast to subscribers.

type ChangeKind

type ChangeKind string

ChangeKind discriminates registry change events.

const (
	ChangeRegistered   ChangeKind = "registered"
	ChangeUpdated      ChangeKind = "updated"
	ChangeDeregistered ChangeKind = "deregistered"
)

type ChannelPattern

type ChannelPattern struct {
	Pattern     string            `json:"pattern"`
	Description string            `json:"description,omitempty"`
	Version     int               `json:"version"`
	Aspects     map[string]Aspect `json:"aspects"`
	ACL         ACLPolicy         `json:"acl,omitzero"`
	// contains filtered or unexported fields
}

ChannelPattern declares the schema contract for a family of channels. The Pattern is matched against incoming channel names; on a match, the per-aspect PayloadSchema applies.

type JSONSchema

type JSONSchema struct {
	Type                 string                 `json:"type,omitempty"`
	Required             []string               `json:"required,omitempty"`
	Properties           map[string]*JSONSchema `json:"properties,omitempty"`
	Items                *JSONSchema            `json:"items,omitempty"`
	AdditionalProperties *bool                  `json:"additionalProperties,omitempty"`
	Enum                 []any                  `json:"enum,omitempty"`
	Description          string                 `json:"description,omitempty"`
}

JSONSchema is a lightweight subset of JSON Schema (draft 2020-12) sufficient for envelope payload validation in Parsec. It supports:

  • type: string | number | integer | boolean | object | array | null
  • required: list of object property names
  • properties: per-property nested JSONSchema
  • items: schema for array elements
  • additionalProperties: whether unknown object fields are allowed
  • enum: list of allowed literal values

Anything richer (oneOf, allOf, $ref, regex patterns) is intentionally out of scope — Parsec subscribers should treat unknown fields as opaque and decode strictly via Go/TS code generation when stronger guarantees are required. A full JSON Schema validator can be substituted later without changing the Registry API.

func (*JSONSchema) Validate

func (s *JSONSchema) Validate(v any) error

Validate reports whether v conforms to s. The error is nil on success and a human-readable explanation otherwise. v is typically the result of json.Unmarshal on the envelope's payload.

func (*JSONSchema) ValidateBytes

func (s *JSONSchema) ValidateBytes(raw []byte) error

ValidateBytes is a convenience that unmarshals raw and runs Validate.

type MemoryRegistry

type MemoryRegistry struct {
	// contains filtered or unexported fields
}

MemoryRegistry is the default in-process Registry. It holds patterns in a map keyed by pattern source string and a per-pattern history of prior versions for schema-evolution support.

func NewMemoryRegistry

func NewMemoryRegistry() *MemoryRegistry

NewMemoryRegistry constructs an empty registry.

func (*MemoryRegistry) Deregister

func (r *MemoryRegistry) Deregister(pattern string) error

Deregister removes pattern. Returns ErrPatternNotFound if it does not exist. Version history is dropped.

func (*MemoryRegistry) Get

func (r *MemoryRegistry) Get(pattern string) (ChannelPattern, error)

Get returns the current registration for pattern.

func (*MemoryRegistry) History

func (r *MemoryRegistry) History(pattern string) ([]ChannelPattern, error)

History returns every registered version of pattern, oldest first. Useful for resolving an explicit schema_ref carrying a version suffix.

func (*MemoryRegistry) List

func (r *MemoryRegistry) List() []ChannelPattern

List returns every current pattern in lexicographic order.

func (*MemoryRegistry) LoadSnapshot

func (r *MemoryRegistry) LoadSnapshot(b []byte) error

LoadSnapshot bulk-loads patterns from b (the wire form Snapshot produces). Existing patterns are replaced. Useful for warm-restart scenarios where an on-disk snapshot precedes any in-process registrations.

func (*MemoryRegistry) Register

func (r *MemoryRegistry) Register(p ChannelPattern) error

Register adds p. The pattern must compile; duplicate patterns return ErrPatternConflict. The first registration is version 1 unless the caller set Version explicitly.

func (*MemoryRegistry) Resolve

func (r *MemoryRegistry) Resolve(channel string) (ChannelPattern, map[string]string, error)

Resolve finds the pattern owning channel. The returned bindings map reflects any {name} placeholder values extracted during match. Returns ErrNoMatch when no pattern matches. When multiple patterns match, the one with the most literal (least wildcard) tokens wins — most-specific match. Ties are broken by pattern string in lexicographic order so resolution is deterministic.

func (*MemoryRegistry) Snapshot

func (r *MemoryRegistry) Snapshot() Snapshot

Marshal returns a Snapshot as deterministic JSON bytes.

func (*MemoryRegistry) Subscribe

func (r *MemoryRegistry) Subscribe() (<-chan Change, func())

Subscribe returns a channel that receives every Change going forward and a cancel func that unsubscribes. Sends are non-blocking; slow consumers drop events. Capacity is 64; size it accordingly.

func (*MemoryRegistry) Update

func (r *MemoryRegistry) Update(p ChannelPattern) error

Update replaces the entry for p.Pattern with the new definition. The previous version is appended to history. If Version is zero on input, the new version is the previous version + 1.

type Mode

type Mode string

Mode controls how a Validator handles envelopes whose payload does not conform to the registered schema.

const (
	// ModeStrict drops failing envelopes and returns an error to the
	// caller. The validator's logger receives a WARN line.
	ModeStrict Mode = "strict"
	// ModeWarn logs a WARN line and lets the envelope through.
	ModeWarn Mode = "warn"
	// ModeOff disables validation entirely. The hot path performs a
	// single nil check.
	ModeOff Mode = "off"
)

type Pattern

type Pattern struct {
	// contains filtered or unexported fields
}

Pattern is a compiled channel-name pattern. The source grammar mixes literal segments with two kinds of wildcards:

  • {name} — matches exactly one segment; the matched value is bound to the placeholder name.
  • * — matches exactly one segment, anonymous.
  • ** — matches one or more trailing segments greedily. Only valid as the final token. Anonymous.

Segments are split on the channel-name delimiter set: '.' AND ':'. This lets a pattern address both Parsec's canonical wire form ("public:sessions.app.id") and the upgrade-spec's shorthand ("sessions:{id}") with one matcher.

Pattern is the schema registry's matching primitive. It does not depend on the channels grammar — names that fail channels.ParseName can still match a pattern (the schema registry is intentionally more permissive than the channel grammar).

func ParsePattern

func ParsePattern(src string) (Pattern, error)

ParsePattern compiles src.

func (Pattern) Match

func (p Pattern) Match(channel string) (map[string]string, bool)

Match tests channel against p. On a match, the returned map binds any {name} placeholders to the segments they consumed. Returns (nil, false) when the pattern does not match.

func (Pattern) Raw

func (p Pattern) Raw() string

Raw returns the source string the pattern was compiled from.

type PublishFunc added in v0.3.0

type PublishFunc func(ctx context.Context, channel string, data []byte) error

PublishFunc is the broker-side primitive the Publisher calls for every Change. Mirrors the shape of parsec.Parsec.Publish minus the PublishResult: the broadcaster does not care about delivery metadata.

type Publisher added in v0.3.0

type Publisher struct {
	// contains filtered or unexported fields
}

Publisher bridges a schema Registry to a parsec channel. Each registry Change (Register / Update / Deregister) is JSON-marshaled and published on the configured channel so subscribers can hot-reload without polling the HTTP snapshot endpoint.

The wire shape of each publication is exactly schema.Change as emitted by the registry; future revisions may wrap it in an envelope, at which point the format_version of the descriptor will be bumped.

func NewPublisher added in v0.3.0

func NewPublisher(reg Registry, publish PublishFunc, opts PublisherOptions) *Publisher

NewPublisher constructs a Publisher. The Registry and PublishFunc are required; nil for either is a programmer error and panics so the misuse surfaces at boot, not in production.

func (*Publisher) Channel added in v0.3.0

func (p *Publisher) Channel() string

Channel returns the resolved broadcast channel name.

func (*Publisher) Run added in v0.3.0

func (p *Publisher) Run(ctx context.Context) error

Run subscribes to the Registry and republishes every Change until ctx is done. Returns nil on graceful shutdown (ctx.Done received or the underlying subscriber channel closed); returns the EnsureChannel error when the pre-flight hook fails.

Marshal / publish errors are logged at WARN and the loop continues: schema broadcasts are best-effort, and a transient broker failure must not block the registry. Subscribers re-sync via the HTTP snapshot endpoint when they detect a gap.

type PublisherOptions added in v0.3.0

type PublisherOptions struct {
	// Channel is the parsec channel name onto which Change events are
	// pushed. When empty, ChangeChannel is used.
	Channel string

	// Logger receives WARN entries for marshal / publish failures.
	// nil uses slog.Default.
	Logger *slog.Logger

	// EnsureChannel, when non-nil, is invoked once at Run start with
	// the resolved channel name. Use this to OpenPublic the channel
	// on the broker before publication. Returning an error aborts
	// Run before the subscribe loop starts.
	EnsureChannel func(channel string) error
}

PublisherOptions configures the broadcast loop.

type Registry

type Registry interface {
	Register(p ChannelPattern) error
	Update(p ChannelPattern) error
	Deregister(pattern string) error
	Resolve(channel string) (ChannelPattern, map[string]string, error)
	Get(pattern string) (ChannelPattern, error)
	List() []ChannelPattern
	Subscribe() (<-chan Change, func())
}

Registry is the in-memory schema store. Safe for concurrent use.

The interface is intentionally minimal — third-party adapters (a SQL store, a config-file loader) can satisfy it without depending on internal Parsec types.

type Snapshot

type Snapshot struct {
	Patterns []ChannelPattern `json:"patterns"`
	At       time.Time        `json:"at"`
}

Snapshot is the JSON shape returned by the HTTP endpoint and the shape pushed on registry-change envelopes.

type Validator

type Validator struct {
	Registry Registry
	Mode     Mode
	Logger   *slog.Logger
}

Validator pairs a Registry with a Mode. Client libraries embed one in every Subscription so each received envelope is checked according to the operator's chosen policy.

func (*Validator) Check

func (v *Validator) Check(env envelope.Envelope) error

Check runs the validation policy against env. In ModeOff the call is a no-op. In ModeWarn validation errors are logged but the function returns nil. In ModeStrict a failure returns wrapped ErrPayloadInvalid.

Envelopes whose channel has no registered pattern are accepted in all modes — the registry is intentionally an opt-in surface. Operators who want closed-world enforcement should wrap Check with a "no pattern = reject" rule outside this package.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL