Documentation
¶
Index ¶
Constants ¶
View Source
const ( UNDEFINED transitionState = iota INVISIBLE PENDING RUNNING COMPLETED CANCELED ERRORED )
View Source
const ( MaxPriority = 100 MinPriority = 0 )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type ChronoHandlerFunc ¶
func ErrorHandler ¶
func ErrorHandler(defaultResp interface{}, msg string) ChronoHandlerFunc
type DLQStats ¶
type DLQStats struct {
Name string `json:"name"`
MessageCount int64 `json:"message_count"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
DLQStats represents statistics about a Dead Letter Queue
type KeyChecker ¶
type KeyChecker struct {
// contains filtered or unexported fields
}
func NewKeyChecker ¶
func NewKeyChecker(rdb *redis.Client, batchSize int) *KeyChecker
func (*KeyChecker) Add ¶
func (c *KeyChecker) Add(key string)
func (*KeyChecker) Start ¶
func (c *KeyChecker) Start(ctx context.Context)
func (*KeyChecker) Stop ¶
func (c *KeyChecker) Stop() int
type ReclaimService ¶
type ReclaimService struct {
// contains filtered or unexported fields
}
func NewReclaimService ¶
func NewReclaimService(storage *storage, interval time.Duration) *ReclaimService
func (*ReclaimService) Start ¶
func (r *ReclaimService) Start(ctx context.Context)
func (*ReclaimService) Stop ¶
func (r *ReclaimService) Stop()
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
func NewScheduler ¶
type Storage ¶
type Storage interface {
CreateQueue(ctx context.Context, request *queueservice_pb.CreateQueueRequest) (*queueservice_pb.CreateQueueResponse, error)
GetQueueMetadata(ctx context.Context, queueName string) (*queue_pb.QueueMetadata, error)
DeleteQueue(ctx context.Context, request *queueservice_pb.DeleteQueueRequest) (*queueservice_pb.DeleteQueueResponse, error)
CreateQueueMessage(ctx context.Context, request *queueservice_pb.PostMessageRequest, validator validator.Validator) (*queueservice_pb.PostMessageResponse, error)
GetQueueMessage(ctx context.Context, request *queueservice_pb.GetNextMessageRequest) (*queueservice_pb.GetNextMessageResponse, error)
DeleteQueueMessage(ctx context.Context, queueName string, messageID string) error
AcknowledgeMessage(ctx context.Context, request *queueservice_pb.AcknowledgeMessageRequest) (*queueservice_pb.AcknowledgeMessageResponse, error)
RenewMessageLease(ctx context.Context, request *queueservice_pb.RenewMessageLeaseRequest) (*queueservice_pb.RenewMessageLeaseResponse, error)
PeekQueueMessages(ctx context.Context, request *queueservice_pb.PeekQueueMessagesRequest) (*queueservice_pb.PeekQueueMessagesResponse, error)
GetQueueState(ctx context.Context, request *queueservice_pb.GetQueueStateRequest) (*queueservice_pb.GetQueueStateResponse, error)
SendMessageHeartBeat(ctx context.Context, request *queueservice_pb.SendMessageHeartBeatRequest) (*queueservice_pb.SendMessageHeartBeatResponse, error)
ListQueues(ctx context.Context, request *queueservice_pb.ListQueuesRequest) (*queueservice_pb.ListQueuesResponse, error)
CreateSchedule(ctx context.Context, request *queueservice_pb.CreateScheduleRequest) (*queueservice_pb.CreateScheduleResponse, error)
DeleteSchedule(ctx context.Context, request *queueservice_pb.DeleteScheduleRequest) (*queueservice_pb.DeleteScheduleResponse, error)
GetSchedule(ctx context.Context, request *queueservice_pb.GetScheduleRequest) (*queueservice_pb.GetScheduleResponse, error)
ListSchedules(ctx context.Context, request *queueservice_pb.ListSchedulesRequest) (*queueservice_pb.ListSchedulesResponse, error)
GetScheduleHistory(ctx context.Context, request *queueservice_pb.GetScheduleHistoryRequest) (*queueservice_pb.GetScheduleHistoryResponse, error)
PauseSchedule(ctx context.Context, request *queueservice_pb.PauseScheduleRequest) (*queueservice_pb.PauseScheduleResponse, error)
ResumeSchedule(ctx context.Context, request *queueservice_pb.ResumeScheduleRequest) (*queueservice_pb.ResumeScheduleResponse, error)
// Calendar schedule operations
ValidateCalendarSchedule(ctx context.Context, calendarSchedule *schedule_pb.CalendarSchedule) error
GetCalendarSchedulePreview(ctx context.Context, calendarSchedule *schedule_pb.CalendarSchedule, count int) (*queueservice_pb.PreviewCalendarScheduleResponse, error)
// DLQ management methods
GetDLQMessages(ctx context.Context, dlqName string, limit int32) ([]*message_pb.Message, error)
RequeueFromDLQ(ctx context.Context, dlqName string, messageID string, targetQueueName string, resetRetries bool) error
DeleteFromDLQ(ctx context.Context, dlqName string, messageID string) error
PurgeDLQ(ctx context.Context, dlqName string) error
GetDLQStats(ctx context.Context, dlqName string) (*DLQStats, error)
}
func NewQueueStorage ¶
func NewQueueStorage(ctx context.Context, redisClient *redis.Client, encryptionKeyManager *keymanager.EncryptionKeyManager, logger *log.Logger) Storage
func NewQueueStorageWithIntervals ¶
func NewQueueStorageWithIntervals(ctx context.Context, redisClient *redis.Client, encryptionKeyManager *keymanager.EncryptionKeyManager, logger *log.Logger, schedulerInterval, reclaimInterval time.Duration) Storage
NewQueueStorageWithIntervals creates a storage instance with custom scheduler and reclaim intervals
Source Files
¶
Click to show internal directories.
Click to hide internal directories.