stream

package
v1.1.11 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 16, 2026 License: AGPL-3.0 Imports: 15 Imported by: 0

Documentation

Overview

Package stream provides DynamoDB stream event handlers for real-time data processing with TableTheory.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func CreateStreamHandler

func CreateStreamHandler(_ *tabletheory.LambdaDB, processor EventProcessor) func(ctx context.Context, event events.DynamoDBEvent) error

CreateStreamHandler creates a new Lambda handler function for DynamoDB streams

func ExtractEntityIDFromPK

func ExtractEntityIDFromPK(pk string) (string, error)

ExtractEntityIDFromPK extracts the entity ID from a PK attribute

func FilterRecordsByEntityType

func FilterRecordsByEntityType(records []events.DynamoDBEventRecord, entityType string) []events.DynamoDBEventRecord

FilterRecordsByEntityType filters DynamoDB stream records by entity type

func FilterRecordsByEventName

func FilterRecordsByEventName(records []events.DynamoDBEventRecord, eventNames ...string) []events.DynamoDBEventRecord

FilterRecordsByEventName filters DynamoDB stream records by event name

func GetBooleanAttribute

func GetBooleanAttribute(record events.DynamoDBEventRecord, key string) (bool, error)

GetBooleanAttribute extracts a boolean attribute from a DynamoDB stream record

func GetEventType

func GetEventType(record events.DynamoDBEventRecord) (string, error)

GetEventType extracts the entity type from a DynamoDB stream record

func GetNumberAttribute

func GetNumberAttribute(record events.DynamoDBEventRecord, key string) (string, error)

GetNumberAttribute extracts a number attribute from a DynamoDB stream record

func GetStringAttribute

func GetStringAttribute(record events.DynamoDBEventRecord, key string) (string, error)

GetStringAttribute extracts a string attribute from a DynamoDB stream record

func ProcessStreamEvent

func ProcessStreamEvent(ctx context.Context, event events.DynamoDBEvent, processor EventProcessor) error

ProcessStreamEvent processes a DynamoDB stream event with the given processor function

func ProcessStreamRecords

func ProcessStreamRecords(ctx context.Context, records []events.DynamoDBEventRecord, handler func(ctx context.Context, record events.DynamoDBEventRecord) error) error

ProcessStreamRecords processes DynamoDB stream records with the provided handler function

func UnmarshalItem

func UnmarshalItem(record events.DynamoDBEventRecord, out any) error

UnmarshalItem unmarshals a single DynamoDB stream record into a struct

func UnmarshalItems

func UnmarshalItems(records []events.DynamoDBEventRecord, outType any) (any, error)

UnmarshalItems unmarshals multiple DynamoDB stream records into a slice of structs

Types

type AIAnalysisStreamHandler

type AIAnalysisStreamHandler struct {
	// contains filtered or unexported fields
}

AIAnalysisStreamHandler processes DynamoDB stream events for AI analysis results

func NewAIAnalysisStreamHandler

func NewAIAnalysisStreamHandler(logger *zap.Logger, moderationRepo ModerationRepository) *AIAnalysisStreamHandler

NewAIAnalysisStreamHandler creates a new AI analysis stream handler

func (*AIAnalysisStreamHandler) HandleStreamEvent

func (h *AIAnalysisStreamHandler) HandleStreamEvent(_ context.Context, record events.DynamoDBEventRecord) error

HandleStreamEvent processes a DynamoDB stream event for AI analysis

func (*AIAnalysisStreamHandler) ProcessStreamRecords

func (h *AIAnalysisStreamHandler) ProcessStreamRecords(ctx context.Context, records []events.DynamoDBEventRecord) error

ProcessStreamRecords processes multiple AI analysis stream records

type BaseHandler

type BaseHandler struct {
	DB        *tabletheory.LambdaDB
	TableName string
	Logger    *zap.Logger
}

BaseHandler provides common functionality for DynamoDB stream handlers

func NewBaseHandler

func NewBaseHandler(db *tabletheory.LambdaDB, tableName string) *BaseHandler

NewBaseHandler creates a new BaseHandler

func (*BaseHandler) HandleDynamoDBStream

func (h *BaseHandler) HandleDynamoDBStream(ctx context.Context, event events.DynamoDBEvent) error

HandleDynamoDBStream processes DynamoDB stream events

type EventProcessor

type EventProcessor func(ctx context.Context, record events.DynamoDBEventRecord) error

EventProcessor is a function type for processing DynamoDB stream events

type Handler

type Handler interface {
	HandleDynamoDBStream(ctx context.Context, event events.DynamoDBEvent) error
}

Handler is a generic interface for DynamoDB stream handlers

type ModerationRepository

type ModerationRepository interface {
	CreateFlag(ctx context.Context, flag *storage.Flag) error
	CreateModerationEvent(ctx context.Context, event *storage.ModerationEvent) error
	CreateModerationDecision(ctx context.Context, decision *storage.ModerationDecision) error
}

ModerationRepository defines the interface for moderation actions

type ProcessingConfig

type ProcessingConfig struct {
	EnableMetrics        bool
	EnableErrorRecovery  bool
	MaxRetryAttempts     int
	RetryBackoffInitial  time.Duration
	RetryBackoffMax      time.Duration
	EnableDLQ            bool
	ParallelProcessing   bool
	MaxConcurrentRecords int
}

ProcessingConfig holds the configuration for stream processing

func DefaultProcessingConfig

func DefaultProcessingConfig() *ProcessingConfig

DefaultProcessingConfig returns production-ready default configuration

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor handles stream processing with enhanced error handling and metrics

func NewProcessor

func NewProcessor(config *ProcessingConfig, logger *zap.Logger) *Processor

NewProcessor creates a new enhanced stream processor

func (*Processor) ProcessStreamRecordsWithRetry

func (sp *Processor) ProcessStreamRecordsWithRetry(
	ctx context.Context,
	records []events.DynamoDBEventRecord,
	handler func(ctx context.Context, record events.DynamoDBEventRecord) error,
) error

ProcessStreamRecordsWithRetry processes DynamoDB stream records with production features

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL