rivertest

package
v0.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 13, 2023 License: LGPL-3.0 Imports: 11 Imported by: 0

Documentation

Overview

Package rivertest contains test assertions that can be used in a project's tests to verify that certain actions occurred from the main river package.

Example (RequireInserted)

Example_requireInserted demonstrates the use of the RequireInserted test assertion, which verifies that a single job was inserted.

package main

import (
	"context"
	"fmt"
	"log/slog"
	"testing"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/internal/riverinternaltest"
	"github.com/riverqueue/river/internal/util/slogutil"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	"github.com/riverqueue/river/rivertest"
)

type RequiredArgs struct {
	Message string `json:"message"`
}

func (RequiredArgs) Kind() string { return "required" }

type RequiredWorker struct {
	river.WorkerDefaults[RequiredArgs]
}

func (w *RequiredWorker) Work(ctx context.Context, job *river.Job[RequiredArgs]) error { return nil }

// Example_requireInserted demonstrates the use of the RequireInserted test
// assertion, which verifies that a single job was inserted.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &RequiredWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger:  slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	_, err = riverClient.Insert(ctx, RequiredArgs{
		Message: "Hello.",
	}, nil)
	if err != nil {
		panic(err)
	}

	// Required for purposes of our example here, but in reality t will be the
	// *testing.T that comes from a test's argument.
	t := &testing.T{}

	job := rivertest.RequireInserted(ctx, t, dbPool, &RequiredArgs{}, nil)
	fmt.Printf("Test passed with message: %s\n", job.Args.Message)

	// Verify the same job again, and this time that it was inserted at the
	// default priority and default queue.
	_ = rivertest.RequireInserted(ctx, t, dbPool, &RequiredArgs{}, &rivertest.RequireInsertedOpts{
		Priority: 1,
		Queue:    river.DefaultQueue,
	})

}
Output:

Test passed with message: Hello.
Example (RequireManyInserted)

Example_requireManyInserted demonstrates the use of the RequireManyInserted test assertion, which requires that multiple jobs of the specified kinds were inserted.

package main

import (
	"context"
	"fmt"
	"log/slog"
	"testing"

	"github.com/jackc/pgx/v5/pgxpool"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/internal/riverinternaltest"
	"github.com/riverqueue/river/internal/util/slogutil"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	"github.com/riverqueue/river/rivertest"
)

type FirstRequiredArgs struct {
	Message string `json:"message"`
}

func (FirstRequiredArgs) Kind() string { return "first_required" }

type FirstRequiredWorker struct {
	river.WorkerDefaults[FirstRequiredArgs]
}

func (w *FirstRequiredWorker) Work(ctx context.Context, job *river.Job[FirstRequiredArgs]) error {
	return nil
}

type SecondRequiredArgs struct {
	Message string `json:"message"`
}

func (SecondRequiredArgs) Kind() string { return "second_required" }

type SecondRequiredWorker struct {
	river.WorkerDefaults[SecondRequiredArgs]
}

func (w *SecondRequiredWorker) Work(ctx context.Context, job *river.Job[SecondRequiredArgs]) error {
	return nil
}

// Example_requireManyInserted demonstrates the use of the RequireManyInserted test
// assertion, which requires that multiple jobs of the specified kinds were
// inserted.
func main() {
	ctx := context.Background()

	dbPool, err := pgxpool.NewWithConfig(ctx, riverinternaltest.DatabaseConfig("river_testdb_example"))
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Required for the purpose of this test, but not necessary in real usage.
	if err := riverinternaltest.TruncateRiverTables(ctx, dbPool); err != nil {
		panic(err)
	}

	workers := river.NewWorkers()
	river.AddWorker(workers, &FirstRequiredWorker{})
	river.AddWorker(workers, &SecondRequiredWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger:  slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn}),
		Workers: workers,
	})
	if err != nil {
		panic(err)
	}

	_, err = riverClient.Insert(ctx, FirstRequiredArgs{Message: "Hello from first."}, nil)
	if err != nil {
		panic(err)
	}

	_, err = riverClient.Insert(ctx, SecondRequiredArgs{Message: "Hello from second."}, nil)
	if err != nil {
		panic(err)
	}

	_, err = riverClient.Insert(ctx, FirstRequiredArgs{Message: "Hello from first (again)."}, nil)
	if err != nil {
		panic(err)
	}

	// Required for purposes of our example here, but in reality t will be the
	// *testing.T that comes from a test's argument.
	t := &testing.T{}

	jobs := rivertest.RequireManyInserted(ctx, t, dbPool, []rivertest.ExpectedJob{
		{Args: &FirstRequiredArgs{}},
		{Args: &SecondRequiredArgs{}},
		{Args: &FirstRequiredArgs{}},
	})
	for i, job := range jobs {
		fmt.Printf("Job %d args: %s\n", i, string(job.EncodedArgs))
	}

	// Verify again, and this time that the second job was inserted at the
	// default priority and default queue.
	_ = rivertest.RequireManyInserted(ctx, t, dbPool, []rivertest.ExpectedJob{
		{Args: &SecondRequiredArgs{}, Opts: &rivertest.RequireInsertedOpts{
			Priority: 1,
			Queue:    river.DefaultQueue,
		}},
	})

}
Output:

Job 0 args: {"message": "Hello from first."}
Job 1 args: {"message": "Hello from second."}
Job 2 args: {"message": "Hello from first (again)."}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func RequireInserted

func RequireInserted[T river.JobArgs](ctx context.Context, tb testing.TB, dbtx DBTX, expectedJob T, opts *RequireInsertedOpts) *river.Job[T]

RequireInserted is a test helper that verifies that a job of the given kind was inserted for work, failing the test if it wasn't. The dbtx argument can be any of a Pgx connection pool, connection, or transaction. If found, the inserted job is returned so that further assertions can be made against it.

func TestInsert(t *testing.T) {
	job := RequireInserted(ctx, t, poolOrConnOrTx, &Job1Args{}, nil)
	...

A RequireInsertedOpts struct can be provided as the last argument, and if it is, its properties (e.g. max attempts, priority, queue name) will act as required assertions in the inserted job row. UniqueOpts is ignored.

The assertion will fail if more than one job of the given kind was found because at that point the job to return is ambiguous. Use RequireManyInserted to cover that case instead.

func RequireManyInserted

func RequireManyInserted(ctx context.Context, tb testing.TB, dbtx DBTX, expectedJobs []ExpectedJob) []*river.JobRow

RequireManyInserted is a test helper that verifies that jobs of the given kinds were inserted for work, failing the test if they weren't, or were inserted in the wrong order. The dbtx argument can be any of a Pgx connection pool, connection, or transaction. If found, the inserted jobs are returned so that further assertions can be made against them.

func TestInsertMany(t *testing.T) {
	job := RequireManyInserted(ctx, t, poolOrConnOrTx, []river.JobArgs{
		&Job1Args{},
	})
	...

A RequireInsertedOpts struct can be provided for each expected job, and if it is, its properties (e.g. max attempts, priority, queue name) will act as required assertions for the corresponding inserted job row. UniqueOpts is ignored.

The assertion expects emitted jobs to have occurred exactly in the order and the number specified, and will fail in case this expectation isn't met. So if a job of a certain kind is emitted multiple times, it must be expected multiple times.

Types

type DBTX

type DBTX interface {
	CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error)
	Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error)
	Query(context.Context, string, ...interface{}) (pgx.Rows, error)
	QueryRow(context.Context, string, ...interface{}) pgx.Row
}

DBTX is a database-like executor which is implemented by all of pgxpool.Pool, pgx.Conn, and pgx.Tx. It's used to let this package's assertions be as flexible as possible in what database argument they can take.

type ExpectedJob

type ExpectedJob struct {
	// Args are job arguments to expect.
	Args river.JobArgs

	// Opts are options for the specific required job including insertion
	// options to assert against.
	Opts *RequireInsertedOpts
}

ExpectedJob is a single job to expect encapsulating job args and possible insertion options.

type RequireInsertedOpts

type RequireInsertedOpts struct {
	// MaxAttempts is the expected maximum number of total attempts for the
	// inserted job.
	//
	// No assertion is made if left the zero value.
	MaxAttempts int

	// Priority is the expected priority for the inserted job.
	//
	// No assertion is made if left the zero value.
	Priority int

	// Queue is the expected queue name of the inserted job.
	//
	// No assertion is made if left the zero value.
	Queue string

	// ScheduledAt is the expected scheduled at time of the inserted job. Times
	// are truncated to the microsecond level for comparison to account for the
	// difference between Go storing times to nanoseconds and Postgres storing
	// only to microsecond precision.
	//
	// No assertion is made if left the zero value.
	ScheduledAt time.Time

	// State is the expected state of the inserted job.
	//
	// No assertion is made if left the zero value.
	State river.JobState

	// Tags are the expected tags of the inserted job.
	//
	// No assertion is made if left the zero value.
	Tags []string
}

Options for RequireInserted or RequireManyInserted including expectations for various queuing properties that stem from InsertOpts.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL