Documentation
¶
Index ¶
- type DynamoDBEventBus
- func (d *DynamoDBEventBus) DeleteEvent(ctx context.Context, eventID string) error
- func (d *DynamoDBEventBus) GetEvent(ctx context.Context, eventID string) (*Event, error)
- func (d *DynamoDBEventBus) Publish(ctx context.Context, event *Event) (string, error)
- func (d *DynamoDBEventBus) Query(ctx context.Context, query *EventQuery) ([]*Event, error)
- func (d *DynamoDBEventBus) Subscribe(_ context.Context, eventType string, handler EventHandler) error
- type Event
- type EventBus
- type EventBusConfig
- type EventHandler
- type EventQuery
- type MemoryEventBus
- func (m *MemoryEventBus) DeleteEvent(_ context.Context, eventID string) error
- func (m *MemoryEventBus) GetEvent(_ context.Context, eventID string) (*Event, error)
- func (m *MemoryEventBus) Publish(ctx context.Context, event *Event) (string, error)
- func (m *MemoryEventBus) Query(_ context.Context, query *EventQuery) ([]*Event, error)
- func (m *MemoryEventBus) Subscribe(_ context.Context, eventType string, handler EventHandler) error
- type MetricRecord
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DynamoDBEventBus ¶
type DynamoDBEventBus struct {
// contains filtered or unexported fields
}
DynamoDBEventBus implements EventBus using TableTheory-backed DynamoDB storage.
This implementation is serverless-safe and survives Lambda container recycling.
func NewDynamoDBEventBus ¶
func NewDynamoDBEventBus(db tablecore.DB, config EventBusConfig) *DynamoDBEventBus
NewDynamoDBEventBus creates a new DynamoDB-backed event bus using TableTheory.
func (*DynamoDBEventBus) DeleteEvent ¶
func (d *DynamoDBEventBus) DeleteEvent(ctx context.Context, eventID string) error
func (*DynamoDBEventBus) Query ¶
func (d *DynamoDBEventBus) Query(ctx context.Context, query *EventQuery) ([]*Event, error)
func (*DynamoDBEventBus) Subscribe ¶
func (d *DynamoDBEventBus) Subscribe(_ context.Context, eventType string, handler EventHandler) error
type Event ¶
type Event struct {
// Primary identifiers and timestamps (8-byte aligned)
PublishedAt time.Time `json:"published_at" theorydb:"index:tenant-timestamp-index,sk"`
CreatedAt time.Time `json:"created_at" theorydb:"created_at"`
ExpiresAt time.Time `json:"expires_at,omitempty" theorydb:"omitempty"`
// String fields (16 bytes each)
ID string `json:"id" theorydb:"index:event-id-index,pk"`
EventType string `json:"event_type"`
TenantID string `json:"tenant_id" theorydb:"index:tenant-timestamp-index,pk"`
SourceID string `json:"source_id"`
PartitionKey string `json:"partition_key" theorydb:"pk,attr:pk"`
SortKey string `json:"sort_key" theorydb:"sk,attr:sk"`
CorrelationID string `json:"correlation_id,omitempty" theorydb:"omitempty"`
// Complex types
Payload json.RawMessage `json:"payload"`
Metadata map[string]string `json:"metadata,omitempty" theorydb:"omitempty"`
Tags []string `json:"tags,omitempty" theorydb:"set,omitempty"`
// TTL is stored in the DynamoDB TTL attribute ("ttl") as a Unix timestamp in seconds.
TTL int64 `json:"-" theorydb:"ttl,omitempty"`
// Smaller numeric types
Version int `json:"version"`
RetryCount int `json:"retry_count"`
// contains filtered or unexported fields
}
Event represents a single durable event.
func (*Event) UnmarshalPayload ¶
UnmarshalPayload unmarshals the event payload into the provided struct.
func (*Event) WithCorrelationID ¶
WithCorrelationID sets a correlation ID for tracing related events.
func (*Event) WithMetadata ¶
WithMetadata adds metadata to the event.
type EventBus ¶
type EventBus interface {
// Publish publishes an event to the bus and returns the event ID.
Publish(ctx context.Context, event *Event) (string, error)
// Query retrieves events based on filters. Implementations may mutate
// query.NextKey to return a pagination cursor.
Query(ctx context.Context, query *EventQuery) ([]*Event, error)
// Subscribe registers a handler for specific event types (for stream processing).
Subscribe(ctx context.Context, eventType string, handler EventHandler) error
// GetEvent retrieves a specific event by ID.
GetEvent(ctx context.Context, eventID string) (*Event, error)
// DeleteEvent removes an event (for cleanup/GDPR).
DeleteEvent(ctx context.Context, eventID string) error
}
EventBus defines the interface for publishing and consuming events.
This interface is intentionally Lift-compatible to minimize migration risk for Pay Theory services (Autheory/K3). AppTheory's DynamoDB implementation uses TableTheory as the data layer.
type EventBusConfig ¶
type EventBusConfig struct {
TableName string
MetricsNamespace string
TTL time.Duration
RetryBaseDelay time.Duration
RetryAttempts int
MaxBatchSize int
EnableMetrics bool
EmitMetric func(MetricRecord)
}
EventBusConfig configures the event bus behavior.
func DefaultEventBusConfig ¶
func DefaultEventBusConfig() EventBusConfig
DefaultEventBusConfig returns sensible defaults.
type EventHandler ¶
EventHandler is a function that processes events.
type EventQuery ¶
type EventQuery struct {
// Lift-compatible cursor shape.
LastEvaluatedKey map[string]any
NextKey map[string]any // Returned pagination token for next query
StartTime *time.Time
EndTime *time.Time
TenantID string
EventType string
Tags []string
Limit int
}
EventQuery defines parameters for querying events.
type MemoryEventBus ¶
type MemoryEventBus struct {
// contains filtered or unexported fields
}
MemoryEventBus provides an in-memory implementation for testing and development.
WARNING: This is NOT suitable for production Lambda environments as events are lost when the Lambda container scales down or is recycled.
func NewMemoryEventBus ¶
func NewMemoryEventBus() *MemoryEventBus
NewMemoryEventBus creates a new in-memory event bus. This should only be used for testing or local development.
func (*MemoryEventBus) DeleteEvent ¶
func (m *MemoryEventBus) DeleteEvent(_ context.Context, eventID string) error
func (*MemoryEventBus) Query ¶
func (m *MemoryEventBus) Query(_ context.Context, query *EventQuery) ([]*Event, error)
func (*MemoryEventBus) Subscribe ¶
func (m *MemoryEventBus) Subscribe(_ context.Context, eventType string, handler EventHandler) error
type MetricRecord ¶
MetricRecord is a minimal, portable metric payload used by the EventBus.
AppTheory does not wrap CloudWatch in core packages; callers can bridge this to their metrics backend (CloudWatch, OTEL, etc).