Documentation
¶
Index ¶
- Constants
- func JobInsertOptionsForPoamDigest(byPeriod time.Duration) *river.InsertOpts
- func JobInsertOptionsForPoamNotification(byPeriod time.Duration) *river.InsertOpts
- func JobInsertOptionsForPoamWorker(byPeriod time.Duration) *river.InsertOpts
- func JobInsertOptionsForRiskDigest(byPeriod time.Duration) *river.InsertOpts
- func JobInsertOptionsForRiskNotification(byPeriod time.Duration) *river.InsertOpts
- func JobInsertOptionsForRiskProcessEvidence() *river.InsertOpts
- func JobInsertOptionsForRiskWorkerUnique(byPeriod time.Duration) *river.InsertOpts
- func JobInsertOptionsForWorkflowNotification() *river.InsertOpts
- func JobInsertOptionsForWorkflowTaskAssignedNotification() *river.InsertOpts
- func JobInsertOptionsWithRetry(queue string, maxAttempts int) *river.InsertOpts
- func NewDigestPeriodicJob(cronSchedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewDueSoonCheckerPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewMilestoneOverduePeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewPoamDeadlineReminderPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewPoamOpenDigestPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewPoamOverdueTransitionPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewRiskEvidenceReconciliationPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewRiskOpenDigestPeriodicJob(schedule string, windowKind string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewRiskReviewDeadlineReminderPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewRiskReviewOverdueEscalationPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewRiskStaleScannerPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewWorkflowSchedulerPeriodicJob(cronSchedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewWorkflowTaskDigestPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func Workers(emailService EmailService, digestService DigestService, ...) *river.Workers
- type DigestService
- type DigestTask
- type DueSoonCheckerArgs
- type DueSoonCheckerWorker
- type EmailService
- type EvidenceDigestItem
- type EvidenceDigestSummary
- type GORMUserRepository
- type MilestoneOverdueReminderArgs
- type MilestoneOverdueReminderWorker
- type MilestoneOverdueScannerArgs
- type MilestoneOverdueScannerWorker
- type NotificationSubscription
- type NotificationUser
- type PoamDeadlineReminderArgs
- type PoamDeadlineReminderScannerArgs
- type PoamDeadlineReminderScannerWorker
- type PoamDeadlineReminderWorker
- type PoamDigestEmailItem
- type PoamMilestoneDigestEmailItem
- type PoamOpenDigestArgs
- type PoamOpenDigestSchedulerArgs
- type PoamOpenDigestSchedulerWorker
- type PoamOpenDigestWorker
- type PoamOverdueNotificationArgs
- type PoamOverdueNotificationWorker
- type PoamOverdueTransitionScannerArgs
- type PoamOverdueTransitionScannerWorker
- type RiskDigestEmailItem
- type RiskEvidenceReconciliationScannerArgs
- type RiskEvidenceReconciliationScannerWorker
- type RiskEvidenceWorker
- type RiskOpenDigestArgs
- type RiskOpenDigestSchedulerArgs
- type RiskOpenDigestSchedulerWorker
- type RiskOpenDigestWorker
- type RiskProcessEvidenceArgs
- type RiskReconcileDuplicatesArgs
- type RiskReconcileDuplicatesWorker
- type RiskReviewDeadlineReminderScannerArgs
- type RiskReviewDeadlineReminderScannerWorker
- type RiskReviewDueReminderArgs
- type RiskReviewDueReminderWorker
- type RiskReviewOverdueEscalationArgs
- type RiskReviewOverdueEscalationScannerArgs
- type RiskReviewOverdueEscalationScannerWorker
- type RiskReviewOverdueEscalationWorker
- type RiskReviewOverdueReopenArgs
- type RiskReviewOverdueReopenWorker
- type RiskStaleOpenReminderArgs
- type RiskStaleOpenReminderWorker
- type RiskStaleRiskScannerArgs
- type RiskStaleRiskScannerWorker
- type SendEmailArgs
- type SendEmailFromArgs
- type SendEmailFromWorker
- type SendEmailWorker
- type SendGlobalDigestArgs
- type SendGlobalDigestDeliveryArgs
- type SendGlobalDigestDeliveryWorker
- type SendGlobalDigestWorker
- type Service
- func (s *Service) EnqueueRiskProcessEvidence(ctx context.Context, evidenceID uuid.UUID, evidenceEnd, status string) error
- func (s *Service) EnqueueSendEmail(ctx context.Context, args *SendEmailArgs) error
- func (s *Service) EnqueueSendEmailFrom(ctx context.Context, args *SendEmailFromArgs) error
- func (s *Service) EnqueueSendGlobalDigestDeliveries(ctx context.Context, args []SendGlobalDigestDeliveryArgs) error
- func (s *Service) EnqueueWorkflowExecutionFailed(ctx context.Context, execution *workflows.WorkflowExecution) error
- func (s *Service) EnqueueWorkflowTaskAssigned(ctx context.Context, stepExecution *workflows.StepExecution) error
- func (s *Service) GetClient() *river.Client[pgx.Tx]
- func (s *Service) GetDAGExecutor() *workflow.DAGExecutor
- func (s *Service) IsStarted() bool
- func (s *Service) Migrate(ctx context.Context) error
- func (s *Service) Start(ctx context.Context) error
- func (s *Service) Stop(ctx context.Context) error
- type SlackService
- type UserRepository
- type WorkflowExecutionFailedArgs
- type WorkflowExecutionFailedWorker
- type WorkflowTaskAssignedArgs
- type WorkflowTaskAssignedWorker
- type WorkflowTaskDigestArgs
- type WorkflowTaskDigestCheckerArgs
- type WorkflowTaskDigestCheckerWorker
- type WorkflowTaskDigestWorker
- type WorkflowTaskDueSoonArgs
- type WorkflowTaskDueSoonWorker
Constants ¶
const ( JobTypeSendEmail = "send_email" JobTypeSendEmailFrom = "send_email_from" JobTypeSendGlobalDigest = "send_global_digest" JobTypeSendGlobalDigestDelivery = "send_global_digest_delivery" )
Job types for email processing
const ( JobTypeWorkflowTaskAssigned = "workflow_task_assigned" JobTypeWorkflowTaskDueSoon = "workflow_task_due_soon" JobTypeWorkflowTaskDigest = "workflow_task_digest" JobTypeWorkflowExecutionFailed = "workflow_execution_failed" )
Job types for workflow notifications
const ( // Scanner job kinds — one per periodic trigger. JobTypePoamDeadlineReminderScanner = "poam_deadline_reminder_scanner" JobTypePoamOverdueTransitionScanner = "poam_overdue_transition_scanner" JobTypeMilestoneOverdueScannerScanner = "poam_milestone_overdue_scanner" // Notification / action job kinds — one per item per recipient. JobTypePoamDeadlineReminder = "poam_deadline_reminder" JobTypePoamOverdueNotification = "poam_overdue_notification" JobTypeMilestoneOverdueReminder = "poam_milestone_overdue_reminder" // Digest job kinds. JobTypePoamOpenDigestScheduler = "poam_open_digest_scheduler" JobTypePoamOpenDigest = "poam_open_digest" )
const ( JobTypeRiskReviewDeadlineReminderScanner = "risk_review_deadline_reminder_scanner" JobTypeRiskReviewOverdueEscalationScanner = "risk_review_overdue_escalation_scanner" JobTypeRiskStaleRiskScanner = "risk_stale_risk_scanner" JobTypeRiskEvidenceReconciliationScanner = "risk_evidence_reconciliation_scanner" JobTypeRiskOpenDigestScheduler = "risk_open_digest_scheduler" JobTypeRiskReviewDueReminder = "risk_review_due_reminder" JobTypeRiskReviewOverdueEscalation = "risk_review_overdue_escalation" JobTypeRiskStaleOpenReminder = "risk_stale_open_reminder" JobTypeRiskReconcileDuplicates = "risk_reconcile_duplicates" JobTypeRiskReviewOverdueReopen = "risk_review_overdue_reopen" JobTypeRiskOpenDigest = "risk_open_digest" )
const (
JobTypeRiskProcessEvidence = "risk_process_evidence"
)
Job types for risk processing
Variables ¶
This section is empty.
Functions ¶
func JobInsertOptionsForPoamDigest ¶
func JobInsertOptionsForPoamDigest(byPeriod time.Duration) *river.InsertOpts
JobInsertOptionsForPoamDigest returns insert options for POAM digest jobs on the "digest" queue. Idempotency is enforced via ByArgs + ByPeriod.
func JobInsertOptionsForPoamNotification ¶
func JobInsertOptionsForPoamNotification(byPeriod time.Duration) *river.InsertOpts
JobInsertOptionsForPoamNotification returns insert options for POAM notification email jobs with 24-hour idempotency window (daily scanners).
func JobInsertOptionsForPoamWorker ¶
func JobInsertOptionsForPoamWorker(byPeriod time.Duration) *river.InsertOpts
JobInsertOptionsForPoamWorker returns insert options for POAM scanner/worker jobs on the "poam" queue with idempotency.
func JobInsertOptionsForRiskDigest ¶ added in v0.14.0
func JobInsertOptionsForRiskDigest(byPeriod time.Duration) *river.InsertOpts
func JobInsertOptionsForRiskNotification ¶ added in v0.13.0
func JobInsertOptionsForRiskNotification(byPeriod time.Duration) *river.InsertOpts
func JobInsertOptionsForRiskProcessEvidence ¶ added in v0.13.0
func JobInsertOptionsForRiskProcessEvidence() *river.InsertOpts
JobInsertOptionsForRiskProcessEvidence returns insert options for risk processing jobs
func JobInsertOptionsForRiskWorkerUnique ¶ added in v0.13.0
func JobInsertOptionsForRiskWorkerUnique(byPeriod time.Duration) *river.InsertOpts
func JobInsertOptionsForWorkflowNotification ¶ added in v0.12.0
func JobInsertOptionsForWorkflowNotification() *river.InsertOpts
JobInsertOptionsForWorkflowNotification returns insert options for workflow notification email jobs
func JobInsertOptionsForWorkflowTaskAssignedNotification ¶ added in v0.12.0
func JobInsertOptionsForWorkflowTaskAssignedNotification() *river.InsertOpts
func JobInsertOptionsWithRetry ¶
func JobInsertOptionsWithRetry(queue string, maxAttempts int) *river.InsertOpts
JobInsertOptionsWithRetry returns insert options for jobs with custom retry policy
func NewDigestPeriodicJob ¶
func NewDigestPeriodicJob(cronSchedule string, logger *zap.SugaredLogger) *river.PeriodicJob
NewDigestPeriodicJob creates a periodic job for digest scheduling
func NewDueSoonCheckerPeriodicJob ¶ added in v0.12.0
func NewDueSoonCheckerPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
func NewMilestoneOverduePeriodicJob ¶
func NewMilestoneOverduePeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
NewMilestoneOverduePeriodicJob creates the River PeriodicJob for the weekly incomplete milestone scanner. Default cron: "0 0 10 * * 1" (Monday 10:00 UTC).
func NewPoamDeadlineReminderPeriodicJob ¶
func NewPoamDeadlineReminderPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
NewPoamDeadlineReminderPeriodicJob creates the River PeriodicJob for the daily POAM deadline reminder scanner. Default cron: "0 0 8 * * *" (08:00 UTC).
func NewPoamOpenDigestPeriodicJob ¶
func NewPoamOpenDigestPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
NewPoamOpenDigestPeriodicJob creates the River PeriodicJob for the daily POAM open digest scheduler. Default cron: "0 0 7 * * *" (07:00 UTC).
func NewPoamOverdueTransitionPeriodicJob ¶
func NewPoamOverdueTransitionPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
NewPoamOverdueTransitionPeriodicJob creates the River PeriodicJob for the daily POAM overdue transition scanner. Default cron: "0 0 9 * * *" (09:00 UTC).
func NewRiskEvidenceReconciliationPeriodicJob ¶ added in v0.13.0
func NewRiskEvidenceReconciliationPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
func NewRiskOpenDigestPeriodicJob ¶ added in v0.14.0
func NewRiskOpenDigestPeriodicJob(schedule string, windowKind string, logger *zap.SugaredLogger) *river.PeriodicJob
func NewRiskReviewDeadlineReminderPeriodicJob ¶ added in v0.13.0
func NewRiskReviewDeadlineReminderPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
func NewRiskReviewOverdueEscalationPeriodicJob ¶ added in v0.13.0
func NewRiskReviewOverdueEscalationPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
func NewRiskStaleScannerPeriodicJob ¶ added in v0.13.0
func NewRiskStaleScannerPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
func NewWorkflowSchedulerPeriodicJob ¶ added in v0.12.0
func NewWorkflowSchedulerPeriodicJob(cronSchedule string, logger *zap.SugaredLogger) *river.PeriodicJob
func NewWorkflowTaskDigestPeriodicJob ¶ added in v0.12.0
func NewWorkflowTaskDigestPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
func Workers ¶
func Workers(emailService EmailService, digestService DigestService, slackService SlackService, userRepo UserRepository, db *gorm.DB, webBaseURL string, logger *zap.SugaredLogger) *river.Workers
Workers returns all workers as work functions with dependencies injected
Types ¶
type DigestService ¶
type DigestService interface {
SendGlobalDigest(ctx context.Context) error
SendGlobalDigestDelivery(ctx context.Context, args SendGlobalDigestDeliveryArgs) error
}
DigestService interface for dependency injection
type DigestTask ¶ added in v0.12.0
type DigestTask struct {
StepTitle string
WorkflowTitle string
WorkflowInstanceTitle string
DueDate *string
StepURL string
}
DigestTask represents a single task entry in the digest email
type DueSoonCheckerArgs ¶ added in v0.12.0
type DueSoonCheckerArgs struct{}
DueSoonCheckerArgs represents the arguments for the periodic due-soon checker job
func (DueSoonCheckerArgs) Kind ¶ added in v0.12.0
func (DueSoonCheckerArgs) Kind() string
Kind returns the job kind for River
func (DueSoonCheckerArgs) Timeout ¶ added in v0.12.0
func (DueSoonCheckerArgs) Timeout() time.Duration
Timeout returns the timeout for the due-soon checker job
type DueSoonCheckerWorker ¶ added in v0.12.0
type DueSoonCheckerWorker struct {
// contains filtered or unexported fields
}
DueSoonCheckerWorker scans for step executions due in a week and enqueues reminder notifications.
func NewDueSoonCheckerWorker ¶ added in v0.12.0
func NewDueSoonCheckerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *DueSoonCheckerWorker
NewDueSoonCheckerWorker creates a new DueSoonCheckerWorker
func (*DueSoonCheckerWorker) Work ¶ added in v0.12.0
func (w *DueSoonCheckerWorker) Work(ctx context.Context, job *river.Job[DueSoonCheckerArgs]) error
Work scans for step executions due in ~1 week and enqueues WorkflowTaskDueSoonArgs jobs.
type EmailService ¶
type EmailService interface {
Send(ctx context.Context, message *types.Message) (*types.SendResult, error)
SendWithProvider(ctx context.Context, providerName string, message *types.Message) (*types.SendResult, error)
UseTemplate(templateName string, data map[string]interface{}) (htmlContent, textContent string, err error)
GetDefaultFromAddress() string
}
EmailService interface for dependency injection
type EvidenceDigestItem ¶
type EvidenceDigestItem struct {
ID string `json:"id"`
UUID string `json:"uuid"`
Title string `json:"title"`
Description string `json:"description"`
Status string `json:"status"`
ExpiresAt string `json:"expires_at,omitempty"`
Labels []string `json:"labels,omitempty"`
}
EvidenceDigestItem carries one evidence row for a digest delivery job.
type EvidenceDigestSummary ¶
type EvidenceDigestSummary struct {
TotalCount int64 `json:"total_count"`
SatisfiedCount int64 `json:"satisfied_count"`
NotSatisfiedCount int64 `json:"not_satisfied_count"`
ExpiredCount int64 `json:"expired_count"`
OtherCount int64 `json:"other_count"`
TopExpired []EvidenceDigestItem `json:"top_expired,omitempty"`
TopNotSatisfied []EvidenceDigestItem `json:"top_not_satisfied,omitempty"`
}
EvidenceDigestSummary carries the evidence summary snapshot for a digest delivery job.
type GORMUserRepository ¶ added in v0.12.0
type GORMUserRepository struct {
// contains filtered or unexported fields
}
GORMUserRepository implements UserRepository using GORM
func NewGORMUserRepository ¶ added in v0.12.0
func NewGORMUserRepository(db *gorm.DB) *GORMUserRepository
NewGORMUserRepository creates a new GORMUserRepository
func (*GORMUserRepository) FindUserByID ¶ added in v0.12.0
func (r *GORMUserRepository) FindUserByID(ctx context.Context, userID string) (NotificationUser, error)
FindUserByID looks up a user by UUID string and returns a NotificationUser
type MilestoneOverdueReminderArgs ¶
type MilestoneOverdueReminderArgs struct {
MilestoneID uuid.UUID `json:"milestone_id"`
PoamItemID uuid.UUID `json:"poam_item_id"`
RecipientUserID uuid.UUID `json:"recipient_user_id"`
MilestoneTitle string `json:"milestone_title"`
PoamTitle string `json:"poam_title"`
SspID uuid.UUID `json:"ssp_id"`
SspDisplayName string `json:"ssp_display_name"`
DueDate string `json:"due_date"` // RFC3339
PoamURL string `json:"poam_url"`
WeeklyBucket string `json:"weekly_bucket"` // e.g. "2026-W14"
}
MilestoneOverdueReminderArgs carries the data needed to send a single incomplete milestone overdue reminder email to one recipient. Idempotency key: MilestoneID + DueDate + WeeklyBucket (ByArgs + ByPeriod 7 days).
func (MilestoneOverdueReminderArgs) Kind ¶
func (MilestoneOverdueReminderArgs) Kind() string
func (MilestoneOverdueReminderArgs) Timeout ¶
func (MilestoneOverdueReminderArgs) Timeout() time.Duration
type MilestoneOverdueReminderWorker ¶
type MilestoneOverdueReminderWorker struct {
// contains filtered or unexported fields
}
MilestoneOverdueReminderWorker sends a single incomplete milestone overdue reminder email to one recipient.
func NewMilestoneOverdueReminderWorker ¶
func NewMilestoneOverdueReminderWorker( emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger, ) *MilestoneOverdueReminderWorker
NewMilestoneOverdueReminderWorker constructs a MilestoneOverdueReminderWorker.
func (*MilestoneOverdueReminderWorker) Work ¶
func (w *MilestoneOverdueReminderWorker) Work( ctx context.Context, job *river.Job[MilestoneOverdueReminderArgs], ) error
Work sends the milestone overdue reminder email for a single milestone × recipient.
type MilestoneOverdueScannerArgs ¶
type MilestoneOverdueScannerArgs struct{}
MilestoneOverdueScannerArgs is the args type for the weekly incomplete milestone scanner job (cron: 0 0 10 * * 1, i.e. Monday 10:00 UTC). The scanner queries for milestones in "planned" status whose due_date has passed and whose parent POAM item is not completed, then enqueues a MilestoneOverdueReminderArgs job per milestone per recipient.
func (MilestoneOverdueScannerArgs) Kind ¶
func (MilestoneOverdueScannerArgs) Kind() string
func (MilestoneOverdueScannerArgs) Timeout ¶
func (MilestoneOverdueScannerArgs) Timeout() time.Duration
type MilestoneOverdueScannerWorker ¶
type MilestoneOverdueScannerWorker struct {
// contains filtered or unexported fields
}
MilestoneOverdueScannerWorker scans weekly for POAM item milestones that are in "open" or "in-progress" status, whose planned_completion_date has passed, and whose parent POAM item is not yet completed. For each such milestone it enqueues a MilestoneOverdueReminderArgs job per milestone per recipient.
func NewMilestoneOverdueScannerWorker ¶
func NewMilestoneOverdueScannerWorker( db *gorm.DB, client workflow.RiverClient, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger, ) *MilestoneOverdueScannerWorker
NewMilestoneOverdueScannerWorker constructs a MilestoneOverdueScannerWorker.
func (*MilestoneOverdueScannerWorker) Work ¶
func (w *MilestoneOverdueScannerWorker) Work( ctx context.Context, job *river.Job[MilestoneOverdueScannerArgs], ) error
Work queries for overdue milestones on non-completed POAM items and enqueues per-milestone per-recipient reminder jobs.
type NotificationSubscription ¶
NotificationSubscription holds worker-facing notification subscription data.
type NotificationUser ¶ added in v0.12.0
type NotificationUser struct {
ID string
Email string
FirstName string
LastName string
SlackUserID string
NotificationSubscriptions []NotificationSubscription
RiskNotificationsSubscribed bool
}
NotificationUser holds the user fields needed for sending notification emails
func (NotificationUser) FullName ¶ added in v0.12.0
func (u NotificationUser) FullName() string
func (NotificationUser) NotificationChannels ¶
func (u NotificationUser) NotificationChannels(notificationType string) []string
type PoamDeadlineReminderArgs ¶
type PoamDeadlineReminderArgs struct {
PoamItemID uuid.UUID `json:"poam_item_id"`
RecipientUserID uuid.UUID `json:"recipient_user_id"`
PoamTitle string `json:"poam_title"`
SspID uuid.UUID `json:"ssp_id"`
SspDisplayName string `json:"ssp_display_name"`
CurrentStatus string `json:"current_status"`
Deadline string `json:"deadline"` // RFC3339
MilestoneCount int `json:"milestone_count"`
PoamURL string `json:"poam_url"`
ReminderWindowBucket string `json:"reminder_window_bucket"` // e.g. "2026-03-31"
}
PoamDeadlineReminderArgs carries the data needed to send a single POAM deadline approaching reminder email to one recipient. Idempotency key: PoamItemID + Deadline + ReminderWindowBucket (ByArgs + ByPeriod 24h).
func (PoamDeadlineReminderArgs) Kind ¶
func (PoamDeadlineReminderArgs) Kind() string
func (PoamDeadlineReminderArgs) Timeout ¶
func (PoamDeadlineReminderArgs) Timeout() time.Duration
type PoamDeadlineReminderScannerArgs ¶
type PoamDeadlineReminderScannerArgs struct{}
PoamDeadlineReminderScannerArgs is the args type for the daily POAM deadline reminder scanner job (cron: 0 0 8 * * *, i.e. 08:00 UTC daily). The scanner queries for open/in-progress POAM items whose deadline falls within the configured reminder window and enqueues a PoamDeadlineReminderArgs job per item per recipient.
func (PoamDeadlineReminderScannerArgs) Kind ¶
func (PoamDeadlineReminderScannerArgs) Kind() string
func (PoamDeadlineReminderScannerArgs) Timeout ¶
func (PoamDeadlineReminderScannerArgs) Timeout() time.Duration
type PoamDeadlineReminderScannerWorker ¶
type PoamDeadlineReminderScannerWorker struct {
// contains filtered or unexported fields
}
PoamDeadlineReminderScannerWorker scans daily for POAM items whose deadline is approaching within the configured window and enqueues per-item per-recipient reminder jobs. Mirrors the Risk Review Deadline Reminder scanner pattern.
func NewPoamDeadlineReminderScannerWorker ¶
func NewPoamDeadlineReminderScannerWorker( db *gorm.DB, client workflow.RiverClient, userRepo UserRepository, webBaseURL string, reminderWindow time.Duration, logger *zap.SugaredLogger, ) *PoamDeadlineReminderScannerWorker
NewPoamDeadlineReminderScannerWorker constructs a PoamDeadlineReminderScannerWorker. reminderWindow is the look-ahead horizon; items with deadline within this duration of now will receive a reminder. Pass a positive value (e.g. 30 * 24 * time.Hour).
func (*PoamDeadlineReminderScannerWorker) Work ¶
func (w *PoamDeadlineReminderScannerWorker) Work( ctx context.Context, job *river.Job[PoamDeadlineReminderScannerArgs], ) error
Work queries for POAM items with deadlines within the configured reminder window and enqueues PoamDeadlineReminderArgs jobs for each item × recipient pair.
type PoamDeadlineReminderWorker ¶
type PoamDeadlineReminderWorker struct {
// contains filtered or unexported fields
}
PoamDeadlineReminderWorker sends a single POAM deadline approaching reminder email to one recipient.
func NewPoamDeadlineReminderWorker ¶
func NewPoamDeadlineReminderWorker( emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger, ) *PoamDeadlineReminderWorker
NewPoamDeadlineReminderWorker constructs a PoamDeadlineReminderWorker.
func (*PoamDeadlineReminderWorker) Work ¶
func (w *PoamDeadlineReminderWorker) Work( ctx context.Context, job *river.Job[PoamDeadlineReminderArgs], ) error
Work sends the POAM deadline reminder email for a single item × recipient.
type PoamDigestEmailItem ¶
type PoamDigestEmailItem struct {
PoamItemID uuid.UUID // used for test assertions; not rendered in template
Title string
SSPName string
Status string
Deadline string // formatted date or "—"
POCName string
MilestoneCount int
PoamURL string
}
PoamDigestEmailItem represents a single POAM item row in the digest email. PoamItemID is carried for test assertions; it is not rendered in the template.
type PoamMilestoneDigestEmailItem ¶
type PoamMilestoneDigestEmailItem struct {
MilestoneTitle string
PoamTitle string
SSPName string
DueDate string
PoamURL string
}
PoamMilestoneDigestEmailItem represents a single milestone row in the digest.
type PoamOpenDigestArgs ¶
type PoamOpenDigestArgs struct {
RecipientUserID uuid.UUID `json:"recipient_user_id"`
WindowStart string `json:"window_start"` // RFC3339
WindowEnd string `json:"window_end"` // RFC3339
WindowKind string `json:"window_kind"` // "daily" | "weekly"
}
PoamOpenDigestArgs carries the data needed to build and send the grouped POAM digest email for a single recipient. Idempotency key: RecipientUserID + WindowStart + WindowEnd (ByArgs + ByPeriod).
func (PoamOpenDigestArgs) Kind ¶
func (PoamOpenDigestArgs) Kind() string
func (PoamOpenDigestArgs) Timeout ¶
func (PoamOpenDigestArgs) Timeout() time.Duration
type PoamOpenDigestSchedulerArgs ¶
type PoamOpenDigestSchedulerArgs struct{}
PoamOpenDigestSchedulerArgs is the args type for the periodic POAM digest scheduler job. It has no payload — the scheduler resolves recipients from DB.
func (PoamOpenDigestSchedulerArgs) Kind ¶
func (PoamOpenDigestSchedulerArgs) Kind() string
func (PoamOpenDigestSchedulerArgs) Timeout ¶
func (PoamOpenDigestSchedulerArgs) Timeout() time.Duration
type PoamOpenDigestSchedulerWorker ¶
type PoamOpenDigestSchedulerWorker struct {
// contains filtered or unexported fields
}
PoamOpenDigestSchedulerWorker is the periodic River job that resolves all POAM digest recipients and enqueues one PoamOpenDigestArgs job per recipient.
func NewPoamOpenDigestSchedulerWorker ¶
func NewPoamOpenDigestSchedulerWorker( db *gorm.DB, client workflow.RiverClient, windowKind string, logger *zap.SugaredLogger, ) *PoamOpenDigestSchedulerWorker
func (*PoamOpenDigestSchedulerWorker) Work ¶
func (w *PoamOpenDigestSchedulerWorker) Work(ctx context.Context, _ *river.Job[PoamOpenDigestSchedulerArgs]) error
type PoamOpenDigestWorker ¶
type PoamOpenDigestWorker struct {
// contains filtered or unexported fields
}
PoamOpenDigestWorker builds and sends the grouped POAM digest email for a single recipient.
func NewPoamOpenDigestWorker ¶
func NewPoamOpenDigestWorker( db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger, ) *PoamOpenDigestWorker
func (*PoamOpenDigestWorker) Work ¶
func (w *PoamOpenDigestWorker) Work(ctx context.Context, job *river.Job[PoamOpenDigestArgs]) error
type PoamOverdueNotificationArgs ¶
type PoamOverdueNotificationArgs struct {
PoamItemID uuid.UUID `json:"poam_item_id"`
RecipientUserID uuid.UUID `json:"recipient_user_id"`
PoamTitle string `json:"poam_title"`
SspID uuid.UUID `json:"ssp_id"`
SspDisplayName string `json:"ssp_display_name"`
Deadline string `json:"deadline"` // RFC3339
PoamURL string `json:"poam_url"`
OverdueWindow string `json:"overdue_window"` // e.g. "2026-03-31"
}
PoamOverdueNotificationArgs carries the data needed to send a single POAM overdue notification email to one recipient. Idempotency key: PoamItemID + Deadline + OverdueWindow (ByArgs + ByPeriod 24h).
func (PoamOverdueNotificationArgs) Kind ¶
func (PoamOverdueNotificationArgs) Kind() string
func (PoamOverdueNotificationArgs) Timeout ¶
func (PoamOverdueNotificationArgs) Timeout() time.Duration
type PoamOverdueNotificationWorker ¶
type PoamOverdueNotificationWorker struct {
// contains filtered or unexported fields
}
PoamOverdueNotificationWorker sends a single POAM overdue notification email to one recipient.
func NewPoamOverdueNotificationWorker ¶
func NewPoamOverdueNotificationWorker( emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger, ) *PoamOverdueNotificationWorker
NewPoamOverdueNotificationWorker constructs a PoamOverdueNotificationWorker.
func (*PoamOverdueNotificationWorker) Work ¶
func (w *PoamOverdueNotificationWorker) Work( ctx context.Context, job *river.Job[PoamOverdueNotificationArgs], ) error
Work sends the POAM overdue notification email for a single item × recipient.
type PoamOverdueTransitionScannerArgs ¶
type PoamOverdueTransitionScannerArgs struct{}
PoamOverdueTransitionScannerArgs is the args type for the daily POAM overdue transition scanner job (cron: 0 0 9 * * *, i.e. 09:00 UTC daily). The scanner queries for open/in-progress POAM items whose deadline has already passed, transitions their status to "overdue" in the DB, and enqueues a PoamOverdueNotificationArgs job per item per recipient.
func (PoamOverdueTransitionScannerArgs) Kind ¶
func (PoamOverdueTransitionScannerArgs) Kind() string
func (PoamOverdueTransitionScannerArgs) Timeout ¶
func (PoamOverdueTransitionScannerArgs) Timeout() time.Duration
type PoamOverdueTransitionScannerWorker ¶
type PoamOverdueTransitionScannerWorker struct {
// contains filtered or unexported fields
}
PoamOverdueTransitionScannerWorker scans daily for POAM items whose deadline has passed and whose status is still open or in-progress. For each such item it:
- Transitions the status to "overdue" in the database (with updated_at).
- Enqueues a PoamOverdueNotificationArgs job per item per recipient.
"overdue" is a terminal state that can only be reversed by a manual PUT /poam-items/:id status update.
func NewPoamOverdueTransitionScannerWorker ¶
func NewPoamOverdueTransitionScannerWorker( db *gorm.DB, client workflow.RiverClient, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger, ) *PoamOverdueTransitionScannerWorker
NewPoamOverdueTransitionScannerWorker constructs a PoamOverdueTransitionScannerWorker.
func (*PoamOverdueTransitionScannerWorker) Work ¶
func (w *PoamOverdueTransitionScannerWorker) Work( ctx context.Context, job *river.Job[PoamOverdueTransitionScannerArgs], ) error
Work queries for overdue POAM items, transitions their status, and enqueues notification jobs.
type RiskDigestEmailItem ¶ added in v0.14.0
type RiskEvidenceReconciliationScannerArgs ¶ added in v0.13.0
type RiskEvidenceReconciliationScannerArgs struct{}
func (RiskEvidenceReconciliationScannerArgs) Kind ¶ added in v0.13.0
func (RiskEvidenceReconciliationScannerArgs) Kind() string
func (RiskEvidenceReconciliationScannerArgs) Timeout ¶ added in v0.13.0
func (RiskEvidenceReconciliationScannerArgs) Timeout() time.Duration
type RiskEvidenceReconciliationScannerWorker ¶ added in v0.13.0
type RiskEvidenceReconciliationScannerWorker struct {
// contains filtered or unexported fields
}
func NewRiskEvidenceReconciliationScannerWorker ¶ added in v0.13.0
func NewRiskEvidenceReconciliationScannerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *RiskEvidenceReconciliationScannerWorker
func (*RiskEvidenceReconciliationScannerWorker) Work ¶ added in v0.13.0
func (w *RiskEvidenceReconciliationScannerWorker) Work(ctx context.Context, _ *river.Job[RiskEvidenceReconciliationScannerArgs]) error
type RiskEvidenceWorker ¶ added in v0.13.0
type RiskEvidenceWorker struct {
// contains filtered or unexported fields
}
RiskEvidenceWorker handles processing evidence and creating risks
func NewRiskEvidenceWorker ¶ added in v0.13.0
func NewRiskEvidenceWorker(db *gorm.DB, logger *zap.SugaredLogger) *RiskEvidenceWorker
NewRiskEvidenceWorker creates a new RiskEvidenceWorker
func (*RiskEvidenceWorker) Work ¶ added in v0.13.0
func (w *RiskEvidenceWorker) Work(ctx context.Context, job *river.Job[RiskProcessEvidenceArgs]) error
Work is the River work function for processing evidence and creating risks
type RiskOpenDigestArgs ¶ added in v0.14.0
type RiskOpenDigestArgs struct {
RecipientUserID uuid.UUID `json:"recipient_user_id"`
WindowStart string `json:"window_start"`
WindowEnd string `json:"window_end"`
WindowKind string `json:"window_kind"`
}
func (RiskOpenDigestArgs) Kind ¶ added in v0.14.0
func (RiskOpenDigestArgs) Kind() string
func (RiskOpenDigestArgs) Timeout ¶ added in v0.14.0
func (RiskOpenDigestArgs) Timeout() time.Duration
type RiskOpenDigestSchedulerArgs ¶ added in v0.14.0
type RiskOpenDigestSchedulerArgs struct{}
func (RiskOpenDigestSchedulerArgs) Kind ¶ added in v0.14.0
func (RiskOpenDigestSchedulerArgs) Kind() string
func (RiskOpenDigestSchedulerArgs) Timeout ¶ added in v0.14.0
func (RiskOpenDigestSchedulerArgs) Timeout() time.Duration
type RiskOpenDigestSchedulerWorker ¶ added in v0.14.0
type RiskOpenDigestSchedulerWorker struct {
// contains filtered or unexported fields
}
func NewRiskOpenDigestSchedulerWorker ¶ added in v0.14.0
func NewRiskOpenDigestSchedulerWorker(db *gorm.DB, client workflow.RiverClient, windowKind string, logger *zap.SugaredLogger) *RiskOpenDigestSchedulerWorker
func (*RiskOpenDigestSchedulerWorker) Work ¶ added in v0.14.0
func (w *RiskOpenDigestSchedulerWorker) Work(ctx context.Context, _ *river.Job[RiskOpenDigestSchedulerArgs]) error
type RiskOpenDigestWorker ¶ added in v0.14.0
type RiskOpenDigestWorker struct {
// contains filtered or unexported fields
}
func NewRiskOpenDigestWorker ¶ added in v0.14.0
func NewRiskOpenDigestWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *RiskOpenDigestWorker
func (*RiskOpenDigestWorker) Work ¶ added in v0.14.0
func (w *RiskOpenDigestWorker) Work(ctx context.Context, job *river.Job[RiskOpenDigestArgs]) error
type RiskProcessEvidenceArgs ¶ added in v0.13.0
type RiskProcessEvidenceArgs struct {
EvidenceID uuid.UUID `json:"evidence_id"`
EvidenceEnd string `json:"evidence_end"`
Status string `json:"status"`
}
RiskProcessEvidenceArgs represents the arguments for processing evidence and creating risks. EvidenceEnd and Status are included alongside EvidenceID intentionally: River uses ByArgs uniqueness to deduplicate jobs within the 5-minute window. Including the end time and status ensures that two different evidence records for the same stream (different end times or states) each get their own independent deduplication window, preventing the second record from being silently dropped.
func (RiskProcessEvidenceArgs) Kind ¶ added in v0.13.0
func (RiskProcessEvidenceArgs) Kind() string
Kind returns the job kind for River
func (RiskProcessEvidenceArgs) Timeout ¶ added in v0.13.0
func (RiskProcessEvidenceArgs) Timeout() time.Duration
Timeout returns the timeout for risk process evidence jobs
type RiskReconcileDuplicatesArgs ¶ added in v0.13.0
type RiskReconcileDuplicatesArgs struct {
DedupeKey string `json:"dedupe_key"`
}
func (RiskReconcileDuplicatesArgs) Kind ¶ added in v0.13.0
func (RiskReconcileDuplicatesArgs) Kind() string
func (RiskReconcileDuplicatesArgs) Timeout ¶ added in v0.13.0
func (RiskReconcileDuplicatesArgs) Timeout() time.Duration
type RiskReconcileDuplicatesWorker ¶ added in v0.13.0
type RiskReconcileDuplicatesWorker struct {
// contains filtered or unexported fields
}
func NewRiskReconcileDuplicatesWorker ¶ added in v0.13.0
func NewRiskReconcileDuplicatesWorker(db *gorm.DB, logger *zap.SugaredLogger) *RiskReconcileDuplicatesWorker
func (*RiskReconcileDuplicatesWorker) Work ¶ added in v0.13.0
func (w *RiskReconcileDuplicatesWorker) Work(ctx context.Context, job *river.Job[RiskReconcileDuplicatesArgs]) error
type RiskReviewDeadlineReminderScannerArgs ¶ added in v0.13.0
type RiskReviewDeadlineReminderScannerArgs struct{}
func (RiskReviewDeadlineReminderScannerArgs) Kind ¶ added in v0.13.0
func (RiskReviewDeadlineReminderScannerArgs) Kind() string
func (RiskReviewDeadlineReminderScannerArgs) Timeout ¶ added in v0.13.0
func (RiskReviewDeadlineReminderScannerArgs) Timeout() time.Duration
type RiskReviewDeadlineReminderScannerWorker ¶ added in v0.13.0
type RiskReviewDeadlineReminderScannerWorker struct {
// contains filtered or unexported fields
}
func NewRiskReviewDeadlineReminderScannerWorker ¶ added in v0.13.0
func NewRiskReviewDeadlineReminderScannerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *RiskReviewDeadlineReminderScannerWorker
func (*RiskReviewDeadlineReminderScannerWorker) Work ¶ added in v0.13.0
func (w *RiskReviewDeadlineReminderScannerWorker) Work(ctx context.Context, _ *river.Job[RiskReviewDeadlineReminderScannerArgs]) error
type RiskReviewDueReminderArgs ¶ added in v0.13.0
type RiskReviewDueReminderArgs struct {
RiskID uuid.UUID `json:"risk_id"`
OwnerUserID uuid.UUID `json:"owner_user_id"`
ReviewDeadline string `json:"review_deadline"`
ReminderWindow string `json:"reminder_window"`
}
func (RiskReviewDueReminderArgs) Kind ¶ added in v0.13.0
func (RiskReviewDueReminderArgs) Kind() string
func (RiskReviewDueReminderArgs) Timeout ¶ added in v0.13.0
func (RiskReviewDueReminderArgs) Timeout() time.Duration
type RiskReviewDueReminderWorker ¶ added in v0.13.0
type RiskReviewDueReminderWorker struct {
// contains filtered or unexported fields
}
func NewRiskReviewDueReminderWorker ¶ added in v0.13.0
func NewRiskReviewDueReminderWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *RiskReviewDueReminderWorker
func (*RiskReviewDueReminderWorker) Work ¶ added in v0.13.0
func (w *RiskReviewDueReminderWorker) Work(ctx context.Context, job *river.Job[RiskReviewDueReminderArgs]) error
type RiskReviewOverdueEscalationArgs ¶ added in v0.13.0
type RiskReviewOverdueEscalationArgs struct {
RiskID uuid.UUID `json:"risk_id"`
OwnerUserID uuid.UUID `json:"owner_user_id"`
ReviewDeadline string `json:"review_deadline"`
OverdueWindow string `json:"overdue_window"`
}
func (RiskReviewOverdueEscalationArgs) Kind ¶ added in v0.13.0
func (RiskReviewOverdueEscalationArgs) Kind() string
func (RiskReviewOverdueEscalationArgs) Timeout ¶ added in v0.13.0
func (RiskReviewOverdueEscalationArgs) Timeout() time.Duration
type RiskReviewOverdueEscalationScannerArgs ¶ added in v0.13.0
type RiskReviewOverdueEscalationScannerArgs struct{}
func (RiskReviewOverdueEscalationScannerArgs) Kind ¶ added in v0.13.0
func (RiskReviewOverdueEscalationScannerArgs) Kind() string
func (RiskReviewOverdueEscalationScannerArgs) Timeout ¶ added in v0.13.0
func (RiskReviewOverdueEscalationScannerArgs) Timeout() time.Duration
type RiskReviewOverdueEscalationScannerWorker ¶ added in v0.13.0
type RiskReviewOverdueEscalationScannerWorker struct {
// contains filtered or unexported fields
}
func NewRiskReviewOverdueEscalationScannerWorker ¶ added in v0.13.0
func NewRiskReviewOverdueEscalationScannerWorker( db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger, autoReopenEnabled bool, autoReopenThresholdDays int, ) *RiskReviewOverdueEscalationScannerWorker
func (*RiskReviewOverdueEscalationScannerWorker) Work ¶ added in v0.13.0
func (w *RiskReviewOverdueEscalationScannerWorker) Work(ctx context.Context, _ *river.Job[RiskReviewOverdueEscalationScannerArgs]) error
type RiskReviewOverdueEscalationWorker ¶ added in v0.13.0
type RiskReviewOverdueEscalationWorker struct {
// contains filtered or unexported fields
}
func NewRiskReviewOverdueEscalationWorker ¶ added in v0.13.0
func NewRiskReviewOverdueEscalationWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *RiskReviewOverdueEscalationWorker
func (*RiskReviewOverdueEscalationWorker) Work ¶ added in v0.13.0
func (w *RiskReviewOverdueEscalationWorker) Work(ctx context.Context, job *river.Job[RiskReviewOverdueEscalationArgs]) error
type RiskReviewOverdueReopenArgs ¶ added in v0.13.0
type RiskReviewOverdueReopenArgs struct {
RiskID uuid.UUID `json:"risk_id"`
ReviewDeadline string `json:"review_deadline"`
ThresholdDays int `json:"threshold_days"`
}
func (RiskReviewOverdueReopenArgs) Kind ¶ added in v0.13.0
func (RiskReviewOverdueReopenArgs) Kind() string
func (RiskReviewOverdueReopenArgs) Timeout ¶ added in v0.13.0
func (RiskReviewOverdueReopenArgs) Timeout() time.Duration
type RiskReviewOverdueReopenWorker ¶ added in v0.13.0
type RiskReviewOverdueReopenWorker struct {
// contains filtered or unexported fields
}
func NewRiskReviewOverdueReopenWorker ¶ added in v0.13.0
func NewRiskReviewOverdueReopenWorker(db *gorm.DB, logger *zap.SugaredLogger) *RiskReviewOverdueReopenWorker
func (*RiskReviewOverdueReopenWorker) Work ¶ added in v0.13.0
func (w *RiskReviewOverdueReopenWorker) Work(ctx context.Context, job *river.Job[RiskReviewOverdueReopenArgs]) error
type RiskStaleOpenReminderArgs ¶ added in v0.13.0
type RiskStaleOpenReminderArgs struct {
RiskID uuid.UUID `json:"risk_id"`
OwnerUserID uuid.UUID `json:"owner_user_id"`
LastSeenAt string `json:"last_seen_at"`
StaleBucketDate string `json:"stale_bucket_date"`
}
func (RiskStaleOpenReminderArgs) Kind ¶ added in v0.13.0
func (RiskStaleOpenReminderArgs) Kind() string
func (RiskStaleOpenReminderArgs) Timeout ¶ added in v0.13.0
func (RiskStaleOpenReminderArgs) Timeout() time.Duration
type RiskStaleOpenReminderWorker ¶ added in v0.13.0
type RiskStaleOpenReminderWorker struct {
// contains filtered or unexported fields
}
func NewRiskStaleOpenReminderWorker ¶ added in v0.13.0
func NewRiskStaleOpenReminderWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *RiskStaleOpenReminderWorker
func (*RiskStaleOpenReminderWorker) Work ¶ added in v0.13.0
func (w *RiskStaleOpenReminderWorker) Work(ctx context.Context, job *river.Job[RiskStaleOpenReminderArgs]) error
type RiskStaleRiskScannerArgs ¶ added in v0.13.0
type RiskStaleRiskScannerArgs struct{}
func (RiskStaleRiskScannerArgs) Kind ¶ added in v0.13.0
func (RiskStaleRiskScannerArgs) Kind() string
func (RiskStaleRiskScannerArgs) Timeout ¶ added in v0.13.0
func (RiskStaleRiskScannerArgs) Timeout() time.Duration
type RiskStaleRiskScannerWorker ¶ added in v0.13.0
type RiskStaleRiskScannerWorker struct {
// contains filtered or unexported fields
}
func NewRiskStaleRiskScannerWorker ¶ added in v0.13.0
func NewRiskStaleRiskScannerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *RiskStaleRiskScannerWorker
func (*RiskStaleRiskScannerWorker) Work ¶ added in v0.13.0
func (w *RiskStaleRiskScannerWorker) Work(ctx context.Context, _ *river.Job[RiskStaleRiskScannerArgs]) error
type SendEmailArgs ¶
type SendEmailArgs struct {
// Email message fields
From string `json:"from"`
To []string `json:"to"`
Cc []string `json:"cc,omitempty"`
Bcc []string `json:"bcc,omitempty"`
Subject string `json:"subject"`
HTMLBody string `json:"html_body,omitempty"`
TextBody string `json:"text_body,omitempty"`
Attachments []types.Attachment `json:"attachments,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
}
SendEmailArgs represents the arguments for sending an email
func (SendEmailArgs) Timeout ¶
func (SendEmailArgs) Timeout() time.Duration
Timeout returns the timeout for email jobs
type SendEmailFromArgs ¶
type SendEmailFromArgs struct {
// Provider to use for sending
Provider string `json:"provider"`
// Email message fields
From string `json:"from"`
To []string `json:"to"`
Cc []string `json:"cc,omitempty"`
Bcc []string `json:"bcc,omitempty"`
Subject string `json:"subject"`
HTMLBody string `json:"html_body,omitempty"`
TextBody string `json:"text_body,omitempty"`
Attachments []types.Attachment `json:"attachments,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
}
SendEmailFromArgs represents the arguments for sending an email from a specific provider
func (SendEmailFromArgs) Kind ¶
func (SendEmailFromArgs) Kind() string
Kind returns the job kind for River
func (SendEmailFromArgs) Timeout ¶
func (SendEmailFromArgs) Timeout() time.Duration
Timeout returns the timeout for email jobs
type SendEmailFromWorker ¶
type SendEmailFromWorker struct {
// contains filtered or unexported fields
}
SendEmailFromWorker handles sending email from provider jobs
func NewSendEmailFromWorker ¶
func NewSendEmailFromWorker(emailService EmailService, logger *zap.SugaredLogger) *SendEmailFromWorker
NewSendEmailFromWorker creates a new SendEmailFromWorker
func (*SendEmailFromWorker) Work ¶
func (w *SendEmailFromWorker) Work(ctx context.Context, job *river.Job[SendEmailFromArgs]) error
Work is the River work function for sending emails from a provider
type SendEmailWorker ¶
type SendEmailWorker struct {
// contains filtered or unexported fields
}
SendEmailWorker handles sending email jobs
func NewSendEmailWorker ¶
func NewSendEmailWorker(emailService EmailService, logger *zap.SugaredLogger) *SendEmailWorker
NewSendEmailWorker creates a new SendEmailWorker
func (*SendEmailWorker) Work ¶
func (w *SendEmailWorker) Work(ctx context.Context, job *river.Job[SendEmailArgs]) error
Work is the River work function for sending emails
type SendGlobalDigestArgs ¶
type SendGlobalDigestArgs struct {
}
SendGlobalDigestArgs represents the arguments for sending global digest
func (SendGlobalDigestArgs) Kind ¶
func (SendGlobalDigestArgs) Kind() string
Kind returns the job kind for River
func (SendGlobalDigestArgs) Timeout ¶
func (SendGlobalDigestArgs) Timeout() time.Duration
Timeout returns the timeout for digest jobs (longer due to multiple emails)
type SendGlobalDigestDeliveryArgs ¶
type SendGlobalDigestDeliveryArgs struct {
Channel string `json:"channel"`
UserID string `json:"user_id"`
UserName string `json:"user_name,omitempty"`
Email string `json:"email,omitempty"`
SlackChannel string `json:"slack_channel,omitempty"`
Summary EvidenceDigestSummary `json:"summary"`
}
SendGlobalDigestDeliveryArgs represents one recipient/channel evidence digest delivery.
func (SendGlobalDigestDeliveryArgs) Kind ¶
func (SendGlobalDigestDeliveryArgs) Kind() string
Kind returns the job kind for River
func (SendGlobalDigestDeliveryArgs) Timeout ¶
func (SendGlobalDigestDeliveryArgs) Timeout() time.Duration
Timeout returns the timeout for a single global digest delivery.
type SendGlobalDigestDeliveryWorker ¶
type SendGlobalDigestDeliveryWorker struct {
// contains filtered or unexported fields
}
SendGlobalDigestDeliveryWorker handles a single global digest delivery job.
func NewSendGlobalDigestDeliveryWorker ¶
func NewSendGlobalDigestDeliveryWorker(digestService DigestService, logger *zap.SugaredLogger) *SendGlobalDigestDeliveryWorker
NewSendGlobalDigestDeliveryWorker creates a new SendGlobalDigestDeliveryWorker.
func (*SendGlobalDigestDeliveryWorker) Work ¶
func (w *SendGlobalDigestDeliveryWorker) Work(ctx context.Context, job *river.Job[SendGlobalDigestDeliveryArgs]) error
Work is the River work function for sending a single global digest delivery.
type SendGlobalDigestWorker ¶
type SendGlobalDigestWorker struct {
// contains filtered or unexported fields
}
SendGlobalDigestWorker handles scheduling global digest deliveries.
func NewSendGlobalDigestWorker ¶
func NewSendGlobalDigestWorker(digestService DigestService, logger *zap.SugaredLogger) *SendGlobalDigestWorker
NewSendGlobalDigestWorker creates a new SendGlobalDigestWorker
func (*SendGlobalDigestWorker) Work ¶
func (w *SendGlobalDigestWorker) Work(ctx context.Context, job *river.Job[SendGlobalDigestArgs]) error
Work is the River work function for scheduling global digest deliveries.
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages the River client and workers
func NewServiceWithDigest ¶
func NewServiceWithDigest( cfg *config.WorkerConfig, db *gorm.DB, emailSvc *email.Service, digestSvc DigestService, digestCfg *config.Config, logger *zap.SugaredLogger, ) (*Service, error)
NewServiceWithDigest creates a new worker service with digest support
func (*Service) EnqueueRiskProcessEvidence ¶ added in v0.13.0
func (s *Service) EnqueueRiskProcessEvidence(ctx context.Context, evidenceID uuid.UUID, evidenceEnd, status string) error
EnqueueRiskProcessEvidence enqueues a risk process evidence job
func (*Service) EnqueueSendEmail ¶
func (s *Service) EnqueueSendEmail(ctx context.Context, args *SendEmailArgs) error
EnqueueSendEmail enqueues a send email job
func (*Service) EnqueueSendEmailFrom ¶
func (s *Service) EnqueueSendEmailFrom(ctx context.Context, args *SendEmailFromArgs) error
EnqueueSendEmailFrom enqueues a send email from provider job
func (*Service) EnqueueSendGlobalDigestDeliveries ¶
func (s *Service) EnqueueSendGlobalDigestDeliveries(ctx context.Context, args []SendGlobalDigestDeliveryArgs) error
EnqueueSendGlobalDigestDeliveries enqueues per-recipient global digest delivery jobs.
func (*Service) EnqueueWorkflowExecutionFailed ¶ added in v0.12.0
func (s *Service) EnqueueWorkflowExecutionFailed(ctx context.Context, execution *workflows.WorkflowExecution) error
EnqueueWorkflowExecutionFailed enqueues a workflow-execution-failed notification email job. Implements the workflow.NotificationEnqueuer interface.
func (*Service) EnqueueWorkflowTaskAssigned ¶ added in v0.12.0
func (s *Service) EnqueueWorkflowTaskAssigned(ctx context.Context, stepExecution *workflows.StepExecution) error
EnqueueWorkflowTaskAssigned enqueues a workflow-task-assigned notification email job. Implements the workflow.NotificationEnqueuer interface.
func (*Service) GetDAGExecutor ¶ added in v0.12.0
func (s *Service) GetDAGExecutor() *workflow.DAGExecutor
GetDAGExecutor returns the shared DAG executor used by workflow River workers. Returns nil when the worker service is disabled.
type SlackService ¶
type SlackService interface {
SendMessage(ctx context.Context, channel string, message *slacktypes.Message) (*slacktypes.SendResult, error)
IsEnabled() bool
}
type UserRepository ¶ added in v0.12.0
type UserRepository interface {
FindUserByID(ctx context.Context, userID string) (NotificationUser, error)
}
UserRepository is the minimal DB interface needed by notification workers
type WorkflowExecutionFailedArgs ¶ added in v0.12.0
type WorkflowExecutionFailedArgs struct {
WorkflowExecutionID string `json:"workflow_execution_id"`
}
WorkflowExecutionFailedArgs represents the arguments for a workflow-execution-failed notification email
func (WorkflowExecutionFailedArgs) Kind ¶ added in v0.12.0
func (WorkflowExecutionFailedArgs) Kind() string
Kind returns the job kind for River
func (WorkflowExecutionFailedArgs) Timeout ¶ added in v0.12.0
func (WorkflowExecutionFailedArgs) Timeout() time.Duration
Timeout returns the timeout for workflow execution failed jobs
type WorkflowExecutionFailedWorker ¶ added in v0.12.0
type WorkflowExecutionFailedWorker struct {
// contains filtered or unexported fields
}
WorkflowExecutionFailedWorker sends a failure notification email to the workflow instance creator
func NewWorkflowExecutionFailedWorker ¶ added in v0.12.0
func NewWorkflowExecutionFailedWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *WorkflowExecutionFailedWorker
NewWorkflowExecutionFailedWorker creates a new WorkflowExecutionFailedWorker
func (*WorkflowExecutionFailedWorker) Work ¶ added in v0.12.0
func (w *WorkflowExecutionFailedWorker) Work(ctx context.Context, job *river.Job[WorkflowExecutionFailedArgs]) error
Work sends a failure notification email for the workflow execution identified by job.Args.WorkflowExecutionID
type WorkflowTaskAssignedArgs ¶ added in v0.12.0
type WorkflowTaskAssignedArgs struct {
AssignedToType string `json:"assigned_to_type"`
Channel string `json:"channel,omitempty"`
UserID string `json:"user_id"`
StepExecutionID string `json:"step_execution_id"`
StepTitle string `json:"step_title"`
WorkflowTitle string `json:"workflow_title"`
WorkflowInstanceTitle string `json:"workflow_instance_title"`
StepURL string `json:"step_url"`
DueDate *time.Time `json:"due_date,omitempty"`
}
WorkflowTaskAssignedArgs represents the arguments for a new-task-assigned notification job.
func (WorkflowTaskAssignedArgs) Kind ¶ added in v0.12.0
func (WorkflowTaskAssignedArgs) Kind() string
Kind returns the job kind for River
func (WorkflowTaskAssignedArgs) Timeout ¶ added in v0.12.0
func (WorkflowTaskAssignedArgs) Timeout() time.Duration
Timeout returns the timeout for workflow task assigned jobs
type WorkflowTaskAssignedWorker ¶ added in v0.12.0
type WorkflowTaskAssignedWorker struct {
// contains filtered or unexported fields
}
WorkflowTaskAssignedWorker handles new-task-assigned notification email jobs
func NewWorkflowTaskAssignedWorker ¶ added in v0.12.0
func NewWorkflowTaskAssignedWorker(emailService EmailService, slackService SlackService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *WorkflowTaskAssignedWorker
NewWorkflowTaskAssignedWorker creates a new WorkflowTaskAssignedWorker
func (*WorkflowTaskAssignedWorker) Work ¶ added in v0.12.0
func (w *WorkflowTaskAssignedWorker) Work(ctx context.Context, job *river.Job[WorkflowTaskAssignedArgs]) error
Work is the River work function for sending new-task-assigned notification emails
type WorkflowTaskDigestArgs ¶ added in v0.12.0
type WorkflowTaskDigestArgs struct {
Channel string `json:"channel,omitempty"`
UserID string `json:"user_id"`
}
WorkflowTaskDigestArgs represents the arguments for a per-user task digest notification job.
func (WorkflowTaskDigestArgs) Kind ¶ added in v0.12.0
func (WorkflowTaskDigestArgs) Kind() string
Kind returns the job kind for River
func (WorkflowTaskDigestArgs) Timeout ¶ added in v0.12.0
func (WorkflowTaskDigestArgs) Timeout() time.Duration
Timeout returns the timeout for workflow task digest jobs
type WorkflowTaskDigestCheckerArgs ¶ added in v0.12.0
type WorkflowTaskDigestCheckerArgs struct{}
WorkflowTaskDigestCheckerArgs represents the arguments for the periodic digest checker job
func (WorkflowTaskDigestCheckerArgs) Kind ¶ added in v0.12.0
func (WorkflowTaskDigestCheckerArgs) Kind() string
Kind returns the job kind for River
func (WorkflowTaskDigestCheckerArgs) Timeout ¶ added in v0.12.0
func (WorkflowTaskDigestCheckerArgs) Timeout() time.Duration
Timeout returns the timeout for the digest checker job
type WorkflowTaskDigestCheckerWorker ¶ added in v0.12.0
type WorkflowTaskDigestCheckerWorker struct {
// contains filtered or unexported fields
}
WorkflowTaskDigestCheckerWorker queries all subscribed users and enqueues per-user, per-channel digest jobs.
func NewWorkflowTaskDigestCheckerWorker ¶ added in v0.12.0
func NewWorkflowTaskDigestCheckerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *WorkflowTaskDigestCheckerWorker
NewWorkflowTaskDigestCheckerWorker creates a new WorkflowTaskDigestCheckerWorker
func (*WorkflowTaskDigestCheckerWorker) Work ¶ added in v0.12.0
func (w *WorkflowTaskDigestCheckerWorker) Work(ctx context.Context, job *river.Job[WorkflowTaskDigestCheckerArgs]) error
Work queries all users subscribed to task daily digest and enqueues a WorkflowTaskDigestArgs job for each channel.
type WorkflowTaskDigestWorker ¶ added in v0.12.0
type WorkflowTaskDigestWorker struct {
// contains filtered or unexported fields
}
WorkflowTaskDigestWorker sends a per-user digest of pending and overdue workflow tasks
func NewWorkflowTaskDigestWorker ¶ added in v0.12.0
func NewWorkflowTaskDigestWorker(db *gorm.DB, emailService EmailService, slackService SlackService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *WorkflowTaskDigestWorker
NewWorkflowTaskDigestWorker creates a new WorkflowTaskDigestWorker
func (*WorkflowTaskDigestWorker) Work ¶ added in v0.12.0
func (w *WorkflowTaskDigestWorker) Work(ctx context.Context, job *river.Job[WorkflowTaskDigestArgs]) error
Work sends digest notifications for the user identified by job.Args.UserID.
type WorkflowTaskDueSoonArgs ¶ added in v0.12.0
type WorkflowTaskDueSoonArgs struct {
Channel string `json:"channel,omitempty"`
UserID string `json:"user_id"`
StepExecutionID string `json:"step_execution_id"`
StepTitle string `json:"step_title"`
WorkflowTitle string `json:"workflow_title"`
WorkflowInstanceTitle string `json:"workflow_instance_title"`
StepURL string `json:"step_url"`
DueDate time.Time `json:"due_date"`
}
WorkflowTaskDueSoonArgs represents the arguments for a task-due-soon reminder notification job.
func (WorkflowTaskDueSoonArgs) Kind ¶ added in v0.12.0
func (WorkflowTaskDueSoonArgs) Kind() string
Kind returns the job kind for River
func (WorkflowTaskDueSoonArgs) Timeout ¶ added in v0.12.0
func (WorkflowTaskDueSoonArgs) Timeout() time.Duration
Timeout returns the timeout for workflow task due soon jobs
type WorkflowTaskDueSoonWorker ¶ added in v0.12.0
type WorkflowTaskDueSoonWorker struct {
// contains filtered or unexported fields
}
WorkflowTaskDueSoonWorker handles task-due-soon reminder notification jobs
func NewWorkflowTaskDueSoonWorker ¶ added in v0.12.0
func NewWorkflowTaskDueSoonWorker(emailService EmailService, slackService SlackService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *WorkflowTaskDueSoonWorker
NewWorkflowTaskDueSoonWorker creates a new WorkflowTaskDueSoonWorker
func (*WorkflowTaskDueSoonWorker) Work ¶ added in v0.12.0
func (w *WorkflowTaskDueSoonWorker) Work(ctx context.Context, job *river.Job[WorkflowTaskDueSoonArgs]) error
Work is the River work function for sending task-due-in-1-day reminder emails
Source Files
¶
- due_soon_checker.go
- helpers.go
- jobs.go
- poam_deadline_reminder_worker.go
- poam_digest_worker.go
- poam_helpers.go
- poam_job_types.go
- poam_milestone_overdue_worker.go
- poam_overdue_transition_worker.go
- poam_periodic_jobs.go
- risk_digest_worker.go
- risk_evidence_worker.go
- risk_job_types.go
- risk_workers.go
- service.go
- user_repository.go
- workflow_execution_failed_worker.go
- workflow_task_digest_checker.go
- workflow_task_digest_worker.go