Documentation
¶
Overview ¶
Package batch provides efficient batch processing capabilities for DynamoDB operations. This package implements patterns for batching multiple database operations together to improve performance and reduce costs in serverless environments.
Index ¶
- Constants
- func OptimalBatchSize(items []any) int
- type BatchDeleteResult
- func BatchDelete(ctx context.Context, client core.DB, keys []any) (*BatchDeleteResult, error)
- func BatchDeleteParallel(ctx context.Context, client core.DB, keys []any, workers int) (*BatchDeleteResult, error)
- func BatchDeleteWithCostTracking(ctx context.Context, client core.DB, keys []any, tracker CostTracker, ...) (*BatchDeleteResult, error)
- func BatchDeleteWithRetry(ctx context.Context, client core.DB, keys []any, maxRetries int) (*BatchDeleteResult, error)
- type BatchDeleter
- func (bd *BatchDeleter) DeleteItems(ctx context.Context, keys []any) (*BatchDeleteResult, error)
- func (bd *BatchDeleter) DeleteItemsParallel(ctx context.Context, keys []any, workers int) (*BatchDeleteResult, error)
- func (bd *BatchDeleter) DeleteItemsWithRetry(ctx context.Context, keys []any, maxRetries int) (*BatchDeleteResult, error)
- type BatchDeleterConfig
- type BatchError
- type BatchMessage
- type BatchReadResult
- type BatchReader
- type BatchReaderConfig
- type BatchWriteResult
- func BatchWrite(ctx context.Context, client core.DB, items []any) (*BatchWriteResult, error)
- func BatchWriteParallel(ctx context.Context, client core.DB, items []any, workers int) (*BatchWriteResult, error)
- func BatchWriteWithCostTracking(ctx context.Context, client core.DB, items []any, tracker CostTracker, ...) (*BatchWriteResult, error)
- type BatchWriter
- type BatchWriterConfig
- type BatchWriterWithProgress
- func (bwp *BatchWriterWithProgress) GetProgressTracker() *ProgressTracker
- func (bwp *BatchWriterWithProgress) WriteItemsParallelWithProgress(ctx context.Context, items []any, workers int) (*BatchWriteResult, error)
- func (bwp *BatchWriterWithProgress) WriteItemsWithProgress(ctx context.Context, items []any) (*BatchWriteResult, error)
- type CostMetrics
- type CostTracker
- type Operation
- type Processor
- type ProcessorConfig
- type ProgressCallback
- type ProgressTracker
- func (pt *ProgressTracker) AddCallback(callback ProgressCallback)
- func (pt *ProgressTracker) GetPercentComplete() float64
- func (pt *ProgressTracker) GetProgress() (total, processed, failed int64)
- func (pt *ProgressTracker) IsComplete() bool
- func (pt *ProgressTracker) UpdateFailed(count int64)
- func (pt *ProgressTracker) UpdateProcessed(count int64)
- type SQSBatchProcessor
- func (p *SQSBatchProcessor) ProcessBatch(ctx context.Context, event events.SQSEvent) (events.SQSEventResponse, error)
- func (p *SQSBatchProcessor) ProcessNotifications(ctx context.Context, event events.SQSEvent) (events.SQSEventResponse, error)
- func (p *SQSBatchProcessor) ProcessTimelineEntries(ctx context.Context, event events.SQSEvent) (events.SQSEventResponse, error)
- type SQSBatchProcessorConfig
Constants ¶
const ( MaxBatchWriteSize = 25 // DynamoDB limit for batch write operations MaxBatchReadSize = 100 // DynamoDB limit for batch get operations DefaultBatchSize = 25 // Default batch size for write operations DefaultWorkers = 5 // Default number of worker goroutines )
DynamoDB batch operation limits
Variables ¶
This section is empty.
Functions ¶
func OptimalBatchSize ¶
OptimalBatchSize calculates the optimal batch size based on item size estimation
Types ¶
type BatchDeleteResult ¶
type BatchDeleteResult struct {
TotalItems int
ProcessedItems int
FailedItems int
Errors []BatchError
Duration time.Duration
ConsumedWCU int64
}
BatchDeleteResult contains the results of a batch delete operation
func BatchDelete ¶
BatchDelete is a convenience function for simple batch deletes
func BatchDeleteParallel ¶
func BatchDeleteParallel(ctx context.Context, client core.DB, keys []any, workers int) (*BatchDeleteResult, error)
BatchDeleteParallel is a convenience function for parallel batch deletes
func BatchDeleteWithCostTracking ¶
func BatchDeleteWithCostTracking(ctx context.Context, client core.DB, keys []any, tracker CostTracker, logger *zap.Logger) (*BatchDeleteResult, error)
BatchDeleteWithCostTracking performs batch delete with cost tracking
func BatchDeleteWithRetry ¶
func BatchDeleteWithRetry(ctx context.Context, client core.DB, keys []any, maxRetries int) (*BatchDeleteResult, error)
BatchDeleteWithRetry is a convenience function for batch deletes with retry logic
type BatchDeleter ¶
type BatchDeleter struct {
*Processor
}
BatchDeleter provides efficient batch delete operations
func NewBatchDeleter ¶
func NewBatchDeleter(client core.DB, config BatchDeleterConfig) *BatchDeleter
NewBatchDeleter creates a new BatchDeleter with the specified configuration
func NewDefaultBatchDeleter ¶
func NewDefaultBatchDeleter(client core.DB) *BatchDeleter
NewDefaultBatchDeleter creates a BatchDeleter with default settings
func (*BatchDeleter) DeleteItems ¶
func (bd *BatchDeleter) DeleteItems(ctx context.Context, keys []any) (*BatchDeleteResult, error)
DeleteItems deletes items in batches, processing them sequentially
func (*BatchDeleter) DeleteItemsParallel ¶
func (bd *BatchDeleter) DeleteItemsParallel(ctx context.Context, keys []any, workers int) (*BatchDeleteResult, error)
DeleteItemsParallel deletes items in parallel using worker pools
func (*BatchDeleter) DeleteItemsWithRetry ¶
func (bd *BatchDeleter) DeleteItemsWithRetry(ctx context.Context, keys []any, maxRetries int) (*BatchDeleteResult, error)
DeleteItemsWithRetry deletes items with exponential backoff retry logic
type BatchDeleterConfig ¶
type BatchDeleterConfig struct {
BatchSize int
Logger *zap.Logger
Tracker CostTracker
}
BatchDeleterConfig holds configuration for BatchDeleter
type BatchError ¶
BatchError represents an error that occurred during batch processing
type BatchMessage ¶
type BatchMessage struct {
Operation string `json:"operation"` // "create", "update", "delete"
Items []any `json:"items"`
TableName string `json:"table_name,omitempty"`
Metadata map[string]any `json:"metadata,omitempty"`
}
BatchMessage represents a message payload containing items to batch process
func CreateBatchDeleteMessage ¶
func CreateBatchDeleteMessage(keys []any, tableName string) *BatchMessage
CreateBatchDeleteMessage creates a batch delete message
func CreateNotificationMessage ¶
func CreateNotificationMessage(userIDs []string, statusID, authorID, notifType, targetType string) *BatchMessage
CreateNotificationMessage creates a notification batch message
func CreateTimelineMessage ¶
func CreateTimelineMessage(followerIDs []string, statusID, authorID string, createdAt time.Time) *BatchMessage
CreateTimelineMessage creates a timeline batch message
type BatchReadResult ¶
type BatchReadResult struct {
TotalKeys int
RetrievedItems int
NotFoundItems int
Errors []BatchError
Duration time.Duration
ConsumedRCU int64
}
BatchReadResult contains the results of a batch read operation
func BatchReadWithCostTracking ¶
func BatchReadWithCostTracking(ctx context.Context, client core.DB, keys []any, dest any, tracker CostTracker, logger *zap.Logger) (*BatchReadResult, error)
BatchReadWithCostTracking performs batch read with cost tracking
type BatchReader ¶
type BatchReader struct {
// contains filtered or unexported fields
}
BatchReader provides efficient batch read operations
func NewBatchReader ¶
func NewBatchReader(client core.DB, config BatchReaderConfig) *BatchReader
NewBatchReader creates a new BatchReader with the specified configuration
func NewDefaultBatchReader ¶
func NewDefaultBatchReader(client core.DB) *BatchReader
NewDefaultBatchReader creates a BatchReader with default settings
func (*BatchReader) ReadItems ¶
func (br *BatchReader) ReadItems(ctx context.Context, keys []any, dest any) (*BatchReadResult, error)
ReadItems reads items in batches using their keys
type BatchReaderConfig ¶
type BatchReaderConfig struct {
BatchSize int
Logger *zap.Logger
Tracker CostTracker
}
BatchReaderConfig holds configuration for BatchReader
type BatchWriteResult ¶
type BatchWriteResult struct {
TotalItems int
ProcessedItems int
FailedItems int
Errors []BatchError
Duration time.Duration
ConsumedWCU int64
}
BatchWriteResult contains the results of a batch write operation
func BatchWrite ¶
BatchWrite is a convenience function for simple batch writes
func BatchWriteParallel ¶
func BatchWriteParallel(ctx context.Context, client core.DB, items []any, workers int) (*BatchWriteResult, error)
BatchWriteParallel is a convenience function for parallel batch writes
func BatchWriteWithCostTracking ¶
func BatchWriteWithCostTracking(ctx context.Context, client core.DB, items []any, tracker CostTracker, logger *zap.Logger) (*BatchWriteResult, error)
BatchWriteWithCostTracking performs batch write with cost tracking
type BatchWriter ¶
type BatchWriter struct {
*Processor
}
BatchWriter provides efficient batch write operations with configurable batch sizes
func NewBatchWriter ¶
func NewBatchWriter(client core.DB, config BatchWriterConfig) *BatchWriter
NewBatchWriter creates a new BatchWriter with the specified configuration
func NewDefaultBatchWriter ¶
func NewDefaultBatchWriter(client core.DB) *BatchWriter
NewDefaultBatchWriter creates a BatchWriter with default settings
func (*BatchWriter) WriteItems ¶
func (bw *BatchWriter) WriteItems(ctx context.Context, items []any) (*BatchWriteResult, error)
WriteItems writes items in batches, processing them sequentially
func (*BatchWriter) WriteItemsParallel ¶
func (bw *BatchWriter) WriteItemsParallel(ctx context.Context, items []any, workers int) (*BatchWriteResult, error)
WriteItemsParallel writes items in parallel using worker pools
type BatchWriterConfig ¶
type BatchWriterConfig struct {
BatchSize int
Logger *zap.Logger
Tracker CostTracker
}
BatchWriterConfig holds configuration for BatchWriter
type BatchWriterWithProgress ¶
type BatchWriterWithProgress struct {
*BatchWriter
// contains filtered or unexported fields
}
BatchWriterWithProgress wraps BatchWriter with progress tracking
func NewBatchWriterWithProgress ¶
func NewBatchWriterWithProgress(client core.DB, config BatchWriterConfig, totalItems int64) *BatchWriterWithProgress
NewBatchWriterWithProgress creates a BatchWriter with progress tracking
func (*BatchWriterWithProgress) GetProgressTracker ¶
func (bwp *BatchWriterWithProgress) GetProgressTracker() *ProgressTracker
GetProgressTracker returns the progress tracker
func (*BatchWriterWithProgress) WriteItemsParallelWithProgress ¶
func (bwp *BatchWriterWithProgress) WriteItemsParallelWithProgress(ctx context.Context, items []any, workers int) (*BatchWriteResult, error)
WriteItemsParallelWithProgress writes items in parallel with progress tracking
func (*BatchWriterWithProgress) WriteItemsWithProgress ¶
func (bwp *BatchWriterWithProgress) WriteItemsWithProgress(ctx context.Context, items []any) (*BatchWriteResult, error)
WriteItemsWithProgress writes items with progress tracking
type CostMetrics ¶
CostMetrics represents the cost metrics at a point in time
type CostTracker ¶
type CostTracker interface {
// CalculateCost returns the current cost metrics
CalculateCost() CostMetrics
// TrackDynamoWrite tracks DynamoDB write operations
TrackDynamoWrite(items int)
// TrackDynamoRead tracks DynamoDB read operations
TrackDynamoRead(items int)
}
CostTracker interface defines the methods needed for cost tracking
type Operation ¶
type Operation int
Operation represents the type of batch operation being performed. It defines whether the batch processor should handle write, delete, or read operations.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor provides efficient batch operations with configurable batch sizes and operation types. It handles write, delete, and read operations in batches while tracking costs and providing parallel processing capabilities for improved performance.
func NewProcessor ¶
func NewProcessor(client core.DB, config ProcessorConfig) *Processor
NewProcessor creates a new Processor with the specified configuration
func (*Processor) ProcessItems ¶
ProcessItems processes items in batches, handling different operation types
func (*Processor) ProcessItemsParallel ¶
func (bp *Processor) ProcessItemsParallel(ctx context.Context, items []any, workers int) (*BatchWriteResult, error)
ProcessItemsParallel processes items in parallel using worker pools
type ProcessorConfig ¶
type ProcessorConfig struct {
BatchSize int
Logger *zap.Logger
Tracker CostTracker
Operation Operation
}
ProcessorConfig holds configuration for Processor instances. It defines the batch size, logging, cost tracking, and operation type settings for batch processing operations.
type ProgressCallback ¶
type ProgressCallback func(total, processed, failed int64)
ProgressCallback is called when progress is updated
type ProgressTracker ¶
type ProgressTracker struct {
// contains filtered or unexported fields
}
ProgressTracker tracks the progress of batch operations
func NewProgressTracker ¶
func NewProgressTracker(total int64) *ProgressTracker
NewProgressTracker creates a new progress tracker
func (*ProgressTracker) AddCallback ¶
func (pt *ProgressTracker) AddCallback(callback ProgressCallback)
AddCallback adds a progress callback
func (*ProgressTracker) GetPercentComplete ¶
func (pt *ProgressTracker) GetPercentComplete() float64
GetPercentComplete returns the percentage of completion
func (*ProgressTracker) GetProgress ¶
func (pt *ProgressTracker) GetProgress() (total, processed, failed int64)
GetProgress returns the current progress
func (*ProgressTracker) IsComplete ¶
func (pt *ProgressTracker) IsComplete() bool
IsComplete returns true if all items have been processed
func (*ProgressTracker) UpdateFailed ¶
func (pt *ProgressTracker) UpdateFailed(count int64)
UpdateFailed updates the number of failed items
func (*ProgressTracker) UpdateProcessed ¶
func (pt *ProgressTracker) UpdateProcessed(count int64)
UpdateProcessed updates the number of processed items
type SQSBatchProcessor ¶
type SQSBatchProcessor struct {
// contains filtered or unexported fields
}
SQSBatchProcessor processes SQS message batches in Lambda
func NewSQSBatchProcessor ¶
func NewSQSBatchProcessor(db core.DB, config SQSBatchProcessorConfig) *SQSBatchProcessor
NewSQSBatchProcessor creates a new SQS batch processor
func (*SQSBatchProcessor) ProcessBatch ¶
func (p *SQSBatchProcessor) ProcessBatch(ctx context.Context, event events.SQSEvent) (events.SQSEventResponse, error)
ProcessBatch processes an SQS event batch and returns response with failures
func (*SQSBatchProcessor) ProcessNotifications ¶
func (p *SQSBatchProcessor) ProcessNotifications(ctx context.Context, event events.SQSEvent) (events.SQSEventResponse, error)
ProcessNotifications processes notification batches from SQS messages
func (*SQSBatchProcessor) ProcessTimelineEntries ¶
func (p *SQSBatchProcessor) ProcessTimelineEntries(ctx context.Context, event events.SQSEvent) (events.SQSEventResponse, error)
ProcessTimelineEntries processes timeline entries from SQS messages
type SQSBatchProcessorConfig ¶
type SQSBatchProcessorConfig struct {
Logger *zap.Logger
Tracker CostTracker
MaxBatchSize int // Maximum items to process in a single batch (default: 25)
}
SQSBatchProcessorConfig holds configuration for SQSBatchProcessor