riverqueue

package
v0.2.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package riverqueue implements attempt-scoped background signing jobs using River, a PostgreSQL-native durable queue.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AttemptFailpoints

type AttemptFailpoints map[string]bool

AttemptFailpoints contains non-production failure injection toggles for live DoD verification. It is intentionally inert unless explicitly enabled by WorkerConfig and blocked for production runtime config in riverqueue.New.

func (AttemptFailpoints) Enabled

func (f AttemptFailpoints) Enabled(name string) bool

type AttemptJobArgs

type AttemptJobArgs struct {
	AttemptID string `json:"attempt_id"`
}

AttemptJobArgs carries attempt-scoped signing work. River jobs are keyed by attempt + phase so regeneration never deduplicates work across attempts.

type CleanupProviderAttemptArgs

type CleanupProviderAttemptArgs AttemptJobArgs

func (CleanupProviderAttemptArgs) InsertOpts

func (CleanupProviderAttemptArgs) Kind

type CleanupProviderAttemptWorker

type CleanupProviderAttemptWorker struct {
	river.WorkerDefaults[CleanupProviderAttemptArgs]
	// contains filtered or unexported fields
}

func (*CleanupProviderAttemptWorker) Timeout

func (*CleanupProviderAttemptWorker) Work

type Dependencies

type Dependencies struct {
	DocumentRepo      port.DocumentRepository
	RecipientRepo     port.DocumentRecipientRepository
	AttemptRepo       port.SigningAttemptRepository
	VersionRepo       port.TemplateVersionRepository
	SignerRoleRepo    port.TemplateVersionSignerRoleRepository
	FieldResponseRepo port.DocumentFieldResponseRepository
	PDFRenderer       port.PDFRenderer
	SigningProvider   port.SigningProvider
	StorageAdapter    port.StorageAdapter
	StorageEnabled    bool
	CompletionHandler port.DocumentCompletedHandler
}

type DispatchAttemptCompletionArgs

type DispatchAttemptCompletionArgs AttemptJobArgs

func (DispatchAttemptCompletionArgs) InsertOpts

func (DispatchAttemptCompletionArgs) Kind

type DispatchAttemptCompletionWorker

type DispatchAttemptCompletionWorker struct {
	river.WorkerDefaults[DispatchAttemptCompletionArgs]
	// contains filtered or unexported fields
}

func (*DispatchAttemptCompletionWorker) Timeout

func (*DispatchAttemptCompletionWorker) Work

type ReconcileProviderSubmissionArgs

type ReconcileProviderSubmissionArgs AttemptJobArgs

func (ReconcileProviderSubmissionArgs) InsertOpts

func (ReconcileProviderSubmissionArgs) Kind

type ReconcileProviderSubmissionWorker

type ReconcileProviderSubmissionWorker struct {
	river.WorkerDefaults[ReconcileProviderSubmissionArgs]
	// contains filtered or unexported fields
}

func (*ReconcileProviderSubmissionWorker) Timeout

func (*ReconcileProviderSubmissionWorker) Work

type RefreshAttemptProviderStatusArgs

type RefreshAttemptProviderStatusArgs AttemptJobArgs

func (RefreshAttemptProviderStatusArgs) InsertOpts

func (RefreshAttemptProviderStatusArgs) Kind

type RefreshAttemptProviderStatusWorker

type RefreshAttemptProviderStatusWorker struct {
	river.WorkerDefaults[RefreshAttemptProviderStatusArgs]
	// contains filtered or unexported fields
}

func (*RefreshAttemptProviderStatusWorker) Timeout

func (*RefreshAttemptProviderStatusWorker) Work

type RenderAttemptPDFArgs

type RenderAttemptPDFArgs AttemptJobArgs

func (RenderAttemptPDFArgs) InsertOpts

func (a RenderAttemptPDFArgs) InsertOpts() river.InsertOpts

func (RenderAttemptPDFArgs) Kind

type RenderAttemptPDFWorker

type RenderAttemptPDFWorker struct {
	river.WorkerDefaults[RenderAttemptPDFArgs]
	// contains filtered or unexported fields
}

func (*RenderAttemptPDFWorker) Timeout

func (*RenderAttemptPDFWorker) Work

type RiverService

type RiverService struct {
	// contains filtered or unexported fields
}

RiverService manages the River client lifecycle and exposes signing attempt UoW.

func New

New creates a RiverService. When cfg.Enabled is false the client operates in insert-only mode: jobs can be transactionally enqueued, but not processed.

func (*RiverService) SigningExecutionUOW

func (r *RiverService) SigningExecutionUOW() port.SigningExecutionUnitOfWork

func (*RiverService) Start

func (r *RiverService) Start(ctx context.Context) error

func (*RiverService) Stop

func (r *RiverService) Stop(ctx context.Context) error

type SigningAttemptExecutor

type SigningAttemptExecutor struct {
	// contains filtered or unexported fields
}

SigningAttemptExecutor contains the attempt-aware worker implementation.

func (*SigningAttemptExecutor) CleanupProviderAttempt

func (e *SigningAttemptExecutor) CleanupProviderAttempt(ctx context.Context, attemptID string) error

func (*SigningAttemptExecutor) DispatchAttemptCompletion

func (e *SigningAttemptExecutor) DispatchAttemptCompletion(ctx context.Context, attemptID string) error

func (*SigningAttemptExecutor) ReconcileProviderSubmission

func (e *SigningAttemptExecutor) ReconcileProviderSubmission(ctx context.Context, attemptID string) error

func (*SigningAttemptExecutor) RefreshAttemptProviderStatus

func (e *SigningAttemptExecutor) RefreshAttemptProviderStatus(ctx context.Context, attemptID string) error

func (*SigningAttemptExecutor) RenderAttemptPDF

func (e *SigningAttemptExecutor) RenderAttemptPDF(ctx context.Context, attemptID string) error

func (*SigningAttemptExecutor) SubmitAttemptToProvider

func (e *SigningAttemptExecutor) SubmitAttemptToProvider(ctx context.Context, attemptID string) error

type SigningAttemptExecutorConfig

type SigningAttemptExecutorConfig struct {
	Pool              *pgxpool.Pool
	Client            *river.Client[pgx.Tx]
	DocumentRepo      port.DocumentRepository
	RecipientRepo     port.DocumentRecipientRepository
	AttemptRepo       port.SigningAttemptRepository
	VersionRepo       port.TemplateVersionRepository
	SignerRoleRepo    port.TemplateVersionSignerRoleRepository
	FieldResponseRepo port.DocumentFieldResponseRepository
	PDFRenderer       port.PDFRenderer
	SigningProvider   port.SigningProvider
	StorageAdapter    port.StorageAdapter
	StorageEnabled    bool
	CompletionHandler port.DocumentCompletedHandler
	Failpoints        AttemptFailpoints
}

type SigningExecutionUnitOfWork

type SigningExecutionUnitOfWork struct {
	// contains filtered or unexported fields
}

SigningExecutionUnitOfWork persists signing attempt transitions and River jobs in the same PostgreSQL transaction.

func NewSigningExecutionUnitOfWork

func NewSigningExecutionUnitOfWork(
	pool *pgxpool.Pool,
	client *river.Client[pgx.Tx],
	attemptRepo port.SigningAttemptRepository,
) *SigningExecutionUnitOfWork

func (*SigningExecutionUnitOfWork) CreateAttemptAndEnqueueRender

func (u *SigningExecutionUnitOfWork) CreateAttemptAndEnqueueRender(
	ctx context.Context,
	documentID string,
	recipients []*entity.DocumentRecipient,
	signerOrders map[string]int,
) (*entity.SigningAttempt, error)

func (*SigningExecutionUnitOfWork) SupersedeActiveAndCreateAttempt

func (u *SigningExecutionUnitOfWork) SupersedeActiveAndCreateAttempt(
	ctx context.Context,
	documentID, expectedOldAttemptID, reason string,
	recipients []*entity.DocumentRecipient,
	signerOrders map[string]int,
) (*entity.SigningAttempt, error)

func (*SigningExecutionUnitOfWork) TerminateActiveAttempt

func (u *SigningExecutionUnitOfWork) TerminateActiveAttempt(
	ctx context.Context,
	attempt *entity.SigningAttempt,
	status entity.SigningAttemptStatus,
	reason, eventType string,
) error

func (*SigningExecutionUnitOfWork) Transition

func (u *SigningExecutionUnitOfWork) Transition(ctx context.Context, attempt *entity.SigningAttempt, eventType string) error

func (*SigningExecutionUnitOfWork) TransitionAndEnqueue

func (u *SigningExecutionUnitOfWork) TransitionAndEnqueue(ctx context.Context, attempt *entity.SigningAttempt, nextPhase port.SigningJobPhase, eventType string) error

type SubmitAttemptToProviderArgs

type SubmitAttemptToProviderArgs AttemptJobArgs

func (SubmitAttemptToProviderArgs) InsertOpts

func (SubmitAttemptToProviderArgs) Kind

type SubmitAttemptToProviderWorker

type SubmitAttemptToProviderWorker struct {
	river.WorkerDefaults[SubmitAttemptToProviderArgs]
	// contains filtered or unexported fields
}

func (*SubmitAttemptToProviderWorker) Timeout

func (*SubmitAttemptToProviderWorker) Work

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL