eventstore

package
v0.0.76 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 25, 2025 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrEventTypeRequired = fmt.Errorf("event type is required")

ErrEventTypeRequired is returned when an event type is required but not provided

View Source
var ErrNotFound = errors.New("not found")

Functions

func InitSchema

func InitSchema(ctx context.Context, exec libdbexec.Exec) error

InitSchema creates the main events table and initial partitions

Types

type Event

type Event struct {
	ID            string          `json:"id" example:"event-uuid"`
	NID           int64           `json:"nid" example:"1"`
	CreatedAt     time.Time       `json:"created_at" example:"2023-01-01T00:00:00Z"`
	EventType     string          `json:"event_type" example:"github.pull_request"`
	EventSource   string          `json:"event_source" example:"github.com"`
	AggregateID   string          `json:"aggregate_id" example:"aggregate-uuid"`
	AggregateType string          `json:"aggregate_type" example:"github.webhook"`
	Version       int             `json:"version" example:"1"`
	Data          json.RawMessage `json:"data" example:"{}"`
	Metadata      json.RawMessage `json:"metadata" example:"{}"`
}

Event represents a stored event without exposing partition details

type MappingConfig added in v0.0.75

type MappingConfig struct {
	Path          string `json:"path"`
	EventType     string `json:"eventType"`
	EventSource   string `json:"eventSource"`
	AggregateType string `json:"aggregateType"`
	// Extract aggregate ID from payload using JSON path or field name
	AggregateIDField   string `json:"aggregateIDField"`
	AggregateTypeField string `json:"aggregateTypeField"`
	EventTypeField     string `json:"eventTypeField"`
	EventSourceField   string `json:"eventSourceField"`
	EventIDField       string `json:"eventIDField"`
	// Fixed version or field to extract from
	Version int `json:"version"`
	// Metadata fields to extract from headers/payload
	MetadataMapping map[string]string `json:"metadataMapping"`
}

type Store

type Store interface {
	AppendEvent(ctx context.Context, event *Event) error
	GetEventsByAggregate(ctx context.Context, eventType string, from, to time.Time, aggregateType, aggregateID string, limit int) ([]Event, error)
	GetEventsByType(ctx context.Context, eventType string, from, to time.Time, limit int) ([]Event, error)
	GetEventsBySource(ctx context.Context, eventType string, from, to time.Time, eventSource string, limit int) ([]Event, error)
	GetEventTypesInRange(ctx context.Context, from, to time.Time, limit int) ([]string, error)
	DeleteEventsByTypeInRange(ctx context.Context, eventType string, from, to time.Time) error

	EnsurePartitionExists(ctx context.Context, ts time.Time) error

	// CreateMapping creates a new mapping config. Returns error if ID already exists.
	CreateMapping(ctx context.Context, config *MappingConfig) error
	// GetMapping retrieves a mapping config by its path (unique identifier)
	GetMapping(ctx context.Context, path string) (*MappingConfig, error)
	// UpdateMapping updates an existing mapping config. Returns error if not found.
	UpdateMapping(ctx context.Context, config *MappingConfig) error
	// DeleteMapping deletes a mapping config by path
	DeleteMapping(ctx context.Context, path string) error
	// ListMappings returns all mapping configs
	ListMappings(ctx context.Context) ([]*MappingConfig, error)
}

Store provides methods for storing and retrieving events

func New

func New(exec libdbexec.Exec) Store

New creates a new event store instance

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL