protosource

package module
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2026 License: MIT Imports: 13 Imported by: 19

README

protosource

Protocol buffers only eventsourcing

Inspiration

Reasoning

  1. Microservices do not actually solve the problem, you are just as calcified as you ever were.
  2. Protocol buffers solve quite a few long term maintenance issues.
  3. I, too, hate managing anything outside of my code.
  4. Others.

Target

  • Data storage in some sort of cost efficient store (that I do not have to manage).
  • Trivial testing, even from a laptop when I am on the plane.
  • As much as possible have all adapter code (lambda, interface methods, etc...) auto generated based on protocol buffers.
  • I should only have to write the business logic for managing the commands, events, and the aggregate state.

Technologies

  1. Protocol buffers
  2. Buf
  3. Domain Modeling
  4. EventSourcing
  5. CQRS

Documentation

Index

Constants

View Source
const DefaultCompressThreshold = 300

DefaultThreshold is the default compression threshold in bytes.

Variables

View Source
var (
	// Error variables for common repository errors
	ErrAggregateNotFound = errors.New("aggregate not found")
	ErrUnhandledCommand  = errors.New("unhandled command")
	ErrUnhandledEvent    = errors.New("unhandled event")
	ErrAlreadyCreated    = errors.New("aggregate already created")
	ErrNotCreatedYet     = errors.New("aggregate not created yet")
	ErrNilCommand        = errors.New("command is nil")
	ErrEmptyAggregateId  = errors.New("aggregate id is empty")
	ErrValidationFailed  = errors.New("command validation failed")
	ErrStateNotAllowed   = errors.New("command not allowed in current aggregate state")
	ErrSkip              = errors.New("command skipped")
)

Functions

func FromMicros

func FromMicros(us int64) time.Time

FromMicros converts Unix microseconds back to a time.Time object

func IsGzipped

func IsGzipped(data []byte) bool

IsGzipped reports whether data starts with the gzip magic bytes.

func MaybeCompress

func MaybeCompress(data []byte, threshold int) ([]byte, error)

MaybeCompress gzip-compresses data if its length meets or exceeds the threshold. A threshold <= 0 disables compression (returns data as-is).

func MaybeDecompress

func MaybeDecompress(data []byte) ([]byte, error)

MaybeDecompress decompresses gzip data (detected via magic bytes). Non-gzip data is returned unchanged.

func NowMicros

func NowMicros() int64

NowMicros returns the current time in Unix microseconds

Types

type Aggregate

type Aggregate interface {
	proto.Message
	// On will be called for each event; returns err if the event could not be
	// applied due to a systemic failure (not based on business rules)
	On(event Event) error
	// GetVersion returns the current version of the aggregate
	GetVersion() int64
}

Aggregate represents a domain object in the bounded context. It encapsulates the state of the domain object at a specific point in time, typically described as the sum of all events that have occurred on this object.

Aggregates process and apply events via the On method. Events should never be rejected based on business rules, as they represent past occurrences. Errors from the On method indicate systemic failures in applying event data to the aggregate.

type AggregateStore

type AggregateStore interface {
	// SaveAggregate persists the materialized aggregate state.
	// The store owns serialization and key computation.
	//
	// This is a write-only interface — the repository does not read materialized
	// aggregates back (it always rebuilds from events via Load). The persisted
	// state is intended for external consumers (dashboards, APIs, projections)
	// that query the store directly. A LoadAggregate read path may be added in
	// the future.
	SaveAggregate(ctx context.Context, aggregate proto.Message) error
}

AggregateStore is an optional interface that stores can implement to persist the materialized aggregate state after each successful Apply. This provides a read-optimized view of the current aggregate without needing to replay events.

Repository checks for this interface via type assertion after persisting events. If the store implements it, the fully materialized aggregate is passed directly, letting the store decide how to serialize, compress, and index it. Stores backed by NoSQL databases (DynamoDB, Cosmos, Firestore) can type-assert the aggregate to AutoPKSK for GSI-indexed single-table storage via opaquedata.

type CORSConfig added in v0.0.6

type CORSConfig struct {
	AllowOrigin  string // e.g. "*" or "https://example.com"
	AllowMethods string // e.g. "GET,POST,PUT,DELETE,OPTIONS"
	AllowHeaders string // e.g. "Content-Type,X-Actor"
}

CORSConfig configures Cross-Origin Resource Sharing headers on the router. When set, Dispatch automatically handles OPTIONS preflight requests and injects CORS headers into every response.

type CommandEvaluator

type CommandEvaluator interface {
	Evaluate(aggregate Aggregate) error
}

CommandEvaluator is an optional interface that command types can implement to inspect the current aggregate state before events are emitted. This is the extension point for custom business logic such as duplicate detection, idempotency checks, or conditional no-ops.

Return nil to proceed with event emission, ErrSkip to silently skip (no events persisted, no error returned to caller), or any other error to abort the command.

type Commander

type Commander interface {
	proto.Message
	// GetId returns the ID of the aggregate to which this command should be applied
	GetId() string
	// GetActor returns the identifier of the actor responsible for this change
	GetActor() string
}

Commander represents a command that specifies a desired change to an aggregate. It includes not only the action but also all necessary data required for that change.

Commands can be rejected based on business logic rules. For example, a "Create" command would be rejected if the aggregate already exists.

type Event

type Event interface {
	proto.Message
	// GetId returns the ID of the aggregate referenced by this event
	GetId() string

	// GetVersion returns the version number of this event
	GetVersion() int64

	// GetAt indicates when the event occurred in Unix microseconds (timestamp)
	GetAt() int64

	// GetActor returns the identifier of the actor responsible for this change
	GetActor() string
}

Event represents a change that has occurred in the past. It should be described using past tense to indicate that it's an event that already happened.

Events encapsulate the details of changes, including the aggregate ID, version number, timestamp, and responsible actor.

type EventEmitter

type EventEmitter interface {
	EmitEvents(aggregate Aggregate) []Event
}

EventEmitter is implemented by generated command types that can produce events. The aggregate is passed in so that EmitEvents can read the current version and, if the aggregate implements Snapshoter, automatically append snapshots.

type EventTTLer

type EventTTLer interface {
	EventTTLSeconds() int64
}

EventTTLer is an optional interface that aggregates implement when they have an event_ttl_seconds annotation. The Repository uses this to stamp records with a TTL before passing them to the Store.

type HandlerFunc

type HandlerFunc func(ctx context.Context, request Request) Response

HandlerFunc is the signature for provider-agnostic request handlers.

type Logger

type Logger interface {
	Warn(msg string, args ...any)
}

Logger is the interface for repository diagnostic logging. Materialization and projection failures are logged here rather than returned to the caller (events are the source of truth; materialization is best-effort).

type Option

type Option func(*Repository)

Option provides functional configuration options for the Repository

func WithCompression

func WithCompression(threshold int) Option

func WithLogger

func WithLogger(logger Logger) Option

WithCompression enables gzip compression for event record data stored by the repository. Record data at or above the threshold (in bytes) is compressed before writing to the store. Data below the threshold is stored uncompressed. Decompression is automatic on read (detected via gzip magic bytes), so compressed and uncompressed data can coexist safely.

Note: this only affects event records (Save/Load). Materialized aggregate state is passed directly to AggregateStore, which owns its own serialization.

Use 300 as a sensible starting threshold. Pass 0 or any negative value to disable compression (the default). WithLogger sets the logger used for diagnostic messages (materialization and projection failures). By default, no logging occurs.

type PostApplyHook

type PostApplyHook interface {
	AfterOn()
}

PostApplyHook is an optional interface that aggregates can implement to compute derived fields after events are applied. Invocation points:

  • Load (event replay): called once after all events are replayed.
  • Apply (materialization): called once after all new events are applied, before the aggregate is persisted.
  • Generated EmitEvents: called on the cloned aggregate before snapshotting, only when a snapshot will actually be emitted (version on interval).

This is the extension point for computing values from collections (totals, counts, etc.).

type Projector

type Projector interface {
	Projections() []proto.Message
}

Projector is an optional interface that aggregates implement when they have projection messages defined in their proto file. The generated Projections() method returns all projection views derived from the current aggregate state. The Repository persists each projection via SaveAggregate after materializing the aggregate itself. Like materialization, projection is best-effort.

Each returned proto.Message must implement opaquedata.AutoPKSK (which is guaranteed by the code generator for projection message types).

type ProtoValidater

type ProtoValidater interface {
	ProtoValidate() error
}

ProtoValidater validates a command using buf/protovalidate annotations. Generated automatically on every command message; calls protovalidate.Validate to enforce field constraints declared in the proto schema.

type Repo

type Repo interface {
	// Apply processes a command and returns the current version of the aggregate
	Apply(ctx context.Context, command Commander) (int64, error)
	// Load retrieves an aggregate by its ID from storage
	Load(ctx context.Context, aggregateID string) (Aggregate, error)
	// History returns the full event history for an aggregate
	History(ctx context.Context, aggregateID string) (*historyv1.History, error)
}

Repo represents the interface for a repository that can handle both commands and queries. It provides methods to apply commands and load aggregates.

type Repository

type Repository struct {
	// contains filtered or unexported fields
}

Repository provides the primary abstraction for saving and loading events. It uses a store to persist data and a serializer to convert between events and records.

func New

func New(prototype Aggregate, store Store, serializer Serializer, opts ...Option) *Repository

New creates a new Repository with the given prototype, store, and serializer. All three parameters are required; passing nil for any of them panics immediately with a descriptive message rather than deferring to an opaque nil-pointer dereference at call time.

func (*Repository) Apply

func (r *Repository) Apply(ctx context.Context, command Commander) (int64, error)

Apply processes the given command and returns the current version of the aggregate. It runs the command through the generated validation pipeline:

  1. VersionValidator — lifecycle gate (create requires version==0, mutation requires version>0)
  2. ProtoValidater — annotation-driven field and cross-field constraints via buf/protovalidate
  3. StateGuard — gate command on current aggregate state (state-machine transitions)
  4. EventEmitter check — verify command can emit events (fail fast before custom logic)
  5. CommandEvaluator — optional custom business logic (duplicate detection, idempotency, conditional no-ops)
  6. EventEmitter — emit events
  7. Persist — save events to store

func (*Repository) History

func (r *Repository) History(ctx context.Context, aggregateID string) (*historyv1.History, error)

History returns the full event history for an aggregate, bypassing any snapshot tail optimization. This is intended for query endpoints that need the complete stream. Record data is transparently decompressed when compression is enabled.

func (*Repository) Load

func (r *Repository) Load(ctx context.Context, aggregateId string) (Aggregate, error)

Load retrieves the aggregate with the specified ID from storage. It loads the event history and applies each event to rebuild the aggregate state.

func (*Repository) Save

func (r *Repository) Save(ctx context.Context, events ...Event) error

Save persists the given events into the underlying Store. It serializes each event and saves them under the aggregate ID. When compression is enabled, record data at or above the threshold is gzip-compressed before writing.

type Request

type Request struct {
	Body            string
	PathParameters  map[string]string
	QueryParameters map[string]string
	Headers         map[string]string
	Actor           string
}

Request is a provider-agnostic representation of an incoming HTTP request. Cloud-specific adapters convert their native request types into this struct before calling generated handlers. The Actor field is pre-populated by the adapter's ActorExtractor — generated handlers never parse auth context.

type Response

type Response struct {
	StatusCode int
	Body       string
	Headers    map[string]string
}

Response is a provider-agnostic representation of an HTTP response. Cloud-specific adapters convert this back to their native response type.

type RouteRegistrar

type RouteRegistrar interface {
	RegisterRoutes(router *Router)
}

RouteRegistrar is implemented by types that register routes on a Router. Generated Handler types satisfy this interface.

type Router

type Router struct {
	// contains filtered or unexported fields
}

Router maps HTTP method + path patterns to HandlerFunc handlers. Path patterns support {param} segments for parameter extraction.

func NewRouter

func NewRouter(registrars ...RouteRegistrar) *Router

NewRouter creates a new Router. If registrars are provided, their routes are registered immediately.

func (*Router) Dispatch

func (r *Router) Dispatch(ctx context.Context, method, path string, request Request) Response

Dispatch finds a matching route and invokes its handler. Path parameters extracted from the pattern are merged into request.PathParameters. Returns 404 for no path match, 405 for path match with wrong method.

func (*Router) Handle

func (r *Router) Handle(method, pattern string, handler HandlerFunc)

Handle registers a handler for the given HTTP method and path pattern. Patterns use {name} for path parameters (e.g., "example/app/sample/v1/{id}").

func (*Router) SetCORS added in v0.0.6

func (r *Router) SetCORS(cfg CORSConfig)

SetCORS enables CORS handling on the router. When set, Dispatch responds to OPTIONS requests with a 204 preflight response and adds CORS headers to all responses.

type Serializer

type Serializer interface {
	// MarshalEvent converts an Event to a Record
	MarshalEvent(event Event) (*recordv1.Record, error)
	MarshalEventAsData(event Event) ([]byte, error)

	// UnmarshalEvent converts a Record back into an Event
	UnmarshalEvent(record *recordv1.Record) (Event, error)
	UnmarshalEventFromData(data []byte) (Event, error)
}

Serializer provides methods to convert between Events and Records. This includes marshaling events to records or data bytes, and unmarshaling records or data bytes back into events.

type SnapshotTailStore

type SnapshotTailStore interface {
	// LoadTail returns the last n events for the given aggregate, ordered by
	// version ascending. If the aggregate has fewer than n events, all events
	// are returned. If the aggregate has no events, an empty History is returned.
	LoadTail(ctx context.Context, aggregateID string, n int) (*historyv1.History, error)
}

SnapshotTailStore is an optional interface that stores can implement to support efficient partial event loading. When the aggregate has a snapshot interval, the repository calls LoadTail to retrieve only the last N events instead of the entire history. This is a single-call operation that every common backend (SQL, DynamoDB, Firestore, Cosmos DB, BoltDB) can implement natively and efficiently.

type Snapshoter

type Snapshoter interface {
	Snapshot(version int64) Event
	SnapshotInterval() int32
}

Snapshoter interface enables aggregates to create snapshots at specific versions. A snapshot represents the state of an aggregate at a given point in time. SnapshotInterval returns the frequency (in events) at which snapshots are taken. A return value of 0 means snapshots are disabled.

type StateGuard added in v0.1.1

type StateGuard interface {
	GuardState(aggregate Aggregate) error
}

StateGuard gates a command on the aggregate's current state. This is a state-machine precondition, not an authorization (AAA) hook: identity and permissions belong in the transport layer or in CommandEvaluator. A StateGuard asks only "is this command legal given the aggregate's current state?" — typically driven by the allowed_states proto annotation, which generates GuardState automatically. Hand-written GuardState methods are also supported for guards that need to inspect fields beyond State.

type Store

type Store interface {
	// Save the provided serialized records to the store
	Save(ctx context.Context, aggregateID string, records ...*recordv1.Record) error

	// Load the history of events for this aggregateId
	Load(ctx context.Context, aggregateId string) (*historyv1.History, error)
}

Store provides an abstraction for the Repository to save and load data. It defines methods for saving serialized records and loading event histories.

type VersionValidator

type VersionValidator interface {
	ValidateVersion(version int64) error
}

VersionValidator is implemented by generated command types with lifecycle constraints. CREATION commands require version == 0; MUTATION commands require version > 0.

Directories

Path Synopsis
adapters
awslambda
Package awslambda provides an adapter that bridges protosource's provider-agnostic handlers to AWS API Gateway Lambda proxy integration.
Package awslambda provides an adapter that bridges protosource's provider-agnostic handlers to AWS API Gateway Lambda proxy integration.
httpstandard
Package httpstandard provides an adapter that bridges protosource's provider-agnostic handlers to standard net/http.
Package httpstandard provides an adapter that bridges protosource's provider-agnostic handlers to standard net/http.
Package authz defines the Authorizer contract that every generated command handler invokes before its pipeline runs.
Package authz defines the Authorizer contract that every generated command handler invokes before its pipeline runs.
allowall
Package allowall provides a no-op authz.Authorizer that permits every request.
Package allowall provides a no-op authz.Authorizer that permits every request.
aws
dynamoclient
Package dynamoclient defines a shared DynamoDB client interface used across the protosource framework.
Package dynamoclient defines a shared DynamoDB client interface used across the protosource framework.
cmd
sample command
testcli command
testdynamo command
example
app/order/v1/ordermgr command
Code generated by protoc-gen-protosource.
Code generated by protoc-gen-protosource.
app/sample/v1/samplemgr command
Code generated by protoc-gen-protosource.
Code generated by protoc-gen-protosource.
app/samplenosnapshot/v1/samplemgr command
Code generated by protoc-gen-protosource.
Code generated by protoc-gen-protosource.
app/test/v1/testmgr command
Code generated by protoc-gen-protosource.
Code generated by protoc-gen-protosource.
history
v1
Package httpclient provides a generic HTTP client for protosource aggregates with protobuf-first content negotiation.
Package httpclient provides a generic HTTP client for protosource aggregates with protobuf-first content negotiation.
v1
options
v1
record
v1
response
v1
serializers
stores

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL