Documentation
¶
Overview ¶
Package stream provides DynamoDB stream event handlers for real-time data processing with TableTheory.
Index ¶
- func CreateStreamHandler(_ *tabletheory.LambdaDB, processor EventProcessor) func(ctx context.Context, event events.DynamoDBEvent) error
- func ExtractEntityIDFromPK(pk string) (string, error)
- func FilterRecordsByEntityType(records []events.DynamoDBEventRecord, entityType string) []events.DynamoDBEventRecord
- func FilterRecordsByEventName(records []events.DynamoDBEventRecord, eventNames ...string) []events.DynamoDBEventRecord
- func GetBooleanAttribute(record events.DynamoDBEventRecord, key string) (bool, error)
- func GetEventType(record events.DynamoDBEventRecord) (string, error)
- func GetNumberAttribute(record events.DynamoDBEventRecord, key string) (string, error)
- func GetStringAttribute(record events.DynamoDBEventRecord, key string) (string, error)
- func ProcessStreamEvent(ctx context.Context, event events.DynamoDBEvent, processor EventProcessor) error
- func ProcessStreamRecords(ctx context.Context, records []events.DynamoDBEventRecord, ...) error
- func UnmarshalItem(record events.DynamoDBEventRecord, out any) error
- func UnmarshalItems(records []events.DynamoDBEventRecord, outType any) (any, error)
- type AIAnalysisStreamHandler
- type BaseHandler
- type EventProcessor
- type Handler
- type ModerationRepository
- type ProcessingConfig
- type Processor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func CreateStreamHandler ¶
func CreateStreamHandler(_ *tabletheory.LambdaDB, processor EventProcessor) func(ctx context.Context, event events.DynamoDBEvent) error
CreateStreamHandler creates a new Lambda handler function for DynamoDB streams
func ExtractEntityIDFromPK ¶
ExtractEntityIDFromPK extracts the entity ID from a PK attribute
func FilterRecordsByEntityType ¶
func FilterRecordsByEntityType(records []events.DynamoDBEventRecord, entityType string) []events.DynamoDBEventRecord
FilterRecordsByEntityType filters DynamoDB stream records by entity type
func FilterRecordsByEventName ¶
func FilterRecordsByEventName(records []events.DynamoDBEventRecord, eventNames ...string) []events.DynamoDBEventRecord
FilterRecordsByEventName filters DynamoDB stream records by event name
func GetBooleanAttribute ¶
func GetBooleanAttribute(record events.DynamoDBEventRecord, key string) (bool, error)
GetBooleanAttribute extracts a boolean attribute from a DynamoDB stream record
func GetEventType ¶
func GetEventType(record events.DynamoDBEventRecord) (string, error)
GetEventType extracts the entity type from a DynamoDB stream record
func GetNumberAttribute ¶
func GetNumberAttribute(record events.DynamoDBEventRecord, key string) (string, error)
GetNumberAttribute extracts a number attribute from a DynamoDB stream record
func GetStringAttribute ¶
func GetStringAttribute(record events.DynamoDBEventRecord, key string) (string, error)
GetStringAttribute extracts a string attribute from a DynamoDB stream record
func ProcessStreamEvent ¶
func ProcessStreamEvent(ctx context.Context, event events.DynamoDBEvent, processor EventProcessor) error
ProcessStreamEvent processes a DynamoDB stream event with the given processor function
func ProcessStreamRecords ¶
func ProcessStreamRecords(ctx context.Context, records []events.DynamoDBEventRecord, handler func(ctx context.Context, record events.DynamoDBEventRecord) error) error
ProcessStreamRecords processes DynamoDB stream records with the provided handler function
func UnmarshalItem ¶
func UnmarshalItem(record events.DynamoDBEventRecord, out any) error
UnmarshalItem unmarshals a single DynamoDB stream record into a struct
func UnmarshalItems ¶
func UnmarshalItems(records []events.DynamoDBEventRecord, outType any) (any, error)
UnmarshalItems unmarshals multiple DynamoDB stream records into a slice of structs
Types ¶
type AIAnalysisStreamHandler ¶
type AIAnalysisStreamHandler struct {
// contains filtered or unexported fields
}
AIAnalysisStreamHandler processes DynamoDB stream events for AI analysis results
func NewAIAnalysisStreamHandler ¶
func NewAIAnalysisStreamHandler(logger *zap.Logger, moderationRepo ModerationRepository) *AIAnalysisStreamHandler
NewAIAnalysisStreamHandler creates a new AI analysis stream handler
func (*AIAnalysisStreamHandler) HandleStreamEvent ¶
func (h *AIAnalysisStreamHandler) HandleStreamEvent(_ context.Context, record events.DynamoDBEventRecord) error
HandleStreamEvent processes a DynamoDB stream event for AI analysis
func (*AIAnalysisStreamHandler) ProcessStreamRecords ¶
func (h *AIAnalysisStreamHandler) ProcessStreamRecords(ctx context.Context, records []events.DynamoDBEventRecord) error
ProcessStreamRecords processes multiple AI analysis stream records
type BaseHandler ¶
type BaseHandler struct {
DB *tabletheory.LambdaDB
TableName string
Logger *zap.Logger
}
BaseHandler provides common functionality for DynamoDB stream handlers
func NewBaseHandler ¶
func NewBaseHandler(db *tabletheory.LambdaDB, tableName string) *BaseHandler
NewBaseHandler creates a new BaseHandler
func (*BaseHandler) HandleDynamoDBStream ¶
func (h *BaseHandler) HandleDynamoDBStream(ctx context.Context, event events.DynamoDBEvent) error
HandleDynamoDBStream processes DynamoDB stream events
type EventProcessor ¶
type EventProcessor func(ctx context.Context, record events.DynamoDBEventRecord) error
EventProcessor is a function type for processing DynamoDB stream events
type Handler ¶
type Handler interface {
HandleDynamoDBStream(ctx context.Context, event events.DynamoDBEvent) error
}
Handler is a generic interface for DynamoDB stream handlers
type ModerationRepository ¶
type ModerationRepository interface {
CreateFlag(ctx context.Context, flag *storage.Flag) error
CreateModerationEvent(ctx context.Context, event *storage.ModerationEvent) error
CreateModerationDecision(ctx context.Context, decision *storage.ModerationDecision) error
}
ModerationRepository defines the interface for moderation actions
type ProcessingConfig ¶
type ProcessingConfig struct {
EnableMetrics bool
EnableErrorRecovery bool
MaxRetryAttempts int
RetryBackoffInitial time.Duration
RetryBackoffMax time.Duration
EnableDLQ bool
ParallelProcessing bool
MaxConcurrentRecords int
}
ProcessingConfig holds the configuration for stream processing
func DefaultProcessingConfig ¶
func DefaultProcessingConfig() *ProcessingConfig
DefaultProcessingConfig returns production-ready default configuration
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor handles stream processing with enhanced error handling and metrics
func NewProcessor ¶
func NewProcessor(config *ProcessingConfig, logger *zap.Logger) *Processor
NewProcessor creates a new enhanced stream processor
func (*Processor) ProcessStreamRecordsWithRetry ¶
func (sp *Processor) ProcessStreamRecordsWithRetry( ctx context.Context, records []events.DynamoDBEventRecord, handler func(ctx context.Context, record events.DynamoDBEventRecord) error, ) error
ProcessStreamRecordsWithRetry processes DynamoDB stream records with production features