repository

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 25, 2026 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AttemptMismatchError added in v1.2.0

type AttemptMismatchError struct {
	MessageID       string
	QueueName       string
	ExpectedAttempt string
	ActualAttempt   string
}

AttemptMismatchError is returned when the attempt ID doesn't match

func (*AttemptMismatchError) Error added in v1.2.0

func (e *AttemptMismatchError) Error() string

type BackendStorage added in v1.2.0

type BackendStorage interface {
	// Lifecycle
	Close() error

	// Queue Operations
	CreateQueue(ctx context.Context, queue *queuepb.Queue) error
	GetQueue(ctx context.Context, name string) (*queuepb.Queue, error)
	GetQueueMetadata(ctx context.Context, name string) (*queuepb.QueueMetadata, error)
	ListQueues(ctx context.Context) ([]*queuepb.Queue, error)
	DeleteQueue(ctx context.Context, name string) error

	// Message Operations
	EnqueueMessage(ctx context.Context, queueName string, message *messagepb.Message) error
	ClaimMessage(ctx context.Context, queueName string, workerId string, attemptId string) (*messagepb.Message, error)
	AcknowledgeMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
	NackMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
	HeartbeatMessage(ctx context.Context, queueName string, messageId string, attemptId string) error
	ExtendMessageLease(ctx context.Context, queueName string, messageId string, attemptId string, extensionMs int64) error
	PeekMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)

	// Schedule Operations
	CreateSchedule(ctx context.Context, schedule *schedulepb.Schedule) error
	GetSchedule(ctx context.Context, scheduleId string) (*schedulepb.Schedule, error)
	ListSchedules(ctx context.Context, queueName string) ([]*schedulepb.Schedule, error)
	DeleteSchedule(ctx context.Context, scheduleId string) error
	PauseSchedule(ctx context.Context, scheduleId string) error
	ResumeSchedule(ctx context.Context, scheduleId string) error
	RecordScheduleExecution(ctx context.Context, scheduleId string, messageId string, executionTime int64) error
	GetScheduleHistory(ctx context.Context, scheduleId string, limit int64) (*schedulepb.ScheduleHistory, error)

	// DLQ Operations
	GetDLQMessages(ctx context.Context, queueName string, limit int32) ([]*messagepb.Message, error)
	RetryDLQMessage(ctx context.Context, queueName string, messageId string) error
	DeleteDLQMessage(ctx context.Context, queueName string, messageId string) error
	PurgeDLQ(ctx context.Context, queueName string) (int64, error)
}

BackendStorage defines the low-level interface that storage backends must implement. This interface uses simple method signatures (unlike the high-level Storage interface which matches gRPC service signatures). Both sqlite.Storage and postgres.Storage implement this interface.

type BackendType added in v1.2.0

type BackendType string

BackendType represents the type of storage backend

const (
	BackendSQLite   BackendType = "sqlite"
	BackendPostgres BackendType = "postgres"
)

type DLQStats

type DLQStats struct {
	Name         string `json:"name"`
	MessageCount int64  `json:"message_count"`
	CreatedAt    int64  `json:"created_at"`
	UpdatedAt    int64  `json:"updated_at"`
}

DLQStats represents statistics about a Dead Letter Queue

type InvalidStateTransitionError added in v1.2.0

type InvalidStateTransitionError struct {
	Entity       string // "message", "queue", "schedule"
	ID           string
	CurrentState string
	TargetState  string
}

InvalidStateTransitionError is returned when a state transition is invalid

func (*InvalidStateTransitionError) Error added in v1.2.0

type MessageNotFoundError added in v1.2.0

type MessageNotFoundError struct {
	MessageID string
	QueueName string
}

MessageNotFoundError is returned when a message does not exist

func (*MessageNotFoundError) Error added in v1.2.0

func (e *MessageNotFoundError) Error() string

type QueueNotFoundError added in v1.2.0

type QueueNotFoundError struct {
	QueueName string
}

QueueNotFoundError is returned when a queue does not exist

func (*QueueNotFoundError) Error added in v1.2.0

func (e *QueueNotFoundError) Error() string

type ScheduleNotFoundError added in v1.2.0

type ScheduleNotFoundError struct {
	ScheduleID string
	QueueName  string
}

ScheduleNotFoundError is returned when a schedule does not exist

func (*ScheduleNotFoundError) Error added in v1.2.0

func (e *ScheduleNotFoundError) Error() string

type Storage

type Storage interface {
	// Queue Operations
	CreateQueue(ctx context.Context, request *queueservicepb.CreateQueueRequest) (*queueservicepb.CreateQueueResponse, error)
	GetQueueMetadata(ctx context.Context, queueName string) (*queuepb.QueueMetadata, error)
	DeleteQueue(ctx context.Context, request *queueservicepb.DeleteQueueRequest) (*queueservicepb.DeleteQueueResponse, error)
	ListQueues(ctx context.Context, request *queueservicepb.ListQueuesRequest) (*queueservicepb.ListQueuesResponse, error)
	GetQueueState(ctx context.Context, request *queueservicepb.GetQueueStateRequest) (*queueservicepb.GetQueueStateResponse, error)

	// Message Lifecycle
	CreateQueueMessage(ctx context.Context, request *queueservicepb.PostMessageRequest, validator validator.Validator) (*queueservicepb.PostMessageResponse, error)
	GetQueueMessage(ctx context.Context, request *queueservicepb.GetNextMessageRequest) (*queueservicepb.GetNextMessageResponse, error)
	AcknowledgeMessage(ctx context.Context, request *queueservicepb.AcknowledgeMessageRequest) (*queueservicepb.AcknowledgeMessageResponse, error)
	DeleteQueueMessage(ctx context.Context, queueName string, messageID string) error
	SendMessageHeartBeat(ctx context.Context, request *queueservicepb.SendMessageHeartBeatRequest) (*queueservicepb.SendMessageHeartBeatResponse, error)
	RenewMessageLease(ctx context.Context, request *queueservicepb.RenewMessageLeaseRequest) (*queueservicepb.RenewMessageLeaseResponse, error)
	PeekQueueMessages(ctx context.Context, request *queueservicepb.PeekQueueMessagesRequest) (*queueservicepb.PeekQueueMessagesResponse, error)

	// Schedule Operations
	CreateSchedule(ctx context.Context, request *queueservicepb.CreateScheduleRequest) (*queueservicepb.CreateScheduleResponse, error)
	DeleteSchedule(ctx context.Context, request *queueservicepb.DeleteScheduleRequest) (*queueservicepb.DeleteScheduleResponse, error)
	GetSchedule(ctx context.Context, request *queueservicepb.GetScheduleRequest) (*queueservicepb.GetScheduleResponse, error)
	ListSchedules(ctx context.Context, request *queueservicepb.ListSchedulesRequest) (*queueservicepb.ListSchedulesResponse, error)
	GetScheduleHistory(ctx context.Context, request *queueservicepb.GetScheduleHistoryRequest) (*queueservicepb.GetScheduleHistoryResponse, error)
	PauseSchedule(ctx context.Context, request *queueservicepb.PauseScheduleRequest) (*queueservicepb.PauseScheduleResponse, error)
	ResumeSchedule(ctx context.Context, request *queueservicepb.ResumeScheduleRequest) (*queueservicepb.ResumeScheduleResponse, error)

	// Calendar Schedule Operations
	ValidateCalendarSchedule(ctx context.Context, calendarSchedule *schedulepb.CalendarSchedule) error
	GetCalendarSchedulePreview(ctx context.Context, calendarSchedule *schedulepb.CalendarSchedule, count int) (*queueservicepb.PreviewCalendarScheduleResponse, error)

	// DLQ Management
	GetDLQMessages(ctx context.Context, dlqName string, limit int32) ([]*messagepb.Message, error)
	RequeueFromDLQ(ctx context.Context, dlqName string, messageID string, targetQueueName string, resetRetries bool) error
	DeleteFromDLQ(ctx context.Context, dlqName string, messageID string) error
	PurgeDLQ(ctx context.Context, dlqName string) error
	GetDLQStats(ctx context.Context, dlqName string) (*DLQStats, error)
}

Storage defines the interface for ChronoQueue persistence layer. All storage backends (SQLite, Postgres, Redis) must implement this interface.

func NewPostgresStorage added in v1.2.0

func NewPostgresStorage(ctx context.Context, config *postgres.Config) (Storage, error)

NewPostgresStorage creates a new storage implementation using PostgreSQL backend

func NewSQLiteStorage added in v1.2.0

func NewSQLiteStorage(ctx context.Context, config *sqlite.Config) (Storage, error)

NewSQLiteStorage creates a new storage implementation using SQLite backend

Directories

Path Synopsis
sql

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL