worker

package
v0.12.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 23, 2026 License: AGPL-3.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

View Source
const (
	JobTypeSendEmail        = "send_email"
	JobTypeSendEmailFrom    = "send_email_from"
	JobTypeSendGlobalDigest = "send_global_digest"
)

Job types for email processing

View Source
const (
	JobTypeWorkflowTaskAssigned    = "workflow_task_assigned"
	JobTypeWorkflowTaskDueSoon     = "workflow_task_due_soon"
	JobTypeWorkflowTaskDigest      = "workflow_task_digest"
	JobTypeWorkflowExecutionFailed = "workflow_execution_failed"
)

Job types for workflow notifications

Variables

This section is empty.

Functions

func JobInsertOptionsForWorkflowNotification added in v0.12.0

func JobInsertOptionsForWorkflowNotification() *river.InsertOpts

JobInsertOptionsForWorkflowNotification returns insert options for workflow notification email jobs

func JobInsertOptionsForWorkflowTaskAssignedNotification added in v0.12.0

func JobInsertOptionsForWorkflowTaskAssignedNotification() *river.InsertOpts

func JobInsertOptionsWithRetry

func JobInsertOptionsWithRetry(queue string, maxAttempts int) *river.InsertOpts

JobInsertOptionsWithRetry returns insert options for jobs with custom retry policy

func NewDigestPeriodicJob

func NewDigestPeriodicJob(cronSchedule string, logger *zap.SugaredLogger) *river.PeriodicJob

NewDigestPeriodicJob creates a periodic job for digest scheduling

func NewDueSoonCheckerPeriodicJob added in v0.12.0

func NewDueSoonCheckerPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob

func NewWorkflowSchedulerPeriodicJob added in v0.12.0

func NewWorkflowSchedulerPeriodicJob(cronSchedule string, logger *zap.SugaredLogger) *river.PeriodicJob

func NewWorkflowTaskDigestPeriodicJob added in v0.12.0

func NewWorkflowTaskDigestPeriodicJob(schedule string, logger *zap.SugaredLogger) *river.PeriodicJob

func Workers

func Workers(emailService EmailService, digestService DigestService, userRepo UserRepository, db *gorm.DB, webBaseURL string, logger *zap.SugaredLogger) *river.Workers

Workers returns all workers as work functions with dependencies injected

Types

type DigestService

type DigestService interface {
	SendGlobalDigest(ctx context.Context) error
}

DigestService interface for dependency injection

type DigestTask added in v0.12.0

type DigestTask struct {
	StepTitle             string
	WorkflowTitle         string
	WorkflowInstanceTitle string
	DueDate               *string
	StepURL               string
}

DigestTask represents a single task entry in the digest email

type DueSoonCheckerArgs added in v0.12.0

type DueSoonCheckerArgs struct{}

DueSoonCheckerArgs represents the arguments for the periodic due-soon checker job

func (DueSoonCheckerArgs) Kind added in v0.12.0

func (DueSoonCheckerArgs) Kind() string

Kind returns the job kind for River

func (DueSoonCheckerArgs) Timeout added in v0.12.0

func (DueSoonCheckerArgs) Timeout() time.Duration

Timeout returns the timeout for the due-soon checker job

type DueSoonCheckerWorker added in v0.12.0

type DueSoonCheckerWorker struct {
	// contains filtered or unexported fields
}

DueSoonCheckerWorker scans for step executions due in a week and enqueues reminder emails

func NewDueSoonCheckerWorker added in v0.12.0

func NewDueSoonCheckerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *DueSoonCheckerWorker

NewDueSoonCheckerWorker creates a new DueSoonCheckerWorker

func (*DueSoonCheckerWorker) Work added in v0.12.0

Work scans for step executions due in ~1 week and enqueues WorkflowTaskDueSoonArgs jobs

type EmailService

type EmailService interface {
	Send(ctx context.Context, message *types.Message) (*types.SendResult, error)
	SendWithProvider(ctx context.Context, providerName string, message *types.Message) (*types.SendResult, error)
	UseTemplate(templateName string, data map[string]interface{}) (htmlContent, textContent string, err error)
	GetDefaultFromAddress() string
}

EmailService interface for dependency injection

type GORMUserRepository added in v0.12.0

type GORMUserRepository struct {
	// contains filtered or unexported fields
}

GORMUserRepository implements UserRepository using GORM

func NewGORMUserRepository added in v0.12.0

func NewGORMUserRepository(db *gorm.DB) *GORMUserRepository

NewGORMUserRepository creates a new GORMUserRepository

func (*GORMUserRepository) FindUserByID added in v0.12.0

func (r *GORMUserRepository) FindUserByID(ctx context.Context, userID string) (NotificationUser, error)

FindUserByID looks up a user by UUID string and returns a NotificationUser

type NotificationUser added in v0.12.0

type NotificationUser struct {
	ID                           string
	Email                        string
	FirstName                    string
	LastName                     string
	TaskAvailableEmailSubscribed bool
	TaskDailyDigestSubscribed    bool
}

NotificationUser holds the user fields needed for sending notification emails

func (NotificationUser) FullName added in v0.12.0

func (u NotificationUser) FullName() string

type SendEmailArgs

type SendEmailArgs struct {
	// Email message fields
	From        string             `json:"from"`
	To          []string           `json:"to"`
	Cc          []string           `json:"cc,omitempty"`
	Bcc         []string           `json:"bcc,omitempty"`
	Subject     string             `json:"subject"`
	HTMLBody    string             `json:"html_body,omitempty"`
	TextBody    string             `json:"text_body,omitempty"`
	Attachments []types.Attachment `json:"attachments,omitempty"`
	Headers     map[string]string  `json:"headers,omitempty"`
}

SendEmailArgs represents the arguments for sending an email

func (SendEmailArgs) Kind

func (SendEmailArgs) Kind() string

Kind returns the job kind for River

func (SendEmailArgs) Timeout

func (SendEmailArgs) Timeout() time.Duration

Timeout returns the timeout for email jobs

type SendEmailFromArgs

type SendEmailFromArgs struct {
	// Provider to use for sending
	Provider string `json:"provider"`

	// Email message fields
	From        string             `json:"from"`
	To          []string           `json:"to"`
	Cc          []string           `json:"cc,omitempty"`
	Bcc         []string           `json:"bcc,omitempty"`
	Subject     string             `json:"subject"`
	HTMLBody    string             `json:"html_body,omitempty"`
	TextBody    string             `json:"text_body,omitempty"`
	Attachments []types.Attachment `json:"attachments,omitempty"`
	Headers     map[string]string  `json:"headers,omitempty"`
}

SendEmailFromArgs represents the arguments for sending an email from a specific provider

func (SendEmailFromArgs) Kind

func (SendEmailFromArgs) Kind() string

Kind returns the job kind for River

func (SendEmailFromArgs) Timeout

func (SendEmailFromArgs) Timeout() time.Duration

Timeout returns the timeout for email jobs

type SendEmailFromWorker

type SendEmailFromWorker struct {
	// contains filtered or unexported fields
}

SendEmailFromWorker handles sending email from provider jobs

func NewSendEmailFromWorker

func NewSendEmailFromWorker(emailService EmailService, logger *zap.SugaredLogger) *SendEmailFromWorker

NewSendEmailFromWorker creates a new SendEmailFromWorker

func (*SendEmailFromWorker) Work

Work is the River work function for sending emails from a provider

type SendEmailWorker

type SendEmailWorker struct {
	// contains filtered or unexported fields
}

SendEmailWorker handles sending email jobs

func NewSendEmailWorker

func NewSendEmailWorker(emailService EmailService, logger *zap.SugaredLogger) *SendEmailWorker

NewSendEmailWorker creates a new SendEmailWorker

func (*SendEmailWorker) Work

Work is the River work function for sending emails

type SendGlobalDigestArgs

type SendGlobalDigestArgs struct {
}

SendGlobalDigestArgs represents the arguments for sending global digest

func (SendGlobalDigestArgs) Kind

Kind returns the job kind for River

func (SendGlobalDigestArgs) Timeout

Timeout returns the timeout for digest jobs (longer due to multiple emails)

type SendGlobalDigestWorker

type SendGlobalDigestWorker struct {
	// contains filtered or unexported fields
}

SendGlobalDigestWorker handles sending global digest jobs

func NewSendGlobalDigestWorker

func NewSendGlobalDigestWorker(digestService DigestService, logger *zap.SugaredLogger) *SendGlobalDigestWorker

NewSendGlobalDigestWorker creates a new SendGlobalDigestWorker

func (*SendGlobalDigestWorker) Work

Work is the River work function for sending global digest

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service manages the River client and workers

func NewServiceWithDigest

func NewServiceWithDigest(
	cfg *config.WorkerConfig,
	db *gorm.DB,
	emailSvc *email.Service,
	digestSvc DigestService,
	digestCfg *config.Config,
	logger *zap.SugaredLogger,
) (*Service, error)

NewServiceWithDigest creates a new worker service with digest support

func (*Service) EnqueueSendEmail

func (s *Service) EnqueueSendEmail(ctx context.Context, args *SendEmailArgs) error

EnqueueSendEmail enqueues a send email job

func (*Service) EnqueueSendEmailFrom

func (s *Service) EnqueueSendEmailFrom(ctx context.Context, args *SendEmailFromArgs) error

EnqueueSendEmailFrom enqueues a send email from provider job

func (*Service) EnqueueWorkflowExecutionFailed added in v0.12.0

func (s *Service) EnqueueWorkflowExecutionFailed(ctx context.Context, execution *workflows.WorkflowExecution) error

EnqueueWorkflowExecutionFailed enqueues a workflow-execution-failed notification email job. Implements the workflow.NotificationEnqueuer interface.

func (*Service) EnqueueWorkflowTaskAssigned added in v0.12.0

func (s *Service) EnqueueWorkflowTaskAssigned(ctx context.Context, stepExecution *workflows.StepExecution) error

EnqueueWorkflowTaskAssigned enqueues a workflow-task-assigned notification email job. Implements the workflow.NotificationEnqueuer interface.

func (*Service) GetClient

func (s *Service) GetClient() *river.Client[pgx.Tx]

GetClient returns the River client for job insertion

func (*Service) GetDAGExecutor added in v0.12.0

func (s *Service) GetDAGExecutor() *workflow.DAGExecutor

GetDAGExecutor returns the shared DAG executor used by workflow River workers. Returns nil when the worker service is disabled.

func (*Service) IsStarted

func (s *Service) IsStarted() bool

IsStarted returns true if the worker service is started

func (*Service) Migrate

func (s *Service) Migrate(ctx context.Context) error

Migrate runs River migrations

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

Start starts the worker service

func (*Service) Stop

func (s *Service) Stop(ctx context.Context) error

Stop stops the worker service

type UserRepository added in v0.12.0

type UserRepository interface {
	FindUserByID(ctx context.Context, userID string) (NotificationUser, error)
}

UserRepository is the minimal DB interface needed by notification workers

type WorkflowExecutionFailedArgs added in v0.12.0

type WorkflowExecutionFailedArgs struct {
	WorkflowExecutionID string `json:"workflow_execution_id"`
}

WorkflowExecutionFailedArgs represents the arguments for a workflow-execution-failed notification email

func (WorkflowExecutionFailedArgs) Kind added in v0.12.0

Kind returns the job kind for River

func (WorkflowExecutionFailedArgs) Timeout added in v0.12.0

Timeout returns the timeout for workflow execution failed jobs

type WorkflowExecutionFailedWorker added in v0.12.0

type WorkflowExecutionFailedWorker struct {
	// contains filtered or unexported fields
}

WorkflowExecutionFailedWorker sends a failure notification email to the workflow instance creator

func NewWorkflowExecutionFailedWorker added in v0.12.0

func NewWorkflowExecutionFailedWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *WorkflowExecutionFailedWorker

NewWorkflowExecutionFailedWorker creates a new WorkflowExecutionFailedWorker

func (*WorkflowExecutionFailedWorker) Work added in v0.12.0

Work sends a failure notification email for the workflow execution identified by job.Args.WorkflowExecutionID

type WorkflowTaskAssignedArgs added in v0.12.0

type WorkflowTaskAssignedArgs struct {
	AssignedToType        string     `json:"assigned_to_type"`
	UserID                string     `json:"user_id"`
	StepExecutionID       string     `json:"step_execution_id"`
	StepTitle             string     `json:"step_title"`
	WorkflowTitle         string     `json:"workflow_title"`
	WorkflowInstanceTitle string     `json:"workflow_instance_title"`
	StepURL               string     `json:"step_url"`
	DueDate               *time.Time `json:"due_date,omitempty"`
}

WorkflowTaskAssignedArgs represents the arguments for a new-task-assigned notification email

func (WorkflowTaskAssignedArgs) Kind added in v0.12.0

Kind returns the job kind for River

func (WorkflowTaskAssignedArgs) Timeout added in v0.12.0

Timeout returns the timeout for workflow task assigned jobs

type WorkflowTaskAssignedWorker added in v0.12.0

type WorkflowTaskAssignedWorker struct {
	// contains filtered or unexported fields
}

WorkflowTaskAssignedWorker handles new-task-assigned notification email jobs

func NewWorkflowTaskAssignedWorker added in v0.12.0

func NewWorkflowTaskAssignedWorker(emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *WorkflowTaskAssignedWorker

NewWorkflowTaskAssignedWorker creates a new WorkflowTaskAssignedWorker

func (*WorkflowTaskAssignedWorker) Work added in v0.12.0

Work is the River work function for sending new-task-assigned notification emails

type WorkflowTaskDigestArgs added in v0.12.0

type WorkflowTaskDigestArgs struct {
	UserID string `json:"user_id"`
}

WorkflowTaskDigestArgs represents the arguments for a per-user task digest email

func (WorkflowTaskDigestArgs) Kind added in v0.12.0

Kind returns the job kind for River

func (WorkflowTaskDigestArgs) Timeout added in v0.12.0

Timeout returns the timeout for workflow task digest jobs

type WorkflowTaskDigestCheckerArgs added in v0.12.0

type WorkflowTaskDigestCheckerArgs struct{}

WorkflowTaskDigestCheckerArgs represents the arguments for the periodic digest checker job

func (WorkflowTaskDigestCheckerArgs) Kind added in v0.12.0

Kind returns the job kind for River

func (WorkflowTaskDigestCheckerArgs) Timeout added in v0.12.0

Timeout returns the timeout for the digest checker job

type WorkflowTaskDigestCheckerWorker added in v0.12.0

type WorkflowTaskDigestCheckerWorker struct {
	// contains filtered or unexported fields
}

WorkflowTaskDigestCheckerWorker queries all subscribed users and enqueues per-user digest jobs

func NewWorkflowTaskDigestCheckerWorker added in v0.12.0

func NewWorkflowTaskDigestCheckerWorker(db *gorm.DB, client workflow.RiverClient, logger *zap.SugaredLogger) *WorkflowTaskDigestCheckerWorker

NewWorkflowTaskDigestCheckerWorker creates a new WorkflowTaskDigestCheckerWorker

func (*WorkflowTaskDigestCheckerWorker) Work added in v0.12.0

Work queries all users with TaskDailyDigestSubscribed=true and enqueues a WorkflowTaskDigestArgs job for each

type WorkflowTaskDigestWorker added in v0.12.0

type WorkflowTaskDigestWorker struct {
	// contains filtered or unexported fields
}

WorkflowTaskDigestWorker sends a per-user digest of pending and overdue workflow tasks

func NewWorkflowTaskDigestWorker added in v0.12.0

func NewWorkflowTaskDigestWorker(db *gorm.DB, emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *WorkflowTaskDigestWorker

NewWorkflowTaskDigestWorker creates a new WorkflowTaskDigestWorker

func (*WorkflowTaskDigestWorker) Work added in v0.12.0

Work sends a digest email for the user identified by job.Args.UserID

type WorkflowTaskDueSoonArgs added in v0.12.0

type WorkflowTaskDueSoonArgs struct {
	UserID                string    `json:"user_id"`
	StepExecutionID       string    `json:"step_execution_id"`
	StepTitle             string    `json:"step_title"`
	WorkflowTitle         string    `json:"workflow_title"`
	WorkflowInstanceTitle string    `json:"workflow_instance_title"`
	StepURL               string    `json:"step_url"`
	DueDate               time.Time `json:"due_date"`
}

WorkflowTaskDueSoonArgs represents the arguments for a task-due-in-1-day reminder email

func (WorkflowTaskDueSoonArgs) Kind added in v0.12.0

Kind returns the job kind for River

func (WorkflowTaskDueSoonArgs) Timeout added in v0.12.0

Timeout returns the timeout for workflow task due soon jobs

type WorkflowTaskDueSoonWorker added in v0.12.0

type WorkflowTaskDueSoonWorker struct {
	// contains filtered or unexported fields
}

WorkflowTaskDueSoonWorker handles task-due-soon reminder email jobs

func NewWorkflowTaskDueSoonWorker added in v0.12.0

func NewWorkflowTaskDueSoonWorker(emailService EmailService, userRepo UserRepository, webBaseURL string, logger *zap.SugaredLogger) *WorkflowTaskDueSoonWorker

NewWorkflowTaskDueSoonWorker creates a new WorkflowTaskDueSoonWorker

func (*WorkflowTaskDueSoonWorker) Work added in v0.12.0

Work is the River work function for sending task-due-in-1-day reminder emails

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL