otelriver

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 21, 2025 License: MPL-2.0 Imports: 11 Imported by: 1

README

otelriver Build Status Go Reference

OpenTelemetry utilities for the River job queue.

See example_middleware_test.go for usage details.

Options

The middleware supports these options:

middleware := otelriver.NewMiddleware(&MiddlewareConfig{
    DurationUnit:                "ms",
    EnableSemanticMetrics:       true,
    EnableWorkSpanJobKindSuffix: true,
    MeterProvider:               meterProvider,
    TracerProvider:              tracerProvider,
})
  • DurationUnit: The unit which durations are emitted as, either "ms" (milliseconds) or "s" (seconds). Defaults to seconds.
  • EnableSemanticMetrics: Causes the middleware to emit metrics compliant with OpenTelemetry's "semantic conventions" for message clients. This has the effect of having all messaging systems share the same common metric names, with attributes differentiating them.
  • EnableWorkSpanJobKindSuffix: Appends the job kind a suffix to work spans so they look like river.work/my_job instead of river.work.
  • MeterProvider: Injected OpenTelemetry meter provider. The global meter provider is used by default.
  • TracerProvider: Injected OpenTelemetry tracer provider. The global tracer provider is used by default.

Use with DataDog

See using the OpenTelemetry API with DataDog and the examples in datadogriver for how to configure a DataDog OpenTelemetry tracer provider.

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Middleware

type Middleware struct {
	river.MiddlewareDefaults
	// contains filtered or unexported fields
}

Middleware is a River middleware that emits OpenTelemetry metrics when jobs are inserted or worked.

Example
package main

import (
	"log/slog"
	"os"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	"github.com/riverqueue/river/rivershared/util/slogutil"
	"github.com/riverqueue/river/rivertype"
	"github.com/riverqueue/rivercontrib/otelriver"
)

func main() {
	_, err := river.NewClient(riverpgxv5.New(nil), &river.Config{
		Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelWarn, ReplaceAttr: slogutil.NoLevelTime})),
		Middleware: []rivertype.Middleware{
			// Install the OpenTelemetry middleware to run for all jobs inserted
			// or worked by this River client.
			otelriver.NewMiddleware(nil),
		},
		TestOnly: true, // suitable only for use in tests; remove for live environments
	})
	if err != nil {
		panic(err)
	}

}

func NewMiddleware

func NewMiddleware(config *MiddlewareConfig) *Middleware

NewMiddleware initializes a new River OpenTelemetry middleware.

config may be nil.

func (*Middleware) InsertMany

func (m *Middleware) InsertMany(ctx context.Context, manyParams []*rivertype.JobInsertParams, doInner func(ctx context.Context) ([]*rivertype.JobInsertResult, error)) ([]*rivertype.JobInsertResult, error)

func (*Middleware) Work

func (m *Middleware) Work(ctx context.Context, job *rivertype.JobRow, doInner func(context.Context) error) error

type MiddlewareConfig

type MiddlewareConfig struct {
	// DurationUnit selects the unit in which duration metrics like
	// `river.work_duration` are emitted.
	//
	// Must be one of "ms" (milliseconds) or "s" (seconds). Defaults to seconds.
	//
	// Does not modify metrics emitted by EnableSemanticMetrics because those
	// are constrained to seconds by specification.
	DurationUnit string

	// EnableSemanticMetrics emits metrics compliant with OpenTelemetry's
	// "semantic conventions" for messaging clients:
	//
	// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-metrics/
	//
	// This has the effect of having all messaging systems share the same common
	// metric names, with attributes differentiating them.
	EnableSemanticMetrics bool

	// EnableWorkSpanJobKindSuffix appends the job kind a suffix to work spans
	// so they look like `river.work/my_job` instead of `river.work`.
	EnableWorkSpanJobKindSuffix bool

	// MeterProvider is a MeterProvider to base metrics on. May be left as nil
	// to use the default global provider.
	MeterProvider metric.MeterProvider

	// TracerProvider is a TracerProvider to base traces on. May be left as nil
	// to use the default global provider.
	TracerProvider trace.TracerProvider
}

MiddlewareConfig is configuration for River's OpenTelemetry middleware.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL