Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type CalendarService ¶
type CalendarService struct {
// contains filtered or unexported fields
}
CalendarService triggers calendar-based schedules when their next_run arrives.
func NewCalendarService ¶
func NewCalendarService(base *sqlbase.BaseSQL, engine calendar.Engine, interval time.Duration) *CalendarService
NewCalendarService creates a new calendar background processor.
func (*CalendarService) RunOnce ¶
func (c *CalendarService) RunOnce(ctx context.Context) error
RunOnce processes due schedules once (used in tests).
func (*CalendarService) Start ¶
func (c *CalendarService) Start(ctx context.Context)
Start begins the processing loop.
func (*CalendarService) StopGracefully ¶
func (c *CalendarService) StopGracefully(ctx context.Context) error
StopGracefully stops the service and waits for shutdown.
type CleanupService ¶
type CleanupService struct {
// contains filtered or unexported fields
}
CleanupService handles permanent deletion of soft-deleted messages after their retention period expires.
func NewCleanupService ¶
func NewCleanupService(base *sqlbase.BaseSQL, interval time.Duration) *CleanupService
NewCleanupService creates a new cleanup service. Recommended interval: 1 hour (cleanup is not time-sensitive).
func (*CleanupService) Start ¶
func (s *CleanupService) Start(ctx context.Context)
Start begins the cleanup service loop.
func (*CleanupService) StopGracefully ¶
func (s *CleanupService) StopGracefully(ctx context.Context) error
StopGracefully signals the service to stop and waits for graceful shutdown.
type CronProcessorService ¶
type CronProcessorService struct {
// contains filtered or unexported fields
}
CronProcessorService triggers cron-based schedules when their cron expressions are due.
func NewCronProcessorService ¶
func NewCronProcessorService(base *sqlbase.BaseSQL, interval time.Duration) *CronProcessorService
NewCronProcessorService creates a new cron background processor.
func (*CronProcessorService) RunOnce ¶
func (c *CronProcessorService) RunOnce(ctx context.Context) error
RunOnce processes cron schedules once (useful for tests).
func (*CronProcessorService) Start ¶
func (c *CronProcessorService) Start(ctx context.Context)
Start begins the processing loop.
func (*CronProcessorService) StopGracefully ¶
func (c *CronProcessorService) StopGracefully(ctx context.Context) error
StopGracefully stops the service and waits for shutdown.
type MetricsReporterService ¶
type MetricsReporterService struct {
// contains filtered or unexported fields
}
MetricsReporterService periodically reports queue state metrics to Prometheus
func NewMetricsReporterService ¶
func NewMetricsReporterService(baseSQL *repositorysql.BaseSQL, interval time.Duration) *MetricsReporterService
NewMetricsReporterService creates a new metrics reporter service interval specifies how often to refresh queue state metrics (recommended: 30s-60s)
func (*MetricsReporterService) LastReportedAt ¶
func (s *MetricsReporterService) LastReportedAt() time.Time
LastReportedAt returns when metrics were last reported
func (*MetricsReporterService) Start ¶
func (s *MetricsReporterService) Start(ctx context.Context)
Start begins the periodic metrics reporting
func (*MetricsReporterService) StopGracefully ¶
func (s *MetricsReporterService) StopGracefully(ctx context.Context) error
StopGracefully stops the metrics reporter and waits for completion
type ReclaimService ¶
type ReclaimService struct {
// contains filtered or unexported fields
}
ReclaimService handles expired lease reclamation for SQL storage backends. It periodically scans for messages with expired leases or heartbeat timeouts and either requeues them (if attempts remain) or moves them to DLQ.
This service is generic and works with any SQL backend (SQLite, Postgres, etc.) by using the BaseSQL abstraction layer and the ReclaimableBackend interface.
func NewReclaimService ¶
func NewReclaimService(backend ReclaimableBackend, base *sqlbase.BaseSQL, interval time.Duration) *ReclaimService
NewReclaimService creates a new reclaim service. The backend parameter must implement ReclaimableBackend (Postgres/SQLite Storage do this).
func (*ReclaimService) Start ¶
func (r *ReclaimService) Start(ctx context.Context)
Start begins the reclaim service loop.
func (*ReclaimService) Stop ¶
func (r *ReclaimService) Stop()
Stop signals the service to stop (legacy method for backward compatibility).
func (*ReclaimService) StopGracefully ¶
func (r *ReclaimService) StopGracefully(ctx context.Context) error
StopGracefully signals the service to stop and waits for graceful shutdown. Returns when the service has fully stopped or context times out.
type ReclaimableBackend ¶ added in v1.2.1
type ReclaimableBackend interface {
// FindExpiredMessages locates messages with expired leases or heartbeats.
//
// Returns up to 'limit' messages from the specified queue that have:
// - lease_expiry < current time, OR
// - heartbeat_expiry < current time
//
// Used by: ReclaimService to identify messages that need reprocessing
FindExpiredMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)
// ReclaimExpiredMessage processes an expired message.
//
// Behavior depends on remaining attempts:
// - If attempts remain: Decrements attempts_left, moves to PENDING (requeue)
// - If no attempts: Moves to ERRORED state and pushes to DLQ
//
// This method handles:
// - State transitions
// - Attempt counting
// - Lease cleanup
// - Metrics recording
//
// Used by: ReclaimService during periodic scans
ReclaimExpiredMessage(ctx context.Context, queueName string, message *messagepb.Message) error
}
ReclaimableBackend defines internal operations needed for lease reclamation.
Design Note: This is an INTERNAL interface, NOT part of the public BackendStorage interface.
Why keep it separate?
Implementation-Specific: The find-and-update pattern is SQL-specific. Other backends (Redis, in-memory) might use different mechanisms: - Redis: ZRANGEBYSCORE on sorted set by expiry time - In-memory: Timer-based callbacks - Message broker: Native TTL handling
Prevents Interface Pollution: BackendStorage defines public operations (EnqueueMessage, ClaimMessage, etc.). Reclaim operations are internal background maintenance, not part of the user-facing API.
Flexible Evolution: We can change reclaim logic without affecting the core storage interface contract.
Only SQL-based backends (Postgres, SQLite) need to implement this interface. The ReclaimService uses this interface to remain type-safe while avoiding tight coupling to the main storage interface.
type SchedulerService ¶
type SchedulerService struct {
// contains filtered or unexported fields
}
SchedulerService handles moving INVISIBLE messages to PENDING when their scheduled time arrives. This service is generic and works with any SQL backend (SQLite, Postgres, etc.) by using the BaseSQL abstraction layer.
func NewSchedulerService ¶
func NewSchedulerService(base *sqlbase.BaseSQL, interval time.Duration) *SchedulerService
NewSchedulerService creates a new scheduler service.
func (*SchedulerService) Start ¶
func (s *SchedulerService) Start(ctx context.Context)
Start begins the scheduler service loop.
func (*SchedulerService) Stop ¶
func (s *SchedulerService) Stop()
Stop signals the service to stop (legacy method for backward compatibility).
func (*SchedulerService) StopGracefully ¶
func (s *SchedulerService) StopGracefully(ctx context.Context) error
Stop signals the service to stop and waits for graceful shutdown. Returns when the service has fully stopped or context times out.