riverui

package module
v0.11.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 5, 2025 License: MPL-2.0 Imports: 34 Imported by: 11

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var FrontendIndex embed.FS

Functions

func NewNotFoundJob added in v0.9.0

func NewNotFoundJob(jobID int64) *apierror.NotFound

func NewNotFoundQueue added in v0.9.0

func NewNotFoundQueue(name string) *apierror.NotFound

func NewNotFoundWorkflow added in v0.9.0

func NewNotFoundWorkflow(id string) *apierror.NotFound

func NormalizePathPrefix

func NormalizePathPrefix(prefix string) string

Types

type ConcurrencyConfig added in v0.9.0

type ConcurrencyConfig struct {
	GlobalLimit int32           `json:"global_limit"`
	LocalLimit  int32           `json:"local_limit"`
	Partition   PartitionConfig `json:"partition"`
}

type DB

type DB interface {
	Begin(ctx context.Context) (pgx.Tx, error)
	Exec(ctx context.Context, query string, args ...interface{}) (pgconn.CommandTag, error)
	Query(ctx context.Context, query string, args ...interface{}) (pgx.Rows, error)
	QueryRow(ctx context.Context, query string, args ...interface{}) pgx.Row
}

DB is the interface for a pgx database connection.

type PartitionConfig added in v0.9.0

type PartitionConfig struct {
	ByArgs []string `json:"by_args"`
	ByKind bool     `json:"by_kind"`
}

type RiverJob

type RiverJob struct {
	RiverJobMinimal
	Errors   []rivertype.AttemptError `json:"errors"`
	Metadata json.RawMessage          `json:"metadata"`
}

type RiverJobMinimal added in v0.10.0

type RiverJobMinimal struct {
	ID          int64           `json:"id"`
	Args        json.RawMessage `json:"args"`
	Attempt     int             `json:"attempt"`
	AttemptedAt *time.Time      `json:"attempted_at"`
	AttemptedBy []string        `json:"attempted_by"`
	CreatedAt   time.Time       `json:"created_at"`
	FinalizedAt *time.Time      `json:"finalized_at"`
	Kind        string          `json:"kind"`
	MaxAttempts int             `json:"max_attempts"`
	Priority    int             `json:"priority"`
	Queue       string          `json:"queue"`
	ScheduledAt time.Time       `json:"scheduled_at"`
	State       string          `json:"state"`
	Tags        []string        `json:"tags"`
}

type RiverProducer added in v0.9.0

type RiverProducer struct {
	ID          int64              `json:"id"`
	ClientID    string             `json:"client_id"`
	Concurrency *ConcurrencyConfig `json:"concurrency"`
	CreatedAt   time.Time          `json:"created_at"`
	MaxWorkers  int                `json:"max_workers"`
	PausedAt    *time.Time         `json:"paused_at"`
	QueueName   string             `json:"queue_name"`
	Running     int32              `json:"running"`
	UpdatedAt   time.Time          `json:"updated_at"`
}

type RiverQueue

type RiverQueue struct {
	CountAvailable int                `json:"count_available"`
	CountRunning   int                `json:"count_running"`
	CreatedAt      time.Time          `json:"created_at"`
	Concurrency    *ConcurrencyConfig `json:"concurrency"`
	Name           string             `json:"name"`
	PausedAt       *time.Time         `json:"paused_at"`
	UpdatedAt      time.Time          `json:"updated_at"`
}

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is an HTTP server that serves the River UI and API. It must be started with Start to initialize caching and background query functionality prior to serving requests. Server implements http.Handler, so it can be directly mounted in an http.ServeMux.

func NewServer

func NewServer(opts *ServerOpts) (*Server, error)

NewServer creates a new Server that serves the River UI and API.

Example

ExampleNewServer demonstrates how to create a River UI server, embed it in an HTTP server, and make requests to its API endpoints.

package main

import (
	"context"
	"fmt"
	"io"
	"log/slog"
	"net/http"
	"net/http/httptest"
	"os"
	"strings"

	"github.com/jackc/pgx/v5/pgxpool"
	"riverqueue.com/riverui"

	"github.com/riverqueue/river"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	"github.com/riverqueue/river/rivershared/util/slogutil"
)

func main() {
	ctx := context.Background()

	// Create a PostgreSQL connection pool. In a real application, you'd use your
	// own connection string or pool config.
	connConfig, err := pgxpool.ParseConfig(os.Getenv("TEST_DATABASE_URL"))
	if err != nil {
		panic(err)
	}

	dbPool, err := pgxpool.NewWithConfig(ctx, connConfig)
	if err != nil {
		panic(err)
	}
	defer dbPool.Close()

	// Create a River client with a message-only logger for reproducible output.
	// You can use any slog.Handler implementation in your application.
	logger := slog.New(&slogutil.SlogMessageOnlyHandler{Level: slog.LevelWarn})

	// Create a River client. We don't need to start the client since we're only
	// using it to demonstrate the UI.
	client, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Logger: logger,
	})
	if err != nil {
		panic(err)
	}

	// Create the River UI server. This server implements http.Handler and can be
	// mounted in an HTTP mux
	server, err := riverui.NewServer(&riverui.ServerOpts{
		Client:  client,
		DevMode: true, // Use the live filesystem—don't use this outside tests
		DB:      dbPool,
		Logger:  logger,
		Prefix:  "/riverui", // Mount the UI under /riverui path
	})
	if err != nil {
		panic(err)
	}

	// Start the server to initialize background processes
	// This does not start an HTTP server
	if err := server.Start(ctx); err != nil {
		panic(err)
	}

	// Create an HTTP mux and mount the River UI:
	mux := http.NewServeMux()
	mux.Handle("/riverui/", server)

	// For this example, we use a test server to demonstrate API calls. In a
	// production environment, you would use http.ListenAndServe instead.
	testServer := httptest.NewServer(mux)
	defer testServer.Close()

	// Make a request to the jobs endpoint to demonstrate API usage:
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, testServer.URL+"/riverui/api/jobs?state=available", nil)
	if err != nil {
		panic(err)
	}
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		panic(err)
	}
	defer resp.Body.Close()

	fmt.Printf("Status: %s\n", resp.Status)

	body, err := io.ReadAll(resp.Body)
	if err != nil {
		panic(err)
	}
	fmt.Printf("Response: %s\n", strings.TrimSpace(string(body)))

}
Output:

Status: 200 OK
Response: {"data":[]}

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(rw http.ResponseWriter, req *http.Request)

ServeHTTP returns an http.ServeHTTP that can be mounted to serve HTTP requests.

func (*Server) Start

func (s *Server) Start(ctx context.Context) error

Start starts the server's background services. Notably, this does _not_ cause the server to start listening for HTTP in any way. To serve HTTP requests, the Server implements `http.Handler` via a `ServeHTTP` method and can be mounted in an existing `http.ServeMux`.

type ServerOpts

type ServerOpts struct {
	// Client is the River client to use for API requests.
	Client *river.Client[pgx.Tx]
	// DB is the database to use for API requests.
	DB DB
	// DevMode is whether the server is running in development mode.
	DevMode bool
	// JobListHideArgsByDefault is whether to hide job arguments by default in the
	// job list view. This is useful for users with complex encoded arguments.
	JobListHideArgsByDefault bool
	// LiveFS is whether to use the live filesystem for the frontend.
	LiveFS bool
	// Logger is the logger to use logging errors within the handler.
	Logger *slog.Logger
	// Prefix is the path prefix to use for the API and UI HTTP requests.
	Prefix string
}

ServerOpts are the options for creating a new Server.

Source Files

  • embed.go
  • handler.go
  • handler_api_endpoint.go
  • int64_string.go
  • spa_response_writer.go

Directories

Path Synopsis
cmd
riverui command
internal
riverproui module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL