Documentation
¶
Overview ¶
Package projection provides projection processing capabilities.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type HashPartitionStrategy ¶
type HashPartitionStrategy struct{}
HashPartitionStrategy implements deterministic hash-based partitioning. Events are distributed across partitions based on a hash of the aggregate ID. This ensures: - All events for the same aggregate go to the same partition - Even distribution across partitions - Deterministic assignment (same aggregate always goes to same partition)
This strategy enables horizontal scaling of projection processing while maintaining ordering guarantees within each aggregate.
func (HashPartitionStrategy) ShouldProcess ¶
func (HashPartitionStrategy) ShouldProcess(aggregateID string, partitionKey, totalPartitions int) bool
ShouldProcess implements PartitionStrategy using FNV-1a hashing.
type PartitionStrategy ¶
type PartitionStrategy interface {
// ShouldProcess returns true if this projection instance should process the given event.
// aggregateID is the aggregate ID of the event.
// partitionKey identifies this projection instance (e.g., "0" for first of 4 workers).
// totalPartitions is the total number of projection instances.
ShouldProcess(aggregateID string, partitionKey int, totalPartitions int) bool
}
PartitionStrategy defines how events are partitioned across projection instances.
type ProcessorConfig ¶
type ProcessorConfig struct {
// PartitionStrategy determines which events this processor handles
PartitionStrategy PartitionStrategy
// Logger is an optional logger for observability.
// If nil, logging is disabled (zero overhead).
Logger es.Logger
// BatchSize is the number of events to read per batch
BatchSize int
// PartitionKey identifies this processor instance (0-indexed)
PartitionKey int
// TotalPartitions is the total number of processor instances
TotalPartitions int
// PollInterval is the duration to wait when no events are available.
// This prevents tight polling loops that consume excessive CPU.
// A value of 0 means no delay (busy polling - not recommended).
// Default is 100ms, which provides a good balance between latency and CPU usage.
PollInterval time.Duration
// RunMode determines processing behavior.
// Default: RunModeContinuous
RunMode RunMode
}
ProcessorConfig configures a projection processor.
func DefaultProcessorConfig ¶
func DefaultProcessorConfig() ProcessorConfig
DefaultProcessorConfig returns the default configuration.
type ProcessorRunner ¶
type ProcessorRunner interface {
// Run processes events for the given projection until the context is canceled.
// Returns an error if the projection handler fails.
Run(ctx context.Context, projection Projection) error
}
ProcessorRunner is the interface that adapter-specific processors must implement. This allows the Runner to orchestrate projections regardless of the underlying storage implementation (SQL, NoSQL, message brokers, etc.).
type Projection ¶
type Projection interface {
// Name returns the unique name of this projection.
// This name is used for checkpoint tracking.
Name() string
// Handle processes a single event.
// Return an error to stop projection processing.
//
// The tx parameter is the processor's transaction used for checkpoint management.
// SQL projections can use this transaction to ensure atomic updates of both
// the read model and the checkpoint. This eliminates inconsistencies where
// a projection succeeds but the checkpoint update fails (or vice versa).
//
// The transaction will be committed by the processor after Handle returns successfully.
// Projections should NEVER call Commit() or Rollback() on the provided transaction.
//
// For non-SQL projections (Elasticsearch, Redis, message brokers), the tx parameter
// should be ignored and projections should manage their own connections as before.
//
// Event is passed by value to enforce immutability (events are value objects).
// Large data (Payload, Metadata byte slices) share references to their backing arrays,
// so the actual payload/metadata data is not deep-copied.
//
//nolint:gocritic // hugeParam: Intentionally pass by value to enforce immutability
Handle(ctx context.Context, tx *sql.Tx, event es.PersistedEvent) error
}
Projection defines the interface for event projection handlers. Projections are storage-agnostic and can write to any destination (SQL databases, NoSQL stores, message brokers, search engines, etc.).
type RunMode ¶ added in v1.1.0
type RunMode int
RunMode determines how the processor handles event processing.
const ( // RunModeContinuous runs forever, continuously polling for new events. // This is the default mode for production use. RunModeContinuous RunMode = iota // RunModeOneOff processes all available events and exits cleanly. // This mode is useful for: // - Integration tests that need synchronous projection processing // - One-time catch-up operations // - Backfilling projections RunModeOneOff )
type ScopedProjection ¶
type ScopedProjection interface {
Projection
// AggregateTypes returns the list of aggregate types this projection cares about.
// If empty, the projection receives events from all aggregate types (still filtered by BoundedContexts if specified).
// If non-empty, only events matching one of these aggregate types are passed to Handle.
AggregateTypes() []string
// BoundedContexts returns the list of bounded contexts this projection cares about.
// If empty, the projection receives events from all bounded contexts (still filtered by AggregateTypes if specified).
// If non-empty, only events matching one of these bounded contexts are passed to Handle.
BoundedContexts() []string
}
ScopedProjection is an optional interface that projections can implement to filter events by aggregate type and/or bounded context. This is useful for read model projections that only care about specific aggregate types within specific bounded contexts.
By default, projections implementing only the Projection interface receive all events. This ensures that global projections (e.g., integration publishers, audit logs) continue to work without modification.
Example - Read model projection scoped to User aggregate in Identity context:
type UserReadModelProjection struct {}
func (p *UserReadModelProjection) Name() string {
return "user_read_model"
}
func (p *UserReadModelProjection) AggregateTypes() []string {
return []string{"User"}
}
func (p *UserReadModelProjection) BoundedContexts() []string {
return []string{"Identity"}
}
func (p *UserReadModelProjection) Handle(ctx context.Context, tx *sql.Tx, event es.PersistedEvent) error {
// Only receives User aggregate events from Identity bounded context
// Use tx for atomic read model updates with checkpoint
return nil
}
Example - Read model projection scoped to multiple contexts:
type OrderRevenueProjection struct {}
func (p *OrderRevenueProjection) Name() string {
return "order_revenue"
}
func (p *OrderRevenueProjection) AggregateTypes() []string {
return []string{"Order"}
}
func (p *OrderRevenueProjection) BoundedContexts() []string {
return []string{"Sales", "Billing"}
}
func (p *OrderRevenueProjection) Handle(ctx context.Context, tx *sql.Tx, event es.PersistedEvent) error {
// Receives Order events from both Sales and Billing contexts
return nil
}
Example - Global integration publisher:
type WatermillPublisher struct {}
func (p *WatermillPublisher) Name() string {
return "system.integration.watermill.v1"
}
func (p *WatermillPublisher) Handle(ctx context.Context, tx *sql.Tx, event es.PersistedEvent) error {
// Receives ALL events from all contexts for publishing to message broker
// Ignore tx parameter - use message broker client
_ = tx
return nil
}