Documentation
¶
Overview ¶
Package riverlog provides a context logging middleware for workers that collates output and stores it to job records.
Example (Middleware) ¶
Example_middleware demonstrates the use of riverlog middleware to inject a logger into context that'll persist its output onto the job record.
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdbtest"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
"github.com/riverqueue/river/riverlog"
"github.com/riverqueue/river/rivershared/riversharedtest"
"github.com/riverqueue/river/rivershared/util/slogutil"
"github.com/riverqueue/river/rivershared/util/testutil"
"github.com/riverqueue/river/rivertype"
)
type LoggingArgs struct{}
func (LoggingArgs) Kind() string { return "logging" }
type LoggingWorker struct {
river.WorkerDefaults[LoggingArgs]
}
func (w *LoggingWorker) Work(ctx context.Context, job *river.Job[LoggingArgs]) error {
riverlog.Logger(ctx).InfoContext(ctx, "Logged from worker")
riverlog.Logger(ctx).InfoContext(ctx, "Another line logged from worker")
return nil
}
// Example_middleware demonstrates the use of riverlog middleware to inject a
// logger into context that'll persist its output onto the job record.
func main() {
ctx := context.Background()
dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
if err != nil {
panic(err)
}
defer dbPool.Close()
workers := river.NewWorkers()
river.AddWorker(workers, &LoggingWorker{})
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Logger: slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Middleware: []rivertype.Middleware{
riverlog.NewMiddleware(func(w io.Writer) slog.Handler {
// We have to use a specialized handler without timestamps to
// make test output reproducible, but in reality this would as
// simple as something like:
//
// return slog.NewJSONHandler(w, nil)
return &slogutil.SlogMessageOnlyHandler{Out: w}
}, nil),
},
Schema: riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
TestOnly: true, // suitable only for use in tests; remove for live environments
Workers: workers,
})
if err != nil {
panic(err)
}
// Out of example scope, but used to wait until a job is worked.
subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
defer subscribeCancel()
if err := riverClient.Start(ctx); err != nil {
panic(err)
}
_, err = riverClient.Insert(ctx, LoggingArgs{}, nil)
if err != nil {
panic(err)
}
type metadataWithLog struct {
RiverLog []struct {
Log string `json:"log"`
} `json:"river:log"`
}
// Wait for job to complete, extract log data out of metadata, and print it.
for _, event := range riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) {
var metadataWithLog metadataWithLog
if err := json.Unmarshal(event.Job.Metadata, &metadataWithLog); err != nil {
panic(err)
}
for _, logAttempt := range metadataWithLog.RiverLog {
fmt.Print(logAttempt.Log)
}
}
if err := riverClient.Stop(ctx); err != nil {
panic(err)
}
}
Output: Logged from worker Another line logged from worker
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Middleware ¶
type Middleware struct {
baseservice.BaseService
rivertype.Middleware
// contains filtered or unexported fields
}
Middleware injects a context logger into the Work function of workers it's installed on (or workers of the client it's installed on) which is accessible with Logger, and which collates all log output to store it to metadata after the job finishes execution. This output is then viewable from River UI.
func NewMiddleware ¶
func NewMiddleware(newHandler func(w io.Writer) slog.Handler, config *MiddlewareConfig) *Middleware
NewMiddleware initializes a new Middleware with the given handler function and configuration.
newHandler is a function which is invoked on every Work execution to generate a new slog.Handler for a work-specific slog.Logger. It should take an io.Writer and return a slog.Handler of choice that's configured to suit the caller.
For example:
riverlog.NewMiddleware(func(w io.Writer) slog.Handler {
return slog.NewJSONHandler(w, nil)
}, nil)
type MiddlewareConfig ¶
type MiddlewareConfig struct {
// MaxSizeBytes is the maximum size of log data that'll be persisted in
// bytes per job attempt. Anything larger will be truncated will be
// truncated down to MaxSizeBytes.
//
// Be careful with this number because the maximum total log size is equal
// to maximum number of attempts multiplied by this number (each attempt's
// logs are kept separately). For example, 25 * 2 MB = 50 MB maximum
// theoretical log size. Log data goes into metadata which is a JSONB field,
// and JSONB fields have a maximum size of 255 MB, so any number larger than
// 255 divided by maximum number of attempts may cause serious operational
// problems.
//
// Defaults to 2 MB (which is per job attempt).
MaxSizeBytes int
}
MiddlewareConfig is configuration for Middleware.