Documentation
¶
Overview ¶
Package riverqueue implements attempt-scoped background signing jobs using River, a PostgreSQL-native durable queue.
Index ¶
- type AttemptFailpoints
- type AttemptJobArgs
- type CleanupProviderAttemptArgs
- type CleanupProviderAttemptWorker
- type Dependencies
- type DispatchAttemptCompletionArgs
- type DispatchAttemptCompletionWorker
- type ReconcileProviderSubmissionArgs
- type ReconcileProviderSubmissionWorker
- type RefreshAttemptProviderStatusArgs
- type RefreshAttemptProviderStatusWorker
- type RenderAttemptPDFArgs
- type RenderAttemptPDFWorker
- type RiverService
- type SigningAttemptExecutor
- func (e *SigningAttemptExecutor) CleanupProviderAttempt(ctx context.Context, attemptID string) error
- func (e *SigningAttemptExecutor) DispatchAttemptCompletion(ctx context.Context, attemptID string) error
- func (e *SigningAttemptExecutor) ReconcileProviderSubmission(ctx context.Context, attemptID string) error
- func (e *SigningAttemptExecutor) RefreshAttemptProviderStatus(ctx context.Context, attemptID string) error
- func (e *SigningAttemptExecutor) RenderAttemptPDF(ctx context.Context, attemptID string) error
- func (e *SigningAttemptExecutor) SubmitAttemptToProvider(ctx context.Context, attemptID string) error
- type SigningAttemptExecutorConfig
- type SigningExecutionUnitOfWork
- func (u *SigningExecutionUnitOfWork) CreateAttemptAndEnqueueRender(ctx context.Context, documentID string, recipients []*entity.DocumentRecipient, ...) (*entity.SigningAttempt, error)
- func (u *SigningExecutionUnitOfWork) SupersedeActiveAndCreateAttempt(ctx context.Context, documentID, expectedOldAttemptID, reason string, ...) (*entity.SigningAttempt, error)
- func (u *SigningExecutionUnitOfWork) TerminateActiveAttempt(ctx context.Context, attempt *entity.SigningAttempt, ...) error
- func (u *SigningExecutionUnitOfWork) Transition(ctx context.Context, attempt *entity.SigningAttempt, eventType string) error
- func (u *SigningExecutionUnitOfWork) TransitionAndEnqueue(ctx context.Context, attempt *entity.SigningAttempt, ...) error
- type SubmitAttemptToProviderArgs
- type SubmitAttemptToProviderWorker
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AttemptFailpoints ¶
AttemptFailpoints contains non-production failure injection toggles for live DoD verification. It is intentionally inert unless explicitly enabled by WorkerConfig and blocked for production runtime config in riverqueue.New.
func (AttemptFailpoints) Enabled ¶
func (f AttemptFailpoints) Enabled(name string) bool
type AttemptJobArgs ¶
type AttemptJobArgs struct {
AttemptID string `json:"attempt_id"`
}
AttemptJobArgs carries attempt-scoped signing work. River jobs are keyed by attempt + phase so regeneration never deduplicates work across attempts.
type CleanupProviderAttemptArgs ¶
type CleanupProviderAttemptArgs AttemptJobArgs
func (CleanupProviderAttemptArgs) InsertOpts ¶
func (a CleanupProviderAttemptArgs) InsertOpts() river.InsertOpts
func (CleanupProviderAttemptArgs) Kind ¶
func (CleanupProviderAttemptArgs) Kind() string
type CleanupProviderAttemptWorker ¶
type CleanupProviderAttemptWorker struct {
river.WorkerDefaults[CleanupProviderAttemptArgs]
// contains filtered or unexported fields
}
func (*CleanupProviderAttemptWorker) Timeout ¶
func (w *CleanupProviderAttemptWorker) Timeout(_ *river.Job[CleanupProviderAttemptArgs]) time.Duration
func (*CleanupProviderAttemptWorker) Work ¶
func (w *CleanupProviderAttemptWorker) Work(ctx context.Context, job *river.Job[CleanupProviderAttemptArgs]) error
type Dependencies ¶
type Dependencies struct {
DocumentRepo port.DocumentRepository
RecipientRepo port.DocumentRecipientRepository
AttemptRepo port.SigningAttemptRepository
VersionRepo port.TemplateVersionRepository
SignerRoleRepo port.TemplateVersionSignerRoleRepository
FieldResponseRepo port.DocumentFieldResponseRepository
PDFRenderer port.PDFRenderer
SigningProvider port.SigningProvider
StorageAdapter port.StorageAdapter
StorageEnabled bool
CompletionHandler port.DocumentCompletedHandler
}
type DispatchAttemptCompletionArgs ¶
type DispatchAttemptCompletionArgs AttemptJobArgs
func (DispatchAttemptCompletionArgs) InsertOpts ¶
func (a DispatchAttemptCompletionArgs) InsertOpts() river.InsertOpts
func (DispatchAttemptCompletionArgs) Kind ¶
func (DispatchAttemptCompletionArgs) Kind() string
type DispatchAttemptCompletionWorker ¶
type DispatchAttemptCompletionWorker struct {
river.WorkerDefaults[DispatchAttemptCompletionArgs]
// contains filtered or unexported fields
}
func (*DispatchAttemptCompletionWorker) Timeout ¶
func (w *DispatchAttemptCompletionWorker) Timeout(_ *river.Job[DispatchAttemptCompletionArgs]) time.Duration
func (*DispatchAttemptCompletionWorker) Work ¶
func (w *DispatchAttemptCompletionWorker) Work(ctx context.Context, job *river.Job[DispatchAttemptCompletionArgs]) error
type ReconcileProviderSubmissionArgs ¶
type ReconcileProviderSubmissionArgs AttemptJobArgs
func (ReconcileProviderSubmissionArgs) InsertOpts ¶
func (a ReconcileProviderSubmissionArgs) InsertOpts() river.InsertOpts
func (ReconcileProviderSubmissionArgs) Kind ¶
func (ReconcileProviderSubmissionArgs) Kind() string
type ReconcileProviderSubmissionWorker ¶
type ReconcileProviderSubmissionWorker struct {
river.WorkerDefaults[ReconcileProviderSubmissionArgs]
// contains filtered or unexported fields
}
func (*ReconcileProviderSubmissionWorker) Timeout ¶
func (w *ReconcileProviderSubmissionWorker) Timeout(_ *river.Job[ReconcileProviderSubmissionArgs]) time.Duration
func (*ReconcileProviderSubmissionWorker) Work ¶
func (w *ReconcileProviderSubmissionWorker) Work(ctx context.Context, job *river.Job[ReconcileProviderSubmissionArgs]) error
type RefreshAttemptProviderStatusArgs ¶
type RefreshAttemptProviderStatusArgs AttemptJobArgs
func (RefreshAttemptProviderStatusArgs) InsertOpts ¶
func (a RefreshAttemptProviderStatusArgs) InsertOpts() river.InsertOpts
func (RefreshAttemptProviderStatusArgs) Kind ¶
func (RefreshAttemptProviderStatusArgs) Kind() string
type RefreshAttemptProviderStatusWorker ¶
type RefreshAttemptProviderStatusWorker struct {
river.WorkerDefaults[RefreshAttemptProviderStatusArgs]
// contains filtered or unexported fields
}
func (*RefreshAttemptProviderStatusWorker) Timeout ¶
func (w *RefreshAttemptProviderStatusWorker) Timeout(_ *river.Job[RefreshAttemptProviderStatusArgs]) time.Duration
func (*RefreshAttemptProviderStatusWorker) Work ¶
func (w *RefreshAttemptProviderStatusWorker) Work(ctx context.Context, job *river.Job[RefreshAttemptProviderStatusArgs]) error
type RenderAttemptPDFArgs ¶
type RenderAttemptPDFArgs AttemptJobArgs
func (RenderAttemptPDFArgs) InsertOpts ¶
func (a RenderAttemptPDFArgs) InsertOpts() river.InsertOpts
func (RenderAttemptPDFArgs) Kind ¶
func (RenderAttemptPDFArgs) Kind() string
type RenderAttemptPDFWorker ¶
type RenderAttemptPDFWorker struct {
river.WorkerDefaults[RenderAttemptPDFArgs]
// contains filtered or unexported fields
}
func (*RenderAttemptPDFWorker) Timeout ¶
func (w *RenderAttemptPDFWorker) Timeout(_ *river.Job[RenderAttemptPDFArgs]) time.Duration
func (*RenderAttemptPDFWorker) Work ¶
func (w *RenderAttemptPDFWorker) Work(ctx context.Context, job *river.Job[RenderAttemptPDFArgs]) error
type RiverService ¶
type RiverService struct {
// contains filtered or unexported fields
}
RiverService manages the River client lifecycle and exposes signing attempt UoW.
func New ¶
func New(ctx context.Context, pool *pgxpool.Pool, cfg config.WorkerConfig, deps Dependencies) (*RiverService, error)
New creates a RiverService. When cfg.Enabled is false the client operates in insert-only mode: jobs can be transactionally enqueued, but not processed.
func (*RiverService) SigningExecutionUOW ¶
func (r *RiverService) SigningExecutionUOW() port.SigningExecutionUnitOfWork
type SigningAttemptExecutor ¶
type SigningAttemptExecutor struct {
// contains filtered or unexported fields
}
SigningAttemptExecutor contains the attempt-aware worker implementation.
func NewSigningAttemptExecutor ¶
func NewSigningAttemptExecutor(cfg SigningAttemptExecutorConfig) *SigningAttemptExecutor
func (*SigningAttemptExecutor) CleanupProviderAttempt ¶
func (e *SigningAttemptExecutor) CleanupProviderAttempt(ctx context.Context, attemptID string) error
func (*SigningAttemptExecutor) DispatchAttemptCompletion ¶
func (e *SigningAttemptExecutor) DispatchAttemptCompletion(ctx context.Context, attemptID string) error
func (*SigningAttemptExecutor) ReconcileProviderSubmission ¶
func (e *SigningAttemptExecutor) ReconcileProviderSubmission(ctx context.Context, attemptID string) error
func (*SigningAttemptExecutor) RefreshAttemptProviderStatus ¶
func (e *SigningAttemptExecutor) RefreshAttemptProviderStatus(ctx context.Context, attemptID string) error
func (*SigningAttemptExecutor) RenderAttemptPDF ¶
func (e *SigningAttemptExecutor) RenderAttemptPDF(ctx context.Context, attemptID string) error
func (*SigningAttemptExecutor) SubmitAttemptToProvider ¶
func (e *SigningAttemptExecutor) SubmitAttemptToProvider(ctx context.Context, attemptID string) error
type SigningAttemptExecutorConfig ¶
type SigningAttemptExecutorConfig struct {
Pool *pgxpool.Pool
Client *river.Client[pgx.Tx]
DocumentRepo port.DocumentRepository
RecipientRepo port.DocumentRecipientRepository
AttemptRepo port.SigningAttemptRepository
VersionRepo port.TemplateVersionRepository
SignerRoleRepo port.TemplateVersionSignerRoleRepository
FieldResponseRepo port.DocumentFieldResponseRepository
PDFRenderer port.PDFRenderer
SigningProvider port.SigningProvider
StorageAdapter port.StorageAdapter
StorageEnabled bool
CompletionHandler port.DocumentCompletedHandler
Failpoints AttemptFailpoints
}
type SigningExecutionUnitOfWork ¶
type SigningExecutionUnitOfWork struct {
// contains filtered or unexported fields
}
SigningExecutionUnitOfWork persists signing attempt transitions and River jobs in the same PostgreSQL transaction.
func NewSigningExecutionUnitOfWork ¶
func NewSigningExecutionUnitOfWork( pool *pgxpool.Pool, client *river.Client[pgx.Tx], attemptRepo port.SigningAttemptRepository, ) *SigningExecutionUnitOfWork
func (*SigningExecutionUnitOfWork) CreateAttemptAndEnqueueRender ¶
func (u *SigningExecutionUnitOfWork) CreateAttemptAndEnqueueRender( ctx context.Context, documentID string, recipients []*entity.DocumentRecipient, signerOrders map[string]int, ) (*entity.SigningAttempt, error)
func (*SigningExecutionUnitOfWork) SupersedeActiveAndCreateAttempt ¶
func (u *SigningExecutionUnitOfWork) SupersedeActiveAndCreateAttempt( ctx context.Context, documentID, expectedOldAttemptID, reason string, recipients []*entity.DocumentRecipient, signerOrders map[string]int, ) (*entity.SigningAttempt, error)
func (*SigningExecutionUnitOfWork) TerminateActiveAttempt ¶
func (u *SigningExecutionUnitOfWork) TerminateActiveAttempt( ctx context.Context, attempt *entity.SigningAttempt, status entity.SigningAttemptStatus, reason, eventType string, ) error
func (*SigningExecutionUnitOfWork) Transition ¶
func (u *SigningExecutionUnitOfWork) Transition(ctx context.Context, attempt *entity.SigningAttempt, eventType string) error
func (*SigningExecutionUnitOfWork) TransitionAndEnqueue ¶
func (u *SigningExecutionUnitOfWork) TransitionAndEnqueue(ctx context.Context, attempt *entity.SigningAttempt, nextPhase port.SigningJobPhase, eventType string) error
type SubmitAttemptToProviderArgs ¶
type SubmitAttemptToProviderArgs AttemptJobArgs
func (SubmitAttemptToProviderArgs) InsertOpts ¶
func (a SubmitAttemptToProviderArgs) InsertOpts() river.InsertOpts
func (SubmitAttemptToProviderArgs) Kind ¶
func (SubmitAttemptToProviderArgs) Kind() string
type SubmitAttemptToProviderWorker ¶
type SubmitAttemptToProviderWorker struct {
river.WorkerDefaults[SubmitAttemptToProviderArgs]
// contains filtered or unexported fields
}
func (*SubmitAttemptToProviderWorker) Timeout ¶
func (w *SubmitAttemptToProviderWorker) Timeout(_ *river.Job[SubmitAttemptToProviderArgs]) time.Duration
func (*SubmitAttemptToProviderWorker) Work ¶
func (w *SubmitAttemptToProviderWorker) Work(ctx context.Context, job *river.Job[SubmitAttemptToProviderArgs]) error