services

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 26, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type DynamoDBEventBus

type DynamoDBEventBus struct {
	// contains filtered or unexported fields
}

DynamoDBEventBus implements EventBus using TableTheory-backed DynamoDB storage.

This implementation is serverless-safe and survives Lambda container recycling.

func NewDynamoDBEventBus

func NewDynamoDBEventBus(db tablecore.DB, config EventBusConfig) *DynamoDBEventBus

NewDynamoDBEventBus creates a new DynamoDB-backed event bus using TableTheory.

func (*DynamoDBEventBus) DeleteEvent

func (d *DynamoDBEventBus) DeleteEvent(ctx context.Context, eventID string) error

func (*DynamoDBEventBus) GetEvent

func (d *DynamoDBEventBus) GetEvent(ctx context.Context, eventID string) (*Event, error)

func (*DynamoDBEventBus) Publish

func (d *DynamoDBEventBus) Publish(ctx context.Context, event *Event) (string, error)

func (*DynamoDBEventBus) Query

func (d *DynamoDBEventBus) Query(ctx context.Context, query *EventQuery) ([]*Event, error)

func (*DynamoDBEventBus) Subscribe

func (d *DynamoDBEventBus) Subscribe(_ context.Context, eventType string, handler EventHandler) error

type Event

type Event struct {

	// Primary identifiers and timestamps (8-byte aligned)
	PublishedAt time.Time `json:"published_at" theorydb:"index:tenant-timestamp-index,sk"`
	CreatedAt   time.Time `json:"created_at" theorydb:"created_at"`
	ExpiresAt   time.Time `json:"expires_at,omitempty" theorydb:"omitempty"`

	// String fields (16 bytes each)
	ID            string `json:"id" theorydb:"index:event-id-index,pk"`
	EventType     string `json:"event_type"`
	TenantID      string `json:"tenant_id" theorydb:"index:tenant-timestamp-index,pk"`
	SourceID      string `json:"source_id"`
	PartitionKey  string `json:"partition_key" theorydb:"pk,attr:pk"`
	SortKey       string `json:"sort_key" theorydb:"sk,attr:sk"`
	CorrelationID string `json:"correlation_id,omitempty" theorydb:"omitempty"`

	// Complex types
	Payload  json.RawMessage   `json:"payload"`
	Metadata map[string]string `json:"metadata,omitempty" theorydb:"omitempty"`
	Tags     []string          `json:"tags,omitempty" theorydb:"set,omitempty"`

	// TTL is stored in the DynamoDB TTL attribute ("ttl") as a Unix timestamp in seconds.
	TTL int64 `json:"-" theorydb:"ttl,omitempty"`

	// Smaller numeric types
	Version    int `json:"version"`
	RetryCount int `json:"retry_count"`
	// contains filtered or unexported fields
}

Event represents a single durable event.

func NewEvent

func NewEvent(eventType, tenantID, sourceID string, payload any) (*Event, error)

NewEvent creates a new event with generated ID and timestamps.

func (*Event) TableName

func (e *Event) TableName() string

func (*Event) UnmarshalPayload

func (e *Event) UnmarshalPayload(v any) error

UnmarshalPayload unmarshals the event payload into the provided struct.

func (*Event) WithCorrelationID

func (e *Event) WithCorrelationID(correlationID string) *Event

WithCorrelationID sets a correlation ID for tracing related events.

func (*Event) WithMetadata

func (e *Event) WithMetadata(key, value string) *Event

WithMetadata adds metadata to the event.

func (*Event) WithTTL

func (e *Event) WithTTL(ttl time.Duration) *Event

WithTTL sets an expiration time for the event.

func (*Event) WithTags

func (e *Event) WithTags(tags ...string) *Event

WithTags adds tags to the event.

type EventBus

type EventBus interface {
	// Publish publishes an event to the bus and returns the event ID.
	Publish(ctx context.Context, event *Event) (string, error)

	// Query retrieves events based on filters. Implementations may mutate
	// query.NextKey to return a pagination cursor.
	Query(ctx context.Context, query *EventQuery) ([]*Event, error)

	// Subscribe registers a handler for specific event types (for stream processing).
	Subscribe(ctx context.Context, eventType string, handler EventHandler) error

	// GetEvent retrieves a specific event by ID.
	GetEvent(ctx context.Context, eventID string) (*Event, error)

	// DeleteEvent removes an event (for cleanup/GDPR).
	DeleteEvent(ctx context.Context, eventID string) error
}

EventBus defines the interface for publishing and consuming events.

This interface is intentionally Lift-compatible to minimize migration risk for Pay Theory services (Autheory/K3). AppTheory's DynamoDB implementation uses TableTheory as the data layer.

type EventBusConfig

type EventBusConfig struct {
	TableName        string
	MetricsNamespace string
	TTL              time.Duration
	RetryBaseDelay   time.Duration
	RetryAttempts    int
	MaxBatchSize     int
	EnableMetrics    bool
	EmitMetric       func(MetricRecord)
}

EventBusConfig configures the event bus behavior.

func DefaultEventBusConfig

func DefaultEventBusConfig() EventBusConfig

DefaultEventBusConfig returns sensible defaults.

type EventHandler

type EventHandler func(ctx context.Context, event *Event) error

EventHandler is a function that processes events.

type EventQuery

type EventQuery struct {
	// Lift-compatible cursor shape.
	LastEvaluatedKey map[string]any
	NextKey          map[string]any // Returned pagination token for next query

	StartTime *time.Time
	EndTime   *time.Time

	TenantID  string
	EventType string
	Tags      []string
	Limit     int
}

EventQuery defines parameters for querying events.

type MemoryEventBus

type MemoryEventBus struct {
	// contains filtered or unexported fields
}

MemoryEventBus provides an in-memory implementation for testing and development.

WARNING: This is NOT suitable for production Lambda environments as events are lost when the Lambda container scales down or is recycled.

func NewMemoryEventBus

func NewMemoryEventBus() *MemoryEventBus

NewMemoryEventBus creates a new in-memory event bus. This should only be used for testing or local development.

func (*MemoryEventBus) DeleteEvent

func (m *MemoryEventBus) DeleteEvent(_ context.Context, eventID string) error

func (*MemoryEventBus) GetEvent

func (m *MemoryEventBus) GetEvent(_ context.Context, eventID string) (*Event, error)

func (*MemoryEventBus) Publish

func (m *MemoryEventBus) Publish(ctx context.Context, event *Event) (string, error)

func (*MemoryEventBus) Query

func (m *MemoryEventBus) Query(_ context.Context, query *EventQuery) ([]*Event, error)

func (*MemoryEventBus) Subscribe

func (m *MemoryEventBus) Subscribe(_ context.Context, eventType string, handler EventHandler) error

type MetricRecord

type MetricRecord struct {
	Namespace string
	Name      string
	Value     float64
	Tags      map[string]string
}

MetricRecord is a minimal, portable metric payload used by the EventBus.

AppTheory does not wrap CloudWatch in core packages; callers can bridge this to their metrics backend (CloudWatch, OTEL, etc).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL