schema

package
v1.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2026 License: Apache-2.0 Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const (
	DefaultSchema                = "pgqueue"
	DefaultNotifyChannel         = "queue_notify"
	QueueListLimit               = 100
	TickerListLimit              = 100
	DefaultPartitionSize         = 100_000 // tasks per partition
	DefaultPartitionThreshold    = 0.5     // create partition(s) when sequence reaches this fraction of the highest partition end
	DefaultPartitionAhead        = 1       // number of partition(s) to create when threshold is reached
	DefaultMaintenanceTickerName = "$maintenance$"
	DefaultCleanupTickerName     = "$cleanup$"
	DefaultTickerPeriod          = 5 * time.Second  // how often to look for matured tickers
	DefaultQueuePeriod           = 10 * time.Second // how often to poll queues for retries and missed notifications
	DefaultCleanupPeriod         = 15 * time.Minute // how often to delete expired tasks
	DefaultMaintenancePeriod     = 10 * time.Minute // create and drop partitions
)

Variables

View Source
var Objects string
View Source
var Queries string

Functions

This section is empty.

Types

type Partition

type Partition struct {
	PartitionMeta
	Count uint64 `json:"count" db:"count"`
}

func (Partition) Insert

func (p Partition) Insert(bind *pg.Bind) (string, error)

func (*Partition) Scan

func (p *Partition) Scan(row pg.Row) error

func (Partition) String

func (p Partition) String() string

func (Partition) Update

func (p Partition) Update(bind *pg.Bind) error

type PartitionList

type PartitionList struct {
	Body []Partition `json:"body,omitempty"`
}

func (*PartitionList) Scan

func (l *PartitionList) Scan(row pg.Row) error

func (PartitionList) String

func (p PartitionList) String() string

type PartitionListRequest

type PartitionListRequest struct{}

func (PartitionListRequest) Select

func (p PartitionListRequest) Select(bind *pg.Bind, op pg.Op) (string, error)

func (PartitionListRequest) String

func (p PartitionListRequest) String() string

type PartitionMeta

type PartitionMeta struct {
	Partition string `json:"partition" db:"partition"`
	Start     uint64 `json:"start" db:"start"`
	End       uint64 `json:"end" db:"end"`
}

func (PartitionMeta) Insert

func (p PartitionMeta) Insert(bind *pg.Bind) (string, error)

func (PartitionMeta) String

func (p PartitionMeta) String() string

func (PartitionMeta) Update

func (p PartitionMeta) Update(bind *pg.Bind) error

type PartitionName

type PartitionName string

func (PartitionName) Select

func (p PartitionName) Select(bind *pg.Bind, op pg.Op) (string, error)

func (PartitionName) String

func (p PartitionName) String() string

type PartitionSeq

type PartitionSeq uint64

func (*PartitionSeq) Scan

func (p *PartitionSeq) Scan(row pg.Row) error

func (PartitionSeq) String

func (p PartitionSeq) String() string

type PartitionSeqRequest

type PartitionSeqRequest struct{}

func (PartitionSeqRequest) Select

func (p PartitionSeqRequest) Select(bind *pg.Bind, op pg.Op) (string, error)

func (PartitionSeqRequest) String

func (p PartitionSeqRequest) String() string

type Queue

type Queue struct {
	Queue string `json:"queue,omitempty" arg:"" help:"Queue name"`
	QueueMeta
}

func (Queue) Insert

func (q Queue) Insert(bind *pg.Bind) (string, error)

func (*Queue) Scan

func (q *Queue) Scan(row pg.Row) error

func (Queue) String

func (q Queue) String() string

func (Queue) Update

func (q Queue) Update(bind *pg.Bind) error

type QueueCleanRequest

type QueueCleanRequest struct {
	Queue string `json:"queue,omitempty" arg:"" help:"Queue name"`
}

func (QueueCleanRequest) Select

func (q QueueCleanRequest) Select(bind *pg.Bind, op pg.Op) (string, error)

func (QueueCleanRequest) String

func (q QueueCleanRequest) String() string

type QueueCleanResponse

type QueueCleanResponse struct {
	QueueCleanRequest
	Body []Task `json:"body,omitempty"`
}

func (*QueueCleanResponse) Scan

func (l *QueueCleanResponse) Scan(row pg.Row) error

func (QueueCleanResponse) String

func (q QueueCleanResponse) String() string

type QueueList

type QueueList struct {
	QueueListRequest
	Count uint64  `json:"count"`
	Body  []Queue `json:"body,omitempty"`
}

func (*QueueList) Scan

func (l *QueueList) Scan(row pg.Row) error

func (*QueueList) ScanCount

func (l *QueueList) ScanCount(row pg.Row) error

func (QueueList) String

func (q QueueList) String() string

type QueueListRequest

type QueueListRequest struct {
	pg.OffsetLimit
}

func (QueueListRequest) Select

func (l QueueListRequest) Select(bind *pg.Bind, op pg.Op) (string, error)

func (QueueListRequest) String

func (q QueueListRequest) String() string

type QueueMeta

type QueueMeta struct {
	TTL         *time.Duration `json:"ttl,omitempty" help:"Time-to-live for queue messages"`
	Retries     *uint64        `json:"retries" help:"Number of retries before failing"`
	RetryDelay  *time.Duration `json:"retry_delay" help:"Backoff delay"`
	Concurrency *uint64        `json:"concurrency" help:"Number of concurrent workers"`
}

func (QueueMeta) String

func (q QueueMeta) String() string

func (QueueMeta) Update

func (q QueueMeta) Update(bind *pg.Bind) error

type QueueName

type QueueName string

func (QueueName) Select

func (q QueueName) Select(bind *pg.Bind, op pg.Op) (string, error)

type QueueStatus

type QueueStatus struct {
	Queue  string `json:"queue"`
	Status string `json:"status"`
	Count  uint64 `json:"count"`
}

func (*QueueStatus) Scan

func (s *QueueStatus) Scan(row pg.Row) error

func (QueueStatus) String

func (q QueueStatus) String() string

type QueueStatusRequest

type QueueStatusRequest struct{}

func (QueueStatusRequest) Select

func (l QueueStatusRequest) Select(bind *pg.Bind, op pg.Op) (string, error)

type QueueStatusResponse

type QueueStatusResponse struct {
	QueueStatusRequest
	Body []QueueStatus `json:"body,omitempty"`
}

func (*QueueStatusResponse) Scan

func (l *QueueStatusResponse) Scan(row pg.Row) error

func (QueueStatusResponse) String

func (q QueueStatusResponse) String() string

type Task

type Task struct {
	Id     uint64          `json:"id,omitempty"`
	Queue  string          `json:"queue,omitempty"`
	Worker *string         `json:"worker,omitempty"`
	Result json.RawMessage `json:"result,omitempty"`
	TaskMeta
	CreatedAt  *time.Time `json:"created_at,omitempty"`
	StartedAt  *time.Time `json:"started_at,omitempty"`
	FinishedAt *time.Time `json:"finished_at,omitempty"`
	DiesAt     *time.Time `json:"dies_at,omitempty"`
	Retries    *uint64    `json:"retries,omitempty"`
}

func (*Task) Scan

func (t *Task) Scan(row pg.Row) error

func (Task) String

func (t Task) String() string

type TaskFunc

type TaskFunc func(context.Context, json.RawMessage) (any, error)

type TaskId

type TaskId uint64

func (*TaskId) Scan

func (t *TaskId) Scan(row pg.Row) error

func (TaskId) Select

func (t TaskId) Select(bind *pg.Bind, op pg.Op) (string, error)

type TaskList

type TaskList struct {
	TaskListRequest
	Count uint64           `json:"count"`
	Body  []TaskWithStatus `json:"body,omitempty"`
}

func (*TaskList) Scan

func (l *TaskList) Scan(row pg.Row) error

func (*TaskList) ScanCount

func (l *TaskList) ScanCount(row pg.Row) error

func (TaskList) String

func (t TaskList) String() string

type TaskListRequest

type TaskListRequest struct {
	pg.OffsetLimit
	Status string `json:"status,omitempty"`
}

func (TaskListRequest) Select

func (l TaskListRequest) Select(bind *pg.Bind, op pg.Op) (string, error)

type TaskMeta

type TaskMeta struct {
	Payload   json.RawMessage `json:"payload,omitempty"`
	DelayedAt *time.Time      `json:"delayed_at,omitempty"`
}

func (TaskMeta) Insert

func (t TaskMeta) Insert(bind *pg.Bind) (string, error)

func (TaskMeta) String

func (t TaskMeta) String() string

func (TaskMeta) Update

func (t TaskMeta) Update(bind *pg.Bind) error

type TaskRelease

type TaskRelease struct {
	Id     uint64          `json:"id,omitempty"`
	Fail   bool            `json:"fail,omitempty"`
	Result json.RawMessage `json:"result,omitempty"`
}

func (TaskRelease) Select

func (t TaskRelease) Select(bind *pg.Bind, op pg.Op) (string, error)

type TaskRetain

type TaskRetain struct {
	Queues []string `json:"queues,omitempty"`
	Worker string   `json:"worker,omitempty"`
}

func (TaskRetain) Select

func (t TaskRetain) Select(bind *pg.Bind, op pg.Op) (string, error)

type TaskWithStatus

type TaskWithStatus struct {
	Task
	Status string `json:"status,omitempty"`
}

func (*TaskWithStatus) Scan

func (t *TaskWithStatus) Scan(row pg.Row) error

func (TaskWithStatus) String

func (t TaskWithStatus) String() string

type Ticker

type Ticker struct {
	Ticker string `json:"ticker" arg:"" help:"Ticker name"`
	TickerMeta
	LastAt *time.Time `json:"last_at,omitempty"`
}

func (*Ticker) Scan

func (t *Ticker) Scan(row pg.Row) error

func (Ticker) String

func (t Ticker) String() string

type TickerList

type TickerList struct {
	TickerListRequest
	Count uint64   `json:"count"`
	Body  []Ticker `json:"body,omitempty"`
}

func (*TickerList) Scan

func (l *TickerList) Scan(row pg.Row) error

func (*TickerList) ScanCount

func (l *TickerList) ScanCount(row pg.Row) error

func (TickerList) String

func (t TickerList) String() string

type TickerListRequest

type TickerListRequest struct {
	pg.OffsetLimit
}

func (TickerListRequest) Select

func (t TickerListRequest) Select(bind *pg.Bind, op pg.Op) (string, error)

func (TickerListRequest) String

func (t TickerListRequest) String() string

type TickerMeta

type TickerMeta struct {
	Payload  json.RawMessage `json:"payload,omitempty"`
	Interval *time.Duration  `json:"interval,omitempty" help:"Interval (default 1 minute)"`
}

func (TickerMeta) Insert

func (t TickerMeta) Insert(bind *pg.Bind) (string, error)

func (TickerMeta) String

func (t TickerMeta) String() string

func (TickerMeta) Update

func (t TickerMeta) Update(bind *pg.Bind) error

type TickerName

type TickerName string

func (TickerName) Select

func (t TickerName) Select(bind *pg.Bind, op pg.Op) (string, error)

func (TickerName) Validate

func (t TickerName) Validate() (string, error)

type TickerNext

type TickerNext struct{}

func (TickerNext) Select

func (t TickerNext) Select(bind *pg.Bind, op pg.Op) (string, error)

func (TickerNext) String

func (t TickerNext) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL