worker

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 26, 2026 License: MIT Imports: 9 Imported by: 0

Documentation

Overview

Package worker is forge's background-job runtime. It composes with app.RunWorker (sibling to app.Run with HTTP disabled) and provides three primitives:

  • Cron: interval-based recurring jobs
  • Queue[T]: typed pub/sub topics with pluggable backends
  • Outbox: drain the kit's outbox table (planned follow-up)

Wave 3 ships Cron + Queue against an in-memory backend. The Postgres LISTEN/NOTIFY and NATS JetStream backends slot behind the same QueueBackend interface in a follow-up.

// services/billing-worker/main.go
app.RunWorker(
    app.WithName("billing"),
    worker.FxModule(),
    worker.Cron(2*time.Hour, "daily-rollup", dailyRollup),
    worker.Queue[CreateInvoiceCmd]("invoices.create", createInvoice),
    internal.FxModule(),
)

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func FxModule

func FxModule(opts ...Option) fx.Option

FxModule wires the worker runtime into the fx graph. Cron jobs start on app.Start; Queue subscriptions start their loops and drop on app.Stop.

The module provides:

  • worker.QueueBackend (the configured backend, default InMemoryQueue)
  • worker.Publisher (typed-message-aware producer handle)

Types

type InMemoryQueue

type InMemoryQueue struct {
	// contains filtered or unexported fields
}

InMemoryQueue is a process-local QueueBackend: useful for tests, single-instance services, and the worker package's own examples. Delivery is best-effort: if no subscriber is registered when a Publish happens, the message is dropped. Buffer is unbounded — fine for tests, never use in production.

func NewInMemoryQueue

func NewInMemoryQueue() *InMemoryQueue

NewInMemoryQueue returns a fresh process-local queue.

func (*InMemoryQueue) Close

func (q *InMemoryQueue) Close()

Close stops every active Subscribe loop.

func (*InMemoryQueue) Publish

func (q *InMemoryQueue) Publish(ctx context.Context, topic string, payload []byte) error

func (*InMemoryQueue) Subscribe

func (q *InMemoryQueue) Subscribe(ctx context.Context, topic string, handler func(context.Context, []byte) error) error

type Option

type Option func(*config)

Option configures the worker FxModule.

func Cron

func Cron(interval time.Duration, name string, fn func(context.Context) error) Option

Cron registers an interval-based job. Wraps kit/cron.

worker.Cron(2*time.Hour, "daily-rollup", dailyRollup)

func Queue

func Queue[T any](topic string, handler func(context.Context, T) error) Option

Queue subscribes handler to topic. Messages on the wire are JSON; the handler receives them decoded as T. Marshal/unmarshal errors are logged but don't crash the consumer.

type CreateInvoiceCmd struct { CustomerID string `json:"customer_id"` }
worker.Queue[CreateInvoiceCmd]("invoices.create", createInvoice)

func WithBackend

func WithBackend(b QueueBackend) Option

WithBackend selects the queue backend. Defaults to NewInMemoryQueue().

type Publisher

type Publisher interface {
	Publish(ctx context.Context, topic string, msg any) error
}

Publisher is the producer-side handle services inject into usecases that emit work. It's the same shape as QueueBackend.Publish but scoped to the publish capability so usecases don't see the subscribe surface.

func NewPublisher

func NewPublisher(b QueueBackend) Publisher

NewPublisher adapts a QueueBackend (typed-message-aware via JSON codec) for usecase injection.

type QueueBackend

type QueueBackend interface {
	// Subscribe registers handler for topic. Implementations call
	// handler concurrently per message; handler errors should trigger
	// backend retry / DLQ per its own policy. Subscribe MUST return
	// when ctx is cancelled.
	Subscribe(ctx context.Context, topic string, handler func(context.Context, []byte) error) error

	// Publish ships payload to topic. Synchronous from the caller's
	// perspective — once Publish returns nil the message is durable
	// (for backends that promise that; in-mem is best-effort).
	Publish(ctx context.Context, topic string, payload []byte) error
}

QueueBackend is the byte-level pub/sub the typed Queue[T] sugar wraps. Backends are responsible for delivery semantics (at-most- once / at-least-once), retry, and ack — the typed wrapper above only cares about codec + handler dispatch.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL