Documentation
¶
Overview ¶
Package projection provides event projection and materialized view support.
Projections allow deriving read-optimized data models from event streams, supporting eventual consistency and multiple backend implementations (in-memory, BadgerDB).
See also:
- README: https://github.com/c0deZ3R0/go-sync-kit#readme
- Architecture overview: https://github.com/c0deZ3R0/go-sync-kit/blob/main/docs/overview.md
Package projection provides read-model building capabilities for go-sync-kit. It enables CQRS, event sourcing, and offline-first architectures by allowing applications to build deterministic, idempotent projections from event streams.
Package projection provides observability and metrics for projection operations. This package integrates with the main observability system following synckit conventions.
Index ¶
- Constants
- func InitDefaultProjectionMetrics(serviceName string, opts ...ProjectionMetricsOption)
- func RecordProjectionApplied(name string, count int, duration time.Duration, operation string)
- func RecordProjectionError(name string, errorType string)
- func SetProjectionHealth(name string, healthy bool)
- func UpdateProjectionLag(name string, lag time.Duration)
- type OffsetStore
- type ProjectionMetrics
- func (m *ProjectionMetrics) GetProjectionMetrics(projection string) map[string]float64
- func (m *ProjectionMetrics) RecordProjectionError(projection, operation, errorType string)
- func (m *ProjectionMetrics) RecordProjectionOperation(projection, operation string, duration time.Duration, success bool, ...)
- func (m *ProjectionMetrics) Registry() *prometheus.Registry
- func (m *ProjectionMetrics) ResetProjectionMetrics(projection string)
- func (m *ProjectionMetrics) SetProjectionHealth(projection string, healthy bool)
- func (m *ProjectionMetrics) UpdateProjectionLag(projection string, lag time.Duration)
- type ProjectionMetricsOption
- type Projector
- type Runner
- type RunnerOption
Constants ¶
const ( ErrorTypeApply = "apply_error" ErrorTypeOffset = "offset_error" ErrorTypeLoad = "load_error" ErrorTypeTimeout = "timeout_error" ErrorTypeContext = "context_error" )
Error type constants for consistent error reporting
const ( OperationApplySince = "apply_since" OperationApplyBatch = "apply_batch" )
Operation type constants for metrics labeling
Variables ¶
This section is empty.
Functions ¶
func InitDefaultProjectionMetrics ¶
func InitDefaultProjectionMetrics(serviceName string, opts ...ProjectionMetricsOption)
InitDefaultProjectionMetrics initializes the default projection metrics instance. This should be called during application startup.
func RecordProjectionApplied ¶
RecordProjectionApplied records successful projection application using default metrics.
func RecordProjectionError ¶
RecordProjectionError records a projection error using default metrics.
func SetProjectionHealth ¶
SetProjectionHealth manually sets the health status using default metrics.
func UpdateProjectionLag ¶
UpdateProjectionLag updates the lag metric using default metrics.
Types ¶
type OffsetStore ¶
type OffsetStore interface {
// Get retrieves the last applied version for a projection.
// Returns nil if no offset has been stored yet (start from beginning).
Get(ctx context.Context, name string) (synckit.Version, error)
// Set updates the last applied version for a projection.
// This should be called atomically after successfully applying events.
Set(ctx context.Context, name string, v synckit.Version) error
}
OffsetStore persists the last applied authoritative version per projection name. This enables resumable projection processing and idempotent operations.
type ProjectionMetrics ¶
type ProjectionMetrics struct {
// contains filtered or unexported fields
}
ProjectionMetrics holds all projection-related Prometheus metrics. This follows the same pattern as SyncKitMetrics in observability/metrics.
func GetDefaultProjectionMetrics ¶
func GetDefaultProjectionMetrics() *ProjectionMetrics
GetDefaultProjectionMetrics returns the default projection metrics instance.
func NewProjectionMetrics ¶
func NewProjectionMetrics(serviceName string, opts ...ProjectionMetricsOption) *ProjectionMetrics
NewProjectionMetrics creates a new ProjectionMetrics collector following synckit conventions.
func (*ProjectionMetrics) GetProjectionMetrics ¶
func (m *ProjectionMetrics) GetProjectionMetrics(projection string) map[string]float64
GetProjectionMetrics returns placeholder metric values for debugging/testing. This is a simplified implementation for testing purposes. In production, metrics should be accessed via the Prometheus HTTP endpoint.
func (*ProjectionMetrics) RecordProjectionError ¶
func (m *ProjectionMetrics) RecordProjectionError(projection, operation, errorType string)
RecordProjectionError records a projection operation error.
func (*ProjectionMetrics) RecordProjectionOperation ¶
func (m *ProjectionMetrics) RecordProjectionOperation(projection, operation string, duration time.Duration, success bool, eventsProcessed int)
RecordProjectionOperation records metrics for a projection operation.
func (*ProjectionMetrics) Registry ¶
func (m *ProjectionMetrics) Registry() *prometheus.Registry
Registry returns the Prometheus registry containing all projection metrics.
func (*ProjectionMetrics) ResetProjectionMetrics ¶
func (m *ProjectionMetrics) ResetProjectionMetrics(projection string)
ResetProjectionMetrics resets all metrics for a given projection (useful for testing).
func (*ProjectionMetrics) SetProjectionHealth ¶
func (m *ProjectionMetrics) SetProjectionHealth(projection string, healthy bool)
SetProjectionHealth manually sets the health status of a projection.
func (*ProjectionMetrics) UpdateProjectionLag ¶
func (m *ProjectionMetrics) UpdateProjectionLag(projection string, lag time.Duration)
UpdateProjectionLag updates the lag metric.
type ProjectionMetricsOption ¶
type ProjectionMetricsOption func(*ProjectionMetrics)
ProjectionMetricsOption allows for functional configuration of ProjectionMetrics.
func WithProjectionLabels ¶
func WithProjectionLabels(labels prometheus.Labels) ProjectionMetricsOption
WithProjectionLabels adds custom labels to all projection metrics.
func WithProjectionRegistry ¶
func WithProjectionRegistry(registry *prometheus.Registry) ProjectionMetricsOption
WithProjectionRegistry sets a custom Prometheus registry.
type Projector ¶
type Projector interface {
// Name returns a stable identifier used for offset bookkeeping.
// This name should be unique across all projectors in an application.
Name() string
// Apply applies a batch of events to the read model.
// Must be idempotent - applying the same events multiple times should be safe.
// Events are provided in order and should be processed sequentially.
Apply(ctx context.Context, batch []synckit.EventWithVersion) error
}
Projector applies domain changes from events to a read model. Implementations must be idempotent - applying the same events multiple times should produce the same result.
type Runner ¶
type Runner interface {
// ApplySince applies all events since the last saved offset.
// Returns the number of events applied, the last processed version, and any error.
// This method is idempotent and can be called multiple times safely.
ApplySince(ctx context.Context) (applied int, last synckit.Version, err error)
// ApplyBatch applies a specific batch of events directly.
// This is useful for server-side hooks that want to apply events immediately
// after they are committed to storage.
ApplyBatch(ctx context.Context, batch []synckit.EventWithVersion) error
}
Runner coordinates loading events from EventStore since the last offset and applying them via a Projector. It handles batching, error recovery, and progress tracking.
func NewRunner ¶
func NewRunner(store synckit.EventStore, offsets OffsetStore, proj Projector, opts ...RunnerOption) Runner
NewRunner creates a new projection runner with the given components and options. The runner will coordinate loading events from the EventStore since the last offset stored in OffsetStore and applying them via the Projector.
type RunnerOption ¶
type RunnerOption func(*runner)
RunnerOption configures a Runner using the functional options pattern.
func WithBatchSize ¶
func WithBatchSize(n int) RunnerOption
WithBatchSize sets the batch size for processing events. Default is 500 events per batch.
func WithLogger ¶
func WithLogger(logger *slog.Logger) RunnerOption
WithLogger sets a custom structured logger for the runner. If not provided, uses the default logger from the logging package.
func WithMetrics ¶
func WithMetrics(metricsCollector *metrics.SyncKitMetrics) RunnerOption
WithMetrics enables metrics collection using the provided SyncKitMetrics instance. This integrates the runner with the unified observability system.
func WithMetricsEnabled ¶
func WithMetricsEnabled(enabled bool) RunnerOption
WithMetricsEnabled enables basic metrics collection using the legacy system. For new code, prefer WithMetrics() with SyncKitMetrics for better integration.