Documentation
¶
Index ¶
- Constants
- func JobInsertOptionsForWorkflowNotification() *river.InsertOpts
- func JobInsertOptionsForWorkflowTaskAssignedNotification() *river.InsertOpts
- func JobInsertOptionsWithRetry(queue string, maxAttempts int) *river.InsertOpts
- func NewDigestPeriodicJob(cronSchedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewDueSoonCheckerPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewWorkflowSchedulerPeriodicJob(cronSchedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func NewWorkflowTaskDigestPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
- func Workers(emailService EmailService, digestService DigestService, ...) *river.Workers
- type DigestService
- type DigestTask
- type DueSoonCheckerArgs
- type DueSoonCheckerWorker
- type EmailService
- type GORMUserRepository
- type NotificationUser
- type SendEmailArgs
- type SendEmailFromArgs
- type SendEmailFromWorker
- type SendEmailWorker
- type SendGlobalDigestArgs
- type SendGlobalDigestWorker
- type Service
- func (s *Service) EnqueueSendEmail(ctx context.Context, args *SendEmailArgs) error
- func (s *Service) EnqueueSendEmailFrom(ctx context.Context, args *SendEmailFromArgs) error
- func (s *Service) EnqueueWorkflowExecutionFailed(ctx context.Context, execution *workflows.WorkflowExecution) error
- func (s *Service) EnqueueWorkflowTaskAssigned(ctx context.Context, stepExecution *workflows.StepExecution) error
- func (s *Service) GetClient() *river.Client[pgx.Tx]
- func (s *Service) GetDAGExecutor() *workflow.DAGExecutor
- func (s *Service) IsStarted() bool
- func (s *Service) Migrate(ctx context.Context) error
- func (s *Service) Start(ctx context.Context) error
- func (s *Service) Stop(ctx context.Context) error
- type UserRepository
- type WorkflowExecutionFailedArgs
- type WorkflowExecutionFailedWorker
- type WorkflowTaskAssignedArgs
- type WorkflowTaskAssignedWorker
- type WorkflowTaskDigestArgs
- type WorkflowTaskDigestCheckerArgs
- type WorkflowTaskDigestCheckerWorker
- type WorkflowTaskDigestWorker
- type WorkflowTaskDueSoonArgs
- type WorkflowTaskDueSoonWorker
Constants ¶
const ( JobTypeSendEmail = "send_email" JobTypeSendEmailFrom = "send_email_from" JobTypeSendGlobalDigest = "send_global_digest" )
Job types for email processing
const ( JobTypeWorkflowTaskAssigned = "workflow_task_assigned" JobTypeWorkflowTaskDueSoon = "workflow_task_due_soon" JobTypeWorkflowTaskDigest = "workflow_task_digest" JobTypeWorkflowExecutionFailed = "workflow_execution_failed" )
Job types for workflow notifications
Variables ¶
This section is empty.
Functions ¶
func JobInsertOptionsForWorkflowNotification ¶ added in v0.12.0
func JobInsertOptionsForWorkflowNotification() *river.InsertOpts
JobInsertOptionsForWorkflowNotification returns insert options for workflow notification email jobs
func JobInsertOptionsForWorkflowTaskAssignedNotification ¶ added in v0.12.0
func JobInsertOptionsForWorkflowTaskAssignedNotification() *river.InsertOpts
func JobInsertOptionsWithRetry ¶
func JobInsertOptionsWithRetry(queue string, maxAttempts int) *river.InsertOpts
JobInsertOptionsWithRetry returns insert options for jobs with custom retry policy
func NewDigestPeriodicJob ¶
func NewDigestPeriodicJob(cronSchedule string, logger *zap.SugaredLogger) *river.PeriodicJob
NewDigestPeriodicJob creates a periodic job for digest scheduling
func NewDueSoonCheckerPeriodicJob ¶ added in v0.12.0
func NewDueSoonCheckerPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
func NewWorkflowSchedulerPeriodicJob ¶ added in v0.12.0
func NewWorkflowSchedulerPeriodicJob(cronSchedule string, logger *zap.SugaredLogger) *river.PeriodicJob
func NewWorkflowTaskDigestPeriodicJob ¶ added in v0.12.0
func NewWorkflowTaskDigestPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob
func Workers ¶
func Workers(emailService EmailService, digestService DigestService, userRepo UserRepository, db *gorm.DB, webBaseURL string, logger *zap.SugaredLogger) *river.Workers
Workers returns all workers as work functions with dependencies injected
Types ¶
type DigestService ¶
DigestService interface for dependency injection
type DigestTask ¶ added in v0.12.0
type DigestTask struct {
StepTitle string
WorkflowTitle string
WorkflowInstanceTitle string
DueDate *string
StepURL string
}
DigestTask represents a single task entry in the digest email
type DueSoonCheckerArgs ¶ added in v0.12.0
type DueSoonCheckerArgs struct{}
DueSoonCheckerArgs represents the arguments for the periodic due-soon checker job
func (DueSoonCheckerArgs) Kind ¶ added in v0.12.0
func (DueSoonCheckerArgs) Kind() string
Kind returns the job kind for River
func (DueSoonCheckerArgs) Timeout ¶ added in v0.12.0
func (DueSoonCheckerArgs) Timeout() time.Duration
Timeout returns the timeout for the due-soon checker job
type DueSoonCheckerWorker ¶ added in v0.12.0
type DueSoonCheckerWorker struct {
// contains filtered or unexported fields
}
DueSoonCheckerWorker scans for step executions due in a week and enqueues reminder emails
func NewDueSoonCheckerWorker ¶ added in v0.12.0
func NewDueSoonCheckerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *DueSoonCheckerWorker
NewDueSoonCheckerWorker creates a new DueSoonCheckerWorker
func (*DueSoonCheckerWorker) Work ¶ added in v0.12.0
func (w *DueSoonCheckerWorker) Work(ctx context.Context, job *river.Job[DueSoonCheckerArgs]) error
Work scans for step executions due in ~1 week and enqueues WorkflowTaskDueSoonArgs jobs
type EmailService ¶
type EmailService interface {
Send(ctx context.Context, message *types.Message) (*types.SendResult, error)
SendWithProvider(ctx context.Context, providerName string, message *types.Message) (*types.SendResult, error)
UseTemplate(templateName string, data map[string]interface{}) (htmlContent, textContent string, err error)
GetDefaultFromAddress() string
}
EmailService interface for dependency injection
type GORMUserRepository ¶ added in v0.12.0
type GORMUserRepository struct {
// contains filtered or unexported fields
}
GORMUserRepository implements UserRepository using GORM
func NewGORMUserRepository ¶ added in v0.12.0
func NewGORMUserRepository(db *gorm.DB) *GORMUserRepository
NewGORMUserRepository creates a new GORMUserRepository
func (*GORMUserRepository) FindUserByID ¶ added in v0.12.0
func (r *GORMUserRepository) FindUserByID(ctx context.Context, userID string) (NotificationUser, error)
FindUserByID looks up a user by UUID string and returns a NotificationUser
type NotificationUser ¶ added in v0.12.0
type NotificationUser struct {
ID string
Email string
FirstName string
LastName string
TaskAvailableEmailSubscribed bool
TaskDailyDigestSubscribed bool
}
NotificationUser holds the user fields needed for sending notification emails
func (NotificationUser) FullName ¶ added in v0.12.0
func (u NotificationUser) FullName() string
type SendEmailArgs ¶
type SendEmailArgs struct {
// Email message fields
From string `json:"from"`
To []string `json:"to"`
Cc []string `json:"cc,omitempty"`
Bcc []string `json:"bcc,omitempty"`
Subject string `json:"subject"`
HTMLBody string `json:"html_body,omitempty"`
TextBody string `json:"text_body,omitempty"`
Attachments []types.Attachment `json:"attachments,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
}
SendEmailArgs represents the arguments for sending an email
func (SendEmailArgs) Timeout ¶
func (SendEmailArgs) Timeout() time.Duration
Timeout returns the timeout for email jobs
type SendEmailFromArgs ¶
type SendEmailFromArgs struct {
// Provider to use for sending
Provider string `json:"provider"`
// Email message fields
From string `json:"from"`
To []string `json:"to"`
Cc []string `json:"cc,omitempty"`
Bcc []string `json:"bcc,omitempty"`
Subject string `json:"subject"`
HTMLBody string `json:"html_body,omitempty"`
TextBody string `json:"text_body,omitempty"`
Attachments []types.Attachment `json:"attachments,omitempty"`
Headers map[string]string `json:"headers,omitempty"`
}
SendEmailFromArgs represents the arguments for sending an email from a specific provider
func (SendEmailFromArgs) Kind ¶
func (SendEmailFromArgs) Kind() string
Kind returns the job kind for River
func (SendEmailFromArgs) Timeout ¶
func (SendEmailFromArgs) Timeout() time.Duration
Timeout returns the timeout for email jobs
type SendEmailFromWorker ¶
type SendEmailFromWorker struct {
// contains filtered or unexported fields
}
SendEmailFromWorker handles sending email from provider jobs
func NewSendEmailFromWorker ¶
func NewSendEmailFromWorker(emailService EmailService, logger *zap.SugaredLogger) *SendEmailFromWorker
NewSendEmailFromWorker creates a new SendEmailFromWorker
func (*SendEmailFromWorker) Work ¶
func (w *SendEmailFromWorker) Work(ctx context.Context, job *river.Job[SendEmailFromArgs]) error
Work is the River work function for sending emails from a provider
type SendEmailWorker ¶
type SendEmailWorker struct {
// contains filtered or unexported fields
}
SendEmailWorker handles sending email jobs
func NewSendEmailWorker ¶
func NewSendEmailWorker(emailService EmailService, logger *zap.SugaredLogger) *SendEmailWorker
NewSendEmailWorker creates a new SendEmailWorker
func (*SendEmailWorker) Work ¶
func (w *SendEmailWorker) Work(ctx context.Context, job *river.Job[SendEmailArgs]) error
Work is the River work function for sending emails
type SendGlobalDigestArgs ¶
type SendGlobalDigestArgs struct {
}
SendGlobalDigestArgs represents the arguments for sending global digest
func (SendGlobalDigestArgs) Kind ¶
func (SendGlobalDigestArgs) Kind() string
Kind returns the job kind for River
func (SendGlobalDigestArgs) Timeout ¶
func (SendGlobalDigestArgs) Timeout() time.Duration
Timeout returns the timeout for digest jobs (longer due to multiple emails)
type SendGlobalDigestWorker ¶
type SendGlobalDigestWorker struct {
// contains filtered or unexported fields
}
SendGlobalDigestWorker handles sending global digest jobs
func NewSendGlobalDigestWorker ¶
func NewSendGlobalDigestWorker(digestService DigestService, logger *zap.SugaredLogger) *SendGlobalDigestWorker
NewSendGlobalDigestWorker creates a new SendGlobalDigestWorker
func (*SendGlobalDigestWorker) Work ¶
func (w *SendGlobalDigestWorker) Work(ctx context.Context, job *river.Job[SendGlobalDigestArgs]) error
Work is the River work function for sending global digest
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service manages the River client and workers
func NewServiceWithDigest ¶
func NewServiceWithDigest( cfg *config.WorkerConfig, db *gorm.DB, emailSvc *email.Service, digestSvc DigestService, digestCfg *config.Config, logger *zap.SugaredLogger, ) (*Service, error)
NewServiceWithDigest creates a new worker service with digest support
func (*Service) EnqueueSendEmail ¶
func (s *Service) EnqueueSendEmail(ctx context.Context, args *SendEmailArgs) error
EnqueueSendEmail enqueues a send email job
func (*Service) EnqueueSendEmailFrom ¶
func (s *Service) EnqueueSendEmailFrom(ctx context.Context, args *SendEmailFromArgs) error
EnqueueSendEmailFrom enqueues a send email from provider job
func (*Service) EnqueueWorkflowExecutionFailed ¶ added in v0.12.0
func (s *Service) EnqueueWorkflowExecutionFailed(ctx context.Context, execution *workflows.WorkflowExecution) error
EnqueueWorkflowExecutionFailed enqueues a workflow-execution-failed notification email job. Implements the workflow.NotificationEnqueuer interface.
func (*Service) EnqueueWorkflowTaskAssigned ¶ added in v0.12.0
func (s *Service) EnqueueWorkflowTaskAssigned(ctx context.Context, stepExecution *workflows.StepExecution) error
EnqueueWorkflowTaskAssigned enqueues a workflow-task-assigned notification email job. Implements the workflow.NotificationEnqueuer interface.
func (*Service) GetDAGExecutor ¶ added in v0.12.0
func (s *Service) GetDAGExecutor() *workflow.DAGExecutor
GetDAGExecutor returns the shared DAG executor used by workflow River workers. Returns nil when the worker service is disabled.
type UserRepository ¶ added in v0.12.0
type UserRepository interface {
FindUserByID(ctx context.Context, userID string) (NotificationUser, error)
}
UserRepository is the minimal DB interface needed by notification workers
type WorkflowExecutionFailedArgs ¶ added in v0.12.0
type WorkflowExecutionFailedArgs struct {
WorkflowExecutionID string `json:"workflow_execution_id"`
}
WorkflowExecutionFailedArgs represents the arguments for a workflow-execution-failed notification email
func (WorkflowExecutionFailedArgs) Kind ¶ added in v0.12.0
func (WorkflowExecutionFailedArgs) Kind() string
Kind returns the job kind for River
func (WorkflowExecutionFailedArgs) Timeout ¶ added in v0.12.0
func (WorkflowExecutionFailedArgs) Timeout() time.Duration
Timeout returns the timeout for workflow execution failed jobs
type WorkflowExecutionFailedWorker ¶ added in v0.12.0
type WorkflowExecutionFailedWorker struct {
// contains filtered or unexported fields
}
WorkflowExecutionFailedWorker sends a failure notification email to the workflow instance creator
func NewWorkflowExecutionFailedWorker ¶ added in v0.12.0
func NewWorkflowExecutionFailedWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *WorkflowExecutionFailedWorker
NewWorkflowExecutionFailedWorker creates a new WorkflowExecutionFailedWorker
func (*WorkflowExecutionFailedWorker) Work ¶ added in v0.12.0
func (w *WorkflowExecutionFailedWorker) Work(ctx context.Context, job *river.Job[WorkflowExecutionFailedArgs]) error
Work sends a failure notification email for the workflow execution identified by job.Args.WorkflowExecutionID
type WorkflowTaskAssignedArgs ¶ added in v0.12.0
type WorkflowTaskAssignedArgs struct {
AssignedToType string `json:"assigned_to_type"`
UserID string `json:"user_id"`
StepExecutionID string `json:"step_execution_id"`
StepTitle string `json:"step_title"`
WorkflowTitle string `json:"workflow_title"`
WorkflowInstanceTitle string `json:"workflow_instance_title"`
StepURL string `json:"step_url"`
DueDate *time.Time `json:"due_date,omitempty"`
}
WorkflowTaskAssignedArgs represents the arguments for a new-task-assigned notification email
func (WorkflowTaskAssignedArgs) Kind ¶ added in v0.12.0
func (WorkflowTaskAssignedArgs) Kind() string
Kind returns the job kind for River
func (WorkflowTaskAssignedArgs) Timeout ¶ added in v0.12.0
func (WorkflowTaskAssignedArgs) Timeout() time.Duration
Timeout returns the timeout for workflow task assigned jobs
type WorkflowTaskAssignedWorker ¶ added in v0.12.0
type WorkflowTaskAssignedWorker struct {
// contains filtered or unexported fields
}
WorkflowTaskAssignedWorker handles new-task-assigned notification email jobs
func NewWorkflowTaskAssignedWorker ¶ added in v0.12.0
func NewWorkflowTaskAssignedWorker(emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *WorkflowTaskAssignedWorker
NewWorkflowTaskAssignedWorker creates a new WorkflowTaskAssignedWorker
func (*WorkflowTaskAssignedWorker) Work ¶ added in v0.12.0
func (w *WorkflowTaskAssignedWorker) Work(ctx context.Context, job *river.Job[WorkflowTaskAssignedArgs]) error
Work is the River work function for sending new-task-assigned notification emails
type WorkflowTaskDigestArgs ¶ added in v0.12.0
type WorkflowTaskDigestArgs struct {
UserID string `json:"user_id"`
}
WorkflowTaskDigestArgs represents the arguments for a per-user task digest email
func (WorkflowTaskDigestArgs) Kind ¶ added in v0.12.0
func (WorkflowTaskDigestArgs) Kind() string
Kind returns the job kind for River
func (WorkflowTaskDigestArgs) Timeout ¶ added in v0.12.0
func (WorkflowTaskDigestArgs) Timeout() time.Duration
Timeout returns the timeout for workflow task digest jobs
type WorkflowTaskDigestCheckerArgs ¶ added in v0.12.0
type WorkflowTaskDigestCheckerArgs struct{}
WorkflowTaskDigestCheckerArgs represents the arguments for the periodic digest checker job
func (WorkflowTaskDigestCheckerArgs) Kind ¶ added in v0.12.0
func (WorkflowTaskDigestCheckerArgs) Kind() string
Kind returns the job kind for River
func (WorkflowTaskDigestCheckerArgs) Timeout ¶ added in v0.12.0
func (WorkflowTaskDigestCheckerArgs) Timeout() time.Duration
Timeout returns the timeout for the digest checker job
type WorkflowTaskDigestCheckerWorker ¶ added in v0.12.0
type WorkflowTaskDigestCheckerWorker struct {
// contains filtered or unexported fields
}
WorkflowTaskDigestCheckerWorker queries all subscribed users and enqueues per-user digest jobs
func NewWorkflowTaskDigestCheckerWorker ¶ added in v0.12.0
func NewWorkflowTaskDigestCheckerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *WorkflowTaskDigestCheckerWorker
NewWorkflowTaskDigestCheckerWorker creates a new WorkflowTaskDigestCheckerWorker
func (*WorkflowTaskDigestCheckerWorker) Work ¶ added in v0.12.0
func (w *WorkflowTaskDigestCheckerWorker) Work(ctx context.Context, job *river.Job[WorkflowTaskDigestCheckerArgs]) error
Work queries all users with TaskDailyDigestSubscribed=true and enqueues a WorkflowTaskDigestArgs job for each
type WorkflowTaskDigestWorker ¶ added in v0.12.0
type WorkflowTaskDigestWorker struct {
// contains filtered or unexported fields
}
WorkflowTaskDigestWorker sends a per-user digest of pending and overdue workflow tasks
func NewWorkflowTaskDigestWorker ¶ added in v0.12.0
func NewWorkflowTaskDigestWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *WorkflowTaskDigestWorker
NewWorkflowTaskDigestWorker creates a new WorkflowTaskDigestWorker
func (*WorkflowTaskDigestWorker) Work ¶ added in v0.12.0
func (w *WorkflowTaskDigestWorker) Work(ctx context.Context, job *river.Job[WorkflowTaskDigestArgs]) error
Work sends a digest email for the user identified by job.Args.UserID
type WorkflowTaskDueSoonArgs ¶ added in v0.12.0
type WorkflowTaskDueSoonArgs struct {
UserID string `json:"user_id"`
StepExecutionID string `json:"step_execution_id"`
StepTitle string `json:"step_title"`
WorkflowTitle string `json:"workflow_title"`
WorkflowInstanceTitle string `json:"workflow_instance_title"`
StepURL string `json:"step_url"`
DueDate time.Time `json:"due_date"`
}
WorkflowTaskDueSoonArgs represents the arguments for a task-due-in-1-day reminder email
func (WorkflowTaskDueSoonArgs) Kind ¶ added in v0.12.0
func (WorkflowTaskDueSoonArgs) Kind() string
Kind returns the job kind for River
func (WorkflowTaskDueSoonArgs) Timeout ¶ added in v0.12.0
func (WorkflowTaskDueSoonArgs) Timeout() time.Duration
Timeout returns the timeout for workflow task due soon jobs
type WorkflowTaskDueSoonWorker ¶ added in v0.12.0
type WorkflowTaskDueSoonWorker struct {
// contains filtered or unexported fields
}
WorkflowTaskDueSoonWorker handles task-due-soon reminder email jobs
func NewWorkflowTaskDueSoonWorker ¶ added in v0.12.0
func NewWorkflowTaskDueSoonWorker(emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *WorkflowTaskDueSoonWorker
NewWorkflowTaskDueSoonWorker creates a new WorkflowTaskDueSoonWorker
func (*WorkflowTaskDueSoonWorker) Work ¶ added in v0.12.0
func (w *WorkflowTaskDueSoonWorker) Work(ctx context.Context, job *river.Job[WorkflowTaskDueSoonArgs]) error
Work is the River work function for sending task-due-in-1-day reminder emails