worker

package
v0.16.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 7, 2026 License: AGPL-3.0 Imports: 42 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JobTypeSendEmail        = "send_email"
	JobTypeSendEmailFrom    = "send_email_from"
	JobTypeSendSlackChannel = "send_channel"
	JobTypeSendSlackDM      = "send_dm"
	JobTypeSendGlobalDigest = "send_global_digest"
)

Job types for email processing

View Source
const (
	// Scanner job kinds — one per periodic trigger.
	JobTypePoamDeadlineReminderScanner    = "poam_deadline_reminder_scanner"
	JobTypePoamOverdueTransitionScanner   = "poam_overdue_transition_scanner"
	JobTypeMilestoneOverdueScannerScanner = "poam_milestone_overdue_scanner"

	// Notification / action job kinds — one per item per recipient.
	JobTypePoamDeadlineReminder     = "poam_deadline_reminder"
	JobTypePoamOverdueNotification  = "poam_overdue_notification"
	JobTypeMilestoneOverdueReminder = "poam_milestone_overdue_reminder"

	// Digest job kinds.
	JobTypePoamOpenDigestScheduler = "poam_open_digest_scheduler"
	JobTypePoamOpenDigest          = "poam_open_digest"
)
View Source
const (
	JobTypeRiskReviewDeadlineReminderScanner  = "risk_review_deadline_reminder_scanner"
	JobTypeRiskReviewOverdueEscalationScanner = "risk_review_overdue_escalation_scanner"
	JobTypeRiskStaleRiskScanner               = "risk_stale_risk_scanner"
	JobTypeRiskEvidenceReconciliationScanner  = "risk_evidence_reconciliation_scanner"
	JobTypeRiskOpenDigestScheduler            = "risk_open_digest_scheduler"

	JobTypeRiskReviewDueReminder       = "risk_review_due_reminder"
	JobTypeRiskReviewOverdueEscalation = "risk_review_overdue_escalation"
	JobTypeRiskStaleOpenReminder       = "risk_stale_open_reminder"
	JobTypeRiskReconcileDuplicates     = "risk_reconcile_duplicates"
	JobTypeRiskReviewOverdueReopen     = "risk_review_overdue_reopen"
	JobTypeRiskOpenDigest              = "risk_open_digest"
	JobTypeRiskOrphanedCleanup         = "risk_orphaned_cleanup"
)
View Source
const (
	JobTypeWorkflowTaskAssigned    = "workflow_task_assigned"
	JobTypeWorkflowTaskDueSoon     = "workflow_task_due_soon"
	JobTypeWorkflowTaskDigest      = "workflow_task_digest"
	JobTypeWorkflowExecutionFailed = "workflow_execution_failed"
)

Job types for workflow notifications.

View Source
const (
	JobTypeRiskProcessEvidence = "risk_process_evidence"
)

Job types for risk processing

Variables

This section is empty.

Functions

func JobInsertOptionsForPoamDigest added in v0.15.0

func JobInsertOptionsForPoamDigest(byPeriod time.Duration) *river.InsertOpts

JobInsertOptionsForPoamDigest returns insert options for POAM digest jobs on the "digest" queue. Idempotency is enforced via ByArgs + ByPeriod.

func JobInsertOptionsForPoamNotification added in v0.15.0

func JobInsertOptionsForPoamNotification(byPeriod time.Duration) *river.InsertOpts

JobInsertOptionsForPoamNotification returns insert options for POAM notification email jobs with 24-hour idempotency window (daily scanners).

func JobInsertOptionsForPoamWorker added in v0.15.0

func JobInsertOptionsForPoamWorker(byPeriod time.Duration) *river.InsertOpts

JobInsertOptionsForPoamWorker returns insert options for POAM scanner/worker jobs on the "poam" queue with idempotency.

func JobInsertOptionsForRiskDigest added in v0.14.0

func JobInsertOptionsForRiskDigest(byPeriod time.Duration) *river.InsertOpts

func JobInsertOptionsForRiskNotification added in v0.13.0

func JobInsertOptionsForRiskNotification(byPeriod time.Duration) *river.InsertOpts

func JobInsertOptionsForRiskOrphanedCleanup added in v0.15.0

func JobInsertOptionsForRiskOrphanedCleanup() *river.InsertOpts

JobInsertOptionsForRiskOrphanedCleanup returns insert options for the orphaned risk cleanup job. ByArgs deduplication uses the river:"unique" fields on RiskOrphanedCleanupArgs, so active jobs are unique by (ssp_id, old_profile_id, new_profile_id). Repeated equivalent changes are collapsed while an equivalent job is active; changes involving different profiles can enqueue independent jobs.

ByState is explicitly set to exclude JobStateCompleted and JobStateCancelled so that a second profile change to the same target profile re-inserts a fresh cleanup job even if the previous one completed but has not yet been removed by River's job-cleaner maintenance process.

func JobInsertOptionsForRiskProcessEvidence added in v0.13.0

func JobInsertOptionsForRiskProcessEvidence() *river.InsertOpts

JobInsertOptionsForRiskProcessEvidence returns insert options for risk processing jobs

func JobInsertOptionsForRiskWorkerUnique added in v0.13.0

func JobInsertOptionsForRiskWorkerUnique(byPeriod time.Duration) *river.InsertOpts

func JobInsertOptionsForWorkflowNotification added in v0.12.0

func JobInsertOptionsForWorkflowNotification() *river.InsertOpts

JobInsertOptionsForWorkflowNotification returns insert options for workflow notification jobs.

func JobInsertOptionsForWorkflowTaskAssignedNotification added in v0.12.0

func JobInsertOptionsForWorkflowTaskAssignedNotification() *river.InsertOpts

JobInsertOptionsForWorkflowTaskAssignedNotification returns insert options for workflow task assignment jobs.

func JobInsertOptionsWithRetry

func JobInsertOptionsWithRetry(queue string, maxAttempts int) *river.InsertOpts

JobInsertOptionsWithRetry returns insert options for jobs with custom retry policy

func NewDigestPeriodicJob

func NewDigestPeriodicJob(cronSchedule string, logger *zap.SugaredLogger) *river.PeriodicJob

NewDigestPeriodicJob creates a periodic job for digest scheduling

func NewDueSoonCheckerPeriodicJob added in v0.12.0

func NewDueSoonCheckerPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob

func NewMilestoneOverduePeriodicJob added in v0.15.0

func NewMilestoneOverduePeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob

NewMilestoneOverduePeriodicJob creates the River PeriodicJob for the weekly incomplete milestone scanner. Default cron: "0 0 10 * * 1" (Monday 10:00 UTC).

func NewPoamDeadlineReminderPeriodicJob added in v0.15.0

func NewPoamDeadlineReminderPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob

NewPoamDeadlineReminderPeriodicJob creates the River PeriodicJob for the daily POAM deadline reminder scanner. Default cron: "0 0 8 * * *" (08:00 UTC).

func NewPoamOpenDigestPeriodicJob added in v0.15.0

func NewPoamOpenDigestPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob

NewPoamOpenDigestPeriodicJob creates the River PeriodicJob for the daily POAM open digest scheduler. Default cron: "0 0 7 * * *" (07:00 UTC).

func NewPoamOverdueTransitionPeriodicJob added in v0.15.0

func NewPoamOverdueTransitionPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob

NewPoamOverdueTransitionPeriodicJob creates the River PeriodicJob for the daily POAM overdue transition scanner. Default cron: "0 0 9 * * *" (09:00 UTC).

func NewRiskEvidenceReconciliationPeriodicJob added in v0.13.0

func NewRiskEvidenceReconciliationPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob

func NewRiskOpenDigestPeriodicJob added in v0.14.0

func NewRiskOpenDigestPeriodicJob(schedule string, windowKind string, logger *zap.SugaredLogger) *river.PeriodicJob

func NewRiskReviewDeadlineReminderPeriodicJob added in v0.13.0

func NewRiskReviewDeadlineReminderPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob

func NewRiskReviewOverdueEscalationPeriodicJob added in v0.13.0

func NewRiskReviewOverdueEscalationPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob

func NewRiskStaleScannerPeriodicJob added in v0.13.0

func NewRiskStaleScannerPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob

func NewWorkflowSchedulerPeriodicJob added in v0.12.0

func NewWorkflowSchedulerPeriodicJob(cronSchedule string, logger *zap.SugaredLogger) *river.PeriodicJob

func NewWorkflowTaskDigestPeriodicJob added in v0.12.0

func NewWorkflowTaskDigestPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob

func Workers

func Workers(
	emailService EmailService,
	digestService DigestService,
	slackService SlackService,
	userRepo UserRepository,
	db *gorm.DB,
	webBaseURL string,
	notificationWorkerEnqueuer notification.WorkerEnqueuer,
	logger *zap.SugaredLogger,
) *river.Workers

Workers returns all workers as work functions with dependencies injected

Types

type DigestService

type DigestService interface {
	SendGlobalDigest(ctx context.Context) error
}

DigestService interface for dependency injection

type DigestTask added in v0.12.0

type DigestTask struct {
	StepTitle             string
	WorkflowTitle         string
	WorkflowInstanceTitle string
	DueDate               *string
	StepURL               string
}

DigestTask represents a single task entry in the digest email

type DueSoonCheckerArgs added in v0.12.0

type DueSoonCheckerArgs struct{}

DueSoonCheckerArgs represents the arguments for the periodic due-soon checker job

func (DueSoonCheckerArgs) Kind added in v0.12.0

func (DueSoonCheckerArgs) Kind() string

Kind returns the job kind for River

func (DueSoonCheckerArgs) Timeout added in v0.12.0

func (DueSoonCheckerArgs) Timeout() time.Duration

Timeout returns the timeout for the due-soon checker job

type DueSoonCheckerWorker added in v0.12.0

type DueSoonCheckerWorker struct {
	// contains filtered or unexported fields
}

DueSoonCheckerWorker scans for step executions due in a week and dispatches reminder notifications.

func NewDueSoonCheckerWorker added in v0.12.0

func NewDueSoonCheckerWorker(
	db *gorm.DB,
	webBaseURL string,
	notificationRuntime notification.RuntimeProvider,
	logger *zap.SugaredLogger,
) *DueSoonCheckerWorker

NewDueSoonCheckerWorker creates a new DueSoonCheckerWorker with an injected runtime provider.

func (*DueSoonCheckerWorker) Work added in v0.12.0

Work scans for step executions due in ~1 week and dispatches task-available notifications.

type EmailService

type EmailService interface {
	Send(ctx context.Context, message *types.Message) (*types.SendResult, error)
	SendWithProvider(ctx context.Context, providerName string, message *types.Message) (*types.SendResult, error)
	UseTemplate(templateName string, data map[string]interface{}) (htmlContent, textContent string, err error)
	GetDefaultFromAddress() string
}

EmailService interface for dependency injection

type GORMUserRepository added in v0.12.0

type GORMUserRepository struct {
	// contains filtered or unexported fields
}

GORMUserRepository implements UserRepository using GORM

func NewGORMUserRepository added in v0.12.0

func NewGORMUserRepository(db *gorm.DB) *GORMUserRepository

NewGORMUserRepository creates a new GORMUserRepository

func (*GORMUserRepository) FindUserByID added in v0.12.0

func (r *GORMUserRepository) FindUserByID(ctx context.Context, userID string) (NotificationUser, error)

FindUserByID looks up a user by UUID string and returns a NotificationUser

type MilestoneOverdueReminderArgs added in v0.15.0

type MilestoneOverdueReminderArgs struct {
	MilestoneID     uuid.UUID `json:"milestone_id"`
	PoamItemID      uuid.UUID `json:"poam_item_id"`
	RecipientUserID uuid.UUID `json:"recipient_user_id"`
	MilestoneTitle  string    `json:"milestone_title"`
	PoamTitle       string    `json:"poam_title"`
	SspID           uuid.UUID `json:"ssp_id"`
	SspDisplayName  string    `json:"ssp_display_name"`
	DueDate         string    `json:"due_date"` // RFC3339
	PoamURL         string    `json:"poam_url"`
	WeeklyBucket    string    `json:"weekly_bucket"` // e.g. "2026-W14"
}

MilestoneOverdueReminderArgs carries the data needed to send a single incomplete milestone overdue reminder notification to one recipient. Idempotency key: MilestoneID + DueDate + WeeklyBucket (ByArgs + ByPeriod 7 days).

func (MilestoneOverdueReminderArgs) Kind added in v0.15.0

func (MilestoneOverdueReminderArgs) Timeout added in v0.15.0

type MilestoneOverdueReminderWorker added in v0.15.0

type MilestoneOverdueReminderWorker struct {
	// contains filtered or unexported fields
}

MilestoneOverdueReminderWorker sends a single incomplete milestone overdue reminder to one recipient.

func NewMilestoneOverdueReminderWorker added in v0.15.0

func NewMilestoneOverdueReminderWorker(
	userRepo UserRepository,
	webBaseURL string,
	notificationServiceFactory *PoamNotificationServiceFactory,
	logger *zap.SugaredLogger,
) *MilestoneOverdueReminderWorker

NewMilestoneOverdueReminderWorker constructs a MilestoneOverdueReminderWorker.

func (*MilestoneOverdueReminderWorker) Work added in v0.15.0

Work sends the milestone overdue reminder for a single milestone × recipient.

type MilestoneOverdueScannerArgs added in v0.15.0

type MilestoneOverdueScannerArgs struct{}

MilestoneOverdueScannerArgs is the args type for the weekly incomplete milestone scanner job (cron: 0 0 10 * * 1, i.e. Monday 10:00 UTC). The scanner queries for milestones in "planned" status whose due_date has passed and whose parent POAM item is not completed, then enqueues a MilestoneOverdueReminderArgs job per milestone per recipient.

func (MilestoneOverdueScannerArgs) Kind added in v0.15.0

func (MilestoneOverdueScannerArgs) Timeout added in v0.15.0

type MilestoneOverdueScannerWorker added in v0.15.0

type MilestoneOverdueScannerWorker struct {
	// contains filtered or unexported fields
}

MilestoneOverdueScannerWorker scans weekly for POAM item milestones that are in "open" or "in-progress" status, whose planned_completion_date has passed, and whose parent POAM item is not yet completed. For each such milestone it enqueues a MilestoneOverdueReminderArgs job per milestone per recipient.

func NewMilestoneOverdueScannerWorker added in v0.15.0

func NewMilestoneOverdueScannerWorker(
	db *gorm.DB,
	client workflow.RiverClient,
	userRepo UserRepository,
	webBaseURL string,
	logger *zap.SugaredLogger,
) *MilestoneOverdueScannerWorker

NewMilestoneOverdueScannerWorker constructs a MilestoneOverdueScannerWorker.

func (*MilestoneOverdueScannerWorker) Work added in v0.15.0

Work queries for overdue milestones on non-completed POAM items and enqueues per-milestone per-recipient reminder jobs.

type NotificationSubscription added in v0.15.0

type NotificationSubscription struct {
	NotificationType string
	Channels         []string
}

NotificationSubscription holds worker-facing notification subscription data.

type NotificationUser added in v0.12.0

type NotificationUser struct {
	ID                        string
	Email                     string
	FirstName                 string
	LastName                  string
	SlackUserID               string
	NotificationSubscriptions []NotificationSubscription
}

NotificationUser holds the user fields needed for sending notification emails

func (NotificationUser) FullName added in v0.12.0

func (u NotificationUser) FullName() string

func (NotificationUser) NotificationChannels added in v0.15.0

func (u NotificationUser) NotificationChannels(subscriptionGate string) []string

type PoamDeadlineReminderArgs added in v0.15.0

type PoamDeadlineReminderArgs struct {
	PoamItemID           uuid.UUID `json:"poam_item_id"`
	RecipientUserID      uuid.UUID `json:"recipient_user_id"`
	PoamTitle            string    `json:"poam_title"`
	SspID                uuid.UUID `json:"ssp_id"`
	SspDisplayName       string    `json:"ssp_display_name"`
	CurrentStatus        string    `json:"current_status"`
	Deadline             string    `json:"deadline"` // RFC3339
	MilestoneCount       int       `json:"milestone_count"`
	PoamURL              string    `json:"poam_url"`
	ReminderWindowBucket string    `json:"reminder_window_bucket"` // e.g. "2026-03-31"
}

PoamDeadlineReminderArgs carries the data needed to send a single POAM deadline approaching reminder notification to one recipient. Idempotency key: PoamItemID + Deadline + ReminderWindowBucket (ByArgs + ByPeriod 24h).

func (PoamDeadlineReminderArgs) Kind added in v0.15.0

func (PoamDeadlineReminderArgs) Timeout added in v0.15.0

type PoamDeadlineReminderScannerArgs added in v0.15.0

type PoamDeadlineReminderScannerArgs struct{}

PoamDeadlineReminderScannerArgs is the args type for the daily POAM deadline reminder scanner job (cron: 0 0 8 * * *, i.e. 08:00 UTC daily). The scanner queries for open/in-progress POAM items whose deadline falls within the configured reminder window and enqueues a PoamDeadlineReminderArgs job per item per recipient.

func (PoamDeadlineReminderScannerArgs) Kind added in v0.15.0

func (PoamDeadlineReminderScannerArgs) Timeout added in v0.15.0

type PoamDeadlineReminderScannerWorker added in v0.15.0

type PoamDeadlineReminderScannerWorker struct {
	// contains filtered or unexported fields
}

PoamDeadlineReminderScannerWorker scans daily for POAM items whose deadline is approaching within the configured window and enqueues per-item per-recipient reminder jobs. Mirrors the Risk Review Deadline Reminder scanner pattern.

func NewPoamDeadlineReminderScannerWorker added in v0.15.0

func NewPoamDeadlineReminderScannerWorker(
	db *gorm.DB,
	client workflow.RiverClient,
	userRepo UserRepository,
	webBaseURL string,
	reminderWindow time.Duration,
	logger *zap.SugaredLogger,
) *PoamDeadlineReminderScannerWorker

NewPoamDeadlineReminderScannerWorker constructs a PoamDeadlineReminderScannerWorker. reminderWindow is the look-ahead horizon; items with deadline within this duration of now will receive a reminder. Pass a positive value (e.g. 30 * 24 * time.Hour).

func (*PoamDeadlineReminderScannerWorker) Work added in v0.15.0

Work queries for POAM items with deadlines within the configured reminder window and enqueues PoamDeadlineReminderArgs jobs for each item × recipient pair.

type PoamDeadlineReminderWorker added in v0.15.0

type PoamDeadlineReminderWorker struct {
	// contains filtered or unexported fields
}

PoamDeadlineReminderWorker sends a single POAM deadline approaching reminder notification to one recipient.

func NewPoamDeadlineReminderWorker added in v0.15.0

func NewPoamDeadlineReminderWorker(
	userRepo UserRepository,
	webBaseURL string,
	notificationServiceFactory *PoamNotificationServiceFactory,
	logger *zap.SugaredLogger,
) *PoamDeadlineReminderWorker

NewPoamDeadlineReminderWorker constructs a PoamDeadlineReminderWorker.

func (*PoamDeadlineReminderWorker) Work added in v0.15.0

Work sends the POAM deadline reminder notification for a single item × recipient.

type PoamDigestEmailItem added in v0.15.0

type PoamDigestEmailItem struct {
	PoamItemID     uuid.UUID // used for test assertions; not rendered in template
	Title          string
	SSPName        string
	Status         string
	Deadline       string // formatted date or "—"
	POCName        string
	MilestoneCount int
	PoamURL        string
}

PoamDigestEmailItem represents a single POAM item row in the digest email. PoamItemID is carried for test assertions; it is not rendered in the template.

type PoamMilestoneDigestEmailItem added in v0.15.0

type PoamMilestoneDigestEmailItem struct {
	MilestoneTitle string
	PoamTitle      string
	SSPName        string
	DueDate        string
	PoamURL        string
}

PoamMilestoneDigestEmailItem represents a single milestone row in the digest.

type PoamNotificationServiceFactory added in v0.16.0

type PoamNotificationServiceFactory struct {
	// contains filtered or unexported fields
}

func NewPoamNotificationServiceFactory added in v0.16.0

func NewPoamNotificationServiceFactory(
	notificationRuntime notification.RuntimeProvider,
) *PoamNotificationServiceFactory

func (*PoamNotificationServiceFactory) New added in v0.16.0

type PoamOpenDigestArgs added in v0.15.0

type PoamOpenDigestArgs struct {
	RecipientUserID uuid.UUID `json:"recipient_user_id"`
	WindowStart     string    `json:"window_start"` // RFC3339
	WindowEnd       string    `json:"window_end"`   // RFC3339
	WindowKind      string    `json:"window_kind"`  // "daily" | "weekly"
}

PoamOpenDigestArgs carries the data needed to build and send the grouped POAM digest email for a single recipient. Idempotency key: RecipientUserID + WindowStart + WindowEnd (ByArgs + ByPeriod).

func (PoamOpenDigestArgs) Kind added in v0.15.0

func (PoamOpenDigestArgs) Kind() string

func (PoamOpenDigestArgs) Timeout added in v0.15.0

func (PoamOpenDigestArgs) Timeout() time.Duration

type PoamOpenDigestSchedulerArgs added in v0.15.0

type PoamOpenDigestSchedulerArgs struct{}

PoamOpenDigestSchedulerArgs is the args type for the periodic POAM digest scheduler job. It has no payload — the scheduler resolves recipients from DB.

func (PoamOpenDigestSchedulerArgs) Kind added in v0.15.0

func (PoamOpenDigestSchedulerArgs) Timeout added in v0.15.0

type PoamOpenDigestSchedulerWorker added in v0.15.0

type PoamOpenDigestSchedulerWorker struct {
	// contains filtered or unexported fields
}

PoamOpenDigestSchedulerWorker is the periodic River job that resolves all POAM digest recipients and enqueues one PoamOpenDigestArgs job per recipient.

func NewPoamOpenDigestSchedulerWorker added in v0.15.0

func NewPoamOpenDigestSchedulerWorker(
	db *gorm.DB,
	client workflow.RiverClient,
	windowKind string,
	logger *zap.SugaredLogger,
) *PoamOpenDigestSchedulerWorker

func (*PoamOpenDigestSchedulerWorker) Work added in v0.15.0

type PoamOpenDigestWorker added in v0.15.0

type PoamOpenDigestWorker struct {
	// contains filtered or unexported fields
}

PoamOpenDigestWorker builds and dispatches the grouped POAM digest notification for a single recipient.

func NewPoamOpenDigestWorker added in v0.15.0

func NewPoamOpenDigestWorker(
	db *gorm.DB,
	userRepo UserRepository,
	webBaseURL string,
	notificationServiceFactory *PoamNotificationServiceFactory,
	logger *zap.SugaredLogger,
) *PoamOpenDigestWorker

func (*PoamOpenDigestWorker) Work added in v0.15.0

type PoamOverdueNotificationArgs added in v0.15.0

type PoamOverdueNotificationArgs struct {
	PoamItemID      uuid.UUID `json:"poam_item_id"`
	RecipientUserID uuid.UUID `json:"recipient_user_id"`
	PoamTitle       string    `json:"poam_title"`
	SspID           uuid.UUID `json:"ssp_id"`
	SspDisplayName  string    `json:"ssp_display_name"`
	Deadline        string    `json:"deadline"` // RFC3339
	PoamURL         string    `json:"poam_url"`
	OverdueWindow   string    `json:"overdue_window"` // e.g. "2026-03-31"
}

PoamOverdueNotificationArgs carries the data needed to send a single POAM overdue notification to one recipient. Idempotency key: PoamItemID + Deadline + OverdueWindow (ByArgs + ByPeriod 24h).

func (PoamOverdueNotificationArgs) Kind added in v0.15.0

func (PoamOverdueNotificationArgs) Timeout added in v0.15.0

type PoamOverdueNotificationWorker added in v0.15.0

type PoamOverdueNotificationWorker struct {
	// contains filtered or unexported fields
}

PoamOverdueNotificationWorker sends a single POAM overdue notification to one recipient.

func NewPoamOverdueNotificationWorker added in v0.15.0

func NewPoamOverdueNotificationWorker(
	userRepo UserRepository,
	webBaseURL string,
	notificationServiceFactory *PoamNotificationServiceFactory,
	logger *zap.SugaredLogger,
) *PoamOverdueNotificationWorker

NewPoamOverdueNotificationWorker constructs a PoamOverdueNotificationWorker.

func (*PoamOverdueNotificationWorker) Work added in v0.15.0

Work sends the POAM overdue notification for a single item × recipient.

type PoamOverdueTransitionScannerArgs added in v0.15.0

type PoamOverdueTransitionScannerArgs struct{}

PoamOverdueTransitionScannerArgs is the args type for the daily POAM overdue transition scanner job (cron: 0 0 9 * * *, i.e. 09:00 UTC daily). The scanner queries for open/in-progress POAM items whose deadline has already passed, transitions their status to "overdue" in the DB, and enqueues a PoamOverdueNotificationArgs job per item per recipient.

func (PoamOverdueTransitionScannerArgs) Kind added in v0.15.0

func (PoamOverdueTransitionScannerArgs) Timeout added in v0.15.0

type PoamOverdueTransitionScannerWorker added in v0.15.0

type PoamOverdueTransitionScannerWorker struct {
	// contains filtered or unexported fields
}

PoamOverdueTransitionScannerWorker scans daily for POAM items whose deadline has passed and whose status is still open or in-progress. For each such item it:

  1. Transitions the status to "overdue" in the database (with updated_at).
  2. Enqueues a PoamOverdueNotificationArgs job per item per recipient.

"overdue" is a terminal state that can only be reversed by a manual PUT /poam-items/:id status update.

func NewPoamOverdueTransitionScannerWorker added in v0.15.0

func NewPoamOverdueTransitionScannerWorker(
	db *gorm.DB,
	client workflow.RiverClient,
	userRepo UserRepository,
	webBaseURL string,
	logger *zap.SugaredLogger,
) *PoamOverdueTransitionScannerWorker

NewPoamOverdueTransitionScannerWorker constructs a PoamOverdueTransitionScannerWorker.

func (*PoamOverdueTransitionScannerWorker) Work added in v0.15.0

Work queries for overdue POAM items, transitions their status, and enqueues notification jobs.

type ProfileControlResolver added in v0.15.0

type ProfileControlResolver interface {
	ResolveProfileControlKeys(ctx context.Context, profileID uuid.UUID) ([]riskrel.ControlKey, error)
}

ProfileControlResolver resolves the full set of (catalogID, controlID) pairs for a profile. Defined as an interface to avoid a circular import between the worker and oscal handler packages. The implementation is injected at wire-up time in cmd/run.go using a closure over oscalhandler.FindFullProfile and oscalhandler.GetControlIDsMapFromProfile.

type RiskDigestEmailItem added in v0.14.0

type RiskDigestEmailItem struct {
	Title          string
	SSPName        string
	Status         string
	Severity       string
	OwnerName      string
	ReviewDeadline string
	RiskURL        string
}

type RiskEvidenceReconciliationScannerArgs added in v0.13.0

type RiskEvidenceReconciliationScannerArgs struct{}

func (RiskEvidenceReconciliationScannerArgs) Kind added in v0.13.0

func (RiskEvidenceReconciliationScannerArgs) Timeout added in v0.13.0

type RiskEvidenceReconciliationScannerWorker added in v0.13.0

type RiskEvidenceReconciliationScannerWorker struct {
	// contains filtered or unexported fields
}

func NewRiskEvidenceReconciliationScannerWorker added in v0.13.0

func NewRiskEvidenceReconciliationScannerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *RiskEvidenceReconciliationScannerWorker

func (*RiskEvidenceReconciliationScannerWorker) Work added in v0.13.0

type RiskEvidenceWorker added in v0.13.0

type RiskEvidenceWorker struct {
	// contains filtered or unexported fields
}

RiskEvidenceWorker handles processing evidence and creating risks

func NewRiskEvidenceWorker added in v0.13.0

func NewRiskEvidenceWorker(db *gorm.DB, logger *zap.SugaredLogger) *RiskEvidenceWorker

NewRiskEvidenceWorker creates a new RiskEvidenceWorker

func (*RiskEvidenceWorker) Work added in v0.13.0

Work is the River work function for processing evidence and creating risks

type RiskNotificationServiceFactory added in v0.16.0

type RiskNotificationServiceFactory struct {
	// contains filtered or unexported fields
}

func NewDirectRiskNotificationServiceFactory added in v0.16.0

func NewDirectRiskNotificationServiceFactory(
	emailService EmailService,
	slackService SlackService,
) *RiskNotificationServiceFactory

func NewRiskNotificationServiceFactory added in v0.16.0

func NewRiskNotificationServiceFactory(
	notificationRuntime notification.RuntimeProvider,
) *RiskNotificationServiceFactory

func (*RiskNotificationServiceFactory) New added in v0.16.0

type RiskOpenDigestArgs added in v0.14.0

type RiskOpenDigestArgs struct {
	RecipientUserID uuid.UUID `json:"recipient_user_id"`
	Channel         string    `json:"channel,omitempty"`
	WindowStart     string    `json:"window_start"`
	WindowEnd       string    `json:"window_end"`
	WindowKind      string    `json:"window_kind"`
}

func (RiskOpenDigestArgs) Kind added in v0.14.0

func (RiskOpenDigestArgs) Kind() string

func (RiskOpenDigestArgs) Timeout added in v0.14.0

func (RiskOpenDigestArgs) Timeout() time.Duration

type RiskOpenDigestPlan added in v0.15.0

type RiskOpenDigestPlan struct {
	WindowKind       string
	WindowStart      time.Time
	WindowEnd        time.Time
	WindowByPeriod   time.Duration
	RecipientUserIDs []uuid.UUID
}

func BuildRiskOpenDigestPlan added in v0.15.0

func BuildRiskOpenDigestPlan(
	ctx context.Context,
	db *gorm.DB,
	windowKind string,
	now time.Time,
	logger *zap.SugaredLogger,
) (*RiskOpenDigestPlan, error)

type RiskOpenDigestSchedulerArgs added in v0.14.0

type RiskOpenDigestSchedulerArgs struct{}

func (RiskOpenDigestSchedulerArgs) Kind added in v0.14.0

func (RiskOpenDigestSchedulerArgs) Timeout added in v0.14.0

type RiskOpenDigestSchedulerWorker added in v0.14.0

type RiskOpenDigestSchedulerWorker struct {
	// contains filtered or unexported fields
}

func NewRiskOpenDigestSchedulerWorker added in v0.14.0

func NewRiskOpenDigestSchedulerWorker(db *gorm.DB, client workflow.RiverClient, windowKind string, logger *zap.SugaredLogger) *RiskOpenDigestSchedulerWorker

func (*RiskOpenDigestSchedulerWorker) Work added in v0.14.0

type RiskOpenDigestWorker added in v0.14.0

type RiskOpenDigestWorker struct {
	// contains filtered or unexported fields
}

func NewRiskOpenDigestWorker added in v0.14.0

func NewRiskOpenDigestWorker(
	db *gorm.DB,
	userRepo UserRepository,
	webBaseURL string,
	notificationServiceFactory *RiskNotificationServiceFactory,
	logger *zap.SugaredLogger,
) *RiskOpenDigestWorker

func (*RiskOpenDigestWorker) Work added in v0.14.0

type RiskOrphanedCleanupArgs added in v0.15.0

type RiskOrphanedCleanupArgs struct {
	SSPID        uuid.UUID  `json:"ssp_id"                  river:"unique"`
	OldProfileID *uuid.UUID `json:"old_profile_id,omitempty" river:"unique"`
	NewProfileID *uuid.UUID `json:"new_profile_id,omitempty" river:"unique"`
}

RiskOrphanedCleanupArgs is enqueued by the SSP profile attach endpoint whenever the profile binding changes. The worker resolves the current profile's control set and transitions any non-terminal auto-generated risks whose controls are no longer present to remediated.

Deduplication uses river:"unique" tags on ssp_id, old_profile_id, and new_profile_id. This means:

  • Two rapid equivalent changes → one job (correct: second is a no-op)
  • Two rapid changes involving different profiles → two independent jobs (correct)

func (RiskOrphanedCleanupArgs) Kind added in v0.15.0

func (RiskOrphanedCleanupArgs) Timeout added in v0.15.0

type RiskOrphanedCleanupWorker added in v0.15.0

type RiskOrphanedCleanupWorker struct {
	// contains filtered or unexported fields
}

RiskOrphanedCleanupWorker is enqueued by the SSP profile attach endpoint whenever the profile binding changes. At execution time, it resolves the SSP's current profile control set and transitions any non-terminal auto-generated risks whose linked controls are no longer present in that set to the "remediated" status.

Design rationale:

  • The handler enqueues a job rather than calling RemediateOrphanedRisks inline so that the HTTP response is not blocked by potentially expensive profile resolution.
  • River's ByArgs deduplication is scoped to ssp_id + old_profile_id + new_profile_id. Active jobs for repeated equivalent changes are collapsed, while changes involving different profiles can enqueue independent jobs.
  • The worker reloads the current SSP profile at execution time so stale jobs remediate against the latest committed binding.

func NewRiskOrphanedCleanupWorker added in v0.15.0

func NewRiskOrphanedCleanupWorker(db *gorm.DB, riskService *riskrel.RiskService, profileResolver ProfileControlResolver, logger *zap.SugaredLogger) *RiskOrphanedCleanupWorker

NewRiskOrphanedCleanupWorker constructs a RiskOrphanedCleanupWorker. profileResolver must be non-nil; it is injected from cmd/run.go to avoid a circular import.

func (*RiskOrphanedCleanupWorker) Work added in v0.15.0

Work implements river.Worker. It resolves the current profile's control set and delegates to RiskService.RemediateOrphanedRisks.

type RiskProcessEvidenceArgs added in v0.13.0

type RiskProcessEvidenceArgs struct {
	EvidenceID  uuid.UUID `json:"evidence_id"`
	EvidenceEnd string    `json:"evidence_end"`
	Status      string    `json:"status"`
}

RiskProcessEvidenceArgs represents the arguments for processing evidence and creating risks. EvidenceEnd and Status are included alongside EvidenceID intentionally: River uses ByArgs uniqueness to deduplicate jobs within the 5-minute window. Including the end time and status ensures that two different evidence records for the same stream (different end times or states) each get their own independent deduplication window, preventing the second record from being silently dropped.

func (RiskProcessEvidenceArgs) Kind added in v0.13.0

Kind returns the job kind for River

func (RiskProcessEvidenceArgs) Timeout added in v0.13.0

Timeout returns the timeout for risk process evidence jobs

type RiskReconcileDuplicatesArgs added in v0.13.0

type RiskReconcileDuplicatesArgs struct {
	DedupeKey string `json:"dedupe_key"`
}

func (RiskReconcileDuplicatesArgs) Kind added in v0.13.0

func (RiskReconcileDuplicatesArgs) Timeout added in v0.13.0

type RiskReconcileDuplicatesWorker added in v0.13.0

type RiskReconcileDuplicatesWorker struct {
	// contains filtered or unexported fields
}

func NewRiskReconcileDuplicatesWorker added in v0.13.0

func NewRiskReconcileDuplicatesWorker(db *gorm.DB, logger *zap.SugaredLogger) *RiskReconcileDuplicatesWorker

func (*RiskReconcileDuplicatesWorker) Work added in v0.13.0

type RiskReviewDeadlineReminderScannerArgs added in v0.13.0

type RiskReviewDeadlineReminderScannerArgs struct{}

func (RiskReviewDeadlineReminderScannerArgs) Kind added in v0.13.0

func (RiskReviewDeadlineReminderScannerArgs) Timeout added in v0.13.0

type RiskReviewDeadlineReminderScannerWorker added in v0.13.0

type RiskReviewDeadlineReminderScannerWorker struct {
	// contains filtered or unexported fields
}

func NewRiskReviewDeadlineReminderScannerWorker added in v0.13.0

func NewRiskReviewDeadlineReminderScannerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *RiskReviewDeadlineReminderScannerWorker

func (*RiskReviewDeadlineReminderScannerWorker) Work added in v0.13.0

type RiskReviewDueReminderArgs added in v0.13.0

type RiskReviewDueReminderArgs struct {
	RiskID         uuid.UUID `json:"risk_id"`
	OwnerUserID    uuid.UUID `json:"owner_user_id"`
	Channel        string    `json:"channel,omitempty"`
	ReviewDeadline string    `json:"review_deadline"`
	ReminderWindow string    `json:"reminder_window"`
}

func (RiskReviewDueReminderArgs) Kind added in v0.13.0

func (RiskReviewDueReminderArgs) Timeout added in v0.13.0

type RiskReviewDueReminderWorker added in v0.13.0

type RiskReviewDueReminderWorker struct {
	// contains filtered or unexported fields
}

func NewRiskReviewDueReminderWorker added in v0.13.0

func NewRiskReviewDueReminderWorker(
	db *gorm.DB,
	userRepo UserRepository,
	webBaseURL string,
	notificationServiceFactory *RiskNotificationServiceFactory,
	logger *zap.SugaredLogger,
) *RiskReviewDueReminderWorker

func (*RiskReviewDueReminderWorker) Work added in v0.13.0

type RiskReviewOverdueEscalationArgs added in v0.13.0

type RiskReviewOverdueEscalationArgs struct {
	RiskID         uuid.UUID `json:"risk_id"`
	OwnerUserID    uuid.UUID `json:"owner_user_id"`
	Channel        string    `json:"channel,omitempty"`
	ReviewDeadline string    `json:"review_deadline"`
	OverdueWindow  string    `json:"overdue_window"`
}

func (RiskReviewOverdueEscalationArgs) Kind added in v0.13.0

func (RiskReviewOverdueEscalationArgs) Timeout added in v0.13.0

type RiskReviewOverdueEscalationScannerArgs added in v0.13.0

type RiskReviewOverdueEscalationScannerArgs struct{}

func (RiskReviewOverdueEscalationScannerArgs) Kind added in v0.13.0

func (RiskReviewOverdueEscalationScannerArgs) Timeout added in v0.13.0

type RiskReviewOverdueEscalationScannerWorker added in v0.13.0

type RiskReviewOverdueEscalationScannerWorker struct {
	// contains filtered or unexported fields
}

func NewRiskReviewOverdueEscalationScannerWorker added in v0.13.0

func NewRiskReviewOverdueEscalationScannerWorker(
	db *gorm.DB,
	client workflow.RiverClient,
	logger *zap.SugaredLogger,
	autoReopenEnabled bool,
	autoReopenThresholdDays int,
) *RiskReviewOverdueEscalationScannerWorker

func (*RiskReviewOverdueEscalationScannerWorker) Work added in v0.13.0

type RiskReviewOverdueEscalationWorker added in v0.13.0

type RiskReviewOverdueEscalationWorker struct {
	// contains filtered or unexported fields
}

func NewRiskReviewOverdueEscalationWorker added in v0.13.0

func NewRiskReviewOverdueEscalationWorker(
	db *gorm.DB,
	userRepo UserRepository,
	webBaseURL string,
	notificationServiceFactory *RiskNotificationServiceFactory,
	logger *zap.SugaredLogger,
) *RiskReviewOverdueEscalationWorker

func (*RiskReviewOverdueEscalationWorker) Work added in v0.13.0

type RiskReviewOverdueReopenArgs added in v0.13.0

type RiskReviewOverdueReopenArgs struct {
	RiskID         uuid.UUID `json:"risk_id"`
	ReviewDeadline string    `json:"review_deadline"`
	ThresholdDays  int       `json:"threshold_days"`
}

func (RiskReviewOverdueReopenArgs) Kind added in v0.13.0

func (RiskReviewOverdueReopenArgs) Timeout added in v0.13.0

type RiskReviewOverdueReopenWorker added in v0.13.0

type RiskReviewOverdueReopenWorker struct {
	// contains filtered or unexported fields
}

func NewRiskReviewOverdueReopenWorker added in v0.13.0

func NewRiskReviewOverdueReopenWorker(db *gorm.DB, logger *zap.SugaredLogger) *RiskReviewOverdueReopenWorker

func (*RiskReviewOverdueReopenWorker) Work added in v0.13.0

type RiskStaleOpenReminderArgs added in v0.13.0

type RiskStaleOpenReminderArgs struct {
	RiskID          uuid.UUID `json:"risk_id"`
	OwnerUserID     uuid.UUID `json:"owner_user_id"`
	Channel         string    `json:"channel,omitempty"`
	LastSeenAt      string    `json:"last_seen_at"`
	StaleBucketDate string    `json:"stale_bucket_date"`
}

func (RiskStaleOpenReminderArgs) Kind added in v0.13.0

func (RiskStaleOpenReminderArgs) Timeout added in v0.13.0

type RiskStaleOpenReminderWorker added in v0.13.0

type RiskStaleOpenReminderWorker struct {
	// contains filtered or unexported fields
}

func NewRiskStaleOpenReminderWorker added in v0.13.0

func NewRiskStaleOpenReminderWorker(
	db *gorm.DB,
	userRepo UserRepository,
	webBaseURL string,
	notificationServiceFactory *RiskNotificationServiceFactory,
	logger *zap.SugaredLogger,
) *RiskStaleOpenReminderWorker

func (*RiskStaleOpenReminderWorker) Work added in v0.13.0

type RiskStaleRiskScannerArgs added in v0.13.0

type RiskStaleRiskScannerArgs struct{}

func (RiskStaleRiskScannerArgs) Kind added in v0.13.0

func (RiskStaleRiskScannerArgs) Timeout added in v0.13.0

type RiskStaleRiskScannerWorker added in v0.13.0

type RiskStaleRiskScannerWorker struct {
	// contains filtered or unexported fields
}

func NewRiskStaleRiskScannerWorker added in v0.13.0

func NewRiskStaleRiskScannerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *RiskStaleRiskScannerWorker

func (*RiskStaleRiskScannerWorker) Work added in v0.13.0

type SendEmailArgs

type SendEmailArgs struct {
	// Email message fields
	From        string             `json:"from"`
	To          []string           `json:"to" river:"unique"`
	Cc          []string           `json:"cc,omitempty"`
	Bcc         []string           `json:"bcc,omitempty"`
	Subject     string             `json:"subject"`
	HTMLBody    string             `json:"html_body,omitempty"`
	TextBody    string             `json:"text_body,omitempty"`
	Attachments []types.Attachment `json:"attachments,omitempty"`
	Headers     map[string]string  `json:"headers,omitempty"`

	// Optional notification metadata for generic notification dispatches.
	NotificationKind string `json:"notification_kind,omitempty"`
	RecipientUserID  string `json:"recipient_user_id,omitempty"`
	CorrelationID    string `json:"correlation_id,omitempty" river:"unique"`
	SourceJobKind    string `json:"source_job_kind,omitempty"`
	SourceJobID      string `json:"source_job_id,omitempty"`
}

SendEmailArgs represents the arguments for sending an email

func (SendEmailArgs) Kind

func (SendEmailArgs) Kind() string

Kind returns the job kind for River

func (SendEmailArgs) Timeout

func (SendEmailArgs) Timeout() time.Duration

Timeout returns the timeout for email jobs

type SendEmailFromArgs

type SendEmailFromArgs struct {
	// Provider to use for sending
	Provider string `json:"provider"`

	// Email message fields
	From        string             `json:"from"`
	To          []string           `json:"to"`
	Cc          []string           `json:"cc,omitempty"`
	Bcc         []string           `json:"bcc,omitempty"`
	Subject     string             `json:"subject"`
	HTMLBody    string             `json:"html_body,omitempty"`
	TextBody    string             `json:"text_body,omitempty"`
	Attachments []types.Attachment `json:"attachments,omitempty"`
	Headers     map[string]string  `json:"headers,omitempty"`
}

SendEmailFromArgs represents the arguments for sending an email from a specific provider

func (SendEmailFromArgs) Kind

func (SendEmailFromArgs) Kind() string

Kind returns the job kind for River

func (SendEmailFromArgs) Timeout

func (SendEmailFromArgs) Timeout() time.Duration

Timeout returns the timeout for email jobs

type SendEmailFromWorker

type SendEmailFromWorker struct {
	// contains filtered or unexported fields
}

SendEmailFromWorker handles sending email from provider jobs

func NewSendEmailFromWorker

func NewSendEmailFromWorker(emailService EmailService, logger *zap.SugaredLogger) *SendEmailFromWorker

NewSendEmailFromWorker creates a new SendEmailFromWorker

func (*SendEmailFromWorker) Work

Work is the River work function for sending emails from a provider

type SendEmailWorker

type SendEmailWorker struct {
	// contains filtered or unexported fields
}

SendEmailWorker handles sending email jobs

func NewSendEmailWorker

func NewSendEmailWorker(emailService EmailService, logger *zap.SugaredLogger) *SendEmailWorker

NewSendEmailWorker creates a new SendEmailWorker

func (*SendEmailWorker) Work

Work is the River work function for sending emails

type SendGlobalDigestArgs

type SendGlobalDigestArgs struct {
}

SendGlobalDigestArgs represents the arguments for sending global digest

func (SendGlobalDigestArgs) Kind

Kind returns the job kind for River

func (SendGlobalDigestArgs) Timeout

Timeout returns the timeout for digest jobs (longer due to multiple emails)

type SendGlobalDigestWorker

type SendGlobalDigestWorker struct {
	// contains filtered or unexported fields
}

SendGlobalDigestWorker handles scheduling global digest deliveries.

func NewSendGlobalDigestWorker

func NewSendGlobalDigestWorker(digestService DigestService, logger *zap.SugaredLogger) *SendGlobalDigestWorker

NewSendGlobalDigestWorker creates a new SendGlobalDigestWorker

func (*SendGlobalDigestWorker) Work

Work is the River work function for scheduling global digest deliveries.

type SendSlackArgs added in v0.16.0

type SendSlackArgs struct {
	Channel    string       `json:"channel" river:"unique"`
	TargetType string       `json:"target_type" river:"unique"`
	Text       string       `json:"text"`
	Blocks     slack.Blocks `json:"blocks,omitempty"`

	// Optional notification metadata for generic notification dispatches.
	NotificationKind string `json:"notification_kind,omitempty"`
	RecipientUserID  string `json:"recipient_user_id,omitempty"`
	CorrelationID    string `json:"correlation_id,omitempty" river:"unique"`
	SourceJobKind    string `json:"source_job_kind,omitempty"`
	SourceJobID      string `json:"source_job_id,omitempty"`
}

SendSlackArgs represents a Slack message before it is routed to a specific River job kind for channel or direct-message delivery.

type SendSlackChannelArgs added in v0.16.0

type SendSlackChannelArgs SendSlackArgs

func (SendSlackChannelArgs) Kind added in v0.16.0

Kind returns the job kind for River

func (SendSlackChannelArgs) Timeout added in v0.16.0

Timeout returns the timeout for slack channel jobs

type SendSlackDMArgs added in v0.16.0

type SendSlackDMArgs SendSlackArgs

func (SendSlackDMArgs) Kind added in v0.16.0

func (SendSlackDMArgs) Kind() string

Kind returns the job kind for River

func (SendSlackDMArgs) Timeout added in v0.16.0

func (SendSlackDMArgs) Timeout() time.Duration

Timeout returns the timeout for slack direct-message jobs

type SendSlackWorker added in v0.16.0

type SendSlackWorker struct {
	// contains filtered or unexported fields
}

SendSlackWorker handles sending Slack message jobs.

func NewSendSlackWorker added in v0.16.0

func NewSendSlackWorker(slackService SlackService, logger *zap.SugaredLogger) *SendSlackWorker

NewSendSlackWorker creates a new SendSlackWorker.

func (*SendSlackWorker) WorkChannel added in v0.16.0

func (w *SendSlackWorker) WorkChannel(ctx context.Context, job *river.Job[SendSlackChannelArgs]) error

WorkChannel is the River work function for sending Slack channel messages.

func (*SendSlackWorker) WorkDM added in v0.16.0

WorkDM is the River work function for sending Slack direct messages.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages the River client and workers

func NewServiceWithDigest

func NewServiceWithDigest(
	cfg *config.WorkerConfig,
	db *gorm.DB,
	emailSvc *email.Service,
	digestSvc DigestService,
	digestCfg *config.Config,
	profileResolver ProfileControlResolver,
	logger *zap.SugaredLogger,
) (*Service, error)

NewServiceWithDigest creates a new worker service with digest support. profileResolver is injected to resolve profile control keys for the orphaned risk cleanup worker; it must implement ProfileControlResolver and is typically a closure over oscal handler functions.

func (*Service) EnqueueNotificationEmail added in v0.16.0

func (s *Service) EnqueueNotificationEmail(ctx context.Context, delivery emailprovider.Delivery) error

EnqueueNotificationEmail enqueues a provider-ready notification email delivery.

func (*Service) EnqueueNotificationSlack added in v0.16.0

func (s *Service) EnqueueNotificationSlack(ctx context.Context, delivery slackprovider.Delivery) error

EnqueueNotificationSlack enqueues a provider-ready notification Slack delivery.

func (*Service) EnqueueOrphanedRiskCleanup added in v0.15.0

func (s *Service) EnqueueOrphanedRiskCleanup(ctx context.Context, sspID uuid.UUID, oldProfileID, newProfileID *uuid.UUID) error

EnqueueOrphanedRiskCleanup enqueues a job to remediate orphaned risks when an SSP's profile binding changes. Active jobs are deduped by (ssp_id, old_profile_id, new_profile_id): repeated equivalent changes collapse to one job, while changes involving different profiles can produce independent jobs.

func (*Service) EnqueueRiskProcessEvidence added in v0.13.0

func (s *Service) EnqueueRiskProcessEvidence(ctx context.Context, evidenceID uuid.UUID, evidenceEnd, status string) error

EnqueueRiskProcessEvidence enqueues a risk process evidence job

func (*Service) EnqueueSendEmail

func (s *Service) EnqueueSendEmail(ctx context.Context, args *SendEmailArgs) error

EnqueueSendEmail enqueues a send email job

func (*Service) EnqueueSendEmailFrom

func (s *Service) EnqueueSendEmailFrom(ctx context.Context, args *SendEmailFromArgs) error

EnqueueSendEmailFrom enqueues a send email from provider job

func (*Service) EnqueueWorkflowExecutionFailed added in v0.12.0

func (s *Service) EnqueueWorkflowExecutionFailed(ctx context.Context, execution *workflows.WorkflowExecution) error

EnqueueWorkflowExecutionFailed enqueues a workflow-execution-failed notification job. Implements the workflow.NotificationEnqueuer interface.

func (*Service) EnqueueWorkflowTaskAssigned added in v0.12.0

func (s *Service) EnqueueWorkflowTaskAssigned(ctx context.Context, stepExecution *workflows.StepExecution) error

EnqueueWorkflowTaskAssigned enqueues a single workflow-task-assigned wrapper job. That worker resolves channels and fans out to provider-ready downstream jobs. Implements the workflow.NotificationEnqueuer interface.

func (*Service) GetClient

func (s *Service) GetClient() *river.Client[pgx.Tx]

GetClient returns the River client for job insertion

func (*Service) GetDAGExecutor added in v0.12.0

func (s *Service) GetDAGExecutor() *workflow.DAGExecutor

GetDAGExecutor returns the shared DAG executor used by workflow River workers. Returns nil when the worker service is disabled.

func (*Service) IsStarted

func (s *Service) IsStarted() bool

IsStarted returns true if the worker service is started

func (*Service) Migrate

func (s *Service) Migrate(ctx context.Context) error

Migrate runs River migrations

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

Start starts the worker service

func (*Service) Stop

func (s *Service) Stop(ctx context.Context) error

Stop stops the worker service

type SlackService added in v0.15.0

type SlackService interface {
	SendMessage(ctx context.Context, channel string, message *slacktypes.Message) (*slacktypes.SendResult, error)
	IsEnabled() bool
}

type UserRepository added in v0.12.0

type UserRepository interface {
	FindUserByID(ctx context.Context, userID string) (NotificationUser, error)
}

UserRepository is the minimal DB interface needed by notification workers

type WorkflowExecutionFailedArgs added in v0.12.0

type WorkflowExecutionFailedArgs struct {
	WorkflowExecutionID string `json:"workflow_execution_id"`
}

WorkflowExecutionFailedArgs represents the arguments for a workflow-execution-failed notification.

func (WorkflowExecutionFailedArgs) Kind added in v0.12.0

Kind returns the job kind for River.

func (WorkflowExecutionFailedArgs) Timeout added in v0.12.0

Timeout returns the timeout for workflow execution failed jobs.

type WorkflowExecutionFailedWorker added in v0.12.0

type WorkflowExecutionFailedWorker struct {
	// contains filtered or unexported fields
}

WorkflowExecutionFailedWorker sends workflow execution failure notifications to the workflow instance creator using supported delivery targets.

func NewWorkflowExecutionFailedWorker added in v0.12.0

func NewWorkflowExecutionFailedWorker(db *gorm.DB, userRepo UserRepository, webBaseURL string, notificationRuntime notification.RuntimeProvider, logger *zap.SugaredLogger) *WorkflowExecutionFailedWorker

NewWorkflowExecutionFailedWorker creates a new WorkflowExecutionFailedWorker

func (*WorkflowExecutionFailedWorker) Work added in v0.12.0

Work sends a workflow execution failure notification for the execution identified by job.Args.WorkflowExecutionID.

type WorkflowTaskAssignedArgs added in v0.12.0

type WorkflowTaskAssignedArgs struct {
	AssignedToType        string     `json:"assigned_to_type"`
	Channel               string     `json:"channel,omitempty"`
	UserID                string     `json:"user_id"`
	StepExecutionID       string     `json:"step_execution_id"`
	StepTitle             string     `json:"step_title,omitempty"`
	WorkflowTitle         string     `json:"workflow_title,omitempty"`
	WorkflowInstanceTitle string     `json:"workflow_instance_title,omitempty"`
	StepURL               string     `json:"step_url,omitempty"`
	DueDate               *time.Time `json:"due_date,omitempty"`
}

WorkflowTaskAssignedArgs represents the arguments for a new-task-assigned notification job.

func (WorkflowTaskAssignedArgs) Kind added in v0.12.0

Kind returns the job kind for River.

func (WorkflowTaskAssignedArgs) Timeout added in v0.12.0

Timeout returns the timeout for workflow task assigned jobs.

type WorkflowTaskAssignedWorker added in v0.12.0

type WorkflowTaskAssignedWorker struct {
	// contains filtered or unexported fields
}

WorkflowTaskAssignedWorker handles new-task-assigned notification jobs.

func NewWorkflowTaskAssignedWorker added in v0.12.0

func NewWorkflowTaskAssignedWorker(
	userRepo UserRepository,
	webBaseURL string,
	notificationRuntime notification.RuntimeProvider,
	logger *zap.SugaredLogger,
) *WorkflowTaskAssignedWorker

NewWorkflowTaskAssignedWorker creates a new WorkflowTaskAssignedWorker with an injected runtime provider.

func (*WorkflowTaskAssignedWorker) Work added in v0.12.0

Work is the River work function for sending new-task-assigned notifications.

type WorkflowTaskDigestArgs added in v0.12.0

type WorkflowTaskDigestArgs struct {
	Channel string `json:"channel,omitempty"`
	UserID  string `json:"user_id"`
}

WorkflowTaskDigestArgs represents the arguments for a per-user task digest notification job.

func (WorkflowTaskDigestArgs) Kind added in v0.12.0

Kind returns the job kind for River.

func (WorkflowTaskDigestArgs) Timeout added in v0.12.0

Timeout returns the timeout for workflow task digest jobs.

type WorkflowTaskDigestCheckerArgs added in v0.12.0

type WorkflowTaskDigestCheckerArgs struct{}

WorkflowTaskDigestCheckerArgs represents the arguments for the periodic digest checker job

func (WorkflowTaskDigestCheckerArgs) Kind added in v0.12.0

Kind returns the job kind for River

func (WorkflowTaskDigestCheckerArgs) Timeout added in v0.12.0

Timeout returns the timeout for the digest checker job

type WorkflowTaskDigestCheckerWorker added in v0.12.0

type WorkflowTaskDigestCheckerWorker struct {
	// contains filtered or unexported fields
}

WorkflowTaskDigestCheckerWorker queries all subscribed users and enqueues one digest job per user.

func NewWorkflowTaskDigestCheckerWorker added in v0.12.0

func NewWorkflowTaskDigestCheckerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *WorkflowTaskDigestCheckerWorker

NewWorkflowTaskDigestCheckerWorker creates a new WorkflowTaskDigestCheckerWorker

func (*WorkflowTaskDigestCheckerWorker) Work added in v0.12.0

Work queries all users subscribed to task daily digest and enqueues a WorkflowTaskDigestArgs job for each user.

type WorkflowTaskDigestWorker added in v0.12.0

type WorkflowTaskDigestWorker struct {
	// contains filtered or unexported fields
}

WorkflowTaskDigestWorker sends a per-user digest of pending and overdue workflow tasks

func NewWorkflowTaskDigestWorker added in v0.12.0

func NewWorkflowTaskDigestWorker(
	db *gorm.DB,
	userRepo UserRepository,
	webBaseURL string,
	notificationRuntime notification.RuntimeProvider,
	logger *zap.SugaredLogger,
) *WorkflowTaskDigestWorker

NewWorkflowTaskDigestWorker creates a new WorkflowTaskDigestWorker with an injected runtime provider.

func (*WorkflowTaskDigestWorker) Work added in v0.12.0

Work sends digest notifications for the user identified by job.Args.UserID.

type WorkflowTaskDueSoonArgs added in v0.12.0

type WorkflowTaskDueSoonArgs struct {
	Channel               string    `json:"channel,omitempty"`
	UserID                string    `json:"user_id"`
	StepExecutionID       string    `json:"step_execution_id"`
	StepTitle             string    `json:"step_title"`
	WorkflowTitle         string    `json:"workflow_title"`
	WorkflowInstanceTitle string    `json:"workflow_instance_title"`
	StepURL               string    `json:"step_url"`
	DueDate               time.Time `json:"due_date"`
}

WorkflowTaskDueSoonArgs represents the arguments for a task-due-soon reminder notification job.

func (WorkflowTaskDueSoonArgs) Kind added in v0.12.0

Kind returns the job kind for River.

func (WorkflowTaskDueSoonArgs) Timeout added in v0.12.0

Timeout returns the timeout for workflow task due soon jobs.

type WorkflowTaskDueSoonWorker added in v0.12.0

type WorkflowTaskDueSoonWorker struct {
	// contains filtered or unexported fields
}

WorkflowTaskDueSoonWorker handles task-due-soon reminder notification jobs.

func NewWorkflowTaskDueSoonWorker added in v0.12.0

func NewWorkflowTaskDueSoonWorker(
	userRepo UserRepository,
	webBaseURL string,
	notificationRuntime notification.RuntimeProvider,
	logger *zap.SugaredLogger,
) *WorkflowTaskDueSoonWorker

NewWorkflowTaskDueSoonWorker creates a new WorkflowTaskDueSoonWorker with an injected runtime provider.

func (*WorkflowTaskDueSoonWorker) Work added in v0.12.0

Work is the River work function for sending task-due-soon reminder notifications.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL