workers

package
v0.0.0-...-0f4e36f Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2026 License: Apache-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package workers provides River job workers (e.g. webhook delivery, feedback embedding).

Index

Constants

View Source
const WebhookDeliveryTimeoutBuffer = 5 * time.Second

WebhookDeliveryTimeoutBuffer is added to HTTP timeout for the River job timeout.

Variables

This section is empty.

Functions

func NewRiverWorkersAndQueues

func NewRiverWorkersAndQueues(
	cfg *config.Config, deps RiverDeps, placeholderMaxWorkers int,
) (*river.Workers, map[string]river.QueueConfig)

NewRiverWorkersAndQueues builds River workers and queue config from cfg and deps. When deps.EmbeddingClient is nil, only webhook workers are registered and the embeddings queue is not added. When placeholderMaxWorkers > 0 (e.g. 1 for insert-only API), all queue MaxWorkers use it; otherwise use cfg.

Types

type FeedbackEmbeddingWorker

type FeedbackEmbeddingWorker struct {
	river.WorkerDefaults[service.FeedbackEmbeddingArgs]
	// contains filtered or unexported fields
}

FeedbackEmbeddingWorker generates and stores embeddings for feedback records.

func NewFeedbackEmbeddingWorker

func NewFeedbackEmbeddingWorker(
	embeddingService feedbackEmbeddingService,
	embeddingClient service.EmbeddingClient,
	docPrefix string,
	metrics observability.EmbeddingMetrics,
) *FeedbackEmbeddingWorker

NewFeedbackEmbeddingWorker creates a worker that fetches the record, calls the embedding client, and stores the result. docPrefix is the prefix for document text. Can be empty for some providers. metrics may be nil when metrics are disabled.

func (*FeedbackEmbeddingWorker) Timeout

Timeout limits how long a single embedding job can run.

func (*FeedbackEmbeddingWorker) Work

Work loads the record, generates or clears the embedding, and persists it.

type FeedbackTranslationWorker

type FeedbackTranslationWorker struct {
	river.WorkerDefaults[service.FeedbackTranslationArgs]
	// contains filtered or unexported fields
}

FeedbackTranslationWorker translates a feedback record's value_text into the tenant's target language and stores it, mirroring the embedding worker's error handling.

func NewFeedbackTranslationWorker

func NewFeedbackTranslationWorker(
	svc translationWorkerService, client service.TranslationClient, metrics observability.TranslationMetrics,
) *FeedbackTranslationWorker

NewFeedbackTranslationWorker creates a worker that fetches the record, translates its value_text, and stores the result. metrics may be nil when metrics are disabled.

func (*FeedbackTranslationWorker) Timeout

Timeout limits how long a single translation job can run.

func (*FeedbackTranslationWorker) Work

Work loads the record, translates value_text into the target language (or copies it when the source already matches), and persists the result.

type RiverDeps

type RiverDeps struct {
	// Webhook worker
	WebhooksRepo       webhookDispatchRepo
	WebhookSender      service.WebhookSender
	WebhookHTTPTimeout time.Duration
	WebhookMetrics     observability.WebhookMetrics

	// Embedding worker (optional; if EmbeddingClient is nil, embedding worker is not registered)
	EmbeddingService   feedbackEmbeddingService
	EmbeddingClient    service.EmbeddingClient
	EmbeddingDocPrefix string
	EmbeddingMetrics   observability.EmbeddingMetrics

	// Translation worker (optional; if TranslationClient is nil, translation worker is not registered)
	TranslationService translationWorkerService
	TranslationClient  service.TranslationClient
	TranslationMetrics observability.TranslationMetrics
	// Per-tenant translation backfill worker (registered alongside the translation worker).
	TranslationBackfillService tenantTranslationBackfillService
	TranslationMaxAttempts     int
}

RiverDeps holds dependencies required to build River workers and queue config. When EmbeddingClient is nil, only webhook workers are registered.

type TenantTranslationBackfillWorker

type TenantTranslationBackfillWorker struct {
	river.WorkerDefaults[service.TenantTranslationBackfillArgs]
	// contains filtered or unexported fields
}

TenantTranslationBackfillWorker fans out a per-tenant re-translation: it lists the tenant's stale text records and enqueues a FeedbackTranslationArgs job for each. It is enqueued when a tenant's translation settings change (see service.TenantTranslationBackfillArgs) and runs off the request path.

func NewTenantTranslationBackfillWorker

func NewTenantTranslationBackfillWorker(
	svc tenantTranslationBackfillService, maxAttempts int,
) *TenantTranslationBackfillWorker

NewTenantTranslationBackfillWorker creates the worker. maxAttempts is applied to the per-record translation jobs it enqueues.

func (*TenantTranslationBackfillWorker) Timeout

Timeout limits how long a single tenant backfill fan-out can run.

func (*TenantTranslationBackfillWorker) Work

Work lists the tenant's stale records and enqueues per-record translation jobs onto the translations queue. The River client is obtained from the context (the only place River sets it) and handed to the service as the inserter, preserving the injected-inserter seam.

type WebhookDispatchWorker

type WebhookDispatchWorker struct {
	river.WorkerDefaults[service.WebhookDispatchArgs]
	// contains filtered or unexported fields
}

WebhookDispatchWorker delivers one event to one webhook endpoint.

func NewWebhookDispatchWorker

func NewWebhookDispatchWorker(
	repo webhookDispatchRepo, sender service.WebhookSender, httpTimeout time.Duration,
	metrics observability.WebhookMetrics,
) *WebhookDispatchWorker

NewWebhookDispatchWorker creates a worker that uses the given repo and sender. httpTimeout is the webhook HTTP client timeout; job timeout is httpTimeout + WebhookDeliveryTimeoutBuffer. metrics may be nil when metrics are disabled.

func (*WebhookDispatchWorker) Timeout

Timeout limits how long a single delivery can run (HTTP timeout + buffer).

func (*WebhookDispatchWorker) Work

Work loads the webhook, builds the payload, and sends once.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL