riverlog

package
v0.29.0-rc.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 4, 2025 License: MPL-2.0 Imports: 10 Imported by: 1

Documentation

Overview

Package riverlog provides a context logging middleware for workers that collates output and stores it to job records.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Logger

func Logger(ctx context.Context) *slog.Logger

Logger extracts a logger from context from within the Work body of a worker. Middleware must be installed on either the worker or client for this function to be usable.

This variant panics if no logger was available in context.

func LoggerSafely added in v0.28.0

func LoggerSafely(ctx context.Context) (*slog.Logger, bool)

LoggerSafely extracts a logger from context from within the Work body of a worker. Middleware must be installed on either the worker or client for this function to be usable.

This variant returns a boolean that's true if a logger was available in context and false otherwise.

Types

type Middleware

type Middleware struct {
	baseservice.BaseService
	rivertype.Middleware
	// contains filtered or unexported fields
}

Middleware injects a context logger into the Work function of workers it's installed on (or workers of the client it's installed on) which is accessible with Logger, and which collates all log output to store it to metadata after the job finishes execution. This output is then viewable from River UI.

func NewMiddleware

func NewMiddleware(newSlogHandler func(w io.Writer) slog.Handler, config *MiddlewareConfig) *Middleware

NewMiddleware initializes a new Middleware with the given slog handler initialization function and configuration.

newHandler is a function which is invoked on every Work execution to generate a new slog.Handler for a work-specific slog.Logger. It should take an io.Writer and return a slog.Handler of choice that's configured to suit the caller.

For example:

riverlog.NewMiddleware(func(w io.Writer) slog.Handler {
	return slog.NewJSONHandler(w, nil)
}, nil)

With the middleware in place, the logger is available in a work function's context:

func (w *MyWorker) Work(ctx context.Context, job *river.Job[MyArgs]) error {
	Logger(ctx).InfoContext(ctx, "Hello from work")
Example

ExampleNewMiddleware demonstrates the use of riverlog middleware to inject a logger into context that'll persist its output onto the job record.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"io"
	"log/slog"
	"os"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/riverdbtest"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	"github.com/riverqueue/river/riverlog"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/slogutil"
	"github.com/riverqueue/river/rivershared/util/testutil"
	"github.com/riverqueue/river/rivertype"
)

type LoggingArgs struct{}

func (LoggingArgs) Kind() string { return "logging" }

type LoggingWorker struct {
	river.WorkerDefaults[LoggingArgs]
}

func (w *LoggingWorker) Work(ctx context.Context, job *river.Job[LoggingArgs]) error {
	riverlog.Logger(ctx).InfoContext(ctx, "Logged from worker")
	riverlog.Logger(ctx).InfoContext(ctx, "Another line logged from worker")
	return nil
}

// ExampleNewMiddleware demonstrates the use of riverlog middleware to inject a
// logger into context that'll persist its output onto the job record.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &LoggingWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
		},
		Middleware: []rivertype.Middleware{
			riverlog.NewMiddleware(func(w io.Writer) slog.Handler {
				// We have to use a specialized ReplacedAttr without level or
				// timestamps to make test output reproducible, but in reality
				// this would as simple as something like:
				//
				// 	return slog.NewJSONHandler(w, nil)
				return slog.NewTextHandler(w, &slog.HandlerOptions{ReplaceAttr: slogutil.NoLevelTime})
			}, nil),
		},
		Schema:   riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
		TestOnly: true,                                                                         // suitable only for use in tests; remove for live environments
		Workers:  workers,
	})
	if err != nil {
		panic(err)
	}

	// Out of example scope, but used to wait until a job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	_, err = riverClient.Insert(ctx, LoggingArgs{}, nil)
	if err != nil {
		panic(err)
	}

	// Wait for job to complete, extract log data out of metadata, and print it.
	for _, event := range riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) {
		var metadataWithLog metadataWithLog
		if err := json.Unmarshal(event.Job.Metadata, &metadataWithLog); err != nil {
			panic(err)
		}
		for _, logAttempt := range metadataWithLog.RiverLog {
			fmt.Print(logAttempt.Log)
		}
	}

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}

type metadataWithLog struct {
	RiverLog []struct {
		Log string `json:"log"`
	} `json:"river:log"`
}
Output:

msg="Logged from worker"
msg="Another line logged from worker"

func NewMiddlewareCustomContext added in v0.23.0

func NewMiddlewareCustomContext(newCustomContext func(ctx context.Context, w io.Writer) context.Context, config *MiddlewareConfig) *Middleware

NewMiddlewareCustomContext initializes a new Middleware with the given arbitrary context initialization function and configuration.

newContext is a function which is invoked on every Work execution to generate a new context for the worker. It's generally used to initialize a logger with the given writer and put it in context under a user-defined context key for later use.

This variant is meant to provide callers with a version of the middleware that's not tied to slog. A non-slog standard library logger, Logrus, or Zap logger could all be placed in context according to preferred convention.

For example:

riverlog.NewMiddlewareCustomContext(func(ctx context.Context, w io.Writer) context.Context {
	logger := log.New(w, "", 0)
	return context.WithValue(ctx, ArbitraryContextKey{}, logger)
}, nil),
Example

ExampleNewMiddlewareCustomContext demonstrates the use of riverlog middleware with an arbitrary new context function that can be used to inject any sort of logger into context.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"io"
	"log"
	"log/slog"
	"os"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/riverdbtest"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	"github.com/riverqueue/river/riverlog"
	"github.com/riverqueue/river/rivershared/riversharedtest"
	"github.com/riverqueue/river/rivershared/util/slogutil"
	"github.com/riverqueue/river/rivershared/util/testutil"
	"github.com/riverqueue/river/rivertype"
)

// Callers should define their own context key to extract their a logger back
// out of work context.
type customContextKey struct{}

type CustomContextLoggingArgs struct{}

func (CustomContextLoggingArgs) Kind() string { return "logging" }

type CustomContextLoggingWorker struct {
	river.WorkerDefaults[CustomContextLoggingArgs]
}

func (w *CustomContextLoggingWorker) Work(ctx context.Context, job *river.Job[CustomContextLoggingArgs]) error {
	logger := ctx.Value(customContextKey{}).(*log.Logger) //nolint:forcetypeassert
	logger.Printf("Raw log from worker")
	return nil
}

// ExampleNewMiddlewareCustomContext demonstrates the use of riverlog middleware
// with an arbitrary new context function that can be used to inject any sort of
// logger into context.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.New(ctx, riversharedtest.TestDatabaseURL())
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	workers := river.NewWorkers()
	river.AddWorker(workers, &CustomContextLoggingWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
		},
		Middleware: []rivertype.Middleware{
			riverlog.NewMiddlewareCustomContext(func(ctx context.Context, w io.Writer) context.Context {
				// For demonstration purposes we show the use of a built-in
				// non-slog logger, but this could be anything like Logrus or
				// Zap. Even the raw writer could be stored if so desired.
				logger := log.New(w, "", 0)
				return context.WithValue(ctx, customContextKey{}, logger)
			}, nil),
		},
		Schema:   riverdbtest.TestSchema(ctx, testutil.PanicTB(), riverpgxv5.New(dbPool), nil), // only necessary for the example test
		TestOnly: true,                                                                         // suitable only for use in tests; remove for live environments
		Workers:  workers,
	})
	if err != nil {
		panic(err)
	}

	// Out of example scope, but used to wait until a job is worked.
	subscribeChan, subscribeCancel := riverClient.Subscribe(river.EventKindJobCompleted)
	defer subscribeCancel()

	if err := riverClient.Start(ctx); err != nil {
		panic(err)
	}

	_, err = riverClient.Insert(ctx, CustomContextLoggingArgs{}, nil)
	if err != nil {
		panic(err)
	}

	// Wait for job to complete, extract log data out of metadata, and print it.
	for _, event := range riversharedtest.WaitOrTimeoutN(testutil.PanicTB(), subscribeChan, 1) {
		var metadataWithLog metadataWithLog
		if err := json.Unmarshal(event.Job.Metadata, &metadataWithLog); err != nil {
			panic(err)
		}
		for _, logAttempt := range metadataWithLog.RiverLog {
			fmt.Print(logAttempt.Log)
		}
	}

	if err := riverClient.Stop(ctx); err != nil {
		panic(err)
	}

}
Output:

Raw log from worker

func (*Middleware) Work

func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error

type MiddlewareConfig

type MiddlewareConfig struct {
	// MaxSizeBytes is the maximum size of log data that'll be persisted in
	// bytes per job attempt. Anything larger will be truncated will be
	// truncated down to MaxSizeBytes.
	//
	// Be careful with this number because the maximum total log size is equal
	// to maximum number of attempts multiplied by this number (each attempt's
	// logs are kept separately). For example, 25 * 2 MB = 50 MB maximum
	// theoretical log size. Log data goes into metadata which is a JSONB field,
	// and JSONB fields have a maximum size of 255 MB, so any number larger than
	// 255 divided by maximum number of attempts may cause serious operational
	// problems.
	//
	// Defaults to 2 MB (which is per job attempt).
	MaxSizeBytes int
}

MiddlewareConfig is configuration for Middleware.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL