repository

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2025 License: MIT Imports: 30 Imported by: 0

Documentation

Index

Constants

View Source
const (
	UNDEFINED transitionState = iota
	INVISIBLE
	PENDING
	RUNNING
	COMPLETED
	CANCELED
	ERRORED
)
View Source
const (
	MaxPriority = 100
	MinPriority = 0
)
View Source
const (
	DefaultThresholdPercentage = 0.1 // Default threshold (x seconds) to renew the lease
	DefaultLeaseRenewal        = 3   // Default lease renewal time (y seconds)
)

Variables

This section is empty.

Functions

This section is empty.

Types

type ChronoHandlerFunc

type ChronoHandlerFunc func(ctx context.Context, req interface{}) (interface{}, error)

func ErrorHandler

func ErrorHandler(defaultResp interface{}, msg string) ChronoHandlerFunc

type DLQStats

type DLQStats struct {
	Name         string `json:"name"`
	MessageCount int64  `json:"message_count"`
	CreatedAt    int64  `json:"created_at"`
	UpdatedAt    int64  `json:"updated_at"`
}

DLQStats represents statistics about a Dead Letter Queue

type KeyChecker

type KeyChecker struct {
	// contains filtered or unexported fields
}

func NewKeyChecker

func NewKeyChecker(rdb *redis.Client, batchSize int) *KeyChecker

func (*KeyChecker) Add

func (c *KeyChecker) Add(key string)

func (*KeyChecker) Start

func (c *KeyChecker) Start(ctx context.Context)

func (*KeyChecker) Stop

func (c *KeyChecker) Stop() int

type ReclaimService

type ReclaimService struct {
	// contains filtered or unexported fields
}

func NewReclaimService

func NewReclaimService(storage *storage, interval time.Duration) *ReclaimService

func (*ReclaimService) Start

func (r *ReclaimService) Start(ctx context.Context)

func (*ReclaimService) Stop

func (r *ReclaimService) Stop()

type Scheduler

type Scheduler struct {
	// contains filtered or unexported fields
}

func NewScheduler

func NewScheduler(storage *storage, interval time.Duration) *Scheduler

func (*Scheduler) Start

func (s *Scheduler) Start(ctx context.Context)

func (*Scheduler) Stop

func (s *Scheduler) Stop()

type Storage

type Storage interface {
	CreateQueue(ctx context.Context, request *queueservice_pb.CreateQueueRequest) (*queueservice_pb.CreateQueueResponse, error)
	GetQueueMetadata(ctx context.Context, queueName string) (*queue_pb.QueueMetadata, error)
	DeleteQueue(ctx context.Context, request *queueservice_pb.DeleteQueueRequest) (*queueservice_pb.DeleteQueueResponse, error)
	CreateQueueMessage(ctx context.Context, request *queueservice_pb.PostMessageRequest, validator validator.Validator) (*queueservice_pb.PostMessageResponse, error)
	GetQueueMessage(ctx context.Context, request *queueservice_pb.GetNextMessageRequest) (*queueservice_pb.GetNextMessageResponse, error)
	DeleteQueueMessage(ctx context.Context, queueName string, messageID string) error
	AcknowledgeMessage(ctx context.Context, request *queueservice_pb.AcknowledgeMessageRequest) (*queueservice_pb.AcknowledgeMessageResponse, error)
	RenewMessageLease(ctx context.Context, request *queueservice_pb.RenewMessageLeaseRequest) (*queueservice_pb.RenewMessageLeaseResponse, error)
	PeekQueueMessages(ctx context.Context, request *queueservice_pb.PeekQueueMessagesRequest) (*queueservice_pb.PeekQueueMessagesResponse, error)
	GetQueueState(ctx context.Context, request *queueservice_pb.GetQueueStateRequest) (*queueservice_pb.GetQueueStateResponse, error)
	SendMessageHeartBeat(ctx context.Context, request *queueservice_pb.SendMessageHeartBeatRequest) (*queueservice_pb.SendMessageHeartBeatResponse, error)
	ListQueues(ctx context.Context, request *queueservice_pb.ListQueuesRequest) (*queueservice_pb.ListQueuesResponse, error)
	CreateSchedule(ctx context.Context, request *queueservice_pb.CreateScheduleRequest) (*queueservice_pb.CreateScheduleResponse, error)
	DeleteSchedule(ctx context.Context, request *queueservice_pb.DeleteScheduleRequest) (*queueservice_pb.DeleteScheduleResponse, error)
	GetSchedule(ctx context.Context, request *queueservice_pb.GetScheduleRequest) (*queueservice_pb.GetScheduleResponse, error)
	ListSchedules(ctx context.Context, request *queueservice_pb.ListSchedulesRequest) (*queueservice_pb.ListSchedulesResponse, error)
	GetScheduleHistory(ctx context.Context, request *queueservice_pb.GetScheduleHistoryRequest) (*queueservice_pb.GetScheduleHistoryResponse, error)
	PauseSchedule(ctx context.Context, request *queueservice_pb.PauseScheduleRequest) (*queueservice_pb.PauseScheduleResponse, error)
	ResumeSchedule(ctx context.Context, request *queueservice_pb.ResumeScheduleRequest) (*queueservice_pb.ResumeScheduleResponse, error)

	// Calendar schedule operations
	ValidateCalendarSchedule(ctx context.Context, calendarSchedule *schedule_pb.CalendarSchedule) error
	GetCalendarSchedulePreview(ctx context.Context, calendarSchedule *schedule_pb.CalendarSchedule, count int) (*queueservice_pb.PreviewCalendarScheduleResponse, error)

	// DLQ management methods
	GetDLQMessages(ctx context.Context, dlqName string, limit int32) ([]*message_pb.Message, error)
	RequeueFromDLQ(ctx context.Context, dlqName string, messageID string, targetQueueName string, resetRetries bool) error
	DeleteFromDLQ(ctx context.Context, dlqName string, messageID string) error
	PurgeDLQ(ctx context.Context, dlqName string) error
	GetDLQStats(ctx context.Context, dlqName string) (*DLQStats, error)
}

func NewQueueStorage

func NewQueueStorage(ctx context.Context, redisClient *redis.Client, encryptionKeyManager *keymanager.EncryptionKeyManager, logger *log.Logger) Storage

func NewQueueStorageForTesting

func NewQueueStorageForTesting(redisClient *redis.Client, encryptionKeyManager *keymanager.EncryptionKeyManager, logger *log.Logger) Storage

NewQueueStorageForTesting creates a storage instance without background workers for testing

func NewQueueStorageWithIntervals

func NewQueueStorageWithIntervals(ctx context.Context, redisClient *redis.Client, encryptionKeyManager *keymanager.EncryptionKeyManager, logger *log.Logger, schedulerInterval, reclaimInterval time.Duration) Storage

NewQueueStorageWithIntervals creates a storage instance with custom scheduler and reclaim intervals

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL