Documentation
¶
Index ¶
- Constants
- Variables
- func FromMicros(us int64) time.Time
- func IsGzipped(data []byte) bool
- func MaybeCompress(data []byte, threshold int) ([]byte, error)
- func MaybeDecompress(data []byte) ([]byte, error)
- func NowMicros() int64
- type Aggregate
- type AggregateStore
- type CORSConfig
- type CommandEvaluator
- type Commander
- type Event
- type EventEmitter
- type EventTTLer
- type HandlerFunc
- type Logger
- type Option
- type PostApplyHook
- type Projector
- type ProtoValidater
- type Repo
- type Repository
- func (r *Repository) Apply(ctx context.Context, command Commander) (int64, error)
- func (r *Repository) History(ctx context.Context, aggregateID string) (*historyv1.History, error)
- func (r *Repository) Load(ctx context.Context, aggregateId string) (Aggregate, error)
- func (r *Repository) Save(ctx context.Context, events ...Event) error
- type Request
- type Response
- type RouteRegistrar
- type Router
- type Serializer
- type SnapshotTailStore
- type Snapshoter
- type StateGuard
- type Store
- type VersionValidator
Constants ¶
const DefaultCompressThreshold = 300
DefaultThreshold is the default compression threshold in bytes.
Variables ¶
var ( // Error variables for common repository errors ErrAggregateNotFound = errors.New("aggregate not found") ErrUnhandledCommand = errors.New("unhandled command") ErrUnhandledEvent = errors.New("unhandled event") ErrAlreadyCreated = errors.New("aggregate already created") ErrNotCreatedYet = errors.New("aggregate not created yet") ErrNilCommand = errors.New("command is nil") ErrEmptyAggregateId = errors.New("aggregate id is empty") ErrValidationFailed = errors.New("command validation failed") ErrStateNotAllowed = errors.New("command not allowed in current aggregate state") ErrSkip = errors.New("command skipped") )
Functions ¶
func FromMicros ¶
FromMicros converts Unix microseconds back to a time.Time object
func MaybeCompress ¶
MaybeCompress gzip-compresses data if its length meets or exceeds the threshold. A threshold <= 0 disables compression (returns data as-is).
func MaybeDecompress ¶
MaybeDecompress decompresses gzip data (detected via magic bytes). Non-gzip data is returned unchanged.
Types ¶
type Aggregate ¶
type Aggregate interface {
proto.Message
// On will be called for each event; returns err if the event could not be
// applied due to a systemic failure (not based on business rules)
On(event Event) error
// GetVersion returns the current version of the aggregate
GetVersion() int64
}
Aggregate represents a domain object in the bounded context. It encapsulates the state of the domain object at a specific point in time, typically described as the sum of all events that have occurred on this object.
Aggregates process and apply events via the On method. Events should never be rejected based on business rules, as they represent past occurrences. Errors from the On method indicate systemic failures in applying event data to the aggregate.
type AggregateStore ¶
type AggregateStore interface {
// SaveAggregate persists the materialized aggregate state.
// The store owns serialization and key computation.
//
// This is a write-only interface — the repository does not read materialized
// aggregates back (it always rebuilds from events via Load). The persisted
// state is intended for external consumers (dashboards, APIs, projections)
// that query the store directly. A LoadAggregate read path may be added in
// the future.
SaveAggregate(ctx context.Context, aggregate proto.Message) error
}
AggregateStore is an optional interface that stores can implement to persist the materialized aggregate state after each successful Apply. This provides a read-optimized view of the current aggregate without needing to replay events.
Repository checks for this interface via type assertion after persisting events. If the store implements it, the fully materialized aggregate is passed directly, letting the store decide how to serialize, compress, and index it. Stores backed by NoSQL databases (DynamoDB, Cosmos, Firestore) can type-assert the aggregate to AutoPKSK for GSI-indexed single-table storage via opaquedata.
type CORSConfig ¶ added in v0.0.6
type CORSConfig struct {
AllowOrigin string // e.g. "*" or "https://example.com"
AllowMethods string // e.g. "GET,POST,PUT,DELETE,OPTIONS"
AllowHeaders string // e.g. "Content-Type,X-Actor"
}
CORSConfig configures Cross-Origin Resource Sharing headers on the router. When set, Dispatch automatically handles OPTIONS preflight requests and injects CORS headers into every response.
type CommandEvaluator ¶
CommandEvaluator is an optional interface that command types can implement to inspect the current aggregate state before events are emitted. This is the extension point for custom business logic such as duplicate detection, idempotency checks, or conditional no-ops.
Return nil to proceed with event emission, ErrSkip to silently skip (no events persisted, no error returned to caller), or any other error to abort the command.
type Commander ¶
type Commander interface {
proto.Message
// GetId returns the ID of the aggregate to which this command should be applied
GetId() string
// GetActor returns the identifier of the actor responsible for this change
GetActor() string
}
Commander represents a command that specifies a desired change to an aggregate. It includes not only the action but also all necessary data required for that change.
Commands can be rejected based on business logic rules. For example, a "Create" command would be rejected if the aggregate already exists.
type Event ¶
type Event interface {
proto.Message
// GetId returns the ID of the aggregate referenced by this event
GetId() string
// GetVersion returns the version number of this event
GetVersion() int64
// GetAt indicates when the event occurred in Unix microseconds (timestamp)
GetAt() int64
// GetActor returns the identifier of the actor responsible for this change
GetActor() string
}
Event represents a change that has occurred in the past. It should be described using past tense to indicate that it's an event that already happened.
Events encapsulate the details of changes, including the aggregate ID, version number, timestamp, and responsible actor.
type EventEmitter ¶
EventEmitter is implemented by generated command types that can produce events. The aggregate is passed in so that EmitEvents can read the current version and, if the aggregate implements Snapshoter, automatically append snapshots.
type EventTTLer ¶
type EventTTLer interface {
EventTTLSeconds() int64
}
EventTTLer is an optional interface that aggregates implement when they have an event_ttl_seconds annotation. The Repository uses this to stamp records with a TTL before passing them to the Store.
type HandlerFunc ¶
HandlerFunc is the signature for provider-agnostic request handlers.
type Logger ¶
Logger is the interface for repository diagnostic logging. Materialization and projection failures are logged here rather than returned to the caller (events are the source of truth; materialization is best-effort).
type Option ¶
type Option func(*Repository)
Option provides functional configuration options for the Repository
func WithCompression ¶
func WithLogger ¶
WithCompression enables gzip compression for event record data stored by the repository. Record data at or above the threshold (in bytes) is compressed before writing to the store. Data below the threshold is stored uncompressed. Decompression is automatic on read (detected via gzip magic bytes), so compressed and uncompressed data can coexist safely.
Note: this only affects event records (Save/Load). Materialized aggregate state is passed directly to AggregateStore, which owns its own serialization.
Use 300 as a sensible starting threshold. Pass 0 or any negative value to disable compression (the default). WithLogger sets the logger used for diagnostic messages (materialization and projection failures). By default, no logging occurs.
type PostApplyHook ¶
type PostApplyHook interface {
AfterOn()
}
PostApplyHook is an optional interface that aggregates can implement to compute derived fields after events are applied. Invocation points:
- Load (event replay): called once after all events are replayed.
- Apply (materialization): called once after all new events are applied, before the aggregate is persisted.
- Generated EmitEvents: called on the cloned aggregate before snapshotting, only when a snapshot will actually be emitted (version on interval).
This is the extension point for computing values from collections (totals, counts, etc.).
type Projector ¶
Projector is an optional interface that aggregates implement when they have projection messages defined in their proto file. The generated Projections() method returns all projection views derived from the current aggregate state. The Repository persists each projection via SaveAggregate after materializing the aggregate itself. Like materialization, projection is best-effort.
Each returned proto.Message must implement opaquedata.AutoPKSK (which is guaranteed by the code generator for projection message types).
type ProtoValidater ¶
type ProtoValidater interface {
ProtoValidate() error
}
ProtoValidater validates a command using buf/protovalidate annotations. Generated automatically on every command message; calls protovalidate.Validate to enforce field constraints declared in the proto schema.
type Repo ¶
type Repo interface {
// Apply processes a command and returns the current version of the aggregate
Apply(ctx context.Context, command Commander) (int64, error)
// Load retrieves an aggregate by its ID from storage
Load(ctx context.Context, aggregateID string) (Aggregate, error)
// History returns the full event history for an aggregate
History(ctx context.Context, aggregateID string) (*historyv1.History, error)
}
Repo represents the interface for a repository that can handle both commands and queries. It provides methods to apply commands and load aggregates.
type Repository ¶
type Repository struct {
// contains filtered or unexported fields
}
Repository provides the primary abstraction for saving and loading events. It uses a store to persist data and a serializer to convert between events and records.
func New ¶
func New(prototype Aggregate, store Store, serializer Serializer, opts ...Option) *Repository
New creates a new Repository with the given prototype, store, and serializer. All three parameters are required; passing nil for any of them panics immediately with a descriptive message rather than deferring to an opaque nil-pointer dereference at call time.
func (*Repository) Apply ¶
Apply processes the given command and returns the current version of the aggregate. It runs the command through the generated validation pipeline:
- VersionValidator — lifecycle gate (create requires version==0, mutation requires version>0)
- ProtoValidater — annotation-driven field and cross-field constraints via buf/protovalidate
- StateGuard — gate command on current aggregate state (state-machine transitions)
- EventEmitter check — verify command can emit events (fail fast before custom logic)
- CommandEvaluator — optional custom business logic (duplicate detection, idempotency, conditional no-ops)
- EventEmitter — emit events
- Persist — save events to store
func (*Repository) History ¶
History returns the full event history for an aggregate, bypassing any snapshot tail optimization. This is intended for query endpoints that need the complete stream. Record data is transparently decompressed when compression is enabled.
func (*Repository) Load ¶
Load retrieves the aggregate with the specified ID from storage. It loads the event history and applies each event to rebuild the aggregate state.
func (*Repository) Save ¶
func (r *Repository) Save(ctx context.Context, events ...Event) error
Save persists the given events into the underlying Store. It serializes each event and saves them under the aggregate ID. When compression is enabled, record data at or above the threshold is gzip-compressed before writing.
type Request ¶
type Request struct {
Body string
PathParameters map[string]string
QueryParameters map[string]string
Headers map[string]string
Actor string
}
Request is a provider-agnostic representation of an incoming HTTP request. Cloud-specific adapters convert their native request types into this struct before calling generated handlers. The Actor field is pre-populated by the adapter's ActorExtractor — generated handlers never parse auth context.
type Response ¶
Response is a provider-agnostic representation of an HTTP response. Cloud-specific adapters convert this back to their native response type.
type RouteRegistrar ¶
type RouteRegistrar interface {
RegisterRoutes(router *Router)
}
RouteRegistrar is implemented by types that register routes on a Router. Generated Handler types satisfy this interface.
type Router ¶
type Router struct {
// contains filtered or unexported fields
}
Router maps HTTP method + path patterns to HandlerFunc handlers. Path patterns support {param} segments for parameter extraction.
func NewRouter ¶
func NewRouter(registrars ...RouteRegistrar) *Router
NewRouter creates a new Router. If registrars are provided, their routes are registered immediately.
func (*Router) Dispatch ¶
Dispatch finds a matching route and invokes its handler. Path parameters extracted from the pattern are merged into request.PathParameters. Returns 404 for no path match, 405 for path match with wrong method.
func (*Router) Handle ¶
func (r *Router) Handle(method, pattern string, handler HandlerFunc)
Handle registers a handler for the given HTTP method and path pattern. Patterns use {name} for path parameters (e.g., "example/app/sample/v1/{id}").
func (*Router) SetCORS ¶ added in v0.0.6
func (r *Router) SetCORS(cfg CORSConfig)
SetCORS enables CORS handling on the router. When set, Dispatch responds to OPTIONS requests with a 204 preflight response and adds CORS headers to all responses.
type Serializer ¶
type Serializer interface {
// MarshalEvent converts an Event to a Record
MarshalEvent(event Event) (*recordv1.Record, error)
MarshalEventAsData(event Event) ([]byte, error)
// UnmarshalEvent converts a Record back into an Event
UnmarshalEvent(record *recordv1.Record) (Event, error)
UnmarshalEventFromData(data []byte) (Event, error)
}
Serializer provides methods to convert between Events and Records. This includes marshaling events to records or data bytes, and unmarshaling records or data bytes back into events.
type SnapshotTailStore ¶
type SnapshotTailStore interface {
// LoadTail returns the last n events for the given aggregate, ordered by
// version ascending. If the aggregate has fewer than n events, all events
// are returned. If the aggregate has no events, an empty History is returned.
LoadTail(ctx context.Context, aggregateID string, n int) (*historyv1.History, error)
}
SnapshotTailStore is an optional interface that stores can implement to support efficient partial event loading. When the aggregate has a snapshot interval, the repository calls LoadTail to retrieve only the last N events instead of the entire history. This is a single-call operation that every common backend (SQL, DynamoDB, Firestore, Cosmos DB, BoltDB) can implement natively and efficiently.
type Snapshoter ¶
Snapshoter interface enables aggregates to create snapshots at specific versions. A snapshot represents the state of an aggregate at a given point in time. SnapshotInterval returns the frequency (in events) at which snapshots are taken. A return value of 0 means snapshots are disabled.
type StateGuard ¶ added in v0.1.1
StateGuard gates a command on the aggregate's current state. This is a state-machine precondition, not an authorization (AAA) hook: identity and permissions belong in the transport layer or in CommandEvaluator. A StateGuard asks only "is this command legal given the aggregate's current state?" — typically driven by the allowed_states proto annotation, which generates GuardState automatically. Hand-written GuardState methods are also supported for guards that need to inspect fields beyond State.
type Store ¶
type Store interface {
// Save the provided serialized records to the store
Save(ctx context.Context, aggregateID string, records ...*recordv1.Record) error
// Load the history of events for this aggregateId
Load(ctx context.Context, aggregateId string) (*historyv1.History, error)
}
Store provides an abstraction for the Repository to save and load data. It defines methods for saving serialized records and loading event histories.
type VersionValidator ¶
VersionValidator is implemented by generated command types with lifecycle constraints. CREATION commands require version == 0; MUTATION commands require version > 0.
Directories
¶
| Path | Synopsis |
|---|---|
|
adapters
|
|
|
awslambda
Package awslambda provides an adapter that bridges protosource's provider-agnostic handlers to AWS API Gateway Lambda proxy integration.
|
Package awslambda provides an adapter that bridges protosource's provider-agnostic handlers to AWS API Gateway Lambda proxy integration. |
|
httpstandard
Package httpstandard provides an adapter that bridges protosource's provider-agnostic handlers to standard net/http.
|
Package httpstandard provides an adapter that bridges protosource's provider-agnostic handlers to standard net/http. |
|
Package authz defines the Authorizer contract that every generated command handler invokes before its pipeline runs.
|
Package authz defines the Authorizer contract that every generated command handler invokes before its pipeline runs. |
|
allowall
Package allowall provides a no-op authz.Authorizer that permits every request.
|
Package allowall provides a no-op authz.Authorizer that permits every request. |
|
aws
|
|
|
dynamoclient
Package dynamoclient defines a shared DynamoDB client interface used across the protosource framework.
|
Package dynamoclient defines a shared DynamoDB client interface used across the protosource framework. |
|
cmd
|
|
|
protoc-gen-protosource
command
|
|
|
protoc-gen-protosource-ts
command
|
|
|
sample
command
|
|
|
testcli
command
|
|
|
testdynamo
command
|
|
|
testdynamo-setup
command
|
|
|
example
|
|
|
app/order/v1/ordermgr
command
Code generated by protoc-gen-protosource.
|
Code generated by protoc-gen-protosource. |
|
app/sample/v1/samplemgr
command
Code generated by protoc-gen-protosource.
|
Code generated by protoc-gen-protosource. |
|
app/samplenosnapshot/v1/samplemgr
command
Code generated by protoc-gen-protosource.
|
Code generated by protoc-gen-protosource. |
|
app/test/v1/testmgr
command
Code generated by protoc-gen-protosource.
|
Code generated by protoc-gen-protosource. |
|
history
|
|
|
Package httpclient provides a generic HTTP client for protosource aggregates with protobuf-first content negotiation.
|
Package httpclient provides a generic HTTP client for protosource aggregates with protobuf-first content negotiation. |
|
options
|
|
|
record
|
|
|
response
|
|
|
serializers
|
|
|
stores
|
|