batch

package
v1.1.16 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 21, 2026 License: AGPL-3.0 Imports: 12 Imported by: 0

Documentation

Overview

Package batch provides efficient batch processing capabilities for DynamoDB operations. This package implements patterns for batching multiple database operations together to improve performance and reduce costs in serverless environments.

Index

Constants

View Source
const (
	MaxBatchWriteSize = 25  // DynamoDB limit for batch write operations
	MaxBatchReadSize  = 100 // DynamoDB limit for batch get operations
	DefaultBatchSize  = 25  // Default batch size for write operations
	DefaultWorkers    = 5   // Default number of worker goroutines
)

DynamoDB batch operation limits

Variables

This section is empty.

Functions

func OptimalBatchSize

func OptimalBatchSize(items []any) int

OptimalBatchSize calculates the optimal batch size based on item size estimation

Types

type BatchDeleteResult

type BatchDeleteResult struct {
	TotalItems     int
	ProcessedItems int
	FailedItems    int
	Errors         []BatchError
	Duration       time.Duration
	ConsumedWCU    int64
}

BatchDeleteResult contains the results of a batch delete operation

func BatchDelete

func BatchDelete(ctx context.Context, client core.DB, keys []any) (*BatchDeleteResult, error)

BatchDelete is a convenience function for simple batch deletes

func BatchDeleteParallel

func BatchDeleteParallel(ctx context.Context, client core.DB, keys []any, workers int) (*BatchDeleteResult, error)

BatchDeleteParallel is a convenience function for parallel batch deletes

func BatchDeleteWithCostTracking

func BatchDeleteWithCostTracking(ctx context.Context, client core.DB, keys []any, tracker CostTracker, logger *zap.Logger) (*BatchDeleteResult, error)

BatchDeleteWithCostTracking performs batch delete with cost tracking

func BatchDeleteWithRetry

func BatchDeleteWithRetry(ctx context.Context, client core.DB, keys []any, maxRetries int) (*BatchDeleteResult, error)

BatchDeleteWithRetry is a convenience function for batch deletes with retry logic

type BatchDeleter

type BatchDeleter struct {
	*Processor
}

BatchDeleter provides efficient batch delete operations

func NewBatchDeleter

func NewBatchDeleter(client core.DB, config BatchDeleterConfig) *BatchDeleter

NewBatchDeleter creates a new BatchDeleter with the specified configuration

func NewDefaultBatchDeleter

func NewDefaultBatchDeleter(client core.DB) *BatchDeleter

NewDefaultBatchDeleter creates a BatchDeleter with default settings

func (*BatchDeleter) DeleteItems

func (bd *BatchDeleter) DeleteItems(ctx context.Context, keys []any) (*BatchDeleteResult, error)

DeleteItems deletes items in batches, processing them sequentially

func (*BatchDeleter) DeleteItemsParallel

func (bd *BatchDeleter) DeleteItemsParallel(ctx context.Context, keys []any, workers int) (*BatchDeleteResult, error)

DeleteItemsParallel deletes items in parallel using worker pools

func (*BatchDeleter) DeleteItemsWithRetry

func (bd *BatchDeleter) DeleteItemsWithRetry(ctx context.Context, keys []any, maxRetries int) (*BatchDeleteResult, error)

DeleteItemsWithRetry deletes items with exponential backoff retry logic

type BatchDeleterConfig

type BatchDeleterConfig struct {
	BatchSize int
	Logger    *zap.Logger
	Tracker   CostTracker
}

BatchDeleterConfig holds configuration for BatchDeleter

type BatchError

type BatchError struct {
	Index int
	Item  any
	Error error
}

BatchError represents an error that occurred during batch processing

type BatchMessage

type BatchMessage struct {
	Operation string         `json:"operation"` // "create", "update", "delete"
	Items     []any          `json:"items"`
	TableName string         `json:"table_name,omitempty"`
	Metadata  map[string]any `json:"metadata,omitempty"`
}

BatchMessage represents a message payload containing items to batch process

func CreateBatchDeleteMessage

func CreateBatchDeleteMessage(keys []any, tableName string) *BatchMessage

CreateBatchDeleteMessage creates a batch delete message

func CreateNotificationMessage

func CreateNotificationMessage(userIDs []string, statusID, authorID, notifType, targetType string) *BatchMessage

CreateNotificationMessage creates a notification batch message

func CreateTimelineMessage

func CreateTimelineMessage(followerIDs []string, statusID, authorID string, createdAt time.Time) *BatchMessage

CreateTimelineMessage creates a timeline batch message

type BatchReadResult

type BatchReadResult struct {
	TotalKeys      int
	RetrievedItems int
	NotFoundItems  int
	Errors         []BatchError
	Duration       time.Duration
	ConsumedRCU    int64
}

BatchReadResult contains the results of a batch read operation

func BatchRead

func BatchRead(ctx context.Context, client core.DB, keys []any, dest any) (*BatchReadResult, error)

BatchRead is a convenience function for simple batch reads

func BatchReadWithCostTracking

func BatchReadWithCostTracking(ctx context.Context, client core.DB, keys []any, dest any, tracker CostTracker, logger *zap.Logger) (*BatchReadResult, error)

BatchReadWithCostTracking performs batch read with cost tracking

type BatchReader

type BatchReader struct {
	// contains filtered or unexported fields
}

BatchReader provides efficient batch read operations

func NewBatchReader

func NewBatchReader(client core.DB, config BatchReaderConfig) *BatchReader

NewBatchReader creates a new BatchReader with the specified configuration

func NewDefaultBatchReader

func NewDefaultBatchReader(client core.DB) *BatchReader

NewDefaultBatchReader creates a BatchReader with default settings

func (*BatchReader) ReadItems

func (br *BatchReader) ReadItems(ctx context.Context, keys []any, dest any) (*BatchReadResult, error)

ReadItems reads items in batches using their keys

type BatchReaderConfig

type BatchReaderConfig struct {
	BatchSize int
	Logger    *zap.Logger
	Tracker   CostTracker
}

BatchReaderConfig holds configuration for BatchReader

type BatchWriteResult

type BatchWriteResult struct {
	TotalItems     int
	ProcessedItems int
	FailedItems    int
	Errors         []BatchError
	Duration       time.Duration
	ConsumedWCU    int64
}

BatchWriteResult contains the results of a batch write operation

func BatchWrite

func BatchWrite(ctx context.Context, client core.DB, items []any) (*BatchWriteResult, error)

BatchWrite is a convenience function for simple batch writes

func BatchWriteParallel

func BatchWriteParallel(ctx context.Context, client core.DB, items []any, workers int) (*BatchWriteResult, error)

BatchWriteParallel is a convenience function for parallel batch writes

func BatchWriteWithCostTracking

func BatchWriteWithCostTracking(ctx context.Context, client core.DB, items []any, tracker CostTracker, logger *zap.Logger) (*BatchWriteResult, error)

BatchWriteWithCostTracking performs batch write with cost tracking

type BatchWriter

type BatchWriter struct {
	*Processor
}

BatchWriter provides efficient batch write operations with configurable batch sizes

func NewBatchWriter

func NewBatchWriter(client core.DB, config BatchWriterConfig) *BatchWriter

NewBatchWriter creates a new BatchWriter with the specified configuration

func NewDefaultBatchWriter

func NewDefaultBatchWriter(client core.DB) *BatchWriter

NewDefaultBatchWriter creates a BatchWriter with default settings

func (*BatchWriter) WriteItems

func (bw *BatchWriter) WriteItems(ctx context.Context, items []any) (*BatchWriteResult, error)

WriteItems writes items in batches, processing them sequentially

func (*BatchWriter) WriteItemsParallel

func (bw *BatchWriter) WriteItemsParallel(ctx context.Context, items []any, workers int) (*BatchWriteResult, error)

WriteItemsParallel writes items in parallel using worker pools

type BatchWriterConfig

type BatchWriterConfig struct {
	BatchSize int
	Logger    *zap.Logger
	Tracker   CostTracker
}

BatchWriterConfig holds configuration for BatchWriter

type BatchWriterWithProgress

type BatchWriterWithProgress struct {
	*BatchWriter
	// contains filtered or unexported fields
}

BatchWriterWithProgress wraps BatchWriter with progress tracking

func NewBatchWriterWithProgress

func NewBatchWriterWithProgress(client core.DB, config BatchWriterConfig, totalItems int64) *BatchWriterWithProgress

NewBatchWriterWithProgress creates a BatchWriter with progress tracking

func (*BatchWriterWithProgress) GetProgressTracker

func (bwp *BatchWriterWithProgress) GetProgressTracker() *ProgressTracker

GetProgressTracker returns the progress tracker

func (*BatchWriterWithProgress) WriteItemsParallelWithProgress

func (bwp *BatchWriterWithProgress) WriteItemsParallelWithProgress(ctx context.Context, items []any, workers int) (*BatchWriteResult, error)

WriteItemsParallelWithProgress writes items in parallel with progress tracking

func (*BatchWriterWithProgress) WriteItemsWithProgress

func (bwp *BatchWriterWithProgress) WriteItemsWithProgress(ctx context.Context, items []any) (*BatchWriteResult, error)

WriteItemsWithProgress writes items with progress tracking

type CostMetrics

type CostMetrics struct {
	DynamoDBReads  int64
	DynamoDBWrites int64
}

CostMetrics represents the cost metrics at a point in time

type CostTracker

type CostTracker interface {
	// CalculateCost returns the current cost metrics
	CalculateCost() CostMetrics
	// TrackDynamoWrite tracks DynamoDB write operations
	TrackDynamoWrite(items int)
	// TrackDynamoRead tracks DynamoDB read operations
	TrackDynamoRead(items int)
}

CostTracker interface defines the methods needed for cost tracking

type Operation

type Operation int

Operation represents the type of batch operation being performed. It defines whether the batch processor should handle write, delete, or read operations.

const (
	// OperationWrite indicates a batch write operation for creating or updating items
	OperationWrite Operation = iota
	// OperationDelete indicates a batch delete operation for removing items
	OperationDelete
	// OperationRead indicates a batch read operation for retrieving items
	OperationRead
)

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor provides efficient batch operations with configurable batch sizes and operation types. It handles write, delete, and read operations in batches while tracking costs and providing parallel processing capabilities for improved performance.

func NewProcessor

func NewProcessor(client core.DB, config ProcessorConfig) *Processor

NewProcessor creates a new Processor with the specified configuration

func (*Processor) ProcessItems

func (bp *Processor) ProcessItems(ctx context.Context, items []any) (*BatchWriteResult, error)

ProcessItems processes items in batches, handling different operation types

func (*Processor) ProcessItemsParallel

func (bp *Processor) ProcessItemsParallel(ctx context.Context, items []any, workers int) (*BatchWriteResult, error)

ProcessItemsParallel processes items in parallel using worker pools

type ProcessorConfig

type ProcessorConfig struct {
	BatchSize int
	Logger    *zap.Logger
	Tracker   CostTracker
	Operation Operation
}

ProcessorConfig holds configuration for Processor instances. It defines the batch size, logging, cost tracking, and operation type settings for batch processing operations.

type ProgressCallback

type ProgressCallback func(total, processed, failed int64)

ProgressCallback is called when progress is updated

type ProgressTracker

type ProgressTracker struct {
	// contains filtered or unexported fields
}

ProgressTracker tracks the progress of batch operations

func NewProgressTracker

func NewProgressTracker(total int64) *ProgressTracker

NewProgressTracker creates a new progress tracker

func (*ProgressTracker) AddCallback

func (pt *ProgressTracker) AddCallback(callback ProgressCallback)

AddCallback adds a progress callback

func (*ProgressTracker) GetPercentComplete

func (pt *ProgressTracker) GetPercentComplete() float64

GetPercentComplete returns the percentage of completion

func (*ProgressTracker) GetProgress

func (pt *ProgressTracker) GetProgress() (total, processed, failed int64)

GetProgress returns the current progress

func (*ProgressTracker) IsComplete

func (pt *ProgressTracker) IsComplete() bool

IsComplete returns true if all items have been processed

func (*ProgressTracker) UpdateFailed

func (pt *ProgressTracker) UpdateFailed(count int64)

UpdateFailed updates the number of failed items

func (*ProgressTracker) UpdateProcessed

func (pt *ProgressTracker) UpdateProcessed(count int64)

UpdateProcessed updates the number of processed items

type SQSBatchProcessor

type SQSBatchProcessor struct {
	// contains filtered or unexported fields
}

SQSBatchProcessor processes SQS message batches in Lambda

func NewSQSBatchProcessor

func NewSQSBatchProcessor(db core.DB, config SQSBatchProcessorConfig) *SQSBatchProcessor

NewSQSBatchProcessor creates a new SQS batch processor

func (*SQSBatchProcessor) ProcessBatch

ProcessBatch processes an SQS event batch and returns response with failures

func (*SQSBatchProcessor) ProcessNotifications

func (p *SQSBatchProcessor) ProcessNotifications(ctx context.Context, event events.SQSEvent) (events.SQSEventResponse, error)

ProcessNotifications processes notification batches from SQS messages

func (*SQSBatchProcessor) ProcessTimelineEntries

func (p *SQSBatchProcessor) ProcessTimelineEntries(ctx context.Context, event events.SQSEvent) (events.SQSEventResponse, error)

ProcessTimelineEntries processes timeline entries from SQS messages

type SQSBatchProcessorConfig

type SQSBatchProcessorConfig struct {
	Logger       *zap.Logger
	Tracker      CostTracker
	MaxBatchSize int // Maximum items to process in a single batch (default: 25)
}

SQSBatchProcessorConfig holds configuration for SQSBatchProcessor

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL