queue

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 25, 2024 License: MIT Imports: 10 Imported by: 0

Documentation

Index

Constants

View Source
const DefaultCreatedTsField = "created_ts"
View Source
const DefaultErrorField = "error"
View Source
const DefaultIdField = "id"
View Source
const DefaultIsProcessedField = "is_processed"
View Source
const DefaultIsSuccessField = "is_success"
View Source
const DefaultPayloadField = "payload"
View Source
const DefaultProcessedTsField = "processed_ts"
View Source
const DefaultQueueField = "queue"
View Source
const DefaultTableName = "barn_message"

Variables

This section is empty.

Functions

This section is empty.

Types

type Message

type Message struct {
	Id          int
	Queue       *string
	CreatedTs   time.Time
	Payload     string
	IsProcessed bool
	ProcessedTs *time.Time
	IsSuccess   *bool
	Error       *string
}

func (Message) LogValue

func (e Message) LogValue() slog.Value

type MessageHandler

type MessageHandler func(tx *sql.Tx, message *Message) error

type MessageQueryConfig

type MessageQueryConfig struct {
	TableName        string
	IdField          string
	QueueField       string
	CreatedTsField   string
	PayloadField     string
	IsProcessedField string
	ProcessedTsField string
	IsSuccessField   string
	ErrorField       string
}

type MessageRepository

type MessageRepository interface {
	FindNext(tx *sql.Tx) (*Message, error)
	Create(tx *sql.Tx, m *Message) error
	Save(tx *sql.Tx, m *Message) error
	DeleteProcessed(tx *sql.Tx, t time.Time) (int, error)
}

func NewDefaultPostgresMessageRepository

func NewDefaultPostgresMessageRepository() MessageRepository

func NewPostgresMessageRepository

func NewPostgresMessageRepository(conig *MessageQueryConfig) MessageRepository

type PostgresMessageRepository

type PostgresMessageRepository struct {
	Config *MessageQueryConfig
}

func (*PostgresMessageRepository) Create

func (r *PostgresMessageRepository) Create(tx *sql.Tx, m *Message) error

func (*PostgresMessageRepository) CreateTable

func (r *PostgresMessageRepository) CreateTable(tx *sql.Tx) error

func (*PostgresMessageRepository) DeleteAll

func (r *PostgresMessageRepository) DeleteAll(tx *sql.Tx) error

func (*PostgresMessageRepository) DeleteProcessed

func (r *PostgresMessageRepository) DeleteProcessed(tx *sql.Tx, t time.Time) (int, error)

func (*PostgresMessageRepository) FindNext

func (r *PostgresMessageRepository) FindNext(tx *sql.Tx) (*Message, error)

func (*PostgresMessageRepository) Save

func (r *PostgresMessageRepository) Save(tx *sql.Tx, m *Message) error

type Worker

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(db *sql.DB, config *WorkerConfig) *Worker

func (*Worker) Start

func (s *Worker) Start()

func (*Worker) StartContext

func (s *Worker) StartContext(ctx context.Context)

func (*Worker) Stop

func (s *Worker) Stop()

type WorkerConfig

type WorkerConfig struct {
	Log        *slog.Logger
	Repository MessageRepository
	Cron       string
	Handler    MessageHandler
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL