projection

package
v0.24.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 16, 2025 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package projection provides event projection and materialized view support.

Projections allow deriving read-optimized data models from event streams, supporting eventual consistency and multiple backend implementations (in-memory, BadgerDB).

See also:

Package projection provides read-model building capabilities for go-sync-kit. It enables CQRS, event sourcing, and offline-first architectures by allowing applications to build deterministic, idempotent projections from event streams.

Package projection provides observability and metrics for projection operations. This package integrates with the main observability system following synckit conventions.

Index

Constants

View Source
const (
	ErrorTypeApply   = "apply_error"
	ErrorTypeOffset  = "offset_error"
	ErrorTypeLoad    = "load_error"
	ErrorTypeTimeout = "timeout_error"
	ErrorTypeContext = "context_error"
)

Error type constants for consistent error reporting

View Source
const (
	OperationApplySince = "apply_since"
	OperationApplyBatch = "apply_batch"
)

Operation type constants for metrics labeling

Variables

This section is empty.

Functions

func InitDefaultProjectionMetrics

func InitDefaultProjectionMetrics(serviceName string, opts ...ProjectionMetricsOption)

InitDefaultProjectionMetrics initializes the default projection metrics instance. This should be called during application startup.

func RecordProjectionApplied

func RecordProjectionApplied(name string, count int, duration time.Duration, operation string)

RecordProjectionApplied records successful projection application using default metrics.

func RecordProjectionError

func RecordProjectionError(name string, errorType string)

RecordProjectionError records a projection error using default metrics.

func SetProjectionHealth

func SetProjectionHealth(name string, healthy bool)

SetProjectionHealth manually sets the health status using default metrics.

func UpdateProjectionLag

func UpdateProjectionLag(name string, lag time.Duration)

UpdateProjectionLag updates the lag metric using default metrics.

Types

type OffsetStore

type OffsetStore interface {
	// Get retrieves the last applied version for a projection.
	// Returns nil if no offset has been stored yet (start from beginning).
	Get(ctx context.Context, name string) (synckit.Version, error)

	// Set updates the last applied version for a projection.
	// This should be called atomically after successfully applying events.
	Set(ctx context.Context, name string, v synckit.Version) error
}

OffsetStore persists the last applied authoritative version per projection name. This enables resumable projection processing and idempotent operations.

type ProjectionMetrics

type ProjectionMetrics struct {
	// contains filtered or unexported fields
}

ProjectionMetrics holds all projection-related Prometheus metrics. This follows the same pattern as SyncKitMetrics in observability/metrics.

func GetDefaultProjectionMetrics

func GetDefaultProjectionMetrics() *ProjectionMetrics

GetDefaultProjectionMetrics returns the default projection metrics instance.

func NewProjectionMetrics

func NewProjectionMetrics(serviceName string, opts ...ProjectionMetricsOption) *ProjectionMetrics

NewProjectionMetrics creates a new ProjectionMetrics collector following synckit conventions.

func (*ProjectionMetrics) GetProjectionMetrics

func (m *ProjectionMetrics) GetProjectionMetrics(projection string) map[string]float64

GetProjectionMetrics returns placeholder metric values for debugging/testing. This is a simplified implementation for testing purposes. In production, metrics should be accessed via the Prometheus HTTP endpoint.

func (*ProjectionMetrics) RecordProjectionError

func (m *ProjectionMetrics) RecordProjectionError(projection, operation, errorType string)

RecordProjectionError records a projection operation error.

func (*ProjectionMetrics) RecordProjectionOperation

func (m *ProjectionMetrics) RecordProjectionOperation(projection, operation string, duration time.Duration, success bool, eventsProcessed int)

RecordProjectionOperation records metrics for a projection operation.

func (*ProjectionMetrics) Registry

func (m *ProjectionMetrics) Registry() *prometheus.Registry

Registry returns the Prometheus registry containing all projection metrics.

func (*ProjectionMetrics) ResetProjectionMetrics

func (m *ProjectionMetrics) ResetProjectionMetrics(projection string)

ResetProjectionMetrics resets all metrics for a given projection (useful for testing).

func (*ProjectionMetrics) SetProjectionHealth

func (m *ProjectionMetrics) SetProjectionHealth(projection string, healthy bool)

SetProjectionHealth manually sets the health status of a projection.

func (*ProjectionMetrics) UpdateProjectionLag

func (m *ProjectionMetrics) UpdateProjectionLag(projection string, lag time.Duration)

UpdateProjectionLag updates the lag metric.

type ProjectionMetricsOption

type ProjectionMetricsOption func(*ProjectionMetrics)

ProjectionMetricsOption allows for functional configuration of ProjectionMetrics.

func WithProjectionLabels

func WithProjectionLabels(labels prometheus.Labels) ProjectionMetricsOption

WithProjectionLabels adds custom labels to all projection metrics.

func WithProjectionRegistry

func WithProjectionRegistry(registry *prometheus.Registry) ProjectionMetricsOption

WithProjectionRegistry sets a custom Prometheus registry.

type Projector

type Projector interface {
	// Name returns a stable identifier used for offset bookkeeping.
	// This name should be unique across all projectors in an application.
	Name() string

	// Apply applies a batch of events to the read model.
	// Must be idempotent - applying the same events multiple times should be safe.
	// Events are provided in order and should be processed sequentially.
	Apply(ctx context.Context, batch []synckit.EventWithVersion) error
}

Projector applies domain changes from events to a read model. Implementations must be idempotent - applying the same events multiple times should produce the same result.

type Runner

type Runner interface {
	// ApplySince applies all events since the last saved offset.
	// Returns the number of events applied, the last processed version, and any error.
	// This method is idempotent and can be called multiple times safely.
	ApplySince(ctx context.Context) (applied int, last synckit.Version, err error)

	// ApplyBatch applies a specific batch of events directly.
	// This is useful for server-side hooks that want to apply events immediately
	// after they are committed to storage.
	ApplyBatch(ctx context.Context, batch []synckit.EventWithVersion) error
}

Runner coordinates loading events from EventStore since the last offset and applying them via a Projector. It handles batching, error recovery, and progress tracking.

func NewRunner

func NewRunner(store synckit.EventStore, offsets OffsetStore, proj Projector, opts ...RunnerOption) Runner

NewRunner creates a new projection runner with the given components and options. The runner will coordinate loading events from the EventStore since the last offset stored in OffsetStore and applying them via the Projector.

type RunnerOption

type RunnerOption func(*runner)

RunnerOption configures a Runner using the functional options pattern.

func WithBatchSize

func WithBatchSize(n int) RunnerOption

WithBatchSize sets the batch size for processing events. Default is 500 events per batch.

func WithLogger

func WithLogger(logger *slog.Logger) RunnerOption

WithLogger sets a custom structured logger for the runner. If not provided, uses the default logger from the logging package.

func WithMetrics

func WithMetrics(metricsCollector *metrics.SyncKitMetrics) RunnerOption

WithMetrics enables metrics collection using the provided SyncKitMetrics instance. This integrates the runner with the unified observability system.

func WithMetricsEnabled

func WithMetricsEnabled(enabled bool) RunnerOption

WithMetricsEnabled enables basic metrics collection using the legacy system. For new code, prefer WithMetrics() with SyncKitMetrics for better integration.

Directories

Path Synopsis
Package badger provides a BadgerDB-backed projection store.
Package badger provides a BadgerDB-backed projection store.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL