stdriverfx

package
v0.0.195 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 25, 2025 License: MIT Imports: 23 Imported by: 0

Documentation

Overview

Package stdriverfx provides cross-functional logic and types for job working.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Log

func Log(ctx context.Context) *zap.Logger

func NewUIServer added in v0.0.128

func NewUIServer(par struct {
	fx.In
	Config
	RiverConfig river.Config
	RUI         *pgxpool.Pool `name:"rui"` // dedicated pool so we can use a different schema (search_path)
	Logs        *zap.Logger
},
) (*riverui.Server, error)

NewUIServer inits a River UI server.

func Provide

func Provide(cbf ...ClientBuilderFunc) fx.Option

Provide the components.

func ProvideEnqueuer

func ProvideEnqueuer[T JobArgs](opts river.InsertOpts) fx.Option

ProvideEnqueuer can be called by the work packages to provide an enqueuer for its args easily. When inserting it will always use the provided insert Opts.

func WithWorker

func WithWorker[T JobArgs]() fx.Option

WithWorker can be provided as an fx option in a worker package to add it to the river worker set.

Types

type Client

type Client interface {
	InsertTx(
		ctx context.Context, tx pgx.Tx, args river.JobArgs, opts *river.InsertOpts) (*rivertype.JobInsertResult, error)
	JobList(ctx context.Context, params *river.JobListParams) (*river.JobListResult, error)
	JobListTx(ctx context.Context, tx pgx.Tx, params *river.JobListParams) (*river.JobListResult, error)
	Start(ctx context.Context) error
	Stop(ctx context.Context) error
	StopAndCancel(ctx context.Context) error
}

Client describes the parts of the River interface we use. It is implemented by both the Pro as the community edition.

func NewRegularClient

func NewRegularClient(rw *pgxpool.Pool, cfg river.Config) (Client, error)

NewRegularClient inits a regular River client by implenting ClientBuilderFunc.

type ClientBuilderFunc

type ClientBuilderFunc func(rw *pgxpool.Pool, cfg river.Config) (Client, error)

ClientBuilderFunc implements the initialization of a river client. Can be a regular one or a pro client.

type Config

type Config struct {
	// Wait some time for jobs to finish, worker will not accept new jobs in this timeframe.
	SoftStopTimeout time.Duration `env:"SOFT_STOP_TIMEOUT" envDefault:"5s"`
	// If the soft stop fails, we cancel all remaining jobs and then wait for this timeout to let them clean up.
	HardStopTimeout time.Duration `env:"HARD_STOP_TIMEOUT" envDefault:"5s"`
}

Config configures the components.

type Enqueuer

type Enqueuer[T JobArgs] interface {
	Enqueue(ctx context.Context, tx pgx.Tx, args T) error
}

Enqueuer describes a type that can enqueue work for the workers given certain arguments.

type JobArgs

type JobArgs interface {
	river.JobArgs
	json.Marshaler
	json.Unmarshaler
	proto.Message
}

JobArgs declare the shape of job arguments for our purpose. We require the arguments to always be a protobuf message. And that the protobuf message implement json.Marshaler and json.Unmarshaler so it can be encoded in the database using json.Marashal.

type JobOutput

type JobOutput interface {
	json.Marshaler
	json.Unmarshaler
	proto.Message
}

JobOutput describes the shape of Job outputs.

type PeriodicWorker added in v0.0.129

type PeriodicWorker interface {
	PeriodicJobs() []*river.PeriodicJob
}

PeriodicWorker interface can be implemented by Workers to make them configure periodic jobs onto the configuration.

type TransactWorker

type TransactWorker[A JobArgs, O JobOutput] struct {
	// contains filtered or unexported fields
}

TransactWorker can be embedded into river workers to make the work be run in a database transaction.

func NewTransactWorker

func NewTransactWorker[A JobArgs, O JobOutput](
	txr *stdtx.Transactor[pgx.Tx],
	val protovalidate.Validator,
	hdlr func(_ context.Context, _ pgx.Tx, job *river.Job[A]) (O, error),
) *TransactWorker[A, O]

func (TransactWorker[A, O]) Work

func (w TransactWorker[A, O]) Work(ctx context.Context, job *river.Job[A]) error

Work implements the river.Worker interface.

type Workers

type Workers struct {
	// contains filtered or unexported fields
}

Workers represents the collection of workers.

func New

func New(par struct {
	fx.In
	fx.Lifecycle
	Config
	Logs   *zap.Logger
	Client Client
},
) (res *Workers, err error)

New inits the main workers component. It also takes the enqueuers so they can be embedded for easier access from all other parts of the codebase.

func (Workers) GetJobByKinds

func (w Workers) GetJobByKinds(
	ctx context.Context, tx pgx.Tx, kind string, moreKinds ...string,
) (*river.JobListResult, error)

GetJobByKinds returns all jobs by the kind string(s).

func (Workers) Ping

func (w Workers) Ping(ctx context.Context) error

Ping provides a check to see if the worker can access the required tables.

Directories

Path Synopsis
internal
testsnapshot
Package testsnapshot provides migration based on a snapshot.
Package testsnapshot provides migration based on a snapshot.
workheartbeat
Package workheartbeat provides working logic for a heartbeat job.
Package workheartbeat provides working logic for a heartbeat job.
workheartbeat/v1
Package workheartbeatv1 provides proto-generated code.
Package workheartbeatv1 provides proto-generated code.
Package stdrivertest provides test utilities for our River abstraction.
Package stdrivertest provides test utilities for our River abstraction.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL