outbox

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 14, 2025 License: Apache-2.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrEmptyTicker        = errors.New("empty ticker")
	ErrEmptyDataKeeper    = errors.New("empty data keeper")
	ErrEmptyHandlers      = errors.New("empty handlers")
	ErrDoJobPeriod        = errors.New("do job period is empty")
	ErrUpdateLockedPeriod = errors.New("update locked period is empty")
	ErrRemoveOldPeriod    = errors.New("remove old period is empty")
	ErrSetFailedPeriod    = errors.New("set failed period is empty")
	ErrEmptyIsAvailable   = errors.New("is available is empty")
	ErrEmptyOutbox        = errors.New("empty outbox")
)

Functions

func AddTypedHandler

func AddTypedHandler[T any](
	o *Outbox,
	source models.DataSource,
	h TypedHandler[T],
) error

func TypedSaveData

func TypedSaveData[T any](
	ctx context.Context,
	o *Outbox,
	source models.DataSource,
	data T,
) error

Types

type DataKeeper

type DataKeeper interface {
	Init(ctx context.Context) error
	SaveData(ctx context.Context, data *models.Data) error
	GetData(ctx context.Context) ([]*models.Data, error)
	UpdateFailedData(ctx context.Context, codes []string) error
	UpdateProcessedData(ctx context.Context, codes []string) error
	UpdateLockedData(ctx context.Context) error
	RemoveOldData(ctx context.Context) error
	SetFailedData(ctx context.Context) error
}

type Handler

type Handler func(ctx context.Context, data []*models.Data) ([]string, error)

type IsAvailable

type IsAvailable func() bool

type Outbox

type Outbox struct {
	// contains filtered or unexported fields
}

func NewDefaultOutbox

func NewDefaultOutbox(
	logger *zap.Logger,
	db connectors.DBConnector[*sqlx.DB, *sqlx.Tx],
	opts ...OutboxOption,
) (*Outbox, error)

func NewOutbox

func NewOutbox(logger *zap.Logger, opts ...OutboxOption) (*Outbox, error)

func (*Outbox) AddDataKeeper

func (o *Outbox) AddDataKeeper(dk DataKeeper)

func (*Outbox) AddDoJobPeriod

func (o *Outbox) AddDoJobPeriod(p string)

func (*Outbox) AddHandler

func (o *Outbox) AddHandler(source models.DataSource, handler Handler)

func (*Outbox) AddHandlers

func (o *Outbox) AddHandlers(handlers map[models.DataSource]Handler)

func (*Outbox) AddIsAvailable

func (o *Outbox) AddIsAvailable(a IsAvailable)

func (*Outbox) AddRemoveOldPeriod

func (o *Outbox) AddRemoveOldPeriod(p string)

func (*Outbox) AddSetFailedPeriod

func (o *Outbox) AddSetFailedPeriod(p string)

func (*Outbox) AddTicker

func (o *Outbox) AddTicker(t Ticker)

func (*Outbox) AddUpdateLockedPeriod

func (o *Outbox) AddUpdateLockedPeriod(p string)

func (*Outbox) SaveData

func (o *Outbox) SaveData(ctx context.Context, source models.DataSource, data []byte) error

func (*Outbox) Start

func (o *Outbox) Start(ctx context.Context) error

func (*Outbox) Stop

func (o *Outbox) Stop(ctx context.Context) error

type OutboxOption

type OutboxOption func(o *Outbox)

func WithAvailableOutboxOpt

func WithAvailableOutboxOpt(a IsAvailable) OutboxOption

func WithDataKeeperOutboxOpt

func WithDataKeeperOutboxOpt(dk DataKeeper) OutboxOption

func WithDoJobPeriodOutboxOpt

func WithDoJobPeriodOutboxOpt(p string) OutboxOption

func WithHandlersOutboxOpt

func WithHandlersOutboxOpt(handlers map[models.DataSource]Handler) OutboxOption

func WithRemoveOldPeriodOutboxOpt

func WithRemoveOldPeriodOutboxOpt(p string) OutboxOption

func WithSetFailedPeriodOutboxOpt

func WithSetFailedPeriodOutboxOpt(p string) OutboxOption

func WithTickerOutboxOpt

func WithTickerOutboxOpt(t Ticker) OutboxOption

func WithUpdateLockedPeriodOutboxOpt

func WithUpdateLockedPeriodOutboxOpt(p string) OutboxOption

type Ticker

type Ticker interface {
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	AddJob(time string, job models.Job) error
}

type TypedHandler

type TypedHandler[T any] func(ctx context.Context, data []T) ([]string, error)

Directories

Path Synopsis
datakeeper

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL