Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AttemptMismatchError ¶ added in v1.2.0
type AttemptMismatchError struct {
MessageID string
QueueName string
ExpectedAttempt string
ActualAttempt string
}
AttemptMismatchError is returned when the attempt ID doesn't match
func (*AttemptMismatchError) Error ¶ added in v1.2.0
func (e *AttemptMismatchError) Error() string
type BackendStorage ¶ added in v1.2.0
type BackendStorage interface {
// Lifecycle
Close() error
// Queue Operations
CreateQueue(ctx context.Context, queue *queuepb.Queue) error
GetQueue(ctx context.Context, name string) (*queuepb.Queue, error)
GetQueueMetadata(ctx context.Context, name string) (*queuepb.QueueMetadata, error)
ListQueues(ctx context.Context) ([]*queuepb.Queue, error)
DeleteQueue(ctx context.Context, name string) error
// Message Operations
EnqueueMessage(ctx context.Context, queueName string, message *messagepb.Message) error
ClaimMessage(ctx context.Context, queueName string, workerId string, attemptId string) (*messagepb.Message, error)
AcknowledgeMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
NackMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
HeartbeatMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
ExtendMessageLease(ctx context.Context, queueName string, messageId string, attemptId string, extensionMs int64) error
PeekMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)
// Schedule Operations
CreateSchedule(ctx context.Context, schedule *schedulepb.Schedule) error
GetSchedule(ctx context.Context, scheduleId string) (*schedulepb.Schedule, error)
ListSchedules(ctx context.Context, queueName string) ([]*schedulepb.Schedule, error)
DeleteSchedule(ctx context.Context, scheduleId string) error
PauseSchedule(ctx context.Context, scheduleId string) error
ResumeSchedule(ctx context.Context, scheduleId string) error
RecordScheduleExecution(ctx context.Context, scheduleId string, messageId string, executionTime int64) error
GetScheduleHistory(ctx context.Context, scheduleId string, limit int64) (*schedulepb.ScheduleHistory, error)
// DLQ Operations
GetDLQMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)
RetryDLQMessage(ctx context.Context, queueName string, messageId string) error
DeleteDLQMessage(ctx context.Context, queueName string, messageId string) error
PurgeDLQ(ctx context.Context, queueName string) (int64, error)
}
BackendStorage defines the low-level interface that storage backends must implement. This interface uses simple method signatures (unlike the high-level Storage interface which matches gRPC service signatures). Both sqlite.Storage and postgres.Storage implement this interface.
type BackendType ¶ added in v1.2.0
type BackendType string
BackendType represents the type of storage backend
const ( BackendSQLite BackendType = "sqlite" BackendPostgres BackendType = "postgres" )
type DLQStats ¶
type DLQStats struct {
Name string `json:"name"`
MessageCount int64 `json:"message_count"`
CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"`
}
DLQStats represents statistics about a Dead Letter Queue
type InvalidStateTransitionError ¶ added in v1.2.0
type InvalidStateTransitionError struct {
Entity string // "message", "queue", "schedule"
ID string
CurrentState string
TargetState string
}
InvalidStateTransitionError is returned when a state transition is invalid
func (*InvalidStateTransitionError) Error ¶ added in v1.2.0
func (e *InvalidStateTransitionError) Error() string
type MessageNotFoundError ¶ added in v1.2.0
MessageNotFoundError is returned when a message does not exist
func (*MessageNotFoundError) Error ¶ added in v1.2.0
func (e *MessageNotFoundError) Error() string
type QueueNotFoundError ¶ added in v1.2.0
type QueueNotFoundError struct {
QueueName string
}
QueueNotFoundError is returned when a queue does not exist
func (*QueueNotFoundError) Error ¶ added in v1.2.0
func (e *QueueNotFoundError) Error() string
type ScheduleNotFoundError ¶ added in v1.2.0
ScheduleNotFoundError is returned when a schedule does not exist
func (*ScheduleNotFoundError) Error ¶ added in v1.2.0
func (e *ScheduleNotFoundError) Error() string
type Storage ¶
type Storage interface {
// Queue Operations
CreateQueue(ctx context.Context, request *queueservicepb.CreateQueueRequest) (*queueservicepb.CreateQueueResponse, error)
GetQueueMetadata(ctx context.Context, queueName string) (*queuepb.QueueMetadata, error)
DeleteQueue(ctx context.Context, request *queueservicepb.DeleteQueueRequest) (*queueservicepb.DeleteQueueResponse, error)
ListQueues(ctx context.Context, request *queueservicepb.ListQueuesRequest) (*queueservicepb.ListQueuesResponse, error)
GetQueueState(ctx context.Context, request *queueservicepb.GetQueueStateRequest) (*queueservicepb.GetQueueStateResponse, error)
// Message Lifecycle
CreateQueueMessage(ctx context.Context, request *queueservicepb.PostMessageRequest, validator validator.Validator) (*queueservicepb.PostMessageResponse, error)
GetQueueMessage(ctx context.Context, request *queueservicepb.GetNextMessageRequest) (*queueservicepb.GetNextMessageResponse, error)
AcknowledgeMessage(ctx context.Context, request *queueservicepb.AcknowledgeMessageRequest) (*queueservicepb.AcknowledgeMessageResponse, error)
DeleteQueueMessage(ctx context.Context, queueName string, messageID string) error
SendMessageHeartBeat(ctx context.Context, request *queueservicepb.SendMessageHeartBeatRequest) (*queueservicepb.SendMessageHeartBeatResponse, error)
RenewMessageLease(ctx context.Context, request *queueservicepb.RenewMessageLeaseRequest) (*queueservicepb.RenewMessageLeaseResponse, error)
PeekQueueMessages(ctx context.Context, request *queueservicepb.PeekQueueMessagesRequest) (*queueservicepb.PeekQueueMessagesResponse, error)
// Schedule Operations
CreateSchedule(ctx context.Context, request *queueservicepb.CreateScheduleRequest) (*queueservicepb.CreateScheduleResponse, error)
DeleteSchedule(ctx context.Context, request *queueservicepb.DeleteScheduleRequest) (*queueservicepb.DeleteScheduleResponse, error)
GetSchedule(ctx context.Context, request *queueservicepb.GetScheduleRequest) (*queueservicepb.GetScheduleResponse, error)
ListSchedules(ctx context.Context, request *queueservicepb.ListSchedulesRequest) (*queueservicepb.ListSchedulesResponse, error)
GetScheduleHistory(ctx context.Context, request *queueservicepb.GetScheduleHistoryRequest) (*queueservicepb.GetScheduleHistoryResponse, error)
PauseSchedule(ctx context.Context, request *queueservicepb.PauseScheduleRequest) (*queueservicepb.PauseScheduleResponse, error)
ResumeSchedule(ctx context.Context, request *queueservicepb.ResumeScheduleRequest) (*queueservicepb.ResumeScheduleResponse, error)
// Calendar Schedule Operations
ValidateCalendarSchedule(ctx context.Context, calendarSchedule *schedulepb.CalendarSchedule) error
GetCalendarSchedulePreview(ctx context.Context, calendarSchedule *schedulepb.CalendarSchedule, count int) (*queueservicepb.PreviewCalendarScheduleResponse, error)
// DLQ Management
GetDLQMessages(ctx context.Context, dlqName string, limit int32) ([]*messagepb.Message, error)
RequeueFromDLQ(ctx context.Context, dlqName string, messageID string, targetQueueName string, resetRetries bool) error
DeleteFromDLQ(ctx context.Context, dlqName string, messageID string) error
PurgeDLQ(ctx context.Context, dlqName string) error
GetDLQStats(ctx context.Context, dlqName string) (*DLQStats, error)
}
Storage defines the interface for ChronoQueue persistence layer. All storage backends (SQLite, Postgres, Redis) must implement this interface.
func NewPostgresStorage ¶ added in v1.2.0
NewPostgresStorage creates a new storage implementation using PostgreSQL backend