Documentation
¶
Overview ¶
Package workers provides River job workers (e.g. webhook delivery, feedback embedding).
Index ¶
Constants ¶
const WebhookDeliveryTimeoutBuffer = 5 * time.Second
WebhookDeliveryTimeoutBuffer is added to HTTP timeout for the River job timeout.
Variables ¶
This section is empty.
Functions ¶
func NewRiverWorkersAndQueues ¶
func NewRiverWorkersAndQueues( cfg *config.Config, deps RiverDeps, placeholderMaxWorkers int, ) (*river.Workers, map[string]river.QueueConfig)
NewRiverWorkersAndQueues builds River workers and queue config from cfg and deps. When deps.EmbeddingClient is nil, only webhook workers are registered and the embeddings queue is not added. When placeholderMaxWorkers > 0 (e.g. 1 for insert-only API), all queue MaxWorkers use it; otherwise use cfg.
Types ¶
type FeedbackEmbeddingWorker ¶
type FeedbackEmbeddingWorker struct {
river.WorkerDefaults[service.FeedbackEmbeddingArgs]
// contains filtered or unexported fields
}
FeedbackEmbeddingWorker generates and stores embeddings for feedback records.
func NewFeedbackEmbeddingWorker ¶
func NewFeedbackEmbeddingWorker( embeddingService feedbackEmbeddingService, embeddingClient service.EmbeddingClient, docPrefix string, metrics observability.EmbeddingMetrics, ) *FeedbackEmbeddingWorker
NewFeedbackEmbeddingWorker creates a worker that fetches the record, calls the embedding client, and stores the result. docPrefix is the prefix for document text. Can be empty for some providers. metrics may be nil when metrics are disabled.
func (*FeedbackEmbeddingWorker) Timeout ¶
func (w *FeedbackEmbeddingWorker) Timeout(*river.Job[service.FeedbackEmbeddingArgs]) time.Duration
Timeout limits how long a single embedding job can run.
func (*FeedbackEmbeddingWorker) Work ¶
func (w *FeedbackEmbeddingWorker) Work(ctx context.Context, job *river.Job[service.FeedbackEmbeddingArgs]) error
Work loads the record, generates or clears the embedding, and persists it.
type FeedbackTranslationWorker ¶
type FeedbackTranslationWorker struct {
river.WorkerDefaults[service.FeedbackTranslationArgs]
// contains filtered or unexported fields
}
FeedbackTranslationWorker translates a feedback record's value_text into the tenant's target language and stores it, mirroring the embedding worker's error handling.
func NewFeedbackTranslationWorker ¶
func NewFeedbackTranslationWorker( svc translationWorkerService, client service.TranslationClient, metrics observability.TranslationMetrics, ) *FeedbackTranslationWorker
NewFeedbackTranslationWorker creates a worker that fetches the record, translates its value_text, and stores the result. metrics may be nil when metrics are disabled.
func (*FeedbackTranslationWorker) Timeout ¶
func (w *FeedbackTranslationWorker) Timeout(*river.Job[service.FeedbackTranslationArgs]) time.Duration
Timeout limits how long a single translation job can run.
func (*FeedbackTranslationWorker) Work ¶
func (w *FeedbackTranslationWorker) Work(ctx context.Context, job *river.Job[service.FeedbackTranslationArgs]) error
Work loads the record, translates value_text into the target language (or copies it when the source already matches), and persists the result.
type RiverDeps ¶
type RiverDeps struct {
// Webhook worker
WebhooksRepo webhookDispatchRepo
WebhookSender service.WebhookSender
WebhookHTTPTimeout time.Duration
WebhookMetrics observability.WebhookMetrics
// Embedding worker (optional; if EmbeddingClient is nil, embedding worker is not registered)
EmbeddingService feedbackEmbeddingService
EmbeddingClient service.EmbeddingClient
EmbeddingDocPrefix string
EmbeddingMetrics observability.EmbeddingMetrics
// Translation worker (optional; if TranslationClient is nil, translation worker is not registered)
TranslationService translationWorkerService
TranslationClient service.TranslationClient
TranslationMetrics observability.TranslationMetrics
// Per-tenant translation backfill worker (registered alongside the translation worker).
TranslationBackfillService tenantTranslationBackfillService
TranslationMaxAttempts int
}
RiverDeps holds dependencies required to build River workers and queue config. When EmbeddingClient is nil, only webhook workers are registered.
type TenantTranslationBackfillWorker ¶
type TenantTranslationBackfillWorker struct {
river.WorkerDefaults[service.TenantTranslationBackfillArgs]
// contains filtered or unexported fields
}
TenantTranslationBackfillWorker fans out a per-tenant re-translation: it lists the tenant's stale text records and enqueues a FeedbackTranslationArgs job for each. It is enqueued when a tenant's translation settings change (see service.TenantTranslationBackfillArgs) and runs off the request path.
func NewTenantTranslationBackfillWorker ¶
func NewTenantTranslationBackfillWorker( svc tenantTranslationBackfillService, maxAttempts int, ) *TenantTranslationBackfillWorker
NewTenantTranslationBackfillWorker creates the worker. maxAttempts is applied to the per-record translation jobs it enqueues.
func (*TenantTranslationBackfillWorker) Timeout ¶
func (w *TenantTranslationBackfillWorker) Timeout(*river.Job[service.TenantTranslationBackfillArgs]) time.Duration
Timeout limits how long a single tenant backfill fan-out can run.
func (*TenantTranslationBackfillWorker) Work ¶
func (w *TenantTranslationBackfillWorker) Work( ctx context.Context, job *river.Job[service.TenantTranslationBackfillArgs], ) error
Work lists the tenant's stale records and enqueues per-record translation jobs onto the translations queue. The River client is obtained from the context (the only place River sets it) and handed to the service as the inserter, preserving the injected-inserter seam.
type WebhookDispatchWorker ¶
type WebhookDispatchWorker struct {
river.WorkerDefaults[service.WebhookDispatchArgs]
// contains filtered or unexported fields
}
WebhookDispatchWorker delivers one event to one webhook endpoint.
func NewWebhookDispatchWorker ¶
func NewWebhookDispatchWorker( repo webhookDispatchRepo, sender service.WebhookSender, httpTimeout time.Duration, metrics observability.WebhookMetrics, ) *WebhookDispatchWorker
NewWebhookDispatchWorker creates a worker that uses the given repo and sender. httpTimeout is the webhook HTTP client timeout; job timeout is httpTimeout + WebhookDeliveryTimeoutBuffer. metrics may be nil when metrics are disabled.
func (*WebhookDispatchWorker) Timeout ¶
func (w *WebhookDispatchWorker) Timeout(*river.Job[service.WebhookDispatchArgs]) time.Duration
Timeout limits how long a single delivery can run (HTTP timeout + buffer).
func (*WebhookDispatchWorker) Work ¶
func (w *WebhookDispatchWorker) Work(ctx context.Context, job *river.Job[service.WebhookDispatchArgs]) error
Work loads the webhook, builds the payload, and sends once.