queue

package
v0.2.6 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 27, 2024 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultCreatedAtField = "created_at"
View Source
const DefaultErrorField = "error"
View Source
const DefaultIdField = "id"
View Source
const DefaultIsProcessedField = "is_processed"
View Source
const DefaultIsSuccessField = "is_success_flg"
View Source
const DefaultNameField = "name"
View Source
const DefaultPayloadField = "payload"
View Source
const DefaultProcessedAtField = "processed_at"
View Source
const DefaultTableName = "barn_queue"

Variables

This section is empty.

Functions

This section is empty.

Types

type Message

type Message struct {
	Id          int
	CreatedAt   time.Time
	Name        string
	Payload     string
	IsProcessed bool
	ProcessedAt *time.Time
	IsSuccess   *bool
	Error       *string
}

func (Message) LogValue

func (e Message) LogValue() slog.Value

type MessageHandler

type MessageHandler func(tx *sql.Tx, message *Message) error

type PostgresQueueRepository added in v0.2.6

type PostgresQueueRepository struct {
	// contains filtered or unexported fields
}

func (*PostgresQueueRepository) Create added in v0.2.6

func (r *PostgresQueueRepository) Create(tx *sql.Tx, m *Message) error

func (*PostgresQueueRepository) CreateTable added in v0.2.6

func (r *PostgresQueueRepository) CreateTable(tx *sql.Tx) error

func (*PostgresQueueRepository) DeleteAll added in v0.2.6

func (r *PostgresQueueRepository) DeleteAll(tx *sql.Tx) error

func (*PostgresQueueRepository) DeleteOld added in v0.2.6

func (r *PostgresQueueRepository) DeleteOld(tx *sql.Tx, moment time.Time) (int, error)

func (*PostgresQueueRepository) FindNext added in v0.2.6

func (r *PostgresQueueRepository) FindNext(tx *sql.Tx) (*Message, error)

func (*PostgresQueueRepository) Save added in v0.2.6

func (r *PostgresQueueRepository) Save(tx *sql.Tx, m *Message) error

type QueueQueryConfig added in v0.2.6

type QueueQueryConfig struct {
	TableName        string
	IdField          string
	CreatedAtField   string
	NameField        string
	PayloadField     string
	IsProcessedField string
	ProcessedAtField string
	IsSuccessField   string
	ErrorField       string
}

type QueueRepository added in v0.2.6

type QueueRepository interface {
	FindNext(tx *sql.Tx) (*Message, error)
	Create(tx *sql.Tx, task *Message) error
	Save(tx *sql.Tx, task *Message) error
	DeleteOld(tx *sql.Tx, t time.Time) (int, error)
}

func NewPostgresQueueRepository added in v0.2.6

func NewPostgresQueueRepository(config ...QueueQueryConfig) QueueRepository

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(db *sql.DB, config *WorkerConfig) *Worker

func (*Worker) Start

func (s *Worker) Start()

func (*Worker) StartContext

func (s *Worker) StartContext(ctx context.Context)

func (*Worker) Stop

func (s *Worker) Stop()

type WorkerConfig

type WorkerConfig struct {
	Log        *slog.Logger
	Repository QueueRepository
	Cron       string
	Handler    MessageHandler
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL