codec

package
v0.0.0-...-21bda59 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 22, 2026 License: MIT Imports: 11 Imported by: 0

Documentation

Overview

Package codec provides a pluggable serialization system for event sourcing.

The codec system supports multiple serialization formats (JSON, Protobuf, etc.) through a common interface, with a thread-safe registry for named codec lookup.

Transport encoding adds event metadata as headers (es- prefix convention), enabling stores and message buses to route and decode events without deserializing the payload.

Index

Constants

View Source
const (
	HeaderEventID        = "es-event-id"
	HeaderEventType      = "es-event-type"
	HeaderTimestamp      = "es-timestamp"
	HeaderVersion        = "es-version"
	HeaderSchemaVersion  = "es-schema-version"
	HeaderStream         = "es-stream"
	HeaderCodec          = "es-codec"
	HeaderCorrelationID  = "es-correlation-id"
	HeaderCausationID    = "es-causation-id"
	HeaderInitiatorKind  = "es-initiator-kind"
	HeaderInitiatorID    = "es-initiator-id"
	HeaderOriginatorKind = "es-originator-kind"
	HeaderOriginatorID   = "es-originator-id"
	HeaderGlobalSeq      = "es-global-seq"
	HeaderMetaPrefix     = "es-meta-"

	// HeaderMapCapacity is the pre-allocation size for header maps.
	HeaderMapCapacity = 16
)

Transport header constants — es- prefix convention.

View Source
const CodecNameCBOR = "cbor"

CodecNameCBOR is the registered name for the CBOR codec.

View Source
const CodecNameJSON = "json"

CodecNameJSON is the registered name for the JSON codec.

View Source
const CodecNameJSONIter = "jsoniter"

CodecNameJSONIter is the registered name for the jsoniter codec.

View Source
const CodecNameProtobuf = "protobuf"

CodecNameProtobuf is the registered name for the Protocol Buffers codec.

View Source
const (
	// MaxRegisteredCodecs prevents unbounded registry growth.
	MaxRegisteredCodecs = 100
)

Bounds — TigerStyle.

Variables

View Source
var (
	// ErrCodecNotRegistered is returned when a requested codec is not in the registry.
	ErrCodecNotRegistered = errors.New("codec: codec not registered")

	// ErrCodecAlreadyRegistered is returned when registering a duplicate codec name.
	ErrCodecAlreadyRegistered = errors.New("codec: codec already registered")
)
View Source
var DefaultRegistry = func() *Registry {
	r := NewRegistry()
	_ = r.Register(NewJSONCodec())
	_ = r.Register(NewJSONIterCodec())
	_ = r.Register(NewCBORCodec())
	return r
}()

DefaultRegistry is a package-level registry pre-loaded with the JSON codec. DefaultRegistry is a package-level registry pre-loaded with JSON codecs. Includes "json" (stdlib), "jsoniter" (recommended), and "cbor" (binary).

Functions

func DecodeEvent

func DecodeEvent[E any](encoded *EncodedEvent, registry *Registry) (eskit.Event[E], error)

DecodeEvent decodes a transport-encoded event back into a typed Event. Uses the es-codec header to select the codec from the registry.

Types

type Codec

type Codec interface {
	// Marshal converts a Go value into a byte slice.
	Marshal(v any) ([]byte, error)

	// Unmarshal converts a byte slice into a Go value.
	Unmarshal(data []byte, v any) error

	// Name returns the unique name of the codec (e.g., "json", "protobuf").
	Name() string
}

Codec defines the standard interface for encoding and decoding data.

func NewCBORCodec

func NewCBORCodec() Codec

NewCBORCodec creates a CBOR codec using canonical encoding for deterministic output.

CBOR (RFC 8949) is a binary serialization format that produces compact output while supporting schema evolution through field addition/removal. Struct tags "cbor" are used with fallback to "json" tags.

Performance is typically 2-4x faster than stdlib JSON with 30-50% smaller payloads. Use this when you need binary efficiency without compile-time schema definitions.

func NewJSONCodec

func NewJSONCodec() Codec

NewJSONCodec creates a JSON codec with buffer pooling for reduced allocations.

func NewJSONIterCodec

func NewJSONIterCodec() Codec

NewJSONIterCodec creates a jsoniter-backed JSON codec.

This is the recommended default JSON codec. It produces output identical to encoding/json but deserializes 3-4x faster with fewer allocations.

Benchmark comparison (AMD EPYC, Go 1.24):

Small payload deserialize: jsoniter 436 ns/op vs stdlib 1675 ns/op (3.8x faster)
Large payload deserialize: jsoniter 7012 ns/op vs stdlib 17610 ns/op (2.5x faster)

func NewProtobufCodec

func NewProtobufCodec() Codec

NewProtobufCodec creates a codec for Protocol Buffers serialization. Types must implement proto.Message. Since we don't want a hard dependency on google.golang.org/protobuf, this codec uses interface assertions.

type EncodedEvent

type EncodedEvent struct {
	Headers map[string]string
	Data    []byte
}

EncodedEvent is an event encoded for transport with headers and payload.

func EncodeEvent

func EncodeEvent[E any](event eskit.Event[E], codec Codec) (*EncodedEvent, error)

EncodeEvent encodes an event into transport format with headers. The codec name is stored in the es-codec header so the receiver knows how to decode the payload.

type ProtobufSchemaCodec

type ProtobufSchemaCodec[E any] struct{}

ProtobufSchemaCodec is a SchemaCodec for Protocol Buffer messages.

Events must implement at least one of:

  • MarshalVT()/UnmarshalVT() (vtprotobuf — fastest)
  • Marshal()/Unmarshal() (standard proto)

Protobuf's field numbering provides natural schema evolution:

  • Adding fields: old readers ignore unknown field numbers
  • Removing fields: new readers see zero values for missing fields
  • Field numbers are the schema — never reuse a deleted field number

func (*ProtobufSchemaCodec[E]) ContentType

func (c *ProtobufSchemaCodec[E]) ContentType() string

func (*ProtobufSchemaCodec[E]) Marshal

func (c *ProtobufSchemaCodec[E]) Marshal(event E) ([]byte, error)

func (*ProtobufSchemaCodec[E]) Unmarshal

func (c *ProtobufSchemaCodec[E]) Unmarshal(data []byte) (E, error)

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry holds a thread-safe collection of named codecs.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new, empty codec registry.

func (*Registry) Get

func (r *Registry) Get(name string) (Codec, error)

Get retrieves a codec by name. Thread-safe.

func (*Registry) Names

func (r *Registry) Names() []string

Names returns all registered codec names.

func (*Registry) Register

func (r *Registry) Register(c Codec) error

Register adds a codec to the registry. Thread-safe.

type SchemaCodec

type SchemaCodec[E any] interface {
	// Marshal encodes a schema-defined event into bytes.
	Marshal(event E) ([]byte, error)

	// Unmarshal decodes bytes into a schema-defined event.
	Unmarshal(data []byte) (E, error)

	// ContentType returns the MIME type (e.g., "application/protobuf", "application/avro").
	ContentType() string
}

SchemaCodec serializes/deserializes schema-defined messages.

Unlike Codec which uses reflection, SchemaCodec works with types that have compile-time schema definitions (protobuf, avro, etc.). The type parameter E constrains which event types the codec can handle.

For store integration, use SchemaCodecAdapter to wrap a SchemaCodec as a standard Codec, enabling backward-compatible use with existing stores.

func NewProtobufSchemaCodec

func NewProtobufSchemaCodec[E any]() SchemaCodec[E]

NewProtobufSchemaCodec creates a SchemaCodec for protobuf messages.

type SchemaCodecAdapter

type SchemaCodecAdapter[E any] struct {
	// contains filtered or unexported fields
}

SchemaCodecAdapter wraps a SchemaCodec[E] as a standard Codec for use with existing stores and transport layers. This enables stores to accept either Codec or SchemaCodec without API changes.

Usage:

pbCodec := codec.NewProtobufSchemaCodec[*pb.OrderCreated]()
adapted := codec.NewSchemaCodecAdapter(pbCodec, "protobuf")
// Use adapted anywhere a Codec is expected

func NewSchemaCodecAdapter

func NewSchemaCodecAdapter[E any](schema SchemaCodec[E], name string) *SchemaCodecAdapter[E]

NewSchemaCodecAdapter creates a Codec adapter from a SchemaCodec. The name parameter is used as the codec registry name.

func (*SchemaCodecAdapter[E]) Marshal

func (a *SchemaCodecAdapter[E]) Marshal(v any) ([]byte, error)

func (*SchemaCodecAdapter[E]) Name

func (a *SchemaCodecAdapter[E]) Name() string

func (*SchemaCodecAdapter[E]) Unmarshal

func (a *SchemaCodecAdapter[E]) Unmarshal(data []byte, v any) error

type SchemaCodecTypeError

type SchemaCodecTypeError struct {
	Expected any
	Got      any
}

SchemaCodecTypeError is returned when a type assertion fails in the adapter.

func (*SchemaCodecTypeError) Error

func (e *SchemaCodecTypeError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL