Documentation
¶
Overview ¶
Package worker is forge's background-job runtime. It composes with app.RunWorker (sibling to app.Run with HTTP disabled) and provides three primitives:
- Cron: interval-based recurring jobs
- Queue[T]: typed pub/sub topics with pluggable backends
- Outbox: drain the kit's outbox table (planned follow-up)
Wave 3 ships Cron + Queue against an in-memory backend. The Postgres LISTEN/NOTIFY and NATS JetStream backends slot behind the same QueueBackend interface in a follow-up.
// services/billing-worker/main.go
app.RunWorker(
app.WithName("billing"),
worker.FxModule(),
worker.Cron(2*time.Hour, "daily-rollup", dailyRollup),
worker.Queue[CreateInvoiceCmd]("invoices.create", createInvoice),
internal.FxModule(),
)
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FxModule ¶
FxModule wires the worker runtime into the fx graph. Cron jobs start on app.Start; Queue subscriptions start their loops and drop on app.Stop.
The module provides:
- worker.QueueBackend (the configured backend, default InMemoryQueue)
- worker.Publisher (typed-message-aware producer handle)
Types ¶
type InMemoryQueue ¶
type InMemoryQueue struct {
// contains filtered or unexported fields
}
InMemoryQueue is a process-local QueueBackend: useful for tests, single-instance services, and the worker package's own examples. Delivery is best-effort: if no subscriber is registered when a Publish happens, the message is dropped. Buffer is unbounded — fine for tests, never use in production.
func NewInMemoryQueue ¶
func NewInMemoryQueue() *InMemoryQueue
NewInMemoryQueue returns a fresh process-local queue.
func (*InMemoryQueue) Close ¶
func (q *InMemoryQueue) Close()
Close stops every active Subscribe loop.
type Option ¶
type Option func(*config)
Option configures the worker FxModule.
func Cron ¶
Cron registers an interval-based job. Wraps kit/cron.
worker.Cron(2*time.Hour, "daily-rollup", dailyRollup)
func Queue ¶
Queue subscribes handler to topic. Messages on the wire are JSON; the handler receives them decoded as T. Marshal/unmarshal errors are logged but don't crash the consumer.
type CreateInvoiceCmd struct { CustomerID string `json:"customer_id"` }
worker.Queue[CreateInvoiceCmd]("invoices.create", createInvoice)
func WithBackend ¶
func WithBackend(b QueueBackend) Option
WithBackend selects the queue backend. Defaults to NewInMemoryQueue().
type Publisher ¶
Publisher is the producer-side handle services inject into usecases that emit work. It's the same shape as QueueBackend.Publish but scoped to the publish capability so usecases don't see the subscribe surface.
func NewPublisher ¶
func NewPublisher(b QueueBackend) Publisher
NewPublisher adapts a QueueBackend (typed-message-aware via JSON codec) for usecase injection.
type QueueBackend ¶
type QueueBackend interface {
// Subscribe registers handler for topic. Implementations call
// handler concurrently per message; handler errors should trigger
// backend retry / DLQ per its own policy. Subscribe MUST return
// when ctx is cancelled.
Subscribe(ctx context.Context, topic string, handler func(context.Context, []byte) error) error
// Publish ships payload to topic. Synchronous from the caller's
// perspective — once Publish returns nil the message is durable
// (for backends that promise that; in-mem is best-effort).
Publish(ctx context.Context, topic string, payload []byte) error
}
QueueBackend is the byte-level pub/sub the typed Queue[T] sugar wraps. Backends are responsible for delivery semantics (at-most- once / at-least-once), retry, and ack — the typed wrapper above only cares about codec + handler dispatch.