Documentation
¶
Index ¶
- Constants
- Variables
- func IDMutationTouchesManagedRoute(store Store, ids []string, allowedStates map[State]struct{}, ...) (bool, error)
- type AttemptListRequest
- type AttemptListResponse
- type AttemptOutcome
- type BacklogBucket
- type BacklogTrendListRequest
- type BacklogTrendListResponse
- type BacklogTrendOperatorAction
- type BacklogTrendSample
- type BacklogTrendSignalConfig
- type BacklogTrendSignalOptions
- type BacklogTrendSignals
- type BacklogTrendStore
- type BatchEnqueuer
- type DeadDeleteRequest
- type DeadDeleteResponse
- type DeadListRequest
- type DeadListResponse
- type DeadRequeueRequest
- type DeadRequeueResponse
- type DeliveryAttempt
- type DequeueRequest
- type DequeueResponse
- type Envelope
- type HistogramBucket
- type HistogramSnapshot
- type LeaseBatchConflict
- type LeaseBatchResult
- type LeaseBatchStore
- type MemoryOption
- func WithDLQRetention(maxAge time.Duration, maxDepth int) MemoryOption
- func WithDeliveredRetention(maxAge time.Duration) MemoryOption
- func WithMemoryPressureLimits(retainedItems int, retainedBytes int64) MemoryOption
- func WithNowFunc(now func() time.Time) MemoryOption
- func WithQueueLimits(maxDepth int, dropPolicy string) MemoryOption
- func WithQueueRetention(maxAge, pruneInterval time.Duration) MemoryOption
- type MemoryPressureRuntimeMetrics
- type MemoryRuntimeMetrics
- type MemoryStore
- func (s *MemoryStore) Ack(leaseID string) error
- func (s *MemoryStore) AckBatch(leaseIDs []string) (LeaseBatchResult, error)
- func (s *MemoryStore) CancelMessages(req MessageCancelRequest) (MessageCancelResponse, error)
- func (s *MemoryStore) CancelMessagesByFilter(req MessageManageFilterRequest) (MessageCancelResponse, error)
- func (s *MemoryStore) CaptureBacklogTrendSample(at time.Time) error
- func (s *MemoryStore) DeleteDead(req DeadDeleteRequest) (DeadDeleteResponse, error)
- func (s *MemoryStore) Dequeue(req DequeueRequest) (DequeueResponse, error)
- func (s *MemoryStore) Enqueue(env Envelope) error
- func (s *MemoryStore) EnqueueBatch(items []Envelope) (int, error)
- func (s *MemoryStore) Extend(leaseID string, extendBy time.Duration) error
- func (s *MemoryStore) ListAttempts(req AttemptListRequest) (AttemptListResponse, error)
- func (s *MemoryStore) ListBacklogTrend(req BacklogTrendListRequest) (BacklogTrendListResponse, error)
- func (s *MemoryStore) ListDead(req DeadListRequest) (DeadListResponse, error)
- func (s *MemoryStore) ListMessages(req MessageListRequest) (MessageListResponse, error)
- func (s *MemoryStore) LookupMessages(req MessageLookupRequest) (MessageLookupResponse, error)
- func (s *MemoryStore) MarkDead(leaseID string, reason string) error
- func (s *MemoryStore) MarkDeadBatch(leaseIDs []string, reason string) (LeaseBatchResult, error)
- func (s *MemoryStore) Nack(leaseID string, delay time.Duration) error
- func (s *MemoryStore) NackBatch(leaseIDs []string, delay time.Duration) (LeaseBatchResult, error)
- func (s *MemoryStore) RecordAttempt(attempt DeliveryAttempt) error
- func (s *MemoryStore) RequeueDead(req DeadRequeueRequest) (DeadRequeueResponse, error)
- func (s *MemoryStore) RequeueMessages(req MessageRequeueRequest) (MessageRequeueResponse, error)
- func (s *MemoryStore) RequeueMessagesByFilter(req MessageManageFilterRequest) (MessageRequeueResponse, error)
- func (s *MemoryStore) ResumeMessages(req MessageResumeRequest) (MessageResumeResponse, error)
- func (s *MemoryStore) ResumeMessagesByFilter(req MessageManageFilterRequest) (MessageResumeResponse, error)
- func (s *MemoryStore) RuntimeMetrics() StoreRuntimeMetrics
- func (s *MemoryStore) Stats() (Stats, error)
- type MessageCancelRequest
- type MessageCancelResponse
- type MessageListRequest
- type MessageListResponse
- type MessageLookupItem
- type MessageLookupRequest
- type MessageLookupResponse
- type MessageManageFilterRequest
- type MessageRequeueRequest
- type MessageRequeueResponse
- type MessageResumeRequest
- type MessageResumeResponse
- type PostgresOption
- func WithPostgresDLQRetention(maxAge time.Duration, maxDepth int) PostgresOption
- func WithPostgresDeliveredRetention(maxAge time.Duration) PostgresOption
- func WithPostgresNowFunc(now func() time.Time) PostgresOption
- func WithPostgresPollInterval(d time.Duration) PostgresOption
- func WithPostgresQueueLimits(maxDepth int, dropPolicy string) PostgresOption
- func WithPostgresRetention(maxAge, pruneInterval time.Duration) PostgresOption
- type PostgresStore
- func (s *PostgresStore) Ack(leaseID string) error
- func (s *PostgresStore) AckBatch(leaseIDs []string) (LeaseBatchResult, error)
- func (s *PostgresStore) CancelMessages(req MessageCancelRequest) (MessageCancelResponse, error)
- func (s *PostgresStore) CancelMessagesByFilter(req MessageManageFilterRequest) (MessageCancelResponse, error)
- func (s *PostgresStore) CaptureBacklogTrendSample(at time.Time) error
- func (s *PostgresStore) Close() error
- func (s *PostgresStore) DeleteDead(req DeadDeleteRequest) (DeadDeleteResponse, error)
- func (s *PostgresStore) Dequeue(req DequeueRequest) (DequeueResponse, error)
- func (s *PostgresStore) Enqueue(env Envelope) error
- func (s *PostgresStore) Extend(leaseID string, extendBy time.Duration) error
- func (s *PostgresStore) ListAttempts(req AttemptListRequest) (AttemptListResponse, error)
- func (s *PostgresStore) ListBacklogTrend(req BacklogTrendListRequest) (BacklogTrendListResponse, error)
- func (s *PostgresStore) ListDead(req DeadListRequest) (DeadListResponse, error)
- func (s *PostgresStore) ListMessages(req MessageListRequest) (MessageListResponse, error)
- func (s *PostgresStore) LookupMessages(req MessageLookupRequest) (MessageLookupResponse, error)
- func (s *PostgresStore) MarkDead(leaseID string, reason string) error
- func (s *PostgresStore) MarkDeadBatch(leaseIDs []string, reason string) (LeaseBatchResult, error)
- func (s *PostgresStore) Nack(leaseID string, delay time.Duration) error
- func (s *PostgresStore) NackBatch(leaseIDs []string, delay time.Duration) (LeaseBatchResult, error)
- func (s *PostgresStore) RecordAttempt(attempt DeliveryAttempt) error
- func (s *PostgresStore) RequeueDead(req DeadRequeueRequest) (DeadRequeueResponse, error)
- func (s *PostgresStore) RequeueMessages(req MessageRequeueRequest) (MessageRequeueResponse, error)
- func (s *PostgresStore) RequeueMessagesByFilter(req MessageManageFilterRequest) (MessageRequeueResponse, error)
- func (s *PostgresStore) ResumeMessages(req MessageResumeRequest) (MessageResumeResponse, error)
- func (s *PostgresStore) ResumeMessagesByFilter(req MessageManageFilterRequest) (MessageResumeResponse, error)
- func (s *PostgresStore) RuntimeMetrics() StoreRuntimeMetrics
- func (s *PostgresStore) Stats() (Stats, error)
- type RuntimeMetricsProvider
- type SQLiteOption
- func WithSQLiteCheckpointInterval(d time.Duration) SQLiteOption
- func WithSQLiteDLQRetention(maxAge time.Duration, maxDepth int) SQLiteOption
- func WithSQLiteDeliveredRetention(maxAge time.Duration) SQLiteOption
- func WithSQLiteNowFunc(now func() time.Time) SQLiteOption
- func WithSQLitePollInterval(d time.Duration) SQLiteOption
- func WithSQLiteQueueLimits(maxDepth int, dropPolicy string) SQLiteOption
- func WithSQLiteRetention(maxAge, pruneInterval time.Duration) SQLiteOption
- type SQLiteRuntimeMetrics
- type SQLiteStore
- func (s *SQLiteStore) Ack(leaseID string) error
- func (s *SQLiteStore) AckBatch(leaseIDs []string) (LeaseBatchResult, error)
- func (s *SQLiteStore) CancelMessages(req MessageCancelRequest) (MessageCancelResponse, error)
- func (s *SQLiteStore) CancelMessagesByFilter(req MessageManageFilterRequest) (MessageCancelResponse, error)
- func (s *SQLiteStore) CaptureBacklogTrendSample(at time.Time) error
- func (s *SQLiteStore) Close() error
- func (s *SQLiteStore) DeleteDead(req DeadDeleteRequest) (DeadDeleteResponse, error)
- func (s *SQLiteStore) Dequeue(req DequeueRequest) (DequeueResponse, error)
- func (s *SQLiteStore) Enqueue(env Envelope) error
- func (s *SQLiteStore) EnqueueBatch(items []Envelope) (int, error)
- func (s *SQLiteStore) Extend(leaseID string, extendBy time.Duration) error
- func (s *SQLiteStore) ListAttempts(req AttemptListRequest) (AttemptListResponse, error)
- func (s *SQLiteStore) ListBacklogTrend(req BacklogTrendListRequest) (BacklogTrendListResponse, error)
- func (s *SQLiteStore) ListDead(req DeadListRequest) (DeadListResponse, error)
- func (s *SQLiteStore) ListMessages(req MessageListRequest) (MessageListResponse, error)
- func (s *SQLiteStore) LookupMessages(req MessageLookupRequest) (MessageLookupResponse, error)
- func (s *SQLiteStore) MarkDead(leaseID string, reason string) error
- func (s *SQLiteStore) MarkDeadBatch(leaseIDs []string, reason string) (LeaseBatchResult, error)
- func (s *SQLiteStore) Nack(leaseID string, delay time.Duration) error
- func (s *SQLiteStore) NackBatch(leaseIDs []string, delay time.Duration) (LeaseBatchResult, error)
- func (s *SQLiteStore) RecordAttempt(attempt DeliveryAttempt) error
- func (s *SQLiteStore) RequeueDead(req DeadRequeueRequest) (DeadRequeueResponse, error)
- func (s *SQLiteStore) RequeueMessages(req MessageRequeueRequest) (MessageRequeueResponse, error)
- func (s *SQLiteStore) RequeueMessagesByFilter(req MessageManageFilterRequest) (MessageRequeueResponse, error)
- func (s *SQLiteStore) ResumeMessages(req MessageResumeRequest) (MessageResumeResponse, error)
- func (s *SQLiteStore) ResumeMessagesByFilter(req MessageManageFilterRequest) (MessageResumeResponse, error)
- func (s *SQLiteStore) RuntimeMetrics() StoreRuntimeMetrics
- func (s *SQLiteStore) Stats() (Stats, error)
- type State
- type Stats
- type Store
- type StoreCommonRuntimeMetrics
- type StoreOperationCounterRuntimeMetric
- type StoreOperationDurationRuntimeMetric
- type StoreOperationErrorRuntimeMetric
- type StoreRuntimeMetrics
Constants ¶
const ( MessageOrderDesc = "desc" MessageOrderAsc = "asc" )
Variables ¶
Functions ¶
func IDMutationTouchesManagedRoute ¶
func IDMutationTouchesManagedRoute(store Store, ids []string, allowedStates map[State]struct{}, managedRoutes map[string]struct{}) (bool, error)
IDMutationTouchesManagedRoute reports whether the ID-scoped mutation request targets at least one managed route item in one of the allowed states.
Types ¶
type AttemptListRequest ¶
type AttemptListResponse ¶
type AttemptListResponse struct {
Items []DeliveryAttempt
}
type AttemptOutcome ¶
type AttemptOutcome string
const ( AttemptOutcomeAcked AttemptOutcome = "acked" AttemptOutcomeRetry AttemptOutcome = "retry" AttemptOutcomeDead AttemptOutcome = "dead" )
type BacklogBucket ¶
type BacklogTrendListRequest ¶
type BacklogTrendListResponse ¶
type BacklogTrendListResponse struct {
Items []BacklogTrendSample
Truncated bool
}
type BacklogTrendOperatorAction ¶
type BacklogTrendOperatorAction struct {
ID string
Severity string
AlertRoutingKey string
Summary string
Playbook string
Alerts []string
MCPTools []string
AdminEndpoints []string
}
func (BacklogTrendOperatorAction) Map ¶
func (a BacklogTrendOperatorAction) Map() map[string]any
type BacklogTrendSample ¶
type BacklogTrendSignalConfig ¶
type BacklogTrendSignalConfig struct {
Window time.Duration
ExpectedCaptureInterval time.Duration
StaleGraceFactor int
SustainedGrowthConsecutive int
SustainedGrowthMinSamples int
SustainedGrowthMinDelta int
RecentSurgeMinTotal int
RecentSurgeMinDelta int
RecentSurgePercent int
QueuedPressureMinTotal int
QueuedPressurePercent int
QueuedPressureLeasedMultiplier int
}
func DefaultBacklogTrendSignalConfig ¶
func DefaultBacklogTrendSignalConfig() BacklogTrendSignalConfig
type BacklogTrendSignals ¶
type BacklogTrendSignals struct {
Status string
Window time.Duration
ExpectedCaptureInterval time.Duration
Since time.Time
Until time.Time
SampleCount int
Truncated bool
LatestCapturedAt time.Time
LatestQueued int
LatestLeased int
LatestDead int
LatestTotal int
BaselineTotal int
DeltaTotal int
DeltaPercent int
GrowthRatePerMinute float64
ConsecutiveIncreases int
SustainedGrowth bool
RecentSurge bool
QueuedPressure bool
FreshnessSeconds int
SamplingStale bool
ActiveAlerts []string
}
func AnalyzeBacklogTrendSignals ¶
func AnalyzeBacklogTrendSignals(samples []BacklogTrendSample, truncated bool, opts BacklogTrendSignalOptions) BacklogTrendSignals
func (BacklogTrendSignals) Map ¶
func (s BacklogTrendSignals) Map() map[string]any
func (BacklogTrendSignals) OperatorActions ¶
func (s BacklogTrendSignals) OperatorActions() []BacklogTrendOperatorAction
type BacklogTrendStore ¶
type BacklogTrendStore interface {
CaptureBacklogTrendSample(at time.Time) error
ListBacklogTrend(req BacklogTrendListRequest) (BacklogTrendListResponse, error)
}
BacklogTrendStore is an optional extension implemented by stores that can persist and query backlog trend snapshots.
type BatchEnqueuer ¶
BatchEnqueuer is an optional extension for transactional batch enqueue. When supported, all items are committed atomically (all-or-nothing).
type DeadDeleteRequest ¶
type DeadDeleteRequest struct {
IDs []string
}
type DeadDeleteResponse ¶
type DeadDeleteResponse struct {
Deleted int
}
type DeadListRequest ¶
type DeadListResponse ¶
type DeadListResponse struct {
Items []Envelope
}
type DeadRequeueRequest ¶
type DeadRequeueRequest struct {
IDs []string
}
type DeadRequeueResponse ¶
type DeadRequeueResponse struct {
Requeued int
}
type DeliveryAttempt ¶
type DequeueRequest ¶
type DequeueResponse ¶
type DequeueResponse struct {
Items []Envelope
}
type HistogramBucket ¶ added in v1.1.0
type HistogramSnapshot ¶ added in v1.1.0
type HistogramSnapshot struct {
Buckets []HistogramBucket
Count int64
Sum float64
}
type LeaseBatchConflict ¶ added in v1.4.0
type LeaseBatchResult ¶ added in v1.4.0
type LeaseBatchResult struct {
Succeeded int
Conflicts []LeaseBatchConflict
}
type LeaseBatchStore ¶ added in v1.4.0
type LeaseBatchStore interface {
AckBatch(leaseIDs []string) (LeaseBatchResult, error)
NackBatch(leaseIDs []string, delay time.Duration) (LeaseBatchResult, error)
MarkDeadBatch(leaseIDs []string, reason string) (LeaseBatchResult, error)
}
LeaseBatchStore is an optional extension for batched lease operations. Implementations should process the whole batch in one store transaction where possible and return per-lease conflicts for not-found/expired leases.
type MemoryOption ¶
type MemoryOption func(*MemoryStore)
func WithDLQRetention ¶
func WithDLQRetention(maxAge time.Duration, maxDepth int) MemoryOption
func WithDeliveredRetention ¶
func WithDeliveredRetention(maxAge time.Duration) MemoryOption
func WithMemoryPressureLimits ¶ added in v1.2.0
func WithMemoryPressureLimits(retainedItems int, retainedBytes int64) MemoryOption
WithMemoryPressureLimits overrides memory pressure limits for the in-memory backend. Positive values enable explicit thresholds.
func WithNowFunc ¶
func WithNowFunc(now func() time.Time) MemoryOption
func WithQueueLimits ¶
func WithQueueLimits(maxDepth int, dropPolicy string) MemoryOption
func WithQueueRetention ¶
func WithQueueRetention(maxAge, pruneInterval time.Duration) MemoryOption
type MemoryPressureRuntimeMetrics ¶ added in v1.2.0
type MemoryRuntimeMetrics ¶ added in v1.2.0
type MemoryStore ¶
type MemoryStore struct {
// contains filtered or unexported fields
}
func NewMemoryStore ¶
func NewMemoryStore(opts ...MemoryOption) *MemoryStore
func (*MemoryStore) Ack ¶
func (s *MemoryStore) Ack(leaseID string) error
func (*MemoryStore) AckBatch ¶ added in v1.4.0
func (s *MemoryStore) AckBatch(leaseIDs []string) (LeaseBatchResult, error)
func (*MemoryStore) CancelMessages ¶
func (s *MemoryStore) CancelMessages(req MessageCancelRequest) (MessageCancelResponse, error)
func (*MemoryStore) CancelMessagesByFilter ¶
func (s *MemoryStore) CancelMessagesByFilter(req MessageManageFilterRequest) (MessageCancelResponse, error)
func (*MemoryStore) CaptureBacklogTrendSample ¶
func (s *MemoryStore) CaptureBacklogTrendSample(at time.Time) error
func (*MemoryStore) DeleteDead ¶
func (s *MemoryStore) DeleteDead(req DeadDeleteRequest) (DeadDeleteResponse, error)
func (*MemoryStore) Dequeue ¶
func (s *MemoryStore) Dequeue(req DequeueRequest) (DequeueResponse, error)
func (*MemoryStore) Enqueue ¶
func (s *MemoryStore) Enqueue(env Envelope) error
func (*MemoryStore) EnqueueBatch ¶
func (s *MemoryStore) EnqueueBatch(items []Envelope) (int, error)
EnqueueBatch atomically enqueues all items or none (all-or-nothing). Returns the number of items enqueued (0 or len(items)) and an error.
func (*MemoryStore) Extend ¶
func (s *MemoryStore) Extend(leaseID string, extendBy time.Duration) error
func (*MemoryStore) ListAttempts ¶
func (s *MemoryStore) ListAttempts(req AttemptListRequest) (AttemptListResponse, error)
func (*MemoryStore) ListBacklogTrend ¶
func (s *MemoryStore) ListBacklogTrend(req BacklogTrendListRequest) (BacklogTrendListResponse, error)
func (*MemoryStore) ListDead ¶
func (s *MemoryStore) ListDead(req DeadListRequest) (DeadListResponse, error)
func (*MemoryStore) ListMessages ¶
func (s *MemoryStore) ListMessages(req MessageListRequest) (MessageListResponse, error)
func (*MemoryStore) LookupMessages ¶
func (s *MemoryStore) LookupMessages(req MessageLookupRequest) (MessageLookupResponse, error)
func (*MemoryStore) MarkDeadBatch ¶ added in v1.4.0
func (s *MemoryStore) MarkDeadBatch(leaseIDs []string, reason string) (LeaseBatchResult, error)
func (*MemoryStore) NackBatch ¶ added in v1.4.0
func (s *MemoryStore) NackBatch(leaseIDs []string, delay time.Duration) (LeaseBatchResult, error)
func (*MemoryStore) RecordAttempt ¶
func (s *MemoryStore) RecordAttempt(attempt DeliveryAttempt) error
func (*MemoryStore) RequeueDead ¶
func (s *MemoryStore) RequeueDead(req DeadRequeueRequest) (DeadRequeueResponse, error)
func (*MemoryStore) RequeueMessages ¶
func (s *MemoryStore) RequeueMessages(req MessageRequeueRequest) (MessageRequeueResponse, error)
func (*MemoryStore) RequeueMessagesByFilter ¶
func (s *MemoryStore) RequeueMessagesByFilter(req MessageManageFilterRequest) (MessageRequeueResponse, error)
func (*MemoryStore) ResumeMessages ¶
func (s *MemoryStore) ResumeMessages(req MessageResumeRequest) (MessageResumeResponse, error)
func (*MemoryStore) ResumeMessagesByFilter ¶
func (s *MemoryStore) ResumeMessagesByFilter(req MessageManageFilterRequest) (MessageResumeResponse, error)
func (*MemoryStore) RuntimeMetrics ¶ added in v1.2.0
func (s *MemoryStore) RuntimeMetrics() StoreRuntimeMetrics
func (*MemoryStore) Stats ¶
func (s *MemoryStore) Stats() (Stats, error)
type MessageCancelRequest ¶
type MessageCancelRequest struct {
IDs []string
}
type MessageCancelResponse ¶
type MessageListRequest ¶
type MessageListResponse ¶
type MessageListResponse struct {
Items []Envelope
}
type MessageLookupItem ¶
type MessageLookupRequest ¶
type MessageLookupRequest struct {
IDs []string
}
type MessageLookupResponse ¶
type MessageLookupResponse struct {
Items []MessageLookupItem
}
type MessageRequeueRequest ¶
type MessageRequeueRequest struct {
IDs []string
}
type MessageRequeueResponse ¶
type MessageResumeRequest ¶
type MessageResumeRequest struct {
IDs []string
}
type MessageResumeResponse ¶
type PostgresOption ¶ added in v1.5.0
type PostgresOption func(*PostgresStore)
func WithPostgresDLQRetention ¶ added in v1.5.0
func WithPostgresDLQRetention(maxAge time.Duration, maxDepth int) PostgresOption
func WithPostgresDeliveredRetention ¶ added in v1.5.0
func WithPostgresDeliveredRetention(maxAge time.Duration) PostgresOption
func WithPostgresNowFunc ¶ added in v1.5.0
func WithPostgresNowFunc(now func() time.Time) PostgresOption
func WithPostgresPollInterval ¶ added in v1.5.0
func WithPostgresPollInterval(d time.Duration) PostgresOption
func WithPostgresQueueLimits ¶ added in v1.5.0
func WithPostgresQueueLimits(maxDepth int, dropPolicy string) PostgresOption
func WithPostgresRetention ¶ added in v1.5.0
func WithPostgresRetention(maxAge, pruneInterval time.Duration) PostgresOption
type PostgresStore ¶ added in v1.5.0
type PostgresStore struct {
// contains filtered or unexported fields
}
func NewPostgresStore ¶ added in v1.5.0
func NewPostgresStore(dsn string, opts ...PostgresOption) (*PostgresStore, error)
func (*PostgresStore) Ack ¶ added in v1.5.0
func (s *PostgresStore) Ack(leaseID string) error
func (*PostgresStore) AckBatch ¶ added in v1.5.0
func (s *PostgresStore) AckBatch(leaseIDs []string) (LeaseBatchResult, error)
func (*PostgresStore) CancelMessages ¶ added in v1.5.0
func (s *PostgresStore) CancelMessages(req MessageCancelRequest) (MessageCancelResponse, error)
func (*PostgresStore) CancelMessagesByFilter ¶ added in v1.5.0
func (s *PostgresStore) CancelMessagesByFilter(req MessageManageFilterRequest) (MessageCancelResponse, error)
func (*PostgresStore) CaptureBacklogTrendSample ¶ added in v1.5.1
func (s *PostgresStore) CaptureBacklogTrendSample(at time.Time) error
func (*PostgresStore) Close ¶ added in v1.5.0
func (s *PostgresStore) Close() error
func (*PostgresStore) DeleteDead ¶ added in v1.5.0
func (s *PostgresStore) DeleteDead(req DeadDeleteRequest) (DeadDeleteResponse, error)
func (*PostgresStore) Dequeue ¶ added in v1.5.0
func (s *PostgresStore) Dequeue(req DequeueRequest) (DequeueResponse, error)
func (*PostgresStore) Enqueue ¶ added in v1.5.0
func (s *PostgresStore) Enqueue(env Envelope) error
func (*PostgresStore) Extend ¶ added in v1.5.0
func (s *PostgresStore) Extend(leaseID string, extendBy time.Duration) error
func (*PostgresStore) ListAttempts ¶ added in v1.5.0
func (s *PostgresStore) ListAttempts(req AttemptListRequest) (AttemptListResponse, error)
func (*PostgresStore) ListBacklogTrend ¶ added in v1.5.1
func (s *PostgresStore) ListBacklogTrend(req BacklogTrendListRequest) (BacklogTrendListResponse, error)
func (*PostgresStore) ListDead ¶ added in v1.5.0
func (s *PostgresStore) ListDead(req DeadListRequest) (DeadListResponse, error)
func (*PostgresStore) ListMessages ¶ added in v1.5.0
func (s *PostgresStore) ListMessages(req MessageListRequest) (MessageListResponse, error)
func (*PostgresStore) LookupMessages ¶ added in v1.5.0
func (s *PostgresStore) LookupMessages(req MessageLookupRequest) (MessageLookupResponse, error)
func (*PostgresStore) MarkDead ¶ added in v1.5.0
func (s *PostgresStore) MarkDead(leaseID string, reason string) error
func (*PostgresStore) MarkDeadBatch ¶ added in v1.5.0
func (s *PostgresStore) MarkDeadBatch(leaseIDs []string, reason string) (LeaseBatchResult, error)
func (*PostgresStore) Nack ¶ added in v1.5.0
func (s *PostgresStore) Nack(leaseID string, delay time.Duration) error
func (*PostgresStore) NackBatch ¶ added in v1.5.0
func (s *PostgresStore) NackBatch(leaseIDs []string, delay time.Duration) (LeaseBatchResult, error)
func (*PostgresStore) RecordAttempt ¶ added in v1.5.0
func (s *PostgresStore) RecordAttempt(attempt DeliveryAttempt) error
func (*PostgresStore) RequeueDead ¶ added in v1.5.0
func (s *PostgresStore) RequeueDead(req DeadRequeueRequest) (DeadRequeueResponse, error)
func (*PostgresStore) RequeueMessages ¶ added in v1.5.0
func (s *PostgresStore) RequeueMessages(req MessageRequeueRequest) (MessageRequeueResponse, error)
func (*PostgresStore) RequeueMessagesByFilter ¶ added in v1.5.0
func (s *PostgresStore) RequeueMessagesByFilter(req MessageManageFilterRequest) (MessageRequeueResponse, error)
func (*PostgresStore) ResumeMessages ¶ added in v1.5.0
func (s *PostgresStore) ResumeMessages(req MessageResumeRequest) (MessageResumeResponse, error)
func (*PostgresStore) ResumeMessagesByFilter ¶ added in v1.5.0
func (s *PostgresStore) ResumeMessagesByFilter(req MessageManageFilterRequest) (MessageResumeResponse, error)
func (*PostgresStore) RuntimeMetrics ¶ added in v1.5.0
func (s *PostgresStore) RuntimeMetrics() StoreRuntimeMetrics
func (*PostgresStore) Stats ¶ added in v1.5.0
func (s *PostgresStore) Stats() (Stats, error)
type RuntimeMetricsProvider ¶ added in v1.1.0
type RuntimeMetricsProvider interface {
RuntimeMetrics() StoreRuntimeMetrics
}
type SQLiteOption ¶
type SQLiteOption func(*SQLiteStore)
func WithSQLiteCheckpointInterval ¶ added in v1.1.0
func WithSQLiteCheckpointInterval(d time.Duration) SQLiteOption
func WithSQLiteDLQRetention ¶
func WithSQLiteDLQRetention(maxAge time.Duration, maxDepth int) SQLiteOption
func WithSQLiteDeliveredRetention ¶
func WithSQLiteDeliveredRetention(maxAge time.Duration) SQLiteOption
func WithSQLiteNowFunc ¶
func WithSQLiteNowFunc(now func() time.Time) SQLiteOption
func WithSQLitePollInterval ¶
func WithSQLitePollInterval(d time.Duration) SQLiteOption
func WithSQLiteQueueLimits ¶
func WithSQLiteQueueLimits(maxDepth int, dropPolicy string) SQLiteOption
func WithSQLiteRetention ¶
func WithSQLiteRetention(maxAge, pruneInterval time.Duration) SQLiteOption
type SQLiteRuntimeMetrics ¶ added in v1.1.0
type SQLiteRuntimeMetrics struct {
WriteDurationSeconds HistogramSnapshot
DequeueDurationSeconds HistogramSnapshot
CheckpointDurationSeconds HistogramSnapshot
BusyTotal int64
RetryTotal int64
TxCommitTotal int64
TxRollbackTotal int64
CheckpointTotal int64
CheckpointErrorTotal int64
}
type SQLiteStore ¶
type SQLiteStore struct {
// contains filtered or unexported fields
}
func NewSQLiteStore ¶
func NewSQLiteStore(dbPath string, opts ...SQLiteOption) (*SQLiteStore, error)
func (*SQLiteStore) Ack ¶
func (s *SQLiteStore) Ack(leaseID string) error
func (*SQLiteStore) AckBatch ¶ added in v1.4.0
func (s *SQLiteStore) AckBatch(leaseIDs []string) (LeaseBatchResult, error)
func (*SQLiteStore) CancelMessages ¶
func (s *SQLiteStore) CancelMessages(req MessageCancelRequest) (MessageCancelResponse, error)
func (*SQLiteStore) CancelMessagesByFilter ¶
func (s *SQLiteStore) CancelMessagesByFilter(req MessageManageFilterRequest) (MessageCancelResponse, error)
func (*SQLiteStore) CaptureBacklogTrendSample ¶
func (s *SQLiteStore) CaptureBacklogTrendSample(at time.Time) error
func (*SQLiteStore) Close ¶
func (s *SQLiteStore) Close() error
func (*SQLiteStore) DeleteDead ¶
func (s *SQLiteStore) DeleteDead(req DeadDeleteRequest) (DeadDeleteResponse, error)
func (*SQLiteStore) Dequeue ¶
func (s *SQLiteStore) Dequeue(req DequeueRequest) (DequeueResponse, error)
func (*SQLiteStore) Enqueue ¶
func (s *SQLiteStore) Enqueue(env Envelope) error
func (*SQLiteStore) EnqueueBatch ¶
func (s *SQLiteStore) EnqueueBatch(items []Envelope) (int, error)
EnqueueBatch atomically enqueues all items or none (all-or-nothing) in a single transaction. Returns the number of items enqueued and an error.
func (*SQLiteStore) Extend ¶
func (s *SQLiteStore) Extend(leaseID string, extendBy time.Duration) error
func (*SQLiteStore) ListAttempts ¶
func (s *SQLiteStore) ListAttempts(req AttemptListRequest) (AttemptListResponse, error)
func (*SQLiteStore) ListBacklogTrend ¶
func (s *SQLiteStore) ListBacklogTrend(req BacklogTrendListRequest) (BacklogTrendListResponse, error)
func (*SQLiteStore) ListDead ¶
func (s *SQLiteStore) ListDead(req DeadListRequest) (DeadListResponse, error)
func (*SQLiteStore) ListMessages ¶
func (s *SQLiteStore) ListMessages(req MessageListRequest) (MessageListResponse, error)
func (*SQLiteStore) LookupMessages ¶
func (s *SQLiteStore) LookupMessages(req MessageLookupRequest) (MessageLookupResponse, error)
func (*SQLiteStore) MarkDeadBatch ¶ added in v1.4.0
func (s *SQLiteStore) MarkDeadBatch(leaseIDs []string, reason string) (LeaseBatchResult, error)
func (*SQLiteStore) NackBatch ¶ added in v1.4.0
func (s *SQLiteStore) NackBatch(leaseIDs []string, delay time.Duration) (LeaseBatchResult, error)
func (*SQLiteStore) RecordAttempt ¶
func (s *SQLiteStore) RecordAttempt(attempt DeliveryAttempt) error
func (*SQLiteStore) RequeueDead ¶
func (s *SQLiteStore) RequeueDead(req DeadRequeueRequest) (DeadRequeueResponse, error)
func (*SQLiteStore) RequeueMessages ¶
func (s *SQLiteStore) RequeueMessages(req MessageRequeueRequest) (MessageRequeueResponse, error)
func (*SQLiteStore) RequeueMessagesByFilter ¶
func (s *SQLiteStore) RequeueMessagesByFilter(req MessageManageFilterRequest) (MessageRequeueResponse, error)
func (*SQLiteStore) ResumeMessages ¶
func (s *SQLiteStore) ResumeMessages(req MessageResumeRequest) (MessageResumeResponse, error)
func (*SQLiteStore) ResumeMessagesByFilter ¶
func (s *SQLiteStore) ResumeMessagesByFilter(req MessageManageFilterRequest) (MessageResumeResponse, error)
func (*SQLiteStore) RuntimeMetrics ¶ added in v1.1.0
func (s *SQLiteStore) RuntimeMetrics() StoreRuntimeMetrics
func (*SQLiteStore) Stats ¶
func (s *SQLiteStore) Stats() (Stats, error)
type Store ¶
type Store interface {
Enqueue(env Envelope) error
Dequeue(req DequeueRequest) (DequeueResponse, error)
Ack(leaseID string) error
Nack(leaseID string, delay time.Duration) error
Extend(leaseID string, extendBy time.Duration) error
MarkDead(leaseID string, reason string) error
ListDead(req DeadListRequest) (DeadListResponse, error)
RequeueDead(req DeadRequeueRequest) (DeadRequeueResponse, error)
DeleteDead(req DeadDeleteRequest) (DeadDeleteResponse, error)
ListMessages(req MessageListRequest) (MessageListResponse, error)
LookupMessages(req MessageLookupRequest) (MessageLookupResponse, error)
CancelMessages(req MessageCancelRequest) (MessageCancelResponse, error)
RequeueMessages(req MessageRequeueRequest) (MessageRequeueResponse, error)
ResumeMessages(req MessageResumeRequest) (MessageResumeResponse, error)
CancelMessagesByFilter(req MessageManageFilterRequest) (MessageCancelResponse, error)
RequeueMessagesByFilter(req MessageManageFilterRequest) (MessageRequeueResponse, error)
ResumeMessagesByFilter(req MessageManageFilterRequest) (MessageResumeResponse, error)
Stats() (Stats, error)
RecordAttempt(attempt DeliveryAttempt) error
ListAttempts(req AttemptListRequest) (AttemptListResponse, error)
}
type StoreCommonRuntimeMetrics ¶ added in v1.5.0
type StoreCommonRuntimeMetrics struct {
OperationDurationSeconds []StoreOperationDurationRuntimeMetric
OperationTotal []StoreOperationCounterRuntimeMetric
ErrorsTotal []StoreOperationErrorRuntimeMetric
}
type StoreOperationCounterRuntimeMetric ¶ added in v1.5.0
type StoreOperationDurationRuntimeMetric ¶ added in v1.5.0
type StoreOperationDurationRuntimeMetric struct {
Operation string
DurationSeconds HistogramSnapshot
}
type StoreOperationErrorRuntimeMetric ¶ added in v1.5.0
type StoreRuntimeMetrics ¶ added in v1.1.0
type StoreRuntimeMetrics struct {
Backend string
Common StoreCommonRuntimeMetrics
SQLite *SQLiteRuntimeMetrics
Memory *MemoryRuntimeMetrics
}