mink

package module
v1.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: Apache-2.0 Imports: 17 Imported by: 0

README

go-mink 🦫

A Comprehensive Event Sourcing & CQRS Toolkit for Go

Go Reference Go Report Card Build Status codecov License Go Version

Quality Gate Status Reliability Rating Security Rating Maintainability Rating

Bugs Vulnerabilities Technical Debt


v1.0.0 — Stable Release

go-mink includes everything you need to build production event-sourced systems in Go:

  • Event Store with optimistic concurrency, PostgreSQL & in-memory adapters
  • Command Bus with middleware pipeline (validation, idempotency, correlation, recovery)
  • Projection Engine with inline, async, and live projections
  • Saga / Process Manager with compensation handling
  • Outbox Pattern for reliable messaging (Webhook, Kafka, SNS publishers)
  • Event Versioning with schema evolution via upcasting (zero DB migration)
  • Field-Level Encryption with AWS KMS, HashiCorp Vault, and local AES-256-GCM
  • GDPR Compliance via crypto-shredding (key revocation) and data export (right to access)
  • Observability with Prometheus metrics and OpenTelemetry tracing
  • Testing Utilities with BDD fixtures, assertions, and test containers
  • CLI Tool for code generation, migrations, and diagnostics

What is go-mink?

go-mink is a batteries-included Event Sourcing and CQRS (Command Query Responsibility Segregation) library for Go. Inspired by MartenDB for .NET, go-mink brings the same developer-friendly experience to the Go ecosystem.

Why "go-mink"? Just as Marten (the animal) inspired the .NET library name, we chose go-mink - another member of the Mustelidae family - for our Go counterpart.

Vision

"Make Event Sourcing in Go as simple as using a traditional ORM"

go-mink aims to eliminate the boilerplate code typically required when implementing Event Sourcing in Go, while providing a pluggable architecture that allows teams to choose their preferred storage backends.

Key Features

Feature Status Description
🎯 Event Store Append-only event storage with optimistic concurrency
🔌 PostgreSQL Adapter Production-ready PostgreSQL support
🧪 Memory Adapter In-memory adapter for testing
🧱 Aggregates Base implementation with event application
📋 Command Bus Full CQRS with command handlers and middleware
🔐 Idempotency Prevent duplicate command processing
🔗 Correlation/Causation Distributed tracing support
📖 Projections Inline, async, and live projection engine
📊 Read Models Generic repository with query builder
📡 Subscriptions Catch-up and polling event subscriptions
🧪 Testing Utilities BDD fixtures, assertions, test containers
📊 Observability Prometheus metrics & OpenTelemetry tracing
📦 MessagePack Alternative serializer for performance
🛠️ CLI Tool Code generation, migrations, diagnostics
Sagas Process manager for long-running workflows
🔄 Event Versioning Schema evolution with upcasting (zero DB migration)
🔐 Security Field-level encryption and GDPR compliance
📦 Data Export GDPR right to access / data portability
📤 Outbox Pattern Reliable event publishing to external systems

Quick Example

package main

import (
    "context"
    
    "go-mink.dev"
    "go-mink.dev/adapters/postgres"
)

func main() {
    ctx := context.Background()
    
    // Initialize PostgreSQL adapter
    adapter, _ := postgres.NewAdapter("postgres://localhost/mydb")
    defer adapter.Close()
    
    // Create event store
    store := mink.New(adapter)
    
    // Create and populate an aggregate
    order := NewOrder("order-123")
    order.Create("customer-456")
    order.AddItem("SKU-001", 2, 29.99)
    
    // Save aggregate (events are persisted)
    store.SaveAggregate(ctx, order)
    
    // Load aggregate (events are replayed)
    loaded := NewOrder("order-123")
    store.LoadAggregate(ctx, loaded)
}

CQRS with Command Bus

package main

import (
    "context"
    
    "go-mink.dev"
    "go-mink.dev/adapters/memory"
)

// Define a command
type CreateOrder struct {
    mink.CommandBase
    CustomerID string `json:"customerId"`
}

func (c CreateOrder) CommandType() string { return "CreateOrder" }
func (c CreateOrder) Validate() error {
    if c.CustomerID == "" {
        return mink.NewValidationError("CreateOrder", "CustomerID", "required")
    }
    return nil
}

func main() {
    ctx := context.Background()
    
    // Create command bus with middleware
    bus := mink.NewCommandBus()
    bus.Use(mink.ValidationMiddleware())
    bus.Use(mink.RecoveryMiddleware())
    bus.Use(mink.CorrelationIDMiddleware(nil))
    
    // Add idempotency (prevents duplicate processing)
    idempotencyStore := memory.NewIdempotencyStore()
    bus.Use(mink.IdempotencyMiddleware(mink.DefaultIdempotencyConfig(idempotencyStore)))
    
    // Register command handler
    bus.RegisterFunc("CreateOrder", func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
        c := cmd.(CreateOrder)
        // Process command...
        return mink.NewSuccessResult("order-123", 1), nil
    })
    
    // Dispatch command
    result, err := bus.Dispatch(ctx, CreateOrder{CustomerID: "cust-456"})
    if err != nil {
        panic(err)
    }
    
    fmt.Printf("Created order: %s (version %d)\n", result.AggregateID, result.Version)
}

Projections & Read Models

package main

import (
    "context"
    "time"

    "go-mink.dev"
    "go-mink.dev/adapters/memory"
)

// Define a read model
type OrderSummary struct {
    OrderID    string
    CustomerID string
    Status     string
    ItemCount  int
    Total      float64
}

// Define a projection
type OrderSummaryProjection struct {
    mink.ProjectionBase
    repo *mink.InMemoryRepository[OrderSummary]
}

func (p *OrderSummaryProjection) Apply(ctx context.Context, event mink.StoredEvent) error {
    // Transform events into read model updates
    // ...
    return nil
}

func main() {
    ctx := context.Background()

    // Create projection engine
    checkpointStore := memory.NewCheckpointStore()
    engine := mink.NewProjectionEngine(store,
        mink.WithCheckpointStore(checkpointStore),
    )

    // Register projections
    repo := mink.NewInMemoryRepository[OrderSummary](func(o *OrderSummary) string {
        return o.OrderID
    })
    engine.RegisterInline(&OrderSummaryProjection{repo: repo})

    // Start engine
    engine.Start(ctx)
    defer engine.Stop(ctx)

    // Query read models with fluent API
    orders, _ := repo.Query(ctx, mink.NewQuery().
        Where("Status", mink.Eq, "Pending").
        OrderByDesc("Total").
        WithLimit(10))

    // Rebuild projections when needed
    rebuilder := mink.NewProjectionRebuilder(store, checkpointStore)
    rebuilder.RebuildInline(ctx, projection, mink.RebuildOptions{BatchSize: 1000})
}

Testing Utilities

import (
    "go-mink.dev/testing/bdd"
    "go-mink.dev/testing/assertions"
    "go-mink.dev/testing/containers"
)

// BDD-style aggregate testing
func TestOrderCreation(t *testing.T) {
    order := NewOrder("order-123")

    bdd.Given(t, order).
        When(func() error {
            return order.Create("customer-456")
        }).
        Then(OrderCreated{OrderID: "order-123", CustomerID: "customer-456"})
}

// Event assertions
assertions.AssertEventTypes(t, events, "OrderCreated", "ItemAdded")

// PostgreSQL test containers
container := containers.StartPostgres(t)
db := container.MustDB(ctx)

Observability

import (
    "go-mink.dev/middleware/metrics"
    "go-mink.dev/middleware/tracing"
)

// Prometheus metrics
m := metrics.New(metrics.WithMetricsServiceName("order-service"))
m.MustRegister()
bus.Use(m.CommandMiddleware())

// OpenTelemetry tracing
tracer := tracing.NewTracer(tracing.WithServiceName("order-service"))
bus.Use(tracer.CommandMiddleware())

Outbox Pattern

import (
    "go-mink.dev"
    "go-mink.dev/adapters/memory"
    "go-mink.dev/outbox/webhook"
)

// Wrap event store with outbox for reliable publishing
outboxStore := memory.NewOutboxStore()
esWithOutbox := mink.NewEventStoreWithOutbox(store, outboxStore, []mink.OutboxRoute{
    {EventTypes: []string{"OrderCreated"}, Destination: "webhook:https://partner.example.com/events"},
    {Destination: "kafka:all-events"}, // All events
})

// Append events - outbox messages scheduled automatically
esWithOutbox.Append(ctx, "Order-123", []interface{}{OrderCreated{OrderID: "123"}})

// Start background processor with publishers
processor := mink.NewOutboxProcessor(outboxStore,
    mink.WithPublisher(webhook.New()),
    mink.WithPollInterval(time.Second),
)
processor.Start(ctx)
defer processor.Stop(ctx)

Event Versioning & Upcasting

import (
    "encoding/json"
    "go-mink.dev"
)

// Define upcaster: v1 → v2 (add Currency field)
type orderCreatedV1ToV2 struct{}

func (u orderCreatedV1ToV2) EventType() string { return "OrderCreated" }
func (u orderCreatedV1ToV2) FromVersion() int  { return 1 }
func (u orderCreatedV1ToV2) ToVersion() int    { return 2 }
func (u orderCreatedV1ToV2) Upcast(data []byte, m mink.Metadata) ([]byte, error) {
    var obj map[string]interface{}
    json.Unmarshal(data, &obj)
    obj["currency"] = "USD" // default for old events
    return json.Marshal(obj)
}

// Register with event store — old events upcasted transparently on load
chain := mink.NewUpcasterChain()
chain.Register(orderCreatedV1ToV2{})

store := mink.New(adapter, mink.WithUpcasters(chain))

// Schema compatibility checking
registry := mink.NewSchemaRegistry()
registry.Register("OrderCreated", mink.SchemaDefinition{Version: 1, Fields: v1Fields})
registry.Register("OrderCreated", mink.SchemaDefinition{Version: 2, Fields: v2Fields})
compat, _ := registry.CheckCompatibility("OrderCreated", 1, 2)
// compat == SchemaBackwardCompatible

Field-Level Encryption

import (
    "go-mink.dev"
    "go-mink.dev/encryption/local"
)

// Set up encryption provider (local for dev, KMS/Vault for production)
provider := local.New(local.WithKey("master-1", myKey))
defer provider.Close()

// Configure which fields to encrypt per event type
encConfig := mink.NewFieldEncryptionConfig(
    mink.WithEncryptionProvider(provider),
    mink.WithDefaultKeyID("master-1"),
    mink.WithEncryptedFields("CustomerCreated", "email", "phone", "ssn"),
    // Per-tenant keys for multi-tenant apps
    mink.WithTenantKeyResolver(func(tenantID string) string {
        return "tenant-" + tenantID
    }),
    // Crypto-shredding: graceful degradation when key is revoked
    mink.WithDecryptionErrorHandler(func(err error, eventType string, meta mink.Metadata) error {
        if errors.Is(err, encryption.ErrKeyRevoked) {
            return nil // Return encrypted data as-is
        }
        return err
    }),
)

// Create event store with encryption
store := mink.New(adapter, mink.WithFieldEncryption(encConfig))

// PII fields are encrypted at rest, decrypted transparently on load
// Revoking a key makes that tenant's data permanently unrecoverable (GDPR)

GDPR Data Export

import "go-mink.dev"

exporter := mink.NewDataExporter(store)

// Export by known stream IDs (efficient)
result, _ := exporter.Export(ctx, mink.ExportRequest{
    SubjectID: "user-123",
    Streams:   []string{"Customer-user-123", "Order-ord-456"},
})
// result.Events, result.TotalEvents, result.RedactedCount

// Or scan all events with filters
result, _ = exporter.Export(ctx, mink.ExportRequest{
    SubjectID: "tenant-A",
    Filter: mink.CombineFilters(
        mink.FilterByTenantID("A"),
        mink.FilterByEventTypes("CustomerCreated", "OrderPlaced"),
    ),
})

// Stream for large exports (no memory accumulation)
exporter.ExportStream(ctx, mink.ExportRequest{
    SubjectID: "user-123",
    Streams:   []string{"Customer-user-123"},
}, func(ctx context.Context, event mink.ExportedEvent) error {
    if event.Redacted {
        return nil // Key revoked — crypto-shredded
    }
    return writeToFile(event)
})

Performance

go-mink is benchmarked on every commit with a shared adapter benchmark suite. Key metrics from CI (ubuntu-latest):

Adapter Append (single) Append (batch 100) Load (1000 events) Concurrent (8 workers)
Memory ~2.5M events/sec ~6M events/sec ~60M events/sec ~1.5M events/sec
PostgreSQL ~5K events/sec ~30K events/sec ~300K events/sec ~15K events/sec

Scale tests: Memory adapter processes 1M events in under 1 second. PostgreSQL adapter handles 100K events with p99 append latency under 2ms.

Run benchmarks locally:

make benchmark-adapters      # Memory adapter (no infra)
make benchmark-adapters-pg   # PostgreSQL adapter (needs infra)

See full benchmark results for detailed throughput, latency percentiles, and instructions for adding benchmarks to new adapters.

Installation

go get go-mink.dev
go get go-mink.dev/adapters/postgres

Documentation

Document Description
Introduction Problem statement and goals
Architecture System design and components
Event Store Event storage design
Read Models Projection system
Adapters Database adapter system
CLI Command-line tooling
API Design Public API reference
Roadmap Future development plans
Advanced Patterns Commands, Sagas, Outbox, Encryption, Data Export
Event Versioning Schema evolution & upcasting
Security Encryption, GDPR compliance
Testing BDD fixtures and test utilities
Benchmarks Performance results and benchmark guide

License

Apache License 2.0 - See LICENSE for details.


go-mink - Event Sourcing for Go, Done Right.

Documentation

Overview

Package mink provides event sourcing and CQRS primitives for Go applications. It offers a simple, flexible API for building event-sourced systems with support for multiple database backends.

Package mink provides event sourcing and CQRS primitives for Go applications.

go-mink is an Event Sourcing library for Go that makes it easy to build applications using event sourcing patterns. It provides a simple API for storing events, loading aggregates, and projecting read models.

Quick Start

Create an event store with the in-memory adapter for development:

import (
    "go-mink.dev"
    "go-mink.dev/adapters/memory"
)

store := mink.New(memory.NewAdapter())

For production, use the PostgreSQL adapter:

import (
    "go-mink.dev"
    "go-mink.dev/adapters/postgres"
)

adapter, err := postgres.NewAdapter(ctx, connStr)
if err != nil {
    log.Fatal(err)
}
store := mink.New(adapter)

Defining Events

Events are simple structs that represent something that happened in your domain:

type OrderCreated struct {
    OrderID    string `json:"orderId"`
    CustomerID string `json:"customerId"`
}

type ItemAdded struct {
    OrderID  string  `json:"orderId"`
    SKU      string  `json:"sku"`
    Quantity int     `json:"quantity"`
    Price    float64 `json:"price"`
}

Register events with the store so they can be serialized and deserialized:

store.RegisterEvents(OrderCreated{}, ItemAdded{})

Defining Aggregates

Aggregates are domain objects that encapsulate business logic and generate events:

type Order struct {
    mink.AggregateBase
    CustomerID string
    Items      []OrderItem
    Status     string
}

func NewOrder(id string) *Order {
    return &Order{
        AggregateBase: mink.NewAggregateBase(id, "Order"),
    }
}

func (o *Order) Create(customerID string) {
    o.Apply(OrderCreated{OrderID: o.AggregateID(), CustomerID: customerID})
    o.CustomerID = customerID
    o.Status = "Created"
}

func (o *Order) ApplyEvent(event interface{}) error {
    switch e := event.(type) {
    case OrderCreated:
        o.CustomerID = e.CustomerID
        o.Status = "Created"
    case ItemAdded:
        o.Items = append(o.Items, OrderItem{SKU: e.SKU, Quantity: e.Quantity, Price: e.Price})
    }
    // NOTE: Version is managed automatically by LoadAggregate and SaveAggregate.
    // You do NOT need to call IncrementVersion() here.
    return nil
}

Saving and Loading Aggregates

Save aggregates to persist their uncommitted events:

order := NewOrder("order-123")
order.Create("customer-456")
order.AddItem("SKU-001", 2, 29.99)

err := store.SaveAggregate(ctx, order)

Load aggregates to rebuild state from events:

loaded := NewOrder("order-123")
err := store.LoadAggregate(ctx, loaded)
// loaded.Status == "Created"
// loaded.Items contains the added item
// loaded.Version() returns the number of events in the stream

Low-Level Event Operations

Append events directly to a stream:

events := []interface{}{
    OrderCreated{OrderID: "123", CustomerID: "456"},
    ItemAdded{OrderID: "123", SKU: "SKU-001", Quantity: 2, Price: 29.99},
}
err := store.Append(ctx, "Order-123", events)

Load events from a stream:

events, err := store.Load(ctx, "Order-123")

Optimistic Concurrency

Use expected versions to prevent concurrent modifications:

// Create new stream (must not exist)
err := store.Append(ctx, "Order-123", events, mink.ExpectVersion(mink.NoStream))

// Append to existing stream at specific version
err := store.Append(ctx, "Order-123", events, mink.ExpectVersion(1))

Version constants:

  • AnyVersion (-1): Skip version check
  • NoStream (0): Stream must not exist
  • StreamExists (-2): Stream must exist

Metadata

Add metadata to events for tracing and multi-tenancy:

metadata := mink.Metadata{}.
    WithUserID("user-123").
    WithCorrelationID("corr-456").
    WithTenantID("tenant-789")

err := store.Append(ctx, "Order-123", events, mink.WithAppendMetadata(metadata))

Commands and CQRS (v0.2.0)

Define commands to encapsulate user intentions:

type CreateOrder struct {
    mink.CommandBase
    CustomerID string `json:"customerId"`
}

func (c CreateOrder) CommandType() string { return "CreateOrder" }
func (c CreateOrder) Validate() error {
    if c.CustomerID == "" {
        return mink.NewValidationError("CustomerID", "required")
    }
    return nil
}

Create a command bus with middleware:

bus := mink.NewCommandBus()
bus.Use(mink.ValidationMiddleware())
bus.Use(mink.RecoveryMiddleware(func(err error) { log.Error(err) }))
bus.Use(mink.LoggingMiddleware(logger, nil))

Register command handlers:

bus.Register("CreateOrder", func(ctx context.Context, cmd mink.Command) (mink.CommandResult, error) {
    c := cmd.(CreateOrder)
    order := NewOrder(uuid.New().String())
    order.Create(c.CustomerID)
    if err := store.SaveAggregate(ctx, order); err != nil {
        return mink.NewErrorResult(err), err
    }
    return mink.NewSuccessResult(order.AggregateID(), order.Version()), nil
})

Dispatch commands:

result, err := bus.Dispatch(ctx, CreateOrder{CustomerID: "cust-123"})

Idempotency

Prevent duplicate command processing with idempotency:

idempotencyStore := memory.NewIdempotencyStore()
config := mink.DefaultIdempotencyConfig(idempotencyStore)
bus.Use(mink.IdempotencyMiddleware(config))

Make commands idempotent by implementing IdempotentCommand:

func (c CreateOrder) IdempotencyKey() string { return c.RequestID }

Index

Constants

View Source
const (
	// AnyVersion skips version checking, allowing append regardless of current version.
	AnyVersion int64 = -1

	// NoStream indicates the stream must not exist (for creating new streams).
	NoStream int64 = 0

	// StreamExists indicates the stream must exist (for appending to existing streams).
	StreamExists int64 = -2
)

Version constants for optimistic concurrency control.

View Source
const (
	OutboxPending    = adapters.OutboxPending
	OutboxProcessing = adapters.OutboxProcessing
	OutboxCompleted  = adapters.OutboxCompleted
	OutboxFailed     = adapters.OutboxFailed
	OutboxDeadLetter = adapters.OutboxDeadLetter
)

Outbox status constants.

View Source
const (
	SagaStatusStarted            = adapters.SagaStatusStarted
	SagaStatusRunning            = adapters.SagaStatusRunning
	SagaStatusCompleted          = adapters.SagaStatusCompleted
	SagaStatusFailed             = adapters.SagaStatusFailed
	SagaStatusCompensating       = adapters.SagaStatusCompensating
	SagaStatusCompensated        = adapters.SagaStatusCompensated
	SagaStatusCompensationFailed = adapters.SagaStatusCompensationFailed
)

Re-export saga status constants from adapters.

View Source
const (
	SagaStepPending     = adapters.SagaStepPending
	SagaStepRunning     = adapters.SagaStepRunning
	SagaStepCompleted   = adapters.SagaStepCompleted
	SagaStepFailed      = adapters.SagaStepFailed
	SagaStepCompensated = adapters.SagaStepCompensated
)

Re-export saga step status constants from adapters.

View Source
const (

	// DefaultSchemaVersion is the schema version assumed for events without an explicit version.
	// All events stored before versioning was introduced are treated as version 1.
	DefaultSchemaVersion = 1
)

Variables

View Source
var (
	// ErrEncryptionFailed indicates a field encryption operation failed.
	ErrEncryptionFailed = encryption.ErrEncryptionFailed

	// ErrDecryptionFailed indicates a field decryption operation failed.
	ErrDecryptionFailed = encryption.ErrDecryptionFailed

	// ErrKeyNotFound indicates the requested encryption key does not exist.
	ErrKeyNotFound = encryption.ErrKeyNotFound

	// ErrKeyRevoked indicates the encryption key has been revoked (crypto-shredding).
	ErrKeyRevoked = encryption.ErrKeyRevoked

	// ErrProviderClosed indicates the encryption provider has been closed.
	ErrProviderClosed = encryption.ErrProviderClosed
)

Encryption-related sentinel errors. These are aliases to the encryption package errors for compatibility.

View Source
var (
	// ErrStreamNotFound indicates the requested stream does not exist.
	ErrStreamNotFound = adapters.ErrStreamNotFound

	// ErrConcurrencyConflict indicates an optimistic concurrency violation.
	ErrConcurrencyConflict = adapters.ErrConcurrencyConflict

	// ErrEventNotFound indicates the requested event does not exist.
	ErrEventNotFound = errors.New("mink: event not found")

	// ErrSerializationFailed indicates event serialization/deserialization failed.
	ErrSerializationFailed = errors.New("mink: serialization failed")

	// ErrEventTypeNotRegistered indicates an unknown event type was encountered.
	ErrEventTypeNotRegistered = errors.New("mink: event type not registered")

	// ErrNilAggregate indicates a nil aggregate was passed.
	ErrNilAggregate = errors.New("mink: nil aggregate")

	// ErrNilStore indicates a nil event store was passed.
	ErrNilStore = errors.New("mink: nil event store")

	// ErrEmptyStreamID indicates an empty stream ID was provided.
	ErrEmptyStreamID = adapters.ErrEmptyStreamID

	// ErrNoEvents indicates no events were provided for append.
	ErrNoEvents = adapters.ErrNoEvents

	// ErrInvalidVersion indicates an invalid version number was provided.
	ErrInvalidVersion = adapters.ErrInvalidVersion

	// ErrAdapterClosed indicates the adapter has been closed.
	ErrAdapterClosed = adapters.ErrAdapterClosed

	// ErrSubscriptionNotSupported indicates the adapter does not support subscriptions.
	ErrSubscriptionNotSupported = errors.New("mink: adapter does not support subscriptions")

	// ErrHandlerNotFound indicates no handler is registered for a command type.
	ErrHandlerNotFound = errors.New("mink: handler not found")

	// ErrValidationFailed indicates command validation failed.
	ErrValidationFailed = errors.New("mink: validation failed")

	// ErrCommandAlreadyProcessed indicates an idempotent command was already processed.
	ErrCommandAlreadyProcessed = errors.New("mink: command already processed")

	// ErrNilCommand indicates a nil command was passed.
	ErrNilCommand = errors.New("mink: nil command")

	// ErrHandlerPanicked indicates a handler panicked during execution.
	ErrHandlerPanicked = errors.New("mink: handler panicked")

	// ErrCommandBusClosed indicates the command bus has been closed.
	ErrCommandBusClosed = errors.New("mink: command bus closed")

	// ErrOutboxMessageNotFound indicates the requested outbox message does not exist.
	ErrOutboxMessageNotFound = adapters.ErrOutboxMessageNotFound

	// ErrOutboxStoreClosed indicates the outbox store has been closed.
	ErrOutboxStoreClosed = errors.New("mink: outbox store closed")

	// ErrPublisherNotFound indicates no publisher is registered for a destination.
	ErrPublisherNotFound = errors.New("mink: publisher not found for destination")

	// ErrOutboxProcessorRunning indicates the outbox processor is already running.
	ErrOutboxProcessorRunning = errors.New("mink: outbox processor already running")
)

Sentinel errors for common error conditions. Use errors.Is() to check for these errors. These errors are aliases to the adapters package errors for compatibility.

View Source
var (
	// ErrNilProjection indicates a nil projection was passed.
	ErrNilProjection = errors.New("mink: nil projection")

	// ErrEmptyProjectionName indicates a projection has no name.
	ErrEmptyProjectionName = errors.New("mink: projection name is required")

	// ErrProjectionNotFound indicates the requested projection does not exist.
	ErrProjectionNotFound = errors.New("mink: projection not found")

	// ErrProjectionAlreadyRegistered indicates a projection with the same name is already registered.
	ErrProjectionAlreadyRegistered = errors.New("mink: projection already registered")

	// ErrProjectionEngineAlreadyRunning indicates the projection engine is already running.
	ErrProjectionEngineAlreadyRunning = errors.New("mink: projection engine already running")

	// ErrProjectionEngineStopped indicates the projection engine has been stopped.
	ErrProjectionEngineStopped = errors.New("mink: projection engine stopped")

	// ErrNoCheckpointStore indicates no checkpoint store was configured.
	ErrNoCheckpointStore = errors.New("mink: checkpoint store is required")

	// ErrNotImplemented indicates a method is not implemented.
	ErrNotImplemented = errors.New("mink: not implemented")

	// ErrProjectionFailed indicates a projection failed to process an event.
	ErrProjectionFailed = errors.New("mink: projection failed")
)
View Source
var (
	// ErrExportFailed indicates a data export operation failed.
	ErrExportFailed = errors.New("mink: export failed")

	// ErrSubjectIDRequired indicates the subject ID was not provided in the export request.
	ErrSubjectIDRequired = errors.New("mink: subject ID is required for export")

	// ErrNoExportSources indicates neither streams nor a filter was provided.
	ErrNoExportSources = errors.New("mink: either streams or filter is required for export")

	// ErrExportScanNotSupported indicates the adapter does not support event scanning.
	// Provide explicit stream IDs in the ExportRequest instead.
	ErrExportScanNotSupported = errors.New("mink: adapter does not support event scanning; provide explicit stream IDs")
)

Export-related sentinel errors.

View Source
var (
	// ErrNotFound indicates the requested entity was not found.
	ErrNotFound = errors.New("mink: not found")

	// ErrAlreadyExists indicates the entity already exists.
	ErrAlreadyExists = errors.New("mink: already exists")

	// ErrInvalidQuery indicates the query is invalid.
	ErrInvalidQuery = errors.New("mink: invalid query")
)

Repository errors

View Source
var (
	// ErrSagaNotFound indicates the requested saga does not exist.
	ErrSagaNotFound = adapters.ErrSagaNotFound

	// ErrSagaAlreadyExists indicates a saga with the same ID already exists.
	ErrSagaAlreadyExists = adapters.ErrSagaAlreadyExists

	// ErrSagaCompleted indicates the saga has already completed.
	ErrSagaCompleted = errors.New("mink: saga already completed")

	// ErrSagaFailed indicates the saga has failed.
	ErrSagaFailed = errors.New("mink: saga failed")

	// ErrSagaCompensating indicates the saga is currently compensating.
	ErrSagaCompensating = errors.New("mink: saga is compensating")

	// ErrNoSagaHandler indicates no handler is registered for the event type.
	ErrNoSagaHandler = errors.New("mink: no saga handler for event")
)

Saga-related sentinel errors.

View Source
var (
	// ErrUpcastFailed indicates an upcaster failed to transform event data.
	ErrUpcastFailed = errors.New("mink: upcast failed")

	// ErrSchemaVersionGap indicates a gap in the upcaster chain for an event type.
	ErrSchemaVersionGap = errors.New("mink: schema version gap")

	// ErrIncompatibleSchema indicates a schema change is not backward compatible.
	ErrIncompatibleSchema = errors.New("mink: incompatible schema")

	// ErrSchemaNotFound indicates the requested schema was not found.
	ErrSchemaNotFound = errors.New("mink: schema not found")
)

Sentinel errors for event versioning and upcasting.

View Source
var NewDecryptionError = encryption.NewDecryptionError

NewDecryptionError creates a new EncryptionError for a decrypt operation.

View Source
var NewEncryptionError = encryption.NewEncryptionError

NewEncryptionError creates a new EncryptionError for an encrypt operation.

View Source
var NewKeyNotFoundError = encryption.NewKeyNotFoundError

NewKeyNotFoundError creates a new KeyNotFoundError.

View Source
var NewKeyRevokedError = encryption.NewKeyRevokedError

NewKeyRevokedError creates a new KeyRevokedError.

Functions

func BuildStreamID

func BuildStreamID(aggregateType, aggregateID string) string

BuildStreamID creates a stream ID from an aggregate type and ID. This follows the convention: "{Type}-{ID}"

func CausationIDFromContext

func CausationIDFromContext(ctx context.Context) string

CausationIDFromContext returns the causation ID from context.

func CorrelationIDFromContext

func CorrelationIDFromContext(ctx context.Context) string

CorrelationIDFromContext returns the correlation ID from context.

func GenerateIdempotencyKey

func GenerateIdempotencyKey(cmd Command) string

GenerateIdempotencyKey generates an idempotency key from a command. The key is based on the command type and its JSON-serialized content.

func GetCommandType

func GetCommandType(cmd interface{}) string

GetCommandType returns the type name of a command using reflection. This is useful for commands that don't embed CommandBase.

func GetEncryptedFields

func GetEncryptedFields(m Metadata) []string

GetEncryptedFields extracts the list of encrypted field names from event metadata.

func GetEncryptionKeyID

func GetEncryptionKeyID(m Metadata) string

GetEncryptionKeyID extracts the encryption key ID from event metadata.

func GetEventType

func GetEventType(event interface{}) string

GetEventType returns the event type name for the given event. It uses the struct name as the type name.

func GetIdempotencyKey

func GetIdempotencyKey(cmd Command) string

GetIdempotencyKey returns the idempotency key for a command. If the command implements IdempotentCommand, it uses that key. Otherwise, it generates a key from the command content.

func GetSchemaVersion

func GetSchemaVersion(m Metadata) int

GetSchemaVersion extracts the schema version from event metadata. Returns DefaultSchemaVersion if the version is not set or cannot be parsed.

func IdempotencyKeyFromField

func IdempotencyKeyFromField(fieldGetter func(Command) string) func(Command) string

IdempotencyKeyFromField extracts the idempotency key from a field in the command. If the field is empty, it falls back to GenerateIdempotencyKey.

func IdempotencyKeyPrefix

func IdempotencyKeyPrefix(prefix string) func(Command) string

IdempotencyKeyPrefix is a convenience function to create a prefixed idempotency key.

func IsEncrypted

func IsEncrypted(m Metadata) bool

IsEncrypted reports whether the event has encrypted fields.

func RegisterGenericHandler

func RegisterGenericHandler[C Command](registry *HandlerRegistry, handler func(ctx context.Context, cmd C) (CommandResult, error))

RegisterGenericHandler is a convenience function to register a generic handler.

func SagaStateToJSON

func SagaStateToJSON(state *SagaState) ([]byte, error)

SagaStateToJSON converts saga state to JSON for persistence.

func ShouldHandleEventType

func ShouldHandleEventType(handledEvents []string, eventType string) bool

ShouldHandleEventType checks if an event type should be handled given a list of handled events. Returns true if handledEvents is empty (meaning all events are handled) or if eventType is in the list. This is a utility function used by projection engine and rebuilder to filter events.

func TenantIDFromContext

func TenantIDFromContext(ctx context.Context) string

TenantIDFromContext returns the tenant ID from context.

func Version

func Version() string

Version returns the library version string.

func WithCausationID

func WithCausationID(ctx context.Context, causationID string) context.Context

WithCausationID returns a context with the causation ID set.

func WithTenantID

func WithTenantID(ctx context.Context, tenantID string) context.Context

WithTenantID returns a context with the tenant ID set.

Types

type Aggregate

type Aggregate interface {
	// AggregateID returns the unique identifier for this aggregate instance.
	AggregateID() string

	// AggregateType returns the type/category of this aggregate (e.g., "Order", "Customer").
	AggregateType() string

	// Version returns the current version of the aggregate.
	// This is the number of events that have been applied.
	Version() int64

	// ApplyEvent applies an event to update the aggregate's state.
	// This method should be idempotent and deterministic.
	ApplyEvent(event interface{}) error

	// UncommittedEvents returns events that have been applied but not yet persisted.
	UncommittedEvents() []interface{}

	// ClearUncommittedEvents removes all uncommitted events after successful persistence.
	ClearUncommittedEvents()
}

Aggregate defines the interface for event-sourced aggregates. An aggregate is a domain object whose state is derived from a sequence of events.

type AggregateBase

type AggregateBase struct {
	// contains filtered or unexported fields
}

AggregateBase provides a default partial implementation of the Aggregate interface. Embed this struct in your aggregate types to get default behavior.

func NewAggregateBase

func NewAggregateBase(id, aggregateType string) AggregateBase

NewAggregateBase creates a new AggregateBase with the given ID and type.

func (*AggregateBase) AggregateID

func (a *AggregateBase) AggregateID() string

AggregateID returns the aggregate's unique identifier.

func (*AggregateBase) AggregateType

func (a *AggregateBase) AggregateType() string

AggregateType returns the aggregate type.

func (*AggregateBase) Apply

func (a *AggregateBase) Apply(event interface{})

Apply records an event as uncommitted. This should be called by the aggregate after creating a new event. The aggregate should also update its internal state based on the event.

func (*AggregateBase) ClearUncommittedEvents

func (a *AggregateBase) ClearUncommittedEvents()

ClearUncommittedEvents removes all uncommitted events.

func (*AggregateBase) HasUncommittedEvents

func (a *AggregateBase) HasUncommittedEvents() bool

HasUncommittedEvents returns true if there are events waiting to be persisted.

func (*AggregateBase) IncrementVersion

func (a *AggregateBase) IncrementVersion()

IncrementVersion increments the aggregate version by 1.

func (*AggregateBase) SetID

func (a *AggregateBase) SetID(id string)

SetID sets the aggregate's ID.

func (*AggregateBase) SetType

func (a *AggregateBase) SetType(t string)

SetType sets the aggregate type.

func (*AggregateBase) SetVersion

func (a *AggregateBase) SetVersion(v int64)

SetVersion sets the aggregate version.

func (*AggregateBase) StreamID

func (a *AggregateBase) StreamID() StreamID

StreamID returns the stream ID for this aggregate. The stream ID is composed of the aggregate type and ID.

func (*AggregateBase) UncommittedEvents

func (a *AggregateBase) UncommittedEvents() []interface{}

UncommittedEvents returns events that haven't been persisted yet.

func (*AggregateBase) Version

func (a *AggregateBase) Version() int64

Version returns the current version of the aggregate.

type AggregateCommand

type AggregateCommand interface {
	Command

	// AggregateID returns the ID of the aggregate this command targets.
	// Returns empty string for commands that create new aggregates.
	AggregateID() string
}

AggregateCommand is a command that targets a specific aggregate.

type AggregateFactory

type AggregateFactory func(id string) Aggregate

AggregateFactory creates new aggregate instances.

type AggregateHandler

type AggregateHandler[C AggregateCommand, A Aggregate] struct {
	// contains filtered or unexported fields
}

AggregateHandler is a handler that works with aggregates and an event store. It loads the aggregate, executes the command, and saves the results.

func NewAggregateHandler

func NewAggregateHandler[C AggregateCommand, A Aggregate](config AggregateHandlerConfig[C, A]) *AggregateHandler[C, A]

NewAggregateHandler creates a new AggregateHandler.

func (*AggregateHandler[C, A]) CommandType

func (h *AggregateHandler[C, A]) CommandType() string

CommandType returns the command type this handler processes.

func (*AggregateHandler[C, A]) Handle

func (h *AggregateHandler[C, A]) Handle(ctx context.Context, cmd Command) (CommandResult, error)

Handle loads the aggregate, executes the command, and saves the aggregate.

type AggregateHandlerConfig

type AggregateHandlerConfig[C AggregateCommand, A Aggregate] struct {
	Store     *EventStore
	Factory   func(id string) A
	Executor  func(ctx context.Context, agg A, cmd C) error
	NewIDFunc func() string
}

AggregateHandlerConfig configures an AggregateHandler.

type AggregateRoot

type AggregateRoot interface {
	Aggregate

	// GetID is an alias for AggregateID for DDD conventions.
	GetID() string
}

AggregateRoot is an extended interface that includes domain-driven design patterns.

type AppendOption

type AppendOption func(*appendConfig)

AppendOption configures an append operation.

func ExpectVersion

func ExpectVersion(v int64) AppendOption

ExpectVersion sets the expected stream version for optimistic concurrency.

func WithAppendMetadata

func WithAppendMetadata(m Metadata) AppendOption

WithMetadata sets metadata for all events in the append operation.

type AsyncOptions

type AsyncOptions struct {
	// BatchSize is the maximum number of events to process in a batch.
	// Default: 100
	BatchSize int

	// BatchTimeout is the maximum time to wait for a full batch.
	// Default: 1 second
	BatchTimeout time.Duration

	// PollInterval is how often to poll for new events when idle.
	// Default: 100ms
	PollInterval time.Duration

	// RetryPolicy defines how to handle errors.
	RetryPolicy RetryPolicy

	// MaxRetries is the maximum number of retries for failed events.
	// Default: 3
	MaxRetries int

	// StartFromBeginning starts processing from the beginning of the event stream.
	// If false, starts from the last checkpoint.
	// Default: false
	StartFromBeginning bool
}

AsyncOptions configures async projection behavior.

func DefaultAsyncOptions

func DefaultAsyncOptions() AsyncOptions

DefaultAsyncOptions returns the default async projection options.

type AsyncProjection

type AsyncProjection interface {
	Projection

	// Apply processes a single event asynchronously.
	Apply(ctx context.Context, event StoredEvent) error

	// ApplyBatch processes multiple events in a single batch for efficiency.
	// If not supported, implementations should process events sequentially.
	ApplyBatch(ctx context.Context, events []StoredEvent) error
}

AsyncProjection processes events asynchronously in the background. This provides eventual consistency but better write performance and scalability.

type AsyncProjectionBase

type AsyncProjectionBase struct {
	ProjectionBase
}

AsyncProjectionBase provides a default implementation of AsyncProjection. Embed this struct and override Apply to create async projections.

func NewAsyncProjectionBase

func NewAsyncProjectionBase(name string, handledEvents ...string) AsyncProjectionBase

NewAsyncProjectionBase creates a new AsyncProjectionBase.

func (*AsyncProjectionBase) ApplyBatch

func (p *AsyncProjectionBase) ApplyBatch(ctx context.Context, events []StoredEvent) error

ApplyBatch provides a default implementation that processes events sequentially. Override this method for custom batch processing logic.

type AsyncResult

type AsyncResult struct {
	// contains filtered or unexported fields
}

AsyncResult represents the result of an asynchronous operation. It provides methods to wait for completion and check the result.

func (*AsyncResult) Cancel

func (r *AsyncResult) Cancel()

Cancel cancels the async operation's context. This can be used to stop a long-running operation early.

func (*AsyncResult) Context

func (r *AsyncResult) Context() context.Context

Context returns the context for this async operation.

func (*AsyncResult) Done

func (r *AsyncResult) Done() <-chan struct{}

Done returns a channel that is closed when the operation completes. Use this in select statements for non-blocking wait patterns.

Example:

select {
case <-result.Done():
    if err := result.Err(); err != nil {
        log.Printf("Failed: %v", err)
    }
case <-time.After(5 * time.Second):
    result.Cancel()
    log.Println("Timed out")
}

func (*AsyncResult) Err

func (r *AsyncResult) Err() error

Err returns the error from the completed operation, or nil if not yet complete or successful.

func (*AsyncResult) IsComplete

func (r *AsyncResult) IsComplete() bool

IsComplete returns true if the operation has completed (successfully or with an error).

func (*AsyncResult) Wait

func (r *AsyncResult) Wait() error

Wait blocks until the operation completes and returns any error. This is a convenience method equivalent to <-result.Done(); return result.Err()

func (*AsyncResult) WaitWithTimeout

func (r *AsyncResult) WaitWithTimeout(timeout time.Duration) error

WaitWithTimeout blocks until the operation completes or the timeout expires. Returns context.DeadlineExceeded if the timeout is reached before completion.

type CatchupSubscription

type CatchupSubscription struct {
	// contains filtered or unexported fields
}

CatchupSubscription provides catch-up subscription functionality. It first reads historical events from the event store, then switches to polling for new events. This ensures no events are missed during the transition.

func NewCatchupSubscription

func NewCatchupSubscription(
	store *EventStore,
	fromPosition uint64,
	opts ...SubscriptionOptions,
) (*CatchupSubscription, error)

NewCatchupSubscription creates a new catch-up subscription. Call Start() to begin receiving events from the specified position.

func (*CatchupSubscription) Close

func (s *CatchupSubscription) Close() error

Close stops the subscription.

func (*CatchupSubscription) Err

func (s *CatchupSubscription) Err() error

Err returns any error that caused the subscription to close.

func (*CatchupSubscription) Events

func (s *CatchupSubscription) Events() <-chan StoredEvent

Events returns the channel for receiving events.

func (*CatchupSubscription) Position

func (s *CatchupSubscription) Position() uint64

Position returns the current position of the subscription.

func (*CatchupSubscription) Start

func (s *CatchupSubscription) Start(ctx context.Context, pollInterval time.Duration) error

Start begins the catch-up subscription with the specified poll interval. It first catches up on historical events, then polls for new events.

type CategoryFilter

type CategoryFilter struct {
	// contains filtered or unexported fields
}

CategoryFilter filters events by stream category.

func NewCategoryFilter

func NewCategoryFilter(category string) *CategoryFilter

NewCategoryFilter creates a filter that only matches events from streams in the category.

func (*CategoryFilter) Matches

func (f *CategoryFilter) Matches(event StoredEvent) bool

Matches returns true if the event's stream is in the category.

type Checkpoint

type Checkpoint struct {
	// ProjectionName is the name of the projection.
	ProjectionName string

	// Position is the global position of the last processed event.
	Position uint64

	// UpdatedAt is when the checkpoint was last updated.
	UpdatedAt time.Time
}

Checkpoint represents a stored checkpoint record.

type CheckpointStore

type CheckpointStore interface {
	// GetCheckpoint returns the last processed position for a projection.
	// Returns 0 if no checkpoint exists.
	GetCheckpoint(ctx context.Context, projectionName string) (uint64, error)

	// SetCheckpoint stores the last processed position for a projection.
	SetCheckpoint(ctx context.Context, projectionName string, position uint64) error

	// DeleteCheckpoint removes the checkpoint for a projection.
	DeleteCheckpoint(ctx context.Context, projectionName string) error

	// GetAllCheckpoints returns checkpoints for all projections.
	GetAllCheckpoints(ctx context.Context) (map[string]uint64, error)
}

CheckpointStore manages projection checkpoints. Checkpoints track the last processed position for each projection.

type Clearable

type Clearable interface {
	// Clear removes all data from the read model.
	Clear(ctx context.Context) error
}

Clearable is an interface for projections that can clear their read model.

type Command

type Command interface {
	// CommandType returns the type identifier for this command (e.g., "CreateOrder").
	CommandType() string

	// Validate checks if the command is valid.
	// Returns nil if valid, or an error describing validation failures.
	Validate() error
}

Command represents an intent to change state in the system. Commands are the write side of CQRS and should be validated before execution.

type CommandBase

type CommandBase struct {
	// CommandID is an optional unique identifier for this command instance.
	CommandID string `json:"commandId,omitempty"`

	// CorrelationID links related commands and events for distributed tracing.
	CorrelationID string `json:"correlationId,omitempty"`

	// CausationID identifies the event or command that caused this command.
	CausationID string `json:"causationId,omitempty"`

	// Metadata contains arbitrary key-value pairs for application-specific data.
	Metadata map[string]string `json:"metadata,omitempty"`
}

CommandBase provides a default partial implementation of Command. Embed this struct in your command types to get common functionality.

func (CommandBase) GetCausationID

func (c CommandBase) GetCausationID() string

GetCausationID returns the causation ID.

func (CommandBase) GetCommandID

func (c CommandBase) GetCommandID() string

GetCommandID returns the command ID.

func (CommandBase) GetCorrelationID

func (c CommandBase) GetCorrelationID() string

GetCorrelationID returns the correlation ID.

func (CommandBase) GetMetadata

func (c CommandBase) GetMetadata(key string) string

GetMetadata returns the value for a metadata key, or empty string if not found.

func (CommandBase) WithCausationID

func (c CommandBase) WithCausationID(id string) CommandBase

WithCausationID returns a copy of CommandBase with the causation ID set.

func (CommandBase) WithCommandID

func (c CommandBase) WithCommandID(id string) CommandBase

WithCommandID returns a copy of CommandBase with the command ID set.

func (CommandBase) WithCorrelationID

func (c CommandBase) WithCorrelationID(id string) CommandBase

WithCorrelationID returns a copy of CommandBase with the correlation ID set.

func (CommandBase) WithMetadata

func (c CommandBase) WithMetadata(key, value string) CommandBase

WithMetadata returns a copy of CommandBase with a metadata key-value pair added.

type CommandBus

type CommandBus struct {
	// contains filtered or unexported fields
}

CommandBus orchestrates command dispatching with middleware support. It routes commands to their handlers through a configurable middleware pipeline.

func NewCommandBus

func NewCommandBus(opts ...CommandBusOption) *CommandBus

NewCommandBus creates a new CommandBus with the given options.

func (*CommandBus) Close

func (b *CommandBus) Close() error

Close closes the command bus, preventing further dispatch operations.

func (*CommandBus) Dispatch

func (b *CommandBus) Dispatch(ctx context.Context, cmd Command) (CommandResult, error)

Dispatch sends a command through the middleware pipeline to its handler.

func (*CommandBus) DispatchAll

func (b *CommandBus) DispatchAll(ctx context.Context, cmds ...Command) ([]DispatchResult, error)

DispatchAll dispatches multiple commands and returns all results. Commands are dispatched sequentially in order.

func (*CommandBus) DispatchAsync

func (b *CommandBus) DispatchAsync(ctx context.Context, cmd Command) <-chan DispatchResult

DispatchAsync sends a command asynchronously and returns immediately. The result can be retrieved through the returned channel.

func (*CommandBus) HandlerCount

func (b *CommandBus) HandlerCount() int

HandlerCount returns the number of registered handlers.

func (*CommandBus) HasHandler

func (b *CommandBus) HasHandler(cmdType string) bool

HasHandler returns true if a handler is registered for the command type.

func (*CommandBus) IsClosed

func (b *CommandBus) IsClosed() bool

IsClosed returns true if the command bus has been closed.

func (*CommandBus) MiddlewareCount

func (b *CommandBus) MiddlewareCount() int

MiddlewareCount returns the number of registered middleware.

func (*CommandBus) Register

func (b *CommandBus) Register(handler CommandHandler)

Register adds a handler to the command bus.

func (*CommandBus) RegisterFunc

func (b *CommandBus) RegisterFunc(cmdType string, fn func(ctx context.Context, cmd Command) (CommandResult, error))

RegisterFunc registers a handler function for a command type.

func (*CommandBus) Use

func (b *CommandBus) Use(middleware ...Middleware)

Use adds middleware to the command bus. Middleware is executed in the order it was added.

type CommandBusOption

type CommandBusOption func(*CommandBus)

CommandBusOption configures a CommandBus.

func WithHandlerRegistry

func WithHandlerRegistry(registry *HandlerRegistry) CommandBusOption

WithHandlerRegistry sets a custom handler registry.

func WithMiddleware

func WithMiddleware(middleware ...Middleware) CommandBusOption

WithMiddleware adds middleware to the command bus.

type CommandContext

type CommandContext struct {
	// Context is the standard Go context.
	Context context.Context

	// Command is the command being executed.
	Command Command

	// Result is the command execution result (set by handler).
	Result CommandResult

	// Metadata contains additional context data that can be set by middleware.
	Metadata map[string]interface{}
}

CommandContext carries command execution context through the middleware chain.

func NewCommandContext

func NewCommandContext(ctx context.Context, cmd Command) *CommandContext

NewCommandContext creates a new CommandContext.

func (*CommandContext) Get

func (c *CommandContext) Get(key string) (interface{}, bool)

Get retrieves a value from the context metadata.

func (*CommandContext) GetString

func (c *CommandContext) GetString(key string) string

GetString retrieves a string value from the context metadata.

func (*CommandContext) Set

func (c *CommandContext) Set(key string, value interface{})

Set stores a value in the context metadata.

func (*CommandContext) SetError

func (c *CommandContext) SetError(err error)

SetError sets an error result.

func (*CommandContext) SetResult

func (c *CommandContext) SetResult(result CommandResult)

SetResult sets the command execution result.

func (*CommandContext) SetSuccess

func (c *CommandContext) SetSuccess(aggregateID string, version int64)

SetSuccess sets a successful result.

type CommandDispatcher

type CommandDispatcher interface {
	// Dispatch sends a command to its handler and returns the result.
	Dispatch(ctx context.Context, cmd Command) (CommandResult, error)
}

CommandDispatcher can dispatch commands to handlers.

type CommandHandler

type CommandHandler interface {
	// CommandType returns the type of command this handler processes.
	CommandType() string

	// Handle processes the command and returns a result.
	Handle(ctx context.Context, cmd Command) (CommandResult, error)
}

CommandHandler is the interface for handling a specific command type. Handlers contain the business logic for processing commands.

type CommandHandlerFunc

type CommandHandlerFunc struct {
	// contains filtered or unexported fields
}

CommandHandlerFunc is a function type that implements CommandHandler.

func NewCommandHandlerFunc

func NewCommandHandlerFunc(cmdType string, fn func(ctx context.Context, cmd Command) (CommandResult, error)) *CommandHandlerFunc

NewCommandHandlerFunc creates a new CommandHandlerFunc.

func (*CommandHandlerFunc) CommandType

func (h *CommandHandlerFunc) CommandType() string

CommandType returns the command type this handler processes.

func (*CommandHandlerFunc) Handle

Handle processes the command.

type CommandResult

type CommandResult struct {
	// Success indicates whether the command executed successfully.
	Success bool

	// AggregateID is the ID of the aggregate affected by the command.
	// For create commands, this is the ID of the newly created aggregate.
	AggregateID string

	// Version is the new version of the aggregate after command execution.
	Version int64

	// Data contains any additional result data.
	Data interface{}

	// Error contains the error if the command failed.
	Error error
}

CommandResult represents the result of command execution. It can contain either a successful result or an error.

func IdempotencyRecordToResult

func IdempotencyRecordToResult(r *IdempotencyRecord) CommandResult

IdempotencyRecordToResult converts the record to a CommandResult.

func NewErrorResult

func NewErrorResult(err error) CommandResult

NewErrorResult creates a failed CommandResult.

func NewSuccessResult

func NewSuccessResult(aggregateID string, version int64) CommandResult

NewSuccessResult creates a successful CommandResult.

func NewSuccessResultWithData

func NewSuccessResultWithData(aggregateID string, version int64, data interface{}) CommandResult

NewSuccessResultWithData creates a successful CommandResult with additional data.

func (CommandResult) IsError

func (r CommandResult) IsError() bool

IsError returns true if the command failed.

func (CommandResult) IsSuccess

func (r CommandResult) IsSuccess() bool

IsSuccess returns true if the command executed successfully.

type CompositeFilter

type CompositeFilter struct {
	// contains filtered or unexported fields
}

CompositeFilter combines multiple filters with AND logic.

func NewCompositeFilter

func NewCompositeFilter(filters ...EventFilter) *CompositeFilter

NewCompositeFilter creates a filter that matches only if all filters match.

func (*CompositeFilter) Matches

func (f *CompositeFilter) Matches(event StoredEvent) bool

Matches returns true if all filters match.

type ConcurrencyError

type ConcurrencyError struct {
	StreamID        string
	ExpectedVersion int64
	ActualVersion   int64
}

ConcurrencyError provides detailed information about a concurrency conflict.

func NewConcurrencyError

func NewConcurrencyError(streamID string, expected, actual int64) *ConcurrencyError

NewConcurrencyError creates a new ConcurrencyError.

func (*ConcurrencyError) Error

func (e *ConcurrencyError) Error() string

Error returns the error message.

func (*ConcurrencyError) Is

func (e *ConcurrencyError) Is(target error) bool

Is reports whether this error matches the target error.

func (*ConcurrencyError) Unwrap

func (e *ConcurrencyError) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap().

type ContextValueMiddleware

type ContextValueMiddleware struct {
	// contains filtered or unexported fields
}

ContextValueMiddleware adds values to the context.

func NewContextValueMiddleware

func NewContextValueMiddleware(key, value interface{}) *ContextValueMiddleware

NewContextValueMiddleware creates middleware that adds a value to the context.

func (*ContextValueMiddleware) Middleware

func (m *ContextValueMiddleware) Middleware() Middleware

Middleware returns the middleware function.

type DataExporter

type DataExporter struct {
	// contains filtered or unexported fields
}

DataExporter handles GDPR data export (right to access / right to data portability). It collects events belonging to a data subject from the event store, decrypts encrypted fields, and returns them in a portable format.

When encrypted fields cannot be decrypted (e.g., key revoked via crypto-shredding), those events are included with Redacted=true and nil Data.

There are two enumeration strategies:

  • Stream-based: provide explicit stream IDs in ExportRequest.Streams.
  • Scan-based: provide an ExportFilter and the exporter scans all events. Requires the adapter to implement SubscriptionAdapter.

func NewDataExporter

func NewDataExporter(store *EventStore, opts ...DataExporterOption) *DataExporter

NewDataExporter creates a new DataExporter for the given event store.

func (*DataExporter) Export

func (e *DataExporter) Export(ctx context.Context, req ExportRequest) (*ExportResult, error)

Export collects all matching events for a data subject and returns them. Use ExportStream for large exports that should not be held in memory.

func (*DataExporter) ExportStream

func (e *DataExporter) ExportStream(ctx context.Context, req ExportRequest, handler ExportHandler) error

ExportStream calls handler for each matching event, without holding all events in memory. This is suitable for large exports. Events are yielded in stream order for stream-based export, or global position order for scan-based export. Return a non-nil error from the handler to stop the export early.

type DataExporterOption

type DataExporterOption func(*DataExporter)

DataExporterOption configures a DataExporter.

func WithExportBatchSize

func WithExportBatchSize(size int) DataExporterOption

WithExportBatchSize sets the number of events loaded per batch during scan-based export. Default is 1000.

func WithExportLogger

func WithExportLogger(l Logger) DataExporterOption

WithExportLogger sets the logger for the data exporter.

type DispatchResult

type DispatchResult struct {
	CommandResult
	Error error
}

DispatchResult contains the result of an asynchronous dispatch operation.

func (DispatchResult) IsSuccess

func (r DispatchResult) IsSuccess() bool

IsSuccess returns true if the dispatch was successful.

type EncryptionError

type EncryptionError = encryption.EncryptionError

EncryptionError provides detailed information about an encryption or decryption failure.

type EncryptionOption

type EncryptionOption func(*FieldEncryptionConfig)

EncryptionOption configures a FieldEncryptionConfig.

func WithDecryptionErrorHandler

func WithDecryptionErrorHandler(handler func(err error, eventType string, metadata Metadata) error) EncryptionOption

WithDecryptionErrorHandler sets a handler for decryption errors. This is used for crypto-shredding: when a key has been deleted, the handler can return nil to skip the event or return a custom error. If the handler returns nil, the event data is returned as-is (still encrypted).

func WithDefaultKeyID

func WithDefaultKeyID(keyID string) EncryptionOption

WithDefaultKeyID sets the default master key ID used when no tenant key resolver is configured or when the tenant ID is empty.

func WithEncryptedFields

func WithEncryptedFields(eventType string, fields ...string) EncryptionOption

WithEncryptedFields registers field paths to encrypt for a given event type. Field paths are dot-separated JSON field names (e.g., "email", "address.street").

func WithEncryptionProvider

func WithEncryptionProvider(p encryption.Provider) EncryptionOption

WithEncryptionProvider sets the encryption provider.

func WithTenantKeyResolver

func WithTenantKeyResolver(resolver func(tenantID string) string) EncryptionOption

WithTenantKeyResolver sets a function that maps tenant IDs to master key IDs. This enables per-tenant encryption keys for multi-tenant applications.

type Event

type Event struct {
	// ID is the globally unique event identifier.
	ID string

	// StreamID identifies the stream this event belongs to.
	StreamID string

	// Type is the event type identifier.
	Type string

	// Data is the deserialized event payload.
	Data interface{}

	// Metadata contains contextual information.
	Metadata Metadata

	// Version is the position within the stream (1-based).
	Version int64

	// GlobalPosition is the position across all streams.
	GlobalPosition uint64

	// Timestamp is when the event was stored.
	Timestamp time.Time
}

Event represents a deserialized event with its data as a Go type. This is the high-level representation used by applications.

func DeserializeEvent

func DeserializeEvent(serializer Serializer, stored StoredEvent) (Event, error)

DeserializeEvent is a convenience function that deserializes a StoredEvent to an Event.

func EventFromStored

func EventFromStored(stored StoredEvent, data interface{}) Event

EventFromStored creates an Event from a StoredEvent with deserialized data.

type EventApplier

type EventApplier func(aggregate interface{}, event interface{}) error

EventApplier is a function type for applying events to aggregates.

type EventData

type EventData struct {
	// Type is the event type identifier (e.g., "OrderCreated").
	Type string

	// Data is the serialized event payload.
	Data []byte

	// Metadata contains optional contextual information.
	Metadata Metadata
}

EventData represents an event to be stored. It contains the event type, serialized payload, and optional metadata.

func NewEventData

func NewEventData(eventType string, data []byte) EventData

NewEventData creates a new EventData with the given type and data.

func SerializeEvent

func SerializeEvent(serializer Serializer, event interface{}, metadata Metadata) (EventData, error)

SerializeEvent is a convenience function that serializes an event and returns EventData.

func SerializeEventWithVersion

func SerializeEventWithVersion(serializer Serializer, event interface{}, metadata Metadata, schemaVersion int) (EventData, error)

SerializeEventWithVersion is a convenience function that serializes an event and stamps the metadata with the given schema version.

func (EventData) Validate

func (e EventData) Validate() error

Validate checks if the EventData is valid.

func (EventData) WithMetadata

func (e EventData) WithMetadata(m Metadata) EventData

WithMetadata returns a copy of EventData with the metadata set.

type EventFilter

type EventFilter interface {
	// Matches returns true if the event should be delivered.
	Matches(event StoredEvent) bool
}

EventFilter determines which events should be delivered.

type EventRegistry

type EventRegistry struct {
	// contains filtered or unexported fields
}

EventRegistry maps event type names to Go types. It is used by the JSONSerializer to deserialize events to the correct type.

func NewEventRegistry

func NewEventRegistry() *EventRegistry

NewEventRegistry creates a new empty EventRegistry.

func (*EventRegistry) Count

func (r *EventRegistry) Count() int

Count returns the number of registered event types.

func (*EventRegistry) Lookup

func (r *EventRegistry) Lookup(eventType string) (reflect.Type, bool)

Lookup returns the Go type for the given event type name. Returns nil and false if the type is not registered.

func (*EventRegistry) Register

func (r *EventRegistry) Register(eventType string, example interface{})

Register adds a mapping from eventType to the Go type of the example. The example should be a value (not a pointer) of the event type.

func (*EventRegistry) RegisterAll

func (r *EventRegistry) RegisterAll(examples ...interface{})

RegisterAll registers multiple events using their struct names as type names. Each example should be a value (not a pointer) of the event type.

func (*EventRegistry) RegisteredTypes

func (r *EventRegistry) RegisteredTypes() []string

RegisteredTypes returns a slice of all registered event type names.

type EventStore

type EventStore struct {
	// contains filtered or unexported fields
}

EventStore is the main entry point for event sourcing operations. It provides methods for appending events, loading aggregates, and managing streams.

func New

func New(adapter adapters.EventStoreAdapter, opts ...Option) *EventStore

New creates a new EventStore with the given adapter and options.

func (*EventStore) Adapter

func (s *EventStore) Adapter() adapters.EventStoreAdapter

Adapter returns the underlying adapter.

func (*EventStore) Append

func (s *EventStore) Append(ctx context.Context, streamID string, events []interface{}, opts ...AppendOption) error

Append stores events to the specified stream. Events can be Go structs which will be serialized using the configured serializer.

func (*EventStore) Close

func (s *EventStore) Close() error

Close releases resources held by the event store.

func (*EventStore) GetLastPosition

func (s *EventStore) GetLastPosition(ctx context.Context) (uint64, error)

GetLastPosition returns the global position of the last stored event.

func (*EventStore) GetStreamInfo

func (s *EventStore) GetStreamInfo(ctx context.Context, streamID string) (*StreamInfo, error)

GetStreamInfo returns metadata about a stream.

func (*EventStore) Initialize

func (s *EventStore) Initialize(ctx context.Context) error

Initialize sets up the required storage schema.

func (*EventStore) Load

func (s *EventStore) Load(ctx context.Context, streamID string) ([]Event, error)

Load retrieves all events from a stream.

func (*EventStore) LoadAggregate

func (s *EventStore) LoadAggregate(ctx context.Context, agg Aggregate) error

LoadAggregate loads an aggregate's state by replaying its events. The aggregate should be a new instance with its ID and type already set.

If the aggregate implements VersionSetter, the version will be set to the number of events loaded. This is required for proper optimistic concurrency control when saving the aggregate later.

Note: AggregateBase implements VersionSetter, so aggregates embedding AggregateBase will automatically have their version set correctly.

func (*EventStore) LoadEventsFromPosition

func (s *EventStore) LoadEventsFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)

LoadEventsFromPosition loads events starting from a global position. Returns ErrSubscriptionNotSupported if the adapter does not implement SubscriptionAdapter. This is a helper method used by ProjectionEngine and ProjectionRebuilder.

func (*EventStore) LoadFrom

func (s *EventStore) LoadFrom(ctx context.Context, streamID string, fromVersion int64) ([]Event, error)

LoadFrom retrieves events from a stream starting from the specified version.

func (*EventStore) LoadRaw

func (s *EventStore) LoadRaw(ctx context.Context, streamID string, fromVersion int64) ([]StoredEvent, error)

LoadRaw retrieves raw (non-deserialized) events from a stream.

func (*EventStore) ProcessStoredEvent

func (s *EventStore) ProcessStoredEvent(ctx context.Context, stored StoredEvent) (Event, error)

ProcessStoredEvent applies decryption, upcasting, and deserialization to a stored event. This is used by components that work with raw StoredEvents (e.g., DataExporter) and need the full processing pipeline.

func (*EventStore) RegisterEvents

func (s *EventStore) RegisterEvents(events ...interface{})

RegisterEvents registers event types with the serializer. This is required for deserializing events back to their original types.

func (*EventStore) RegisterUpcasters

func (s *EventStore) RegisterUpcasters(upcasters ...Upcaster) error

RegisterUpcasters is a convenience method that registers upcasters with the event store. If no UpcasterChain has been configured, a new one is created automatically.

func (*EventStore) SaveAggregate

func (s *EventStore) SaveAggregate(ctx context.Context, agg Aggregate) error

SaveAggregate persists uncommitted events from an aggregate. The aggregate's version is used for optimistic concurrency control.

After a successful save, if the aggregate implements VersionSetter, the version will be updated to reflect the new stream version. This allows for subsequent modifications without reloading.

func (*EventStore) Serializer

func (s *EventStore) Serializer() Serializer

Serializer returns the event store's serializer.

type EventStoreWithOutbox

type EventStoreWithOutbox struct {
	// contains filtered or unexported fields
}

EventStoreWithOutbox wraps an EventStore to automatically schedule outbox messages when events are appended. If the adapter implements OutboxAppender, events and outbox messages are written atomically in the same transaction.

func NewEventStoreWithOutbox

func NewEventStoreWithOutbox(store *EventStore, outboxStore OutboxStore, routes []OutboxRoute, opts ...OutboxOption) *EventStoreWithOutbox

NewEventStoreWithOutbox creates a new EventStoreWithOutbox wrapper.

func (*EventStoreWithOutbox) Append

func (es *EventStoreWithOutbox) Append(ctx context.Context, streamID string, events []interface{}, opts ...AppendOption) error

Append stores events and schedules outbox messages.

func (*EventStoreWithOutbox) OutboxStore

func (es *EventStoreWithOutbox) OutboxStore() OutboxStore

OutboxStore returns the underlying OutboxStore.

func (*EventStoreWithOutbox) SaveAggregate

func (es *EventStoreWithOutbox) SaveAggregate(ctx context.Context, agg Aggregate) error

SaveAggregate persists uncommitted events and schedules outbox messages.

func (*EventStoreWithOutbox) Store

func (es *EventStoreWithOutbox) Store() *EventStore

Store returns the underlying EventStore.

type EventSubscriber

type EventSubscriber interface {
	// SubscribeAll subscribes to all events starting from the given position.
	SubscribeAll(ctx context.Context, fromPosition uint64, opts ...SubscriptionOptions) (Subscription, error)

	// SubscribeStream subscribes to events from a specific stream.
	SubscribeStream(ctx context.Context, streamID string, fromVersion int64, opts ...SubscriptionOptions) (Subscription, error)

	// SubscribeCategory subscribes to events from all streams in a category.
	SubscribeCategory(ctx context.Context, category string, fromPosition uint64, opts ...SubscriptionOptions) (Subscription, error)
}

EventSubscriber provides event subscription capabilities.

type EventTypeFilter

type EventTypeFilter struct {
	// contains filtered or unexported fields
}

EventTypeFilter filters events by type.

func NewEventTypeFilter

func NewEventTypeFilter(eventTypes ...string) *EventTypeFilter

NewEventTypeFilter creates a filter that only matches the specified event types.

func (*EventTypeFilter) Matches

func (f *EventTypeFilter) Matches(event StoredEvent) bool

Matches returns true if the event type is in the filter.

type EventTypeNotRegisteredError

type EventTypeNotRegisteredError struct {
	EventType string
}

EventTypeNotRegisteredError provides detailed information about an unregistered event type.

func NewEventTypeNotRegisteredError

func NewEventTypeNotRegisteredError(eventType string) *EventTypeNotRegisteredError

NewEventTypeNotRegisteredError creates a new EventTypeNotRegisteredError.

func (*EventTypeNotRegisteredError) Error

Error returns the error message.

func (*EventTypeNotRegisteredError) Is

func (e *EventTypeNotRegisteredError) Is(target error) bool

Is reports whether this error matches the target error.

func (*EventTypeNotRegisteredError) Unwrap

func (e *EventTypeNotRegisteredError) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap().

type ExportError

type ExportError struct {
	SubjectID string
	Cause     error
}

ExportError provides detailed information about a data export failure.

func NewExportError

func NewExportError(subjectID string, cause error) *ExportError

NewExportError creates a new ExportError.

func (*ExportError) Error

func (e *ExportError) Error() string

Error returns the error message.

func (*ExportError) Is

func (e *ExportError) Is(target error) bool

Is reports whether this error matches the target error.

func (*ExportError) Unwrap

func (e *ExportError) Unwrap() error

Unwrap returns the underlying cause for errors.Unwrap().

type ExportFilter

type ExportFilter func(event StoredEvent) bool

ExportFilter determines whether a stored event should be included in a data export.

func CombineFilters

func CombineFilters(filters ...ExportFilter) ExportFilter

CombineFilters returns a filter that matches events passing ALL provided filters (AND logic).

func FilterByEventTypes

func FilterByEventTypes(types ...string) ExportFilter

FilterByEventTypes returns a filter that matches events of any of the given types.

func FilterByMetadata

func FilterByMetadata(key, value string) ExportFilter

FilterByMetadata returns a filter that matches events with a specific custom metadata key-value pair.

func FilterByStreamPrefix

func FilterByStreamPrefix(prefix string) ExportFilter

FilterByStreamPrefix returns a filter that matches events from streams whose ID starts with the given prefix.

func FilterByTenantID

func FilterByTenantID(tenantID string) ExportFilter

FilterByTenantID returns a filter that matches events with the given tenant ID.

func FilterByUserID

func FilterByUserID(userID string) ExportFilter

FilterByUserID returns a filter that matches events with the given user ID.

type ExportHandler

type ExportHandler func(ctx context.Context, event ExportedEvent) error

ExportHandler is called for each exported event during streaming export. Return a non-nil error to stop the export.

type ExportRequest

type ExportRequest struct {
	// SubjectID identifies the data subject (required).
	SubjectID string

	// Streams lists specific stream IDs to export.
	// When provided, only these streams are loaded (efficient, no full scan).
	Streams []string

	// Filter selects which events to include.
	// When Streams is empty, the exporter scans all events and applies this filter
	// (requires the adapter to implement SubscriptionAdapter).
	// When Streams is provided, the filter is applied within each stream.
	Filter ExportFilter

	// FromTime limits export to events stored at or after this time.
	FromTime *time.Time

	// ToTime limits export to events stored at or before this time.
	ToTime *time.Time
}

ExportRequest describes what data to export for a data subject.

type ExportResult

type ExportResult struct {
	// SubjectID is the data subject identifier from the request.
	SubjectID string

	// Events contains all exported events, ordered by stream then version.
	Events []ExportedEvent

	// Streams lists all unique stream IDs that contained matching events.
	Streams []string

	// TotalEvents is the total number of events included (including redacted).
	TotalEvents int

	// RedactedCount is the number of events whose PII could not be decrypted
	// (e.g., due to crypto-shredding / key revocation).
	RedactedCount int

	// ExportedAt is the timestamp when the export was generated.
	ExportedAt time.Time
}

ExportResult contains all exported data for a data subject.

type ExportedEvent

type ExportedEvent struct {
	// StreamID identifies the stream this event belongs to.
	StreamID string

	// EventType is the event type identifier.
	EventType string

	// Data is the deserialized event payload.
	// When Redacted is true, Data is nil.
	Data interface{}

	// RawData is the serialized event payload after decryption (JSON bytes).
	// When Redacted is true, RawData contains the original (encrypted) bytes.
	RawData []byte

	// Metadata contains non-PII contextual information about the event.
	Metadata ExportedMetadata

	// Version is the position within the stream (1-based).
	Version int64

	// GlobalPosition is the position across all streams.
	GlobalPosition uint64

	// Timestamp is when the event was stored.
	Timestamp time.Time

	// Redacted indicates the event's encrypted fields could not be decrypted
	// (e.g., because the encryption key was revoked via crypto-shredding).
	Redacted bool
}

ExportedEvent represents a single event in the data export.

type ExportedMetadata

type ExportedMetadata struct {
	CorrelationID string
	CausationID   string
	TenantID      string
	SchemaVersion int
}

ExportedMetadata contains non-PII metadata included in the export.

type FieldDefinition

type FieldDefinition struct {
	// Name is the field name.
	Name string

	// Type is the field type (e.g., "string", "int", "bool").
	Type string

	// Required indicates whether the field must be present.
	Required bool
}

FieldDefinition describes a single field in an event schema.

type FieldEncryptionConfig

type FieldEncryptionConfig struct {
	// contains filtered or unexported fields
}

FieldEncryptionConfig configures per-event-type field encryption. It uses envelope encryption: a data encryption key (DEK) is generated per event, encrypted with the master key, and stored in metadata. Individual fields are encrypted locally with the DEK for performance.

func NewFieldEncryptionConfig

func NewFieldEncryptionConfig(opts ...EncryptionOption) *FieldEncryptionConfig

NewFieldEncryptionConfig creates a new FieldEncryptionConfig with the given options.

func (*FieldEncryptionConfig) HasEncryptedFields

func (c *FieldEncryptionConfig) HasEncryptedFields(eventType string) bool

HasEncryptedFields reports whether any fields are configured for encryption for the given event type.

type Filter

type Filter struct {
	// Field is the field name to filter on.
	Field string

	// Op is the comparison operator.
	Op FilterOp

	// Value is the value to compare against.
	Value interface{}
}

Filter represents a query filter condition.

type FilterOp

type FilterOp string

FilterOp represents a filter operation.

const (
	// FilterOpEq matches equal values.
	FilterOpEq FilterOp = "="

	// FilterOpNe matches not equal values.
	FilterOpNe FilterOp = "!="

	// FilterOpGt matches greater than values.
	FilterOpGt FilterOp = ">"

	// FilterOpGte matches greater than or equal values.
	FilterOpGte FilterOp = ">="

	// FilterOpLt matches less than values.
	FilterOpLt FilterOp = "<"

	// FilterOpLte matches less than or equal values.
	FilterOpLte FilterOp = "<="

	// FilterOpIn matches any value in a list.
	FilterOpIn FilterOp = "IN"

	// FilterOpNotIn matches no value in a list.
	FilterOpNotIn FilterOp = "NOT IN"

	// FilterOpLike matches using SQL LIKE pattern.
	FilterOpLike FilterOp = "LIKE"

	// FilterOpIsNull matches null values.
	FilterOpIsNull FilterOp = "IS NULL"

	// FilterOpIsNotNull matches non-null values.
	FilterOpIsNotNull FilterOp = "IS NOT NULL"

	// FilterOpContains matches arrays containing a value.
	FilterOpContains FilterOp = "CONTAINS"

	// FilterOpBetween matches values between two bounds.
	FilterOpBetween FilterOp = "BETWEEN"
)

type GenericHandler

type GenericHandler[C Command] struct {
	// contains filtered or unexported fields
}

GenericHandler is a type-safe command handler for a specific command type. Use this to create handlers with compile-time type checking.

func NewGenericHandler

func NewGenericHandler[C Command](handler func(ctx context.Context, cmd C) (CommandResult, error)) *GenericHandler[C]

NewGenericHandler creates a new GenericHandler for the specified command type.

func (*GenericHandler[C]) CommandType

func (h *GenericHandler[C]) CommandType() string

CommandType returns the command type this handler processes.

func (*GenericHandler[C]) Handle

func (h *GenericHandler[C]) Handle(ctx context.Context, cmd Command) (CommandResult, error)

Handle processes the command with type checking.

type HandlerNotFoundError

type HandlerNotFoundError struct {
	CommandType string
}

HandlerNotFoundError provides detailed information about a missing handler.

func NewHandlerNotFoundError

func NewHandlerNotFoundError(cmdType string) *HandlerNotFoundError

NewHandlerNotFoundError creates a new HandlerNotFoundError.

func (*HandlerNotFoundError) Error

func (e *HandlerNotFoundError) Error() string

Error returns the error message.

func (*HandlerNotFoundError) Is

func (e *HandlerNotFoundError) Is(target error) bool

Is reports whether this error matches the target error.

func (*HandlerNotFoundError) Unwrap

func (e *HandlerNotFoundError) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap().

type HandlerRegistry

type HandlerRegistry struct {
	// contains filtered or unexported fields
}

HandlerRegistry manages command handler registration and lookup.

func NewHandlerRegistry

func NewHandlerRegistry() *HandlerRegistry

NewHandlerRegistry creates a new HandlerRegistry.

func (*HandlerRegistry) Clear

func (r *HandlerRegistry) Clear()

Clear removes all handlers.

func (*HandlerRegistry) CommandTypes

func (r *HandlerRegistry) CommandTypes() []string

CommandTypes returns all registered command types.

func (*HandlerRegistry) Count

func (r *HandlerRegistry) Count() int

Count returns the number of registered handlers.

func (*HandlerRegistry) Get

func (r *HandlerRegistry) Get(cmdType string) CommandHandler

Get returns the handler for a command type. Returns nil if no handler is registered.

func (*HandlerRegistry) Has

func (r *HandlerRegistry) Has(cmdType string) bool

Has returns true if a handler is registered for the command type.

func (*HandlerRegistry) Register

func (r *HandlerRegistry) Register(handler CommandHandler)

Register adds a handler for a command type. If a handler is already registered for this type, it will be replaced.

func (*HandlerRegistry) RegisterFunc

func (r *HandlerRegistry) RegisterFunc(cmdType string, fn func(ctx context.Context, cmd Command) (CommandResult, error))

RegisterFunc registers a handler function for a command type.

func (*HandlerRegistry) Remove

func (r *HandlerRegistry) Remove(cmdType string)

Remove removes a handler for a command type.

type IdempotencyConfig

type IdempotencyConfig struct {
	// Store is the idempotency store to use.
	Store IdempotencyStore

	// TTL is how long to keep idempotency records.
	// Default is 24 hours.
	TTL time.Duration

	// KeyGenerator generates idempotency keys from commands.
	// If nil, GetIdempotencyKey is used.
	KeyGenerator func(Command) string

	// StoreErrors determines if failed commands should be stored.
	// If true, replaying a failed command returns the same error.
	// If false, failed commands can be retried.
	// Default is false.
	StoreErrors bool

	// SkipCommands is a list of command types to skip idempotency checking.
	SkipCommands []string
}

IdempotencyConfig configures the idempotency middleware.

func DefaultIdempotencyConfig

func DefaultIdempotencyConfig(store IdempotencyStore) IdempotencyConfig

DefaultIdempotencyConfig returns a default idempotency configuration.

type IdempotencyRecord

type IdempotencyRecord = adapters.IdempotencyRecord

IdempotencyRecord stores information about a processed command.

func NewIdempotencyRecord

func NewIdempotencyRecord(key, cmdType string, result CommandResult, ttl time.Duration) *IdempotencyRecord

NewIdempotencyRecord creates a new IdempotencyRecord from a CommandResult.

type IdempotencyReplayError

type IdempotencyReplayError struct {
	Key     string
	Message string
}

IdempotencyReplayError indicates a command was already processed.

func (*IdempotencyReplayError) Error

func (e *IdempotencyReplayError) Error() string

func (*IdempotencyReplayError) Is

func (e *IdempotencyReplayError) Is(target error) bool

func (*IdempotencyReplayError) Unwrap

func (e *IdempotencyReplayError) Unwrap() error

type IdempotencyStore

type IdempotencyStore = adapters.IdempotencyStore

IdempotencyStore tracks processed commands to prevent duplicate processing.

type IdempotentCommand

type IdempotentCommand interface {
	Command

	// IdempotencyKey returns a unique key for deduplication.
	// Commands with the same key will only be processed once.
	IdempotencyKey() string
}

IdempotentCommand is a command that supports idempotency.

type InMemoryRepository

type InMemoryRepository[T any] struct {
	// contains filtered or unexported fields
}

InMemoryRepository provides an in-memory implementation of ReadModelRepository. Useful for testing and prototyping.

func NewInMemoryRepository

func NewInMemoryRepository[T any](getID func(*T) string) *InMemoryRepository[T]

NewInMemoryRepository creates a new in-memory repository. The getID function extracts the ID from a read model.

func (*InMemoryRepository[T]) Clear

func (r *InMemoryRepository[T]) Clear(ctx context.Context) error

Clear removes all read models.

func (*InMemoryRepository[T]) Count

func (r *InMemoryRepository[T]) Count(ctx context.Context, query Query) (int64, error)

Count returns the number of read models matching the query. Note: This basic implementation ignores query filters and returns total count. For production use with filtering, implement a database-backed repository.

func (*InMemoryRepository[T]) Delete

func (r *InMemoryRepository[T]) Delete(ctx context.Context, id string) error

Delete removes a read model by ID.

func (*InMemoryRepository[T]) DeleteMany

func (r *InMemoryRepository[T]) DeleteMany(ctx context.Context, query Query) (int64, error)

DeleteMany removes all read models matching the query. Note: This basic implementation ignores query filters and deletes all items when filters are provided. For production use with filtering, implement a database-backed repository.

func (*InMemoryRepository[T]) Exists

func (r *InMemoryRepository[T]) Exists(ctx context.Context, id string) (bool, error)

Exists checks if a read model with the given ID exists.

func (*InMemoryRepository[T]) Find

func (r *InMemoryRepository[T]) Find(ctx context.Context, query Query) ([]*T, error)

Find queries read models with the given criteria. Note: This is a basic implementation that doesn't support all filter operations.

func (*InMemoryRepository[T]) FindOne

func (r *InMemoryRepository[T]) FindOne(ctx context.Context, query Query) (*T, error)

FindOne returns the first read model matching the query.

func (*InMemoryRepository[T]) Get

func (r *InMemoryRepository[T]) Get(ctx context.Context, id string) (*T, error)

Get retrieves a read model by ID.

func (*InMemoryRepository[T]) GetAll

func (r *InMemoryRepository[T]) GetAll(ctx context.Context) ([]*T, error)

GetAll returns all read models in the repository.

func (*InMemoryRepository[T]) GetMany

func (r *InMemoryRepository[T]) GetMany(ctx context.Context, ids []string) ([]*T, error)

GetMany retrieves multiple read models by their IDs.

func (*InMemoryRepository[T]) Insert

func (r *InMemoryRepository[T]) Insert(ctx context.Context, model *T) error

Insert creates a new read model.

func (*InMemoryRepository[T]) Len

func (r *InMemoryRepository[T]) Len() int

Len returns the number of items in the repository.

func (*InMemoryRepository[T]) Update

func (r *InMemoryRepository[T]) Update(ctx context.Context, id string, updateFn func(*T)) error

Update modifies an existing read model.

func (*InMemoryRepository[T]) Upsert

func (r *InMemoryRepository[T]) Upsert(ctx context.Context, model *T) error

Upsert creates or updates a read model.

type IncompatibleSchemaError

type IncompatibleSchemaError struct {
	EventType     string
	OldVersion    int
	NewVersion    int
	Compatibility SchemaCompatibility
	Reason        string
}

IncompatibleSchemaError provides detailed information about a schema incompatibility.

func NewIncompatibleSchemaError

func NewIncompatibleSchemaError(eventType string, oldVersion, newVersion int, compatibility SchemaCompatibility, reason string) *IncompatibleSchemaError

NewIncompatibleSchemaError creates a new IncompatibleSchemaError.

func (*IncompatibleSchemaError) Error

func (e *IncompatibleSchemaError) Error() string

Error returns the error message.

func (*IncompatibleSchemaError) Is

func (e *IncompatibleSchemaError) Is(target error) bool

Is reports whether this error matches the target error.

func (*IncompatibleSchemaError) Unwrap

func (e *IncompatibleSchemaError) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap().

type InlineProjection

type InlineProjection interface {
	Projection

	// Apply processes a single event within the event store transaction.
	// The projection should update its read model based on the event.
	Apply(ctx context.Context, event StoredEvent) error
}

InlineProjection processes events in the same transaction as the event store append. This provides strong consistency but may impact write performance.

type JSONSerializer

type JSONSerializer struct {
	// contains filtered or unexported fields
}

JSONSerializer is the default Serializer implementation using JSON encoding.

func NewJSONSerializer

func NewJSONSerializer() *JSONSerializer

NewJSONSerializer creates a new JSONSerializer with an empty registry.

func NewJSONSerializerWithRegistry

func NewJSONSerializerWithRegistry(registry *EventRegistry) *JSONSerializer

NewJSONSerializerWithRegistry creates a new JSONSerializer with the given registry.

func (*JSONSerializer) Deserialize

func (s *JSONSerializer) Deserialize(data []byte, eventType string) (interface{}, error)

Deserialize converts JSON bytes back to an event. If the event type is registered, returns a value of that type. Otherwise, returns a map[string]interface{}.

func (*JSONSerializer) Register

func (s *JSONSerializer) Register(eventType string, example interface{})

Register adds an event type to the serializer's registry.

func (*JSONSerializer) RegisterAll

func (s *JSONSerializer) RegisterAll(examples ...interface{})

RegisterAll registers multiple events using their struct names as type names.

func (*JSONSerializer) Registry

func (s *JSONSerializer) Registry() *EventRegistry

Registry returns the underlying EventRegistry.

func (*JSONSerializer) Serialize

func (s *JSONSerializer) Serialize(event interface{}) ([]byte, error)

Serialize converts an event to JSON bytes.

type KeyNotFoundError

type KeyNotFoundError = encryption.KeyNotFoundError

KeyNotFoundError provides detailed information about a missing encryption key.

type KeyRevokedError

type KeyRevokedError = encryption.KeyRevokedError

KeyRevokedError provides detailed information about a revoked encryption key.

type LiveOptions

type LiveOptions struct {
	// BufferSize is the size of the event channel buffer.
	// Default: 1000
	BufferSize int
}

LiveOptions configures live projection behavior.

func DefaultLiveOptions

func DefaultLiveOptions() LiveOptions

DefaultLiveOptions returns the default live projection options.

type LiveProjection

type LiveProjection interface {
	Projection

	// OnEvent is called for each event in real-time.
	// This method should not block for long periods.
	OnEvent(ctx context.Context, event StoredEvent)

	// IsTransient returns true if this projection doesn't persist state.
	// Transient projections are not checkpointed.
	IsTransient() bool
}

LiveProjection receives events in real-time for dashboards and notifications. These projections are transient and don't persist state.

type LiveProjectionBase

type LiveProjectionBase struct {
	ProjectionBase
	// contains filtered or unexported fields
}

LiveProjectionBase provides a default implementation of LiveProjection. Embed this struct and override OnEvent to create live projections.

func NewLiveProjectionBase

func NewLiveProjectionBase(name string, transient bool, handledEvents ...string) LiveProjectionBase

NewLiveProjectionBase creates a new LiveProjectionBase.

func (*LiveProjectionBase) IsTransient

func (p *LiveProjectionBase) IsTransient() bool

IsTransient returns whether this projection is transient.

type Logger

type Logger interface {
	Debug(msg string, args ...interface{})
	Info(msg string, args ...interface{})
	Warn(msg string, args ...interface{})
	Error(msg string, args ...interface{})
}

Logger defines the logging interface for the event store.

type LoggingMiddleware

type LoggingMiddleware struct {
	// contains filtered or unexported fields
}

LoggingMiddleware logs command execution.

func NewLoggingMiddleware

func NewLoggingMiddleware(logger Logger) *LoggingMiddleware

NewLoggingMiddleware creates a new LoggingMiddleware.

func (*LoggingMiddleware) Middleware

func (m *LoggingMiddleware) Middleware() Middleware

Middleware returns the middleware function.

type Metadata

type Metadata struct {
	// CorrelationID links related events across services for distributed tracing.
	CorrelationID string `json:"correlationId,omitempty"`

	// CausationID identifies the event or command that caused this event.
	CausationID string `json:"causationId,omitempty"`

	// UserID identifies the user who triggered this event.
	UserID string `json:"userId,omitempty"`

	// TenantID identifies the tenant for multi-tenant applications.
	TenantID string `json:"tenantId,omitempty"`

	// Custom contains arbitrary key-value pairs for application-specific metadata.
	Custom map[string]string `json:"custom,omitempty"`
}

Metadata contains contextual information about an event. It supports distributed tracing, multi-tenancy, and custom key-value pairs.

func SetSchemaVersion

func SetSchemaVersion(m Metadata, version int) Metadata

SetSchemaVersion returns a copy of Metadata with the schema version set.

func (Metadata) IsEmpty

func (m Metadata) IsEmpty() bool

IsEmpty reports whether the Metadata has no values set.

func (Metadata) WithCausationID

func (m Metadata) WithCausationID(id string) Metadata

WithCausationID returns a copy of Metadata with the causation ID set.

func (Metadata) WithCorrelationID

func (m Metadata) WithCorrelationID(id string) Metadata

WithCorrelationID returns a copy of Metadata with the correlation ID set.

func (Metadata) WithCustom

func (m Metadata) WithCustom(key, value string) Metadata

WithCustom returns a copy of Metadata with a custom key-value pair added.

func (Metadata) WithTenantID

func (m Metadata) WithTenantID(id string) Metadata

WithTenantID returns a copy of Metadata with the tenant ID set.

func (Metadata) WithUserID

func (m Metadata) WithUserID(id string) Metadata

WithUserID returns a copy of Metadata with the user ID set.

type MetricsCollector

type MetricsCollector interface {
	// RecordCommand records a command execution.
	RecordCommand(cmdType string, duration time.Duration, success bool, err error)
}

MetricsMiddleware collects metrics about command execution.

type Middleware

type Middleware func(next MiddlewareFunc) MiddlewareFunc

Middleware wraps a handler function with additional functionality.

func CausationIDMiddleware

func CausationIDMiddleware() Middleware

CausationIDMiddleware creates middleware that propagates causation IDs. The causation ID links events/commands to the command that caused them. This is essential for tracking the chain of causality in event sourcing.

func ChainMiddleware

func ChainMiddleware(middleware ...Middleware) Middleware

ChainMiddleware creates a single middleware from multiple middleware.

func CommandTypeMiddleware

func CommandTypeMiddleware(types []string, middleware Middleware) Middleware

CommandTypeMiddleware applies middleware only for specific command types.

func ConditionalMiddleware

func ConditionalMiddleware(condition func(Command) bool, middleware Middleware) Middleware

ConditionalMiddleware applies middleware only if the condition is true.

func CorrelationIDMiddleware

func CorrelationIDMiddleware(generator func() string) Middleware

CorrelationIDMiddleware creates middleware that propagates correlation IDs.

func IdempotencyMiddleware

func IdempotencyMiddleware(config IdempotencyConfig) Middleware

IdempotencyMiddleware creates middleware that prevents duplicate command processing.

func MetricsMiddleware

func MetricsMiddleware(collector MetricsCollector) Middleware

MetricsMiddleware creates middleware that records metrics.

func RecoveryMiddleware

func RecoveryMiddleware() Middleware

RecoveryMiddleware recovers from panics in handlers and returns them as errors. It captures a sanitized representation of the command data for debugging.

func RetryMiddleware

func RetryMiddleware(config RetryConfig) Middleware

RetryMiddleware creates middleware that retries failed commands.

func TenantMiddleware

func TenantMiddleware(extractor func(Command) string, required bool) Middleware

TenantMiddleware extracts and validates tenant ID.

func TimeoutMiddleware

func TimeoutMiddleware(timeout time.Duration) Middleware

TimeoutMiddleware adds a timeout to command execution.

func ValidationMiddleware

func ValidationMiddleware() Middleware

ValidationMiddleware validates commands before they reach the handler. If validation fails, the command is not dispatched.

type MiddlewareFunc

type MiddlewareFunc func(ctx context.Context, cmd Command) (CommandResult, error)

MiddlewareFunc is the function signature for command middleware.

type MultiValidationError

type MultiValidationError struct {
	// CommandType is the type of command that failed validation.
	CommandType string

	// Errors contains all validation errors.
	Errors []*ValidationError
}

MultiValidationError contains multiple validation errors.

func NewMultiValidationError

func NewMultiValidationError(cmdType string) *MultiValidationError

NewMultiValidationError creates a new MultiValidationError.

func (*MultiValidationError) Add

Add adds a validation error.

func (*MultiValidationError) AddField

func (e *MultiValidationError) AddField(field, message string)

AddField adds a validation error for a specific field.

func (*MultiValidationError) Error

func (e *MultiValidationError) Error() string

Error returns the error message.

func (*MultiValidationError) HasErrors

func (e *MultiValidationError) HasErrors() bool

HasErrors returns true if there are any validation errors.

func (*MultiValidationError) Is

func (e *MultiValidationError) Is(target error) bool

Is reports whether this error matches the target error.

func (*MultiValidationError) Unwrap

func (e *MultiValidationError) Unwrap() error

Unwrap returns the first error for errors.Unwrap().

type Option

type Option func(*EventStore)

Option configures an EventStore.

func WithFieldEncryption

func WithFieldEncryption(config *FieldEncryptionConfig) Option

WithFieldEncryption configures the event store with field-level encryption. When set, configured fields are automatically encrypted during appending and decrypted during loading. Uses envelope encryption for performance.

func WithLogger

func WithLogger(l Logger) Option

WithLogger sets a custom logger.

func WithSerializer

func WithSerializer(s Serializer) Option

WithSerializer sets a custom serializer.

func WithUpcasters

func WithUpcasters(chain *UpcasterChain) Option

WithUpcasters configures the event store with an upcaster chain for transparent schema evolution. When set, events are automatically upcasted to the latest schema version during loading and stamped with the latest version during appending.

type OrderBy

type OrderBy struct {
	// Field is the field name to sort by.
	Field string

	// Desc specifies descending order.
	Desc bool
}

OrderBy represents a sort order.

type OutboxMessage

type OutboxMessage = adapters.OutboxMessage

OutboxMessage represents a message in the transactional outbox.

type OutboxMetrics

type OutboxMetrics interface {
	RecordMessageProcessed(destination string, success bool)
	RecordMessageFailed(destination string)
	RecordMessageDeadLettered()
	RecordBatchDuration(duration time.Duration)
	RecordPendingMessages(count int64)
}

OutboxMetrics collects metrics about outbox processing.

type OutboxOption

type OutboxOption func(*EventStoreWithOutbox)

OutboxOption configures an EventStoreWithOutbox.

func WithOutboxLogger

func WithOutboxLogger(l Logger) OutboxOption

WithOutboxLogger sets a logger for the outbox wrapper.

func WithOutboxMaxAttempts

func WithOutboxMaxAttempts(n int) OutboxOption

WithOutboxMaxAttempts sets the default max attempts for outbox messages.

type OutboxProcessor

type OutboxProcessor struct {
	// contains filtered or unexported fields
}

OutboxProcessor polls the outbox store for pending messages and publishes them via registered publishers. It handles retries, dead-lettering, and cleanup.

func NewOutboxProcessor

func NewOutboxProcessor(store OutboxStore, opts ...ProcessorOption) *OutboxProcessor

NewOutboxProcessor creates a new OutboxProcessor.

func (*OutboxProcessor) IsRunning

func (p *OutboxProcessor) IsRunning() bool

IsRunning returns true if the processor is running.

func (*OutboxProcessor) Start

func (p *OutboxProcessor) Start(ctx context.Context) error

Start begins the background processing loop.

func (*OutboxProcessor) Stop

func (p *OutboxProcessor) Stop(ctx context.Context) error

Stop gracefully stops the processor, draining in-flight work.

type OutboxRoute

type OutboxRoute struct {
	// EventTypes is the list of event types this route matches. Empty matches all.
	EventTypes []string

	// Destination is the target (e.g., "webhook:https://example.com/events", "kafka:orders").
	Destination string

	// Transform optionally transforms the event payload before outbox scheduling.
	// Note: the event parameter is always nil because the outbox operates on raw serialized data.
	// Use the stored parameter to access stream ID, event type, raw data, and metadata.
	Transform func(event interface{}, stored StoredEvent) ([]byte, error)

	// Filter optionally filters events. Return true to include the event.
	// Note: the event parameter is always nil because the outbox operates on raw serialized data.
	// Use the stored parameter to access stream ID, event type, raw data, and metadata.
	Filter func(event interface{}, stored StoredEvent) bool
}

OutboxRoute defines routing rules for outbox messages.

type OutboxStatus

type OutboxStatus = adapters.OutboxStatus

OutboxStatus represents the current status of an outbox message.

type OutboxStore

type OutboxStore = adapters.OutboxStore

OutboxStore defines the interface for outbox message persistence.

type PanicError

type PanicError struct {
	CommandType string
	Value       interface{}
	Stack       string
	// CommandData contains a sanitized JSON representation of the command for debugging.
	// Sensitive fields should be masked by the caller before setting this field.
	CommandData string
}

PanicError provides detailed information about a handler panic.

func NewPanicError

func NewPanicError(cmdType string, value interface{}, stack string) *PanicError

NewPanicError creates a new PanicError.

func NewPanicErrorWithCommand

func NewPanicErrorWithCommand(cmdType string, value interface{}, stack string, commandData string) *PanicError

NewPanicErrorWithCommand creates a new PanicError with command data for debugging. The commandData should be a sanitized representation of the command (sensitive fields masked).

func (*PanicError) Error

func (e *PanicError) Error() string

Error returns the error message.

func (*PanicError) Is

func (e *PanicError) Is(target error) bool

Is reports whether this error matches the target error.

func (*PanicError) Unwrap

func (e *PanicError) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap().

type ParallelRebuilder

type ParallelRebuilder struct {
	// contains filtered or unexported fields
}

ParallelRebuilder rebuilds multiple projections in parallel.

func NewParallelRebuilder

func NewParallelRebuilder(rebuilder *ProjectionRebuilder, concurrency int) *ParallelRebuilder

NewParallelRebuilder creates a new parallel rebuilder.

func (*ParallelRebuilder) RebuildAll

func (pr *ParallelRebuilder) RebuildAll(ctx context.Context, projections []AsyncProjection, opts ...RebuildOptions) error

RebuildAll rebuilds multiple async projections in parallel.

type PollingSubscription

type PollingSubscription struct {
	// contains filtered or unexported fields
}

PollingSubscription polls the event store for new events. This is a fallback when push-based subscriptions aren't available.

func NewPollingSubscription

func NewPollingSubscription(
	store *EventStore,
	fromPosition uint64,
	opts ...SubscriptionOptions,
) *PollingSubscription

NewPollingSubscription creates a new polling subscription.

func (*PollingSubscription) Close

func (s *PollingSubscription) Close() error

Close stops the subscription.

func (*PollingSubscription) Err

func (s *PollingSubscription) Err() error

Err returns any error that caused the subscription to close.

func (*PollingSubscription) Events

func (s *PollingSubscription) Events() <-chan StoredEvent

Events returns the channel for receiving events.

func (*PollingSubscription) Start

func (s *PollingSubscription) Start(ctx context.Context, pollInterval time.Duration)

Start begins polling for events.

type ProcessorOption

type ProcessorOption func(*OutboxProcessor)

ProcessorOption configures an OutboxProcessor.

func WithBatchSize

func WithBatchSize(n int) ProcessorOption

WithBatchSize sets the maximum number of messages to process in a single batch.

func WithCleanupAge

func WithCleanupAge(d time.Duration) ProcessorOption

WithCleanupAge sets the age threshold for cleaning up completed messages.

func WithCleanupInterval

func WithCleanupInterval(d time.Duration) ProcessorOption

WithCleanupInterval sets how often completed messages are cleaned up.

func WithMaxRetries

func WithMaxRetries(n int) ProcessorOption

WithMaxRetries sets the maximum number of delivery attempts.

func WithOutboxMetrics

func WithOutboxMetrics(metrics OutboxMetrics) ProcessorOption

WithOutboxMetrics sets the metrics collector for the processor.

func WithPollInterval

func WithPollInterval(d time.Duration) ProcessorOption

WithPollInterval sets how often the processor polls for pending messages.

func WithProcessorLogger

func WithProcessorLogger(logger Logger) ProcessorOption

WithProcessorLogger sets the logger for the processor.

func WithPublisher

func WithPublisher(publisher Publisher) ProcessorOption

WithPublisher registers a publisher for a given destination prefix.

func WithRetryBackoff

func WithRetryBackoff(d time.Duration) ProcessorOption

WithRetryBackoff sets the duration between retry cycles.

type ProgressCallback

type ProgressCallback func(progress RebuildProgress)

ProgressCallback is called periodically during rebuild with progress updates.

type Projection

type Projection interface {
	// Name returns the unique identifier for this projection.
	// This name is used for checkpointing and management.
	Name() string

	// HandledEvents returns the list of event types this projection handles.
	// An empty list means the projection handles all event types.
	HandledEvents() []string
}

Projection is the base interface for all projection types. Projections transform events into optimized read models.

type ProjectionBase

type ProjectionBase struct {
	// contains filtered or unexported fields
}

ProjectionBase provides a default partial implementation of Projection. Embed this struct in your projection types to get common functionality.

func NewProjectionBase

func NewProjectionBase(name string, handledEvents ...string) ProjectionBase

NewProjectionBase creates a new ProjectionBase.

func (*ProjectionBase) HandledEvents

func (p *ProjectionBase) HandledEvents() []string

HandledEvents returns the list of event types this projection handles.

func (*ProjectionBase) HandlesEvent

func (p *ProjectionBase) HandlesEvent(eventType string) bool

HandlesEvent returns true if this projection handles the given event type.

func (*ProjectionBase) Name

func (p *ProjectionBase) Name() string

Name returns the projection name.

type ProjectionEngine

type ProjectionEngine struct {
	// contains filtered or unexported fields
}

ProjectionEngine manages the lifecycle of projections. It handles registration, starting, stopping, and monitoring projections.

func NewProjectionEngine

func NewProjectionEngine(store *EventStore, opts ...ProjectionEngineOption) *ProjectionEngine

NewProjectionEngine creates a new ProjectionEngine.

func (*ProjectionEngine) GetAllStatuses

func (e *ProjectionEngine) GetAllStatuses() []*ProjectionStatus

GetAllStatuses returns the status of all registered projections.

func (*ProjectionEngine) GetStatus

func (e *ProjectionEngine) GetStatus(name string) (*ProjectionStatus, error)

GetStatus returns the status of a projection by name.

func (*ProjectionEngine) IsRunning

func (e *ProjectionEngine) IsRunning() bool

IsRunning returns true if the engine is running.

func (*ProjectionEngine) NotifyLiveProjections

func (e *ProjectionEngine) NotifyLiveProjections(ctx context.Context, events []StoredEvent)

NotifyLiveProjections notifies all live projections of new events.

func (*ProjectionEngine) ProcessInlineProjections

func (e *ProjectionEngine) ProcessInlineProjections(ctx context.Context, events []StoredEvent) error

ProcessInlineProjections processes all inline projections for the given events. This is called by the event store after appending events.

func (*ProjectionEngine) RegisterAsync

func (e *ProjectionEngine) RegisterAsync(projection AsyncProjection, opts ...AsyncOptions) error

RegisterAsync registers an async projection with the given options. Async projections are processed in background workers.

func (*ProjectionEngine) RegisterInline

func (e *ProjectionEngine) RegisterInline(projection InlineProjection) error

RegisterInline registers an inline projection. Inline projections are processed synchronously with event appends.

func (*ProjectionEngine) RegisterLive

func (e *ProjectionEngine) RegisterLive(projection LiveProjection, opts ...LiveOptions) error

RegisterLive registers a live projection with optional configuration. Live projections receive events in real-time.

func (*ProjectionEngine) Start

func (e *ProjectionEngine) Start(ctx context.Context) error

Start starts the projection engine and all registered projections.

func (*ProjectionEngine) Stop

func (e *ProjectionEngine) Stop(ctx context.Context) error

Stop gracefully stops the projection engine.

func (*ProjectionEngine) Unregister

func (e *ProjectionEngine) Unregister(name string) error

Unregister removes a projection by name.

type ProjectionEngineOption

type ProjectionEngineOption func(*ProjectionEngine)

ProjectionEngineOption configures a ProjectionEngine.

func WithCheckpointStore

func WithCheckpointStore(store CheckpointStore) ProjectionEngineOption

WithCheckpointStore sets the checkpoint store for the engine.

func WithProjectionLogger

func WithProjectionLogger(logger Logger) ProjectionEngineOption

WithProjectionLogger sets the logger for the engine.

func WithProjectionMetrics

func WithProjectionMetrics(metrics ProjectionMetrics) ProjectionEngineOption

WithProjectionMetrics sets the metrics collector for the engine.

type ProjectionError

type ProjectionError struct {
	ProjectionName string
	EventType      string
	Position       uint64
	Cause          error
}

ProjectionError provides detailed information about a projection failure.

func NewProjectionError

func NewProjectionError(projectionName, eventType string, position uint64, cause error) *ProjectionError

NewProjectionError creates a new ProjectionError.

func (*ProjectionError) Error

func (e *ProjectionError) Error() string

Error returns the error message.

func (*ProjectionError) Is

func (e *ProjectionError) Is(target error) bool

Is reports whether this error matches the target error.

func (*ProjectionError) Unwrap

func (e *ProjectionError) Unwrap() error

Unwrap returns the underlying cause for errors.Unwrap().

type ProjectionMetrics

type ProjectionMetrics interface {
	// RecordEventProcessed records that an event was processed.
	RecordEventProcessed(projectionName, eventType string, duration time.Duration, success bool)

	// RecordBatchProcessed records that a batch of events was processed.
	RecordBatchProcessed(projectionName string, count int, duration time.Duration, success bool)

	// RecordCheckpoint records a checkpoint update.
	RecordCheckpoint(projectionName string, position uint64)

	// RecordError records a projection error.
	RecordError(projectionName string, err error)
}

ProjectionMetrics collects metrics about projection processing.

type ProjectionRebuilder

type ProjectionRebuilder struct {
	// contains filtered or unexported fields
}

ProjectionRebuilder rebuilds projections from scratch. It replays all events through a projection to reconstruct its read model.

func NewProjectionRebuilder

func NewProjectionRebuilder(store *EventStore, checkpointStore CheckpointStore, opts ...ProjectionRebuilderOption) *ProjectionRebuilder

NewProjectionRebuilder creates a new projection rebuilder.

func (*ProjectionRebuilder) RebuildAsync

func (r *ProjectionRebuilder) RebuildAsync(ctx context.Context, projection AsyncProjection, opts ...RebuildOptions) error

RebuildAsync rebuilds an async projection from scratch.

func (*ProjectionRebuilder) RebuildInline

func (r *ProjectionRebuilder) RebuildInline(ctx context.Context, projection InlineProjection, opts ...RebuildOptions) error

RebuildInline rebuilds an inline projection from scratch.

type ProjectionRebuilderOption

type ProjectionRebuilderOption func(*ProjectionRebuilder)

ProjectionRebuilderOption configures a ProjectionRebuilder.

func WithRebuilderBatchSize

func WithRebuilderBatchSize(size int) ProjectionRebuilderOption

WithRebuilderBatchSize sets the batch size for rebuilding.

func WithRebuilderLogger

func WithRebuilderLogger(logger Logger) ProjectionRebuilderOption

WithRebuilderLogger sets the logger for the rebuilder.

func WithRebuilderMetrics

func WithRebuilderMetrics(metrics ProjectionMetrics) ProjectionRebuilderOption

WithRebuilderMetrics sets the metrics collector for the rebuilder.

type ProjectionState

type ProjectionState string

ProjectionState represents the current state of a projection.

const (
	// ProjectionStateStopped indicates the projection is not running.
	ProjectionStateStopped ProjectionState = "stopped"

	// ProjectionStateRunning indicates the projection is actively processing events.
	ProjectionStateRunning ProjectionState = "running"

	// ProjectionStatePaused indicates the projection is paused.
	ProjectionStatePaused ProjectionState = "paused"

	// ProjectionStateFaulted indicates the projection has encountered an error.
	ProjectionStateFaulted ProjectionState = "faulted"

	// ProjectionStateRebuilding indicates the projection is being rebuilt.
	ProjectionStateRebuilding ProjectionState = "rebuilding"

	// ProjectionStateCatchingUp indicates the projection is catching up to current events.
	ProjectionStateCatchingUp ProjectionState = "catching_up"
)

type ProjectionStatus

type ProjectionStatus struct {
	// Name is the projection name.
	Name string

	// State is the current state of the projection.
	State ProjectionState

	// LastPosition is the global position of the last processed event.
	LastPosition uint64

	// EventsProcessed is the total number of events processed.
	EventsProcessed uint64

	// LastProcessedAt is when the last event was processed.
	LastProcessedAt time.Time

	// Error contains the error message if the projection is faulted.
	Error string

	// Lag is the number of events behind the head of the event store.
	Lag uint64

	// AverageLatency is the average time to process an event.
	AverageLatency time.Duration
}

ProjectionStatus provides detailed information about a projection's current state.

type Publisher

type Publisher interface {
	// Publish sends one or more messages to the external system.
	Publish(ctx context.Context, messages []*OutboxMessage) error

	// Destination returns the destination prefix this publisher handles (e.g., "webhook", "kafka", "sns").
	Destination() string
}

Publisher publishes outbox messages to an external system.

type Query

type Query struct {
	// Filters to apply.
	Filters []Filter

	// Ordering criteria.
	OrderBy []OrderBy

	// Maximum number of results to return.
	// 0 means no limit.
	Limit int

	// Number of results to skip.
	Offset int

	// IncludeCount includes the total count in paginated results.
	IncludeCount bool
}

Query represents a query for read models.

func NewQuery

func NewQuery() *Query

NewQuery creates a new empty Query.

func (*Query) And

func (q *Query) And(field string, op FilterOp, value interface{}) *Query

And is an alias for Where for readability.

func (*Query) Build

func (q *Query) Build() Query

Build returns a copy of the query (useful for chaining).

func (*Query) OrderByAsc

func (q *Query) OrderByAsc(field string) *Query

OrderByAsc adds ascending order.

func (*Query) OrderByDesc

func (q *Query) OrderByDesc(field string) *Query

OrderByDesc adds descending order.

func (*Query) Where

func (q *Query) Where(field string, op FilterOp, value interface{}) *Query

Where adds a filter condition.

func (*Query) WithCount

func (q *Query) WithCount() *Query

WithCount includes total count in results.

func (*Query) WithLimit

func (q *Query) WithLimit(limit int) *Query

WithLimit sets the maximum number of results.

func (*Query) WithOffset

func (q *Query) WithOffset(offset int) *Query

WithOffset sets the number of results to skip.

func (*Query) WithPagination

func (q *Query) WithPagination(page, pageSize int) *Query

WithPagination sets limit and offset for pagination.

type QueryResult

type QueryResult[T any] struct {
	// Items contains the matching read models.
	Items []*T

	// TotalCount is the total number of matching items (before pagination).
	// Only populated if IncludeCount was true in the query.
	TotalCount int64

	// HasMore indicates if there are more results beyond the limit.
	HasMore bool
}

QueryResult contains query results with optional count.

type ReadModelID

type ReadModelID interface {
	// GetID returns the read model's unique identifier.
	GetID() string
}

ReadModelID is an interface for read models that can return their ID.

type ReadModelRepository

type ReadModelRepository[T any] interface {
	// Get retrieves a read model by ID.
	// Returns ErrNotFound if not found.
	Get(ctx context.Context, id string) (*T, error)

	// GetMany retrieves multiple read models by their IDs.
	// Missing IDs are silently skipped.
	GetMany(ctx context.Context, ids []string) ([]*T, error)

	// Find queries read models with the given criteria.
	Find(ctx context.Context, query Query) ([]*T, error)

	// FindOne returns the first read model matching the query.
	// Returns ErrNotFound if no match.
	FindOne(ctx context.Context, query Query) (*T, error)

	// Count returns the number of read models matching the query.
	Count(ctx context.Context, query Query) (int64, error)

	// Insert creates a new read model.
	// Returns ErrAlreadyExists if ID already exists.
	Insert(ctx context.Context, model *T) error

	// Update modifies an existing read model.
	// Returns ErrNotFound if not found.
	Update(ctx context.Context, id string, updateFn func(*T)) error

	// Upsert creates or updates a read model.
	Upsert(ctx context.Context, model *T) error

	// Delete removes a read model by ID.
	// Returns ErrNotFound if not found.
	Delete(ctx context.Context, id string) error

	// DeleteMany removes all read models matching the query.
	// Returns the number of deleted models.
	DeleteMany(ctx context.Context, query Query) (int64, error)

	// Clear removes all read models.
	Clear(ctx context.Context) error
}

ReadModelRepository provides generic CRUD operations for read models. T is the read model type.

type RebuildOptions

type RebuildOptions struct {
	// DeleteCheckpoint deletes the existing checkpoint before rebuilding.
	// Default: true
	DeleteCheckpoint bool

	// ClearReadModel calls the projection's Clear method before rebuilding.
	// Only applicable for projections that implement Clearable.
	// Default: true
	ClearReadModel bool

	// ProgressCallback is called periodically with progress updates.
	ProgressCallback ProgressCallback

	// ProgressInterval is how often to call the progress callback.
	// Default: 1 second
	ProgressInterval time.Duration

	// FromPosition starts rebuilding from a specific position.
	// Default: 0 (from beginning)
	FromPosition uint64

	// ToPosition stops rebuilding at a specific position.
	// Default: 0 (to end)
	ToPosition uint64
}

RebuildOptions configures a projection rebuild.

func DefaultRebuildOptions

func DefaultRebuildOptions() RebuildOptions

DefaultRebuildOptions returns the default rebuild options.

type RebuildProgress

type RebuildProgress struct {
	// ProjectionName is the name of the projection being rebuilt.
	ProjectionName string

	// TotalEvents is the total number of events to process.
	TotalEvents uint64

	// ProcessedEvents is the number of events processed so far.
	ProcessedEvents uint64

	// CurrentPosition is the current global position.
	CurrentPosition uint64

	// StartedAt is when the rebuild started.
	StartedAt time.Time

	// Duration is the elapsed time.
	Duration time.Duration

	// EventsPerSecond is the processing rate.
	EventsPerSecond float64

	// EstimatedRemaining is the estimated time remaining.
	EstimatedRemaining time.Duration

	// Completed indicates if the rebuild is complete.
	Completed bool

	// Error contains any error that occurred.
	Error error
}

RebuildProgress tracks the progress of a projection rebuild.

type RetryConfig

type RetryConfig struct {
	// MaxAttempts is the maximum number of attempts (including the first one).
	MaxAttempts int

	// InitialDelay is the initial delay between retries.
	InitialDelay time.Duration

	// MaxDelay is the maximum delay between retries.
	MaxDelay time.Duration

	// Multiplier is the factor by which the delay increases on each retry.
	Multiplier float64

	// ShouldRetry determines if an error should be retried.
	// If nil, all errors are retried.
	ShouldRetry func(err error) bool
}

RetryMiddleware retries failed commands.

func DefaultRetryConfig

func DefaultRetryConfig() RetryConfig

DefaultRetryConfig returns a default retry configuration.

type RetryPolicy

type RetryPolicy interface {
	// ShouldRetry returns true if the operation should be retried.
	ShouldRetry(attempt int, err error) bool

	// Delay returns the duration to wait before the next retry.
	Delay(attempt int) time.Duration
}

RetryPolicy defines how to handle retries for failed operations.

func ExponentialBackoffRetry

func ExponentialBackoffRetry(maxRetries int, baseDelay, maxDelay time.Duration) RetryPolicy

ExponentialBackoffRetry creates a new retry policy with exponential backoff.

func NoRetry

func NoRetry() RetryPolicy

NoRetry returns a retry policy that never retries.

type Saga

type Saga interface {
	// SagaID returns the unique identifier for this saga instance.
	SagaID() string

	// SagaType returns the type of this saga (e.g., "OrderFulfillment").
	SagaType() string

	// Status returns the current status of the saga.
	Status() SagaStatus

	// SetStatus sets the saga status.
	SetStatus(status SagaStatus)

	// CurrentStep returns the current step number (0-based).
	CurrentStep() int

	// SetCurrentStep sets the current step number.
	SetCurrentStep(step int)

	// CorrelationID returns the correlation ID for this saga.
	// Used to correlate events to this saga instance.
	CorrelationID() string

	// SetCorrelationID sets the correlation ID.
	SetCorrelationID(id string)

	// HandledEvents returns the list of event types this saga handles.
	HandledEvents() []string

	// HandleEvent processes an event and returns commands to dispatch.
	// The returned commands will be executed by the saga manager.
	HandleEvent(ctx context.Context, event StoredEvent) ([]Command, error)

	// Compensate is called when the saga needs to rollback.
	// It returns compensating commands to undo previous steps.
	Compensate(ctx context.Context, failedStep int, failureReason error) ([]Command, error)

	// IsComplete returns true if the saga has completed successfully.
	IsComplete() bool

	// StartedAt returns when the saga started.
	StartedAt() time.Time

	// SetStartedAt sets when the saga started.
	SetStartedAt(t time.Time)

	// CompletedAt returns when the saga completed (nil if not completed).
	CompletedAt() *time.Time

	// SetCompletedAt sets when the saga completed.
	SetCompletedAt(t *time.Time)

	// Data returns the saga's internal state as a map.
	// This is serialized and stored in the saga store.
	Data() map[string]interface{}

	// SetData restores the saga's internal state from a map.
	SetData(data map[string]interface{})

	// Version returns the saga version for optimistic concurrency.
	Version() int64

	// SetVersion sets the saga version.
	SetVersion(v int64)

	// IncrementVersion increments the saga version.
	IncrementVersion()
}

Saga defines the interface for saga implementations. A saga coordinates long-running business processes across multiple aggregates.

type SagaBase

type SagaBase struct {
	// contains filtered or unexported fields
}

SagaBase provides a default partial implementation of the Saga interface. Embed this struct in your saga types to get default behavior.

func NewSagaBase

func NewSagaBase(id, sagaType string) SagaBase

NewSagaBase creates a new SagaBase with the given ID and type.

func (*SagaBase) Complete

func (s *SagaBase) Complete()

Complete marks the saga as completed.

func (*SagaBase) CompletedAt

func (s *SagaBase) CompletedAt() *time.Time

CompletedAt returns when the saga completed.

func (*SagaBase) CorrelationID

func (s *SagaBase) CorrelationID() string

CorrelationID returns the correlation ID.

func (*SagaBase) CurrentStep

func (s *SagaBase) CurrentStep() int

CurrentStep returns the current step number.

func (*SagaBase) Fail

func (s *SagaBase) Fail()

Fail marks the saga as failed.

func (*SagaBase) IncrementVersion

func (s *SagaBase) IncrementVersion()

IncrementVersion increments the saga version.

func (*SagaBase) MarkCompensated

func (s *SagaBase) MarkCompensated()

MarkCompensated marks the saga as compensated.

func (*SagaBase) SagaID

func (s *SagaBase) SagaID() string

SagaID returns the saga's unique identifier.

func (*SagaBase) SagaType

func (s *SagaBase) SagaType() string

SagaType returns the saga type.

func (*SagaBase) SetCompletedAt

func (s *SagaBase) SetCompletedAt(t *time.Time)

SetCompletedAt sets when the saga completed.

func (*SagaBase) SetCorrelationID

func (s *SagaBase) SetCorrelationID(id string)

SetCorrelationID sets the correlation ID.

func (*SagaBase) SetCurrentStep

func (s *SagaBase) SetCurrentStep(step int)

SetCurrentStep sets the current step number.

func (*SagaBase) SetID

func (s *SagaBase) SetID(id string)

SetID sets the saga's ID.

func (*SagaBase) SetStartedAt

func (s *SagaBase) SetStartedAt(t time.Time)

SetStartedAt sets when the saga started.

func (*SagaBase) SetStatus

func (s *SagaBase) SetStatus(status SagaStatus)

SetStatus sets the saga status.

func (*SagaBase) SetType

func (s *SagaBase) SetType(t string)

SetType sets the saga type.

func (*SagaBase) SetVersion

func (s *SagaBase) SetVersion(v int64)

SetVersion sets the saga version.

func (*SagaBase) StartCompensation

func (s *SagaBase) StartCompensation()

StartCompensation marks the saga as compensating.

func (*SagaBase) StartedAt

func (s *SagaBase) StartedAt() time.Time

StartedAt returns when the saga started.

func (*SagaBase) Status

func (s *SagaBase) Status() SagaStatus

Status returns the current status.

func (*SagaBase) Version

func (s *SagaBase) Version() int64

Version returns the saga version.

type SagaCorrelation

type SagaCorrelation struct {
	// SagaType is the type of saga this correlation applies to.
	SagaType string

	// EventTypes are the event types that can start this saga.
	StartingEvents []string

	// CorrelationIDFunc extracts the correlation ID from an event.
	// This is used to find existing sagas or create new ones.
	CorrelationIDFunc func(event StoredEvent) string
}

SagaCorrelation provides strategies for correlating events to sagas.

type SagaFactory

type SagaFactory func(id string) Saga

SagaFactory creates new saga instances.

type SagaFailedError

type SagaFailedError struct {
	SagaID      string
	SagaType    string
	FailedStep  int
	Reason      string
	Recoverable bool
}

SagaFailedError provides detailed information about a saga failure.

func NewSagaFailedError

func NewSagaFailedError(sagaID, sagaType string, failedStep int, reason string, recoverable bool) *SagaFailedError

NewSagaFailedError creates a new SagaFailedError.

func (*SagaFailedError) Error

func (e *SagaFailedError) Error() string

Error returns the error message.

func (*SagaFailedError) Is

func (e *SagaFailedError) Is(target error) bool

Is reports whether this error matches the target error.

type SagaManager

type SagaManager struct {
	// contains filtered or unexported fields
}

SagaManager orchestrates saga lifecycle and event processing. It subscribes to events, routes them to appropriate sagas, and dispatches resulting commands.

Concurrency and Idempotency

SagaManager provides several mechanisms to ensure correct saga processing under concurrent access:

  1. Per-Saga Locking: Each saga ID has an associated mutex that serializes access. This prevents race conditions when the same event is delivered from multiple sources (e.g., pg_notify + polling) or when multiple events for the same saga arrive simultaneously.

  2. Fresh State Loading: Before processing each event, the saga state is loaded fresh from the store. This ensures terminal status checks and idempotency checks see the latest state.

  3. Event Idempotency: Processed events are tracked in the SagaState.ProcessedEvents field (not in the saga's Data map) to prevent duplicate processing on retries. This is handled transparently by the SagaManager - saga implementations don't need to preserve these internal tracking fields.

  4. Optimistic Concurrency: The saga store uses version-based optimistic concurrency control. On conflict, the event is retried with fresh state.

func NewSagaManager

func NewSagaManager(eventStore *EventStore, opts ...SagaManagerOption) *SagaManager

NewSagaManager creates a new SagaManager.

func (*SagaManager) FindSagaByCorrelationID

func (m *SagaManager) FindSagaByCorrelationID(ctx context.Context, correlationID string) (*SagaState, error)

FindSagaByCorrelationID finds a saga by its correlation ID.

func (*SagaManager) GetSaga

func (m *SagaManager) GetSaga(ctx context.Context, sagaID string) (*SagaState, error)

GetSaga retrieves a saga by its ID.

func (*SagaManager) IsRunning

func (m *SagaManager) IsRunning() bool

IsRunning returns true if the saga manager is running.

func (*SagaManager) Position

func (m *SagaManager) Position() uint64

Position returns the current event position.

func (*SagaManager) ProcessEvent

func (m *SagaManager) ProcessEvent(ctx context.Context, event StoredEvent) error

ProcessEvent manually processes a single event (for testing or manual replay).

func (*SagaManager) Register

func (m *SagaManager) Register(sagaType string, factory SagaFactory, correlation SagaCorrelation)

Register registers a saga type with its factory and correlation configuration.

func (*SagaManager) RegisterSimple

func (m *SagaManager) RegisterSimple(sagaType string, factory SagaFactory, startingEvents ...string)

RegisterSimple registers a saga with a simple correlation based on event stream ID.

func (*SagaManager) SetPosition

func (m *SagaManager) SetPosition(pos uint64)

SetPosition sets the starting position for event processing.

func (*SagaManager) Start

func (m *SagaManager) Start(ctx context.Context) error

Start begins processing events and routing them to sagas. This method blocks until the context is cancelled.

func (*SagaManager) StartAsync

func (m *SagaManager) StartAsync(ctx context.Context) *AsyncResult

StartAsync begins processing events and routing them to sagas in a background goroutine. It returns immediately with an AsyncResult that can be used to:

  • Wait for the saga manager to stop: result.Wait()
  • Wait with timeout: result.WaitWithTimeout(5 * time.Second)
  • Check if stopped: result.IsComplete()
  • Cancel the manager: result.Cancel()
  • Get the error: result.Err()

The saga manager will continue processing until:

  • The provided context is cancelled
  • result.Cancel() is called
  • An unrecoverable error occurs

Example:

result := manager.StartAsync(ctx)

// Do other work while saga manager runs in background...

// Later, when shutting down:
result.Cancel()
if err := result.WaitWithTimeout(10 * time.Second); err != nil {
    log.Printf("Saga manager shutdown: %v", err)
}

func (*SagaManager) StartSaga

func (m *SagaManager) StartSaga(ctx context.Context, sagaType string, triggerEvent StoredEvent) error

StartSaga manually triggers a new saga instance synchronously. The saga will be created and the trigger event will be processed immediately.

This is useful when you want to start a saga based on an external trigger rather than waiting for an event to flow through the event store subscription.

func (*SagaManager) StartSagaAsync

func (m *SagaManager) StartSagaAsync(ctx context.Context, sagaType string, triggerEvent StoredEvent) *AsyncResult

StartSagaAsync manually triggers a new saga instance asynchronously. The saga will be started and the first event will be processed in a background goroutine. Returns an AsyncResult that can be used to wait for the initial processing to complete.

This is useful when you want to start a saga based on an external trigger rather than waiting for an event to flow through the event store subscription.

Example:

result := manager.StartSagaAsync(ctx, "OrderFulfillment", initialEvent)
if err := result.WaitWithTimeout(5 * time.Second); err != nil {
    log.Printf("Failed to start saga: %v", err)
}

func (*SagaManager) Stop

func (m *SagaManager) Stop()

Stop gracefully stops the saga manager.

type SagaManagerOption

type SagaManagerOption func(*SagaManager)

SagaManagerOption configures a SagaManager.

func WithCommandBus

func WithCommandBus(bus *CommandBus) SagaManagerOption

WithCommandBus sets the command bus for dispatching commands.

func WithSagaLogger

func WithSagaLogger(logger Logger) SagaManagerOption

WithSagaLogger sets the logger.

func WithSagaPollInterval

func WithSagaPollInterval(d time.Duration) SagaManagerOption

WithSagaPollInterval sets the polling interval for event subscription.

func WithSagaRetryAttempts

func WithSagaRetryAttempts(attempts int) SagaManagerOption

WithSagaRetryAttempts sets the number of retry attempts for failed commands.

func WithSagaRetryDelay

func WithSagaRetryDelay(d time.Duration) SagaManagerOption

WithSagaRetryDelay sets the delay between retry attempts.

func WithSagaSerializer

func WithSagaSerializer(serializer Serializer) SagaManagerOption

WithSagaSerializer sets the serializer for saga data.

func WithSagaStore

func WithSagaStore(store SagaStore) SagaManagerOption

WithSagaStore sets the saga store.

type SagaNotFoundError

type SagaNotFoundError = adapters.SagaNotFoundError

SagaNotFoundError provides detailed information about a missing saga. This is a type alias to adapters.SagaNotFoundError for consistency.

type SagaState

type SagaState = adapters.SagaState

SagaState represents the persisted state of a saga.

func SagaStateFromJSON

func SagaStateFromJSON(data []byte) (*SagaState, error)

SagaStateFromJSON parses saga state from JSON.

type SagaStatus

type SagaStatus = adapters.SagaStatus

SagaStatus represents the current status of a saga.

type SagaStep

type SagaStep = adapters.SagaStep

SagaStep represents a single step in a saga.

type SagaStepStatus

type SagaStepStatus = adapters.SagaStepStatus

SagaStepStatus represents the status of a saga step.

type SagaStore

type SagaStore = adapters.SagaStore

SagaStore defines the interface for saga persistence.

type SchemaCompatibility

type SchemaCompatibility int

SchemaCompatibility represents the level of backward compatibility between schema versions.

const (
	// SchemaFullyCompatible indicates the schema change is fully backward and forward compatible.
	// No fields were added, removed, or changed — only documentation or ordering changes.
	SchemaFullyCompatible SchemaCompatibility = iota

	// SchemaBackwardCompatible indicates old data can be read by new code.
	// Fields may have been added (with defaults) but none removed or changed.
	SchemaBackwardCompatible

	// SchemaForwardCompatible indicates new data can be read by old code.
	// Fields may have been removed but none added or changed.
	SchemaForwardCompatible

	// SchemaBreaking indicates the schema change breaks compatibility.
	// Fields were changed, renamed, or removed without migration support.
	SchemaBreaking
)

func (SchemaCompatibility) String

func (c SchemaCompatibility) String() string

String returns a human-readable name for the compatibility level.

type SchemaDefinition

type SchemaDefinition struct {
	// Version is the schema version number.
	Version int

	// Fields describes the fields in this schema version.
	Fields []FieldDefinition

	// JSONSchema is an optional JSON Schema document for validation.
	JSONSchema json.RawMessage

	// RegisteredAt is when this schema version was registered.
	RegisteredAt time.Time
}

SchemaDefinition describes the schema for a specific version of an event type.

type SchemaRegistry

type SchemaRegistry struct {
	// contains filtered or unexported fields
}

SchemaRegistry is an in-memory registry that tracks event schemas and their versions. It provides compatibility checking between schema versions.

func NewSchemaRegistry

func NewSchemaRegistry() *SchemaRegistry

NewSchemaRegistry creates a new empty SchemaRegistry.

func (*SchemaRegistry) CheckCompatibility

func (r *SchemaRegistry) CheckCompatibility(eventType string, oldVersion, newVersion int) (SchemaCompatibility, error)

CheckCompatibility determines the compatibility level between two schema versions. It compares the field definitions to classify the change.

func (*SchemaRegistry) GetLatestVersion

func (r *SchemaRegistry) GetLatestVersion(eventType string) (int, error)

GetLatestVersion returns the highest registered schema version for an event type.

func (*SchemaRegistry) GetSchema

func (r *SchemaRegistry) GetSchema(eventType string, version int) (*SchemaDefinition, error)

GetSchema retrieves a specific schema version for an event type.

func (*SchemaRegistry) Register

func (r *SchemaRegistry) Register(eventType string, schema SchemaDefinition) error

Register adds a schema definition for an event type. Returns an error if the version is < 1 or already registered.

func (*SchemaRegistry) RegisteredEventTypes

func (r *SchemaRegistry) RegisteredEventTypes() []string

RegisteredEventTypes returns a sorted list of event types that have schemas registered.

type SchemaVersionGapError

type SchemaVersionGapError struct {
	EventType       string
	MissingVersion  int
	ExpectedVersion int
}

SchemaVersionGapError provides detailed information about a gap in the upcaster chain.

func NewSchemaVersionGapError

func NewSchemaVersionGapError(eventType string, missingVersion, expectedVersion int) *SchemaVersionGapError

NewSchemaVersionGapError creates a new SchemaVersionGapError.

func (*SchemaVersionGapError) Error

func (e *SchemaVersionGapError) Error() string

Error returns the error message.

func (*SchemaVersionGapError) Is

func (e *SchemaVersionGapError) Is(target error) bool

Is reports whether this error matches the target error.

func (*SchemaVersionGapError) Unwrap

func (e *SchemaVersionGapError) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap().

type SerializationError

type SerializationError struct {
	EventType string
	Operation string // "serialize" or "deserialize"
	Cause     error
}

SerializationError provides detailed information about a serialization failure.

func NewSerializationError

func NewSerializationError(eventType, operation string, cause error) *SerializationError

NewSerializationError creates a new SerializationError.

func (*SerializationError) Error

func (e *SerializationError) Error() string

Error returns the error message.

func (*SerializationError) Is

func (e *SerializationError) Is(target error) bool

Is reports whether this error matches the target error.

func (*SerializationError) Unwrap

func (e *SerializationError) Unwrap() error

Unwrap returns the underlying cause for errors.Unwrap().

type Serializer

type Serializer interface {
	// Serialize converts an event to bytes.
	Serialize(event interface{}) ([]byte, error)

	// Deserialize converts bytes back to an event.
	// The eventType is used to determine the target type.
	Deserialize(data []byte, eventType string) (interface{}, error)
}

Serializer handles event payload serialization and deserialization.

type SimpleDispatcher

type SimpleDispatcher struct {
	// contains filtered or unexported fields
}

SimpleDispatcher is a basic dispatcher that forwards commands to handlers.

func NewSimpleDispatcher

func NewSimpleDispatcher(registry *HandlerRegistry) *SimpleDispatcher

NewSimpleDispatcher creates a new SimpleDispatcher.

func (*SimpleDispatcher) Dispatch

func (d *SimpleDispatcher) Dispatch(ctx context.Context, cmd Command) (CommandResult, error)

Dispatch sends a command to its handler.

type StoredEvent

type StoredEvent struct {
	// ID is the globally unique event identifier.
	ID string

	// StreamID identifies the stream this event belongs to.
	StreamID string

	// Type is the event type identifier.
	Type string

	// Data is the serialized event payload.
	Data []byte

	// Metadata contains contextual information.
	Metadata Metadata

	// Version is the position within the stream (1-based).
	Version int64

	// GlobalPosition is the position across all streams.
	GlobalPosition uint64

	// Timestamp is when the event was stored.
	Timestamp time.Time
}

StoredEvent represents a persisted event with all storage metadata.

type StreamID

type StreamID struct {
	// Category represents the aggregate type (e.g., "Order", "Customer").
	Category string

	// ID is the unique identifier within the category (e.g., "order-123").
	ID string
}

StreamID uniquely identifies an event stream. It consists of a category (aggregate type) and an instance ID.

func NewStreamID

func NewStreamID(category, id string) StreamID

NewStreamID creates a new StreamID from category and ID.

func ParseStreamID

func ParseStreamID(s string) (StreamID, error)

ParseStreamID parses a stream ID string in the format "Category-ID". Returns an error if the format is invalid.

func (StreamID) IsZero

func (s StreamID) IsZero() bool

IsZero reports whether the StreamID is empty.

func (StreamID) String

func (s StreamID) String() string

String returns the stream ID as "Category-ID".

func (StreamID) Validate

func (s StreamID) Validate() error

Validate checks if the StreamID is valid.

type StreamInfo

type StreamInfo struct {
	// StreamID is the stream identifier.
	StreamID string

	// Category is the stream category (aggregate type).
	Category string

	// Version is the current stream version.
	Version int64

	// EventCount is the number of events in the stream.
	EventCount int64

	// CreatedAt is when the stream was created.
	CreatedAt time.Time

	// UpdatedAt is when the stream was last modified.
	UpdatedAt time.Time
}

StreamInfo contains metadata about an event stream.

type StreamNotFoundError

type StreamNotFoundError struct {
	StreamID string
}

StreamNotFoundError provides detailed information about a missing stream.

func NewStreamNotFoundError

func NewStreamNotFoundError(streamID string) *StreamNotFoundError

NewStreamNotFoundError creates a new StreamNotFoundError.

func (*StreamNotFoundError) Error

func (e *StreamNotFoundError) Error() string

Error returns the error message.

func (*StreamNotFoundError) Is

func (e *StreamNotFoundError) Is(target error) bool

Is reports whether this error matches the target error.

func (*StreamNotFoundError) Unwrap

func (e *StreamNotFoundError) Unwrap() error

Unwrap returns the underlying error for errors.Unwrap().

type Subscription

type Subscription interface {
	// Events returns the channel for receiving events.
	Events() <-chan StoredEvent

	// Close stops the subscription.
	Close() error

	// Err returns any error that caused the subscription to close.
	Err() error
}

Subscription represents an active event subscription.

type SubscriptionAdapter

type SubscriptionAdapter interface {
	// LoadFromPosition loads events starting from a global position.
	LoadFromPosition(ctx context.Context, fromPosition uint64, limit int) ([]StoredEvent, error)

	// SubscribeAll subscribes to all events across all streams.
	SubscribeAll(ctx context.Context, fromPosition uint64) (<-chan StoredEvent, error)

	// SubscribeStream subscribes to events from a specific stream.
	SubscribeStream(ctx context.Context, streamID string, fromVersion int64) (<-chan StoredEvent, error)

	// SubscribeCategory subscribes to all events from streams in a category.
	SubscribeCategory(ctx context.Context, category string, fromPosition uint64) (<-chan StoredEvent, error)
}

SubscriptionAdapter provides methods for subscribing to event streams. This interface extends the basic EventStoreAdapter for subscription capabilities.

type SubscriptionOptions

type SubscriptionOptions struct {
	// BufferSize is the size of the event channel buffer.
	// Default: 256
	BufferSize int

	// Filter optionally filters which events are delivered.
	Filter EventFilter

	// RetryOnError determines whether to retry on transient errors.
	// Default: true
	RetryOnError bool

	// RetryInterval is the time to wait between retries.
	// Default: 1 second
	RetryInterval time.Duration

	// MaxRetries is the maximum number of retry attempts.
	// Default: 5
	MaxRetries int
}

SubscriptionOptions configures a subscription.

func DefaultSubscriptionOptions

func DefaultSubscriptionOptions() SubscriptionOptions

DefaultSubscriptionOptions returns the default subscription options.

type UpcastError

type UpcastError struct {
	EventType   string
	FromVersion int
	ToVersion   int
	Cause       error
}

UpcastError provides detailed information about an upcasting failure.

func NewUpcastError

func NewUpcastError(eventType string, fromVersion, toVersion int, cause error) *UpcastError

NewUpcastError creates a new UpcastError.

func (*UpcastError) Error

func (e *UpcastError) Error() string

Error returns the error message.

func (*UpcastError) Is

func (e *UpcastError) Is(target error) bool

Is reports whether this error matches the target error.

func (*UpcastError) Unwrap

func (e *UpcastError) Unwrap() error

Unwrap returns the underlying cause for errors.Unwrap().

type Upcaster

type Upcaster interface {
	// EventType returns the event type this upcaster handles.
	EventType() string

	// FromVersion returns the source schema version.
	FromVersion() int

	// ToVersion returns the target schema version. Must equal FromVersion() + 1.
	ToVersion() int

	// Upcast transforms event data from FromVersion to ToVersion.
	// Metadata is provided as read-only context (e.g., for tenant-specific defaults).
	Upcast(data []byte, metadata Metadata) ([]byte, error)
}

Upcaster transforms event data from one schema version to the next. Upcasters operate on raw bytes and are serializer-agnostic. Each upcaster handles exactly one version transition (FromVersion → ToVersion).

type UpcasterChain

type UpcasterChain struct {
	// contains filtered or unexported fields
}

UpcasterChain is a thread-safe registry of upcasters that applies them in sequence. It validates that there are no gaps or duplicates in the version chain.

func NewUpcasterChain

func NewUpcasterChain() *UpcasterChain

NewUpcasterChain creates a new empty UpcasterChain.

func (*UpcasterChain) HasUpcasters

func (c *UpcasterChain) HasUpcasters(eventType string) bool

HasUpcasters reports whether any upcasters are registered for the given event type.

func (*UpcasterChain) LatestVersion

func (c *UpcasterChain) LatestVersion(eventType string) int

LatestVersion returns the latest schema version for the given event type. Returns DefaultSchemaVersion if no upcasters are registered for the type.

func (*UpcasterChain) Register

func (c *UpcasterChain) Register(u Upcaster) error

Register adds an upcaster to the chain. Returns an error if the upcaster's ToVersion != FromVersion + 1 or if an upcaster for the same event type and version transition already exists.

func (*UpcasterChain) RegisteredEventTypes

func (c *UpcasterChain) RegisteredEventTypes() []string

RegisteredEventTypes returns a sorted list of event types that have upcasters registered.

func (*UpcasterChain) Upcast

func (c *UpcasterChain) Upcast(eventType string, fromVersion int, data []byte, metadata Metadata) ([]byte, int, error)

Upcast transforms event data from fromVersion to the latest version. Returns the transformed data, the final version, and any error. If no upcasters exist for the event type or the data is already at the latest version, the original data is returned unchanged.

func (*UpcasterChain) Validate

func (c *UpcasterChain) Validate() error

Validate checks the entire chain for gaps. For each event type, the upcasters must form a contiguous chain from the lowest FromVersion to the highest ToVersion.

type UpcastingSerializer

type UpcastingSerializer struct {
	// contains filtered or unexported fields
}

UpcastingSerializer is a decorator that wraps any Serializer and applies upcasting during deserialization. Serialization passes through unchanged.

func NewUpcastingSerializer

func NewUpcastingSerializer(inner Serializer, chain *UpcasterChain) *UpcastingSerializer

NewUpcastingSerializer creates a new UpcastingSerializer wrapping the given serializer.

func (*UpcastingSerializer) Chain

func (s *UpcastingSerializer) Chain() *UpcasterChain

Chain returns the upcaster chain.

func (*UpcastingSerializer) Deserialize

func (s *UpcastingSerializer) Deserialize(data []byte, eventType string) (interface{}, error)

Deserialize converts bytes back to an event. If upcasters are registered for the event type, the data is upcasted from DefaultSchemaVersion before deserialization.

func (*UpcastingSerializer) DeserializeWithVersion

func (s *UpcastingSerializer) DeserializeWithVersion(data []byte, eventType string, schemaVersion int, metadata Metadata) (interface{}, error)

DeserializeWithVersion converts bytes back to an event, upcasting from the specified schema version. Metadata is provided as read-only context to upcasters.

func (*UpcastingSerializer) Inner

func (s *UpcastingSerializer) Inner() Serializer

Inner returns the wrapped serializer.

func (*UpcastingSerializer) Serialize

func (s *UpcastingSerializer) Serialize(event interface{}) ([]byte, error)

Serialize converts an event to bytes. Pass-through to the inner serializer.

type ValidationError

type ValidationError struct {
	// CommandType is the type of command that failed validation.
	CommandType string

	// Field is the field that failed validation (optional).
	Field string

	// Message describes the validation failure.
	Message string

	// Cause is the underlying error (optional).
	Cause error
}

ValidationError represents a command validation failure.

func NewValidationError

func NewValidationError(cmdType, field, message string) *ValidationError

NewValidationError creates a new ValidationError.

func NewValidationErrorWithCause

func NewValidationErrorWithCause(cmdType, field, message string, cause error) *ValidationError

NewValidationErrorWithCause creates a new ValidationError with an underlying cause.

func (*ValidationError) Error

func (e *ValidationError) Error() string

Error returns the error message.

func (*ValidationError) Is

func (e *ValidationError) Is(target error) bool

Is reports whether this error matches the target error.

func (*ValidationError) Unwrap

func (e *ValidationError) Unwrap() error

Unwrap returns the underlying cause for errors.Unwrap().

type Validator

type Validator interface {
	// Validate validates a command and returns validation errors.
	Validate(cmd Command) error
}

Validator provides command validation functionality.

type ValidatorFunc

type ValidatorFunc func(cmd Command) error

ValidatorFunc is a function that implements Validator.

func (ValidatorFunc) Validate

func (f ValidatorFunc) Validate(cmd Command) error

Validate implements Validator.

type VersionSetter

type VersionSetter interface {
	SetVersion(v int64)
}

VersionSetter is an optional interface that aggregates can implement to allow the EventStore to set their version during loading. This is used for optimistic concurrency control in SaveAggregate. AggregateBase implements this interface.

type VersionedAggregate

type VersionedAggregate interface {
	Aggregate

	// OriginalVersion returns the version when the aggregate was loaded.
	OriginalVersion() int64
}

VersionedAggregate provides versioning information for optimistic concurrency.

Directories

Path Synopsis
Package adapters provides interfaces for event store backends.
Package adapters provides interfaces for event store backends.
memory
Package memory provides an in-memory implementation of the event store adapter.
Package memory provides an in-memory implementation of the event store adapter.
postgres
Package postgres provides a PostgreSQL implementation of the event store adapter.
Package postgres provides a PostgreSQL implementation of the event store adapter.
cli
commands
Package commands provides the CLI command implementations for mink.
Package commands provides the CLI command implementations for mink.
config
Package config provides configuration management for the mink CLI.
Package config provides configuration management for the mink CLI.
styles
Package styles provides consistent styling for the go-mink CLI.
Package styles provides consistent styling for the go-mink CLI.
ui
Package ui provides reusable UI components for the go-mink CLI.
Package ui provides reusable UI components for the go-mink CLI.
cmd
mink command
mink is the command-line interface for the go-mink event sourcing library.
mink is the command-line interface for the go-mink event sourcing library.
Package encryption defines interfaces and types for field-level encryption in event sourcing.
Package encryption defines interfaces and types for field-level encryption in event sourcing.
kms
Package kms provides an AWS KMS encryption provider for field-level encryption.
Package kms provides an AWS KMS encryption provider for field-level encryption.
local
Package local provides an in-memory AES-256-GCM encryption provider for testing.
Package local provides an in-memory AES-256-GCM encryption provider for testing.
providertest
Package providertest provides shared test helpers for encryption.Provider implementations.
Package providertest provides shared test helpers for encryption.Provider implementations.
vault
Package vault provides a HashiCorp Vault Transit encryption provider for field-level encryption.
Package vault provides a HashiCorp Vault Transit encryption provider for field-level encryption.
examples
basic command
Package main demonstrates the basic usage of go-mink for event sourcing.
Package main demonstrates the basic usage of go-mink for event sourcing.
cqrs command
Package main demonstrates the CQRS and Command Bus features of go-mink (Phase 2).
Package main demonstrates the CQRS and Command Bus features of go-mink (Phase 2).
cqrs-postgres command
Package main demonstrates Phase 2 CQRS & Command Bus features with PostgreSQL.
Package main demonstrates Phase 2 CQRS & Command Bus features with PostgreSQL.
encryption command
Package main demonstrates field-level encryption in go-mink.
Package main demonstrates field-level encryption in go-mink.
export command
Package main demonstrates GDPR data export (right to access / data portability) in go-mink.
Package main demonstrates GDPR data export (right to access / data portability) in go-mink.
full-ecommerce command
Package main demonstrates a complete e-commerce order fulfillment system using go-mink.
Package main demonstrates a complete e-commerce order fulfillment system using go-mink.
metrics command
Example: Metrics Middleware
Example: Metrics Middleware
msgpack command
Example: MessagePack Serializer
Example: MessagePack Serializer
projections command
Package main demonstrates the projection and read model features of go-mink.
Package main demonstrates the projection and read model features of go-mink.
protobuf command
Example: Protocol Buffers Serializer
Example: Protocol Buffers Serializer
sagas command
Example: Saga (Process Manager) Pattern
Example: Saga (Process Manager) Pattern
tracing command
Example: Distributed Tracing with OpenTelemetry
Example: Distributed Tracing with OpenTelemetry
versioning command
Package main demonstrates event versioning and upcasting in go-mink.
Package main demonstrates event versioning and upcasting in go-mink.
middleware
metrics
Package metrics provides Prometheus metrics integration for mink.
Package metrics provides Prometheus metrics integration for mink.
tracing
Package tracing provides OpenTelemetry integration for mink.
Package tracing provides OpenTelemetry integration for mink.
outbox
kafka
Package kafka provides a Kafka publisher for the outbox pattern.
Package kafka provides a Kafka publisher for the outbox pattern.
sns
Package sns provides an AWS SNS publisher for the outbox pattern.
Package sns provides an AWS SNS publisher for the outbox pattern.
webhook
Package webhook provides a webhook publisher for the outbox pattern.
Package webhook provides a webhook publisher for the outbox pattern.
serializer
msgpack
Package msgpack provides a MessagePack serializer implementation for mink.
Package msgpack provides a MessagePack serializer implementation for mink.
protobuf
Package protobuf provides a Protocol Buffers serializer for mink events.
Package protobuf provides a Protocol Buffers serializer for mink events.
testing
assertions
Package assertions provides event assertion utilities for testing event-sourced systems.
Package assertions provides event assertion utilities for testing event-sourced systems.
bdd
Package bdd provides BDD-style test fixtures for event-sourced aggregates.
Package bdd provides BDD-style test fixtures for event-sourced aggregates.
benchmarks
Package benchmarks provides a shared benchmark suite for EventStoreAdapter implementations.
Package benchmarks provides a shared benchmark suite for EventStoreAdapter implementations.
containers
Package containers provides test container utilities for integration testing.
Package containers provides test container utilities for integration testing.
projections
Package projections provides testing utilities for projection development.
Package projections provides testing utilities for projection development.
sagas
Package sagas provides testing utilities for saga (process manager) development.
Package sagas provides testing utilities for saga (process manager) development.
testutil
Package testutil provides test utilities and fixtures for go-mink.
Package testutil provides test utilities and fixtures for go-mink.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL