workflow

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: GPL-3.0 Imports: 28 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// MaxConcurrentWorkflowRunsPerWorkflow is the maximum concurrent runs per workflow.
	MaxConcurrentWorkflowRunsPerWorkflow = 5

	// MaxConcurrentWorkflowRunsPerTenant is the maximum concurrent workflow runs per tenant.
	MaxConcurrentWorkflowRunsPerTenant = 50
)

Concurrent workflow run limits to prevent resource exhaustion.

Variables

This section is empty.

Functions

func RegisterAllActionHandlers

func RegisterAllActionHandlers(
	executor *WorkflowExecutor,
	vulnSvc *finding.VulnerabilityService,
	pipelineSvc *pipeline.Service,
	scanSvc *scansvc.Service,
	integrationSvc *integration.IntegrationService,
	log *logger.Logger,
)

RegisterAllActionHandlers registers all built-in action handlers.

func RegisterAllActionHandlersWithAI

func RegisterAllActionHandlersWithAI(
	executor *WorkflowExecutor,
	vulnSvc *finding.VulnerabilityService,
	pipelineSvc *pipeline.Service,
	scanSvc *scansvc.Service,
	integrationSvc *integration.IntegrationService,
	aiTriageSvc *aitriage.AITriageService,
	log *logger.Logger,
)

RegisterAllActionHandlersWithAI registers all built-in action handlers including AI triage.

func ValidateSourceFilter

func ValidateSourceFilter(ctx context.Context, config map[string]any, cacheService *finding.FindingSourceCacheService) error

ValidateSourceFilter validates that source codes in the filter are valid. Uses the finding.FindingSourceCacheService to check against active sources.

Types

type AITriageActionHandler

type AITriageActionHandler struct {
	// contains filtered or unexported fields
}

AITriageActionHandler handles AI triage triggering actions.

func NewAITriageActionHandler

func NewAITriageActionHandler(aiTriageSvc *aitriage.AITriageService, log *logger.Logger) *AITriageActionHandler

NewAITriageActionHandler creates a new AITriageActionHandler.

func (*AITriageActionHandler) Execute

func (h *AITriageActionHandler) Execute(ctx context.Context, input *ActionInput) (map[string]any, error)

Execute executes an AI triage action.

type AITriageEvent

type AITriageEvent struct {
	TenantID   shared.ID
	FindingID  shared.ID
	TriageID   shared.ID
	EventType  workflowdom.TriggerType // ai_triage_completed or ai_triage_failed
	TriageData map[string]any          // Triage result data
}

AITriageEvent represents an AI triage completion/failure event.

type ActionHandler

type ActionHandler interface {
	// Execute executes the action and returns the output.
	Execute(ctx context.Context, input *ActionInput) (map[string]any, error)
}

ActionHandler defines the interface for workflow action handlers.

type ActionInput

type ActionInput struct {
	TenantID     shared.ID
	WorkflowID   shared.ID
	RunID        shared.ID
	NodeKey      string
	ActionType   workflowdom.ActionType
	ActionConfig map[string]any
	TriggerData  map[string]any
	Context      map[string]any
}

ActionInput contains the input for an action execution.

type AddEdgeInput

type AddEdgeInput struct {
	TenantID      shared.ID
	UserID        shared.ID
	WorkflowID    shared.ID
	SourceNodeKey string
	TargetNodeKey string
	SourceHandle  string
	Label         string
}

AddEdgeInput represents input for adding an edge.

type AddNodeInput

type AddNodeInput struct {
	TenantID    shared.ID
	UserID      shared.ID
	WorkflowID  shared.ID
	NodeKey     string
	NodeType    workflowdom.NodeType
	Name        string
	Description string
	UIPositionX float64
	UIPositionY float64
	Config      workflowdom.NodeConfig
}

AddNodeInput represents input for adding a node.

type ConditionEvaluator

type ConditionEvaluator interface {
	// Evaluate evaluates a condition expression against the given data.
	Evaluate(ctx context.Context, expression string, data map[string]any) (bool, error)
}

ConditionEvaluator defines the interface for condition evaluation.

type CreateEdgeInput

type CreateEdgeInput struct {
	SourceNodeKey string
	TargetNodeKey string
	SourceHandle  string
	Label         string
}

CreateEdgeInput represents input for creating a workflow edge.

type CreateNodeInput

type CreateNodeInput struct {
	NodeKey     string
	NodeType    workflowdom.NodeType
	Name        string
	Description string
	UIPositionX float64
	UIPositionY float64
	Config      workflowdom.NodeConfig
}

CreateNodeInput represents input for creating a workflow node.

type CreateWorkflowInput

type CreateWorkflowInput struct {
	TenantID    shared.ID
	UserID      shared.ID
	Name        string
	Description string
	Tags        []string
	Nodes       []CreateNodeInput
	Edges       []CreateEdgeInput
}

CreateWorkflowInput represents input for creating a workflow.

type DefaultConditionEvaluator

type DefaultConditionEvaluator struct{}

DefaultConditionEvaluator provides a simple condition evaluator. It supports basic expressions like:

  • "trigger.severity == 'critical'"
  • "trigger.asset_type in ['server', 'database']"
  • "upstream.check_status.output.is_valid == true"

SEC-WF11: Includes expression length/complexity limits to prevent ReDoS.

func (*DefaultConditionEvaluator) Evaluate

func (e *DefaultConditionEvaluator) Evaluate(ctx context.Context, expression string, data map[string]any) (bool, error)

Evaluate evaluates a condition expression.

type DefaultNotificationHandler

type DefaultNotificationHandler struct {
	// contains filtered or unexported fields
}

DefaultNotificationHandler handles notification actions using the notification service.

func (*DefaultNotificationHandler) Send

Send sends a notification.

type ExecutionContext

type ExecutionContext struct {
	Run               *workflowdom.Run
	Workflow          *workflowdom.Workflow
	TriggerData       map[string]any
	Context           map[string]any // Shared context across nodes
	CompletedNodeKeys map[string]bool
	NodeRunsByKey     map[string]*workflowdom.NodeRun
	// contains filtered or unexported fields
}

ExecutionContext holds the state during workflow execution.

func (*ExecutionContext) GetContextValue

func (ec *ExecutionContext) GetContextValue(key string) (any, bool)

GetContextValue gets a value from the execution context.

func (*ExecutionContext) IsNodeCompleted

func (ec *ExecutionContext) IsNodeCompleted(nodeKey string) bool

IsNodeCompleted checks if a node is completed.

func (*ExecutionContext) MarkNodeCompleted

func (ec *ExecutionContext) MarkNodeCompleted(nodeKey string)

MarkNodeCompleted marks a node as completed.

func (*ExecutionContext) SetContextValue

func (ec *ExecutionContext) SetContextValue(key string, value any)

SetContextValue sets a value in the execution context.

type FindingActionHandler

type FindingActionHandler struct {
	// contains filtered or unexported fields
}

FindingActionHandler handles actions related to findings.

func NewFindingActionHandler

func NewFindingActionHandler(vulnSvc *finding.VulnerabilityService, log *logger.Logger) *FindingActionHandler

NewFindingActionHandler creates a new FindingActionHandler.

func (*FindingActionHandler) Execute

func (h *FindingActionHandler) Execute(ctx context.Context, input *ActionInput) (map[string]any, error)

Execute executes a finding-related action.

type FindingEvent

type FindingEvent struct {
	TenantID  shared.ID
	Finding   *vulnerability.Finding
	EventType workflowdom.TriggerType // finding_created, finding_updated
	Changes   map[string]any          // For finding_updated: which fields changed
}

FindingEvent represents a finding-related event.

type HTTPRequestHandler

type HTTPRequestHandler struct {
	// contains filtered or unexported fields
}

HTTPRequestHandler handles HTTP request actions. SECURITY: Includes SSRF protection via URL allowlist/denylist.

func NewHTTPRequestHandler

func NewHTTPRequestHandler(log *logger.Logger) *HTTPRequestHandler

NewHTTPRequestHandler creates a new secure HTTPRequestHandler.

func (*HTTPRequestHandler) AllowLocalhostForTesting

func (h *HTTPRequestHandler) AllowLocalhostForTesting()

AllowLocalhostForTesting disables the localhost/loopback SSRF check. This is intended for testing purposes only — never call in production.

func (*HTTPRequestHandler) Execute

func (h *HTTPRequestHandler) Execute(ctx context.Context, input *ActionInput) (map[string]any, error)

Execute executes an HTTP request action.

func (*HTTPRequestHandler) SetBlockedCIDRs

func (h *HTTPRequestHandler) SetBlockedCIDRs(cidrs []string)

SetBlockedCIDRs overrides the list of blocked CIDR ranges. This is intended for testing purposes only.

func (*HTTPRequestHandler) SetClient

func (h *HTTPRequestHandler) SetClient(client *http.Client)

SetClient overrides the default HTTP client used by the handler. This is intended for testing purposes only.

type ListWorkflowRunsInput

type ListWorkflowRunsInput struct {
	TenantID   shared.ID
	WorkflowID *shared.ID
	Status     *workflowdom.RunStatus
	Page       int
	PerPage    int
}

ListWorkflowRunsInput represents input for listing workflow runs.

type ListWorkflowsInput

type ListWorkflowsInput struct {
	TenantID shared.ID
	IsActive *bool
	Tags     []string
	Search   string
	Page     int
	PerPage  int
}

ListWorkflowsInput represents input for listing workflows.

type NotificationHandler

type NotificationHandler interface {
	// Send sends a notification and returns the result.
	Send(ctx context.Context, input *NotificationInput) (map[string]any, error)
}

NotificationHandler defines the interface for notification handlers.

type NotificationInput

type NotificationInput struct {
	TenantID           shared.ID
	WorkflowID         shared.ID
	RunID              shared.ID
	NodeKey            string
	NotificationType   workflowdom.NotificationType
	NotificationConfig map[string]any
	TriggerData        map[string]any
	Context            map[string]any
}

NotificationInput contains the input for a notification.

type PipelineTriggerHandler

type PipelineTriggerHandler struct {
	// contains filtered or unexported fields
}

PipelineTriggerHandler handles pipeline and scan triggering actions.

func NewPipelineTriggerHandler

func NewPipelineTriggerHandler(pipelineSvc *pipeline.Service, scanSvc *scansvc.Service, log *logger.Logger) *PipelineTriggerHandler

NewPipelineTriggerHandler creates a new PipelineTriggerHandler.

func (*PipelineTriggerHandler) Execute

func (h *PipelineTriggerHandler) Execute(ctx context.Context, input *ActionInput) (map[string]any, error)

Execute executes a pipeline/scan trigger action.

type ScriptRunnerHandler

type ScriptRunnerHandler struct {
	// contains filtered or unexported fields
}

ScriptRunnerHandler handles script execution actions. NOTE: This is a placeholder. In production, script execution would need proper sandboxing, resource limits, and security controls.

func NewScriptRunnerHandler

func NewScriptRunnerHandler(log *logger.Logger) *ScriptRunnerHandler

NewScriptRunnerHandler creates a new ScriptRunnerHandler.

func (*ScriptRunnerHandler) Execute

func (h *ScriptRunnerHandler) Execute(ctx context.Context, input *ActionInput) (map[string]any, error)

Execute executes a script action.

type TicketActionHandler

type TicketActionHandler struct {
	// contains filtered or unexported fields
}

TicketActionHandler handles ticket creation and update actions.

func NewTicketActionHandler

func NewTicketActionHandler(intSvc *integration.IntegrationService, log *logger.Logger) *TicketActionHandler

NewTicketActionHandler creates a new TicketActionHandler.

func (*TicketActionHandler) Execute

func (h *TicketActionHandler) Execute(ctx context.Context, input *ActionInput) (map[string]any, error)

Execute executes a ticket-related action.

type TriggerWorkflowInput

type TriggerWorkflowInput struct {
	TenantID    shared.ID
	UserID      shared.ID
	WorkflowID  shared.ID
	TriggerType workflowdom.TriggerType
	TriggerData map[string]any
}

TriggerWorkflowInput represents input for triggering a workflow.

type UpdateNodeInput

type UpdateNodeInput struct {
	TenantID    shared.ID
	UserID      shared.ID
	WorkflowID  shared.ID
	NodeID      shared.ID
	Name        *string
	Description *string
	UIPositionX *float64
	UIPositionY *float64
	Config      *workflowdom.NodeConfig
}

UpdateNodeInput represents input for updating a node.

type UpdateWorkflowGraphInput

type UpdateWorkflowGraphInput struct {
	TenantID    shared.ID
	UserID      shared.ID
	WorkflowID  shared.ID
	Name        *string           // Optional: update name
	Description *string           // Optional: update description
	Tags        []string          // Optional: update tags (nil = no change)
	Nodes       []CreateNodeInput // Required: new nodes (replaces all existing)
	Edges       []CreateEdgeInput // Required: new edges (replaces all existing)
}

UpdateWorkflowGraphInput represents input for updating a workflow's graph (nodes and edges).

type UpdateWorkflowInput

type UpdateWorkflowInput struct {
	TenantID    shared.ID
	UserID      shared.ID
	WorkflowID  shared.ID
	Name        *string
	Description *string
	Tags        []string
	IsActive    *bool
}

UpdateWorkflowInput represents input for updating a workflow.

type WorkflowEventDispatcher

type WorkflowEventDispatcher struct {
	// contains filtered or unexported fields
}

WorkflowEventDispatcher dispatches events to matching workflows. It evaluates trigger configurations to determine which workflows should run.

func NewWorkflowEventDispatcher

func NewWorkflowEventDispatcher(
	workflowRepo workflowdom.WorkflowRepository,
	nodeRepo workflowdom.NodeRepository,
	service *WorkflowService,
	log *logger.Logger,
) *WorkflowEventDispatcher

NewWorkflowEventDispatcher creates a new workflow event dispatcher.

func (*WorkflowEventDispatcher) DispatchAITriageCompleted

func (d *WorkflowEventDispatcher) DispatchAITriageCompleted(
	ctx context.Context,
	tenantID, findingID, triageID shared.ID,
	triageData map[string]any,
)

DispatchAITriageCompleted is a convenience method for dispatching ai_triage_completed events.

func (*WorkflowEventDispatcher) DispatchAITriageEvent

func (d *WorkflowEventDispatcher) DispatchAITriageEvent(ctx context.Context, event AITriageEvent) error

DispatchAITriageEvent dispatches an AI triage event to matching workflows.

func (*WorkflowEventDispatcher) DispatchAITriageFailed

func (d *WorkflowEventDispatcher) DispatchAITriageFailed(
	ctx context.Context,
	tenantID, findingID, triageID shared.ID,
	errorMessage string,
)

DispatchAITriageFailed is a convenience method for dispatching ai_triage_failed events.

func (*WorkflowEventDispatcher) DispatchFindingEvent

func (d *WorkflowEventDispatcher) DispatchFindingEvent(ctx context.Context, event FindingEvent) error

DispatchFindingEvent dispatches a finding event to matching workflows. It evaluates all active workflows with matching trigger types and filters.

func (*WorkflowEventDispatcher) DispatchFindingsCreated

func (d *WorkflowEventDispatcher) DispatchFindingsCreated(ctx context.Context, tenantID shared.ID, findings []*vulnerability.Finding)

DispatchFindingsCreated dispatches finding_created events for a batch of newly created findings. This is called by the ingest service when findings are created during ingestion. Events are dispatched asynchronously to avoid blocking the ingestion pipeline.

Optimizations applied: - Batch workflow matching: finds workflows once for all findings instead of per-finding - Limits findings per dispatch to prevent memory exhaustion - Proper goroutine recovery to prevent panics from crashing the service - Deduplication: each workflow is triggered once per batch (not per finding)

type WorkflowExecutor

type WorkflowExecutor struct {
	// contains filtered or unexported fields
}

WorkflowExecutor handles the execution of workflow runs. It processes nodes in topological order, handling conditions, actions, and notifications.

SECURITY: Includes tenant isolation, execution timeout, and rate limiting.

func NewWorkflowExecutor

func NewWorkflowExecutor(
	workflowRepo workflowdom.WorkflowRepository,
	runRepo workflowdom.RunRepository,
	nodeRunRepo workflowdom.NodeRunRepository,
	log *logger.Logger,
	opts ...WorkflowExecutorOption,
) *WorkflowExecutor

NewWorkflowExecutor creates a new WorkflowExecutor.

func (*WorkflowExecutor) Execute

func (e *WorkflowExecutor) Execute(ctx context.Context, runID shared.ID) error

Execute executes a workflow run. This is the main entry point for workflow execution. SEC-WF05: Includes tenant isolation check.

func (*WorkflowExecutor) ExecuteAsync

func (e *WorkflowExecutor) ExecuteAsync(runID shared.ID)

ExecuteAsync executes a workflow run asynchronously with rate limiting. SEC-WF07: Uses semaphore to limit concurrent executions.

func (*WorkflowExecutor) ExecuteAsyncWithTenant

func (e *WorkflowExecutor) ExecuteAsyncWithTenant(runID shared.ID, tenantID shared.ID)

ExecuteAsyncWithTenant executes a workflow run asynchronously with tenant context. SEC-WF07: Uses semaphore to limit concurrent executions and passes tenant for isolation. SEC-WF10: Also enforces per-tenant rate limiting. SEC-WF12: Includes panic recovery to prevent resource leaks.

func (*WorkflowExecutor) ExecuteWithTenant

func (e *WorkflowExecutor) ExecuteWithTenant(ctx context.Context, runID shared.ID, expectedTenantID shared.ID) error

ExecuteWithTenant executes a workflow run with explicit tenant verification. SEC-WF05: If expectedTenantID is provided, verifies the run belongs to that tenant.

func (*WorkflowExecutor) RegisterActionHandler

func (e *WorkflowExecutor) RegisterActionHandler(actionType workflowdom.ActionType, handler ActionHandler)

RegisterActionHandler registers a custom action handler.

type WorkflowExecutorConfig

type WorkflowExecutorConfig struct {
	// MaxNodeExecutionTime is the maximum time allowed for a single node execution.
	MaxNodeExecutionTime time.Duration

	// MaxConcurrentNodes is the maximum nodes that can execute concurrently.
	MaxConcurrentNodes int
}

WorkflowExecutorConfig holds configuration for the executor.

func DefaultWorkflowExecutorConfig

func DefaultWorkflowExecutorConfig() WorkflowExecutorConfig

DefaultWorkflowExecutorConfig returns default configuration.

type WorkflowExecutorOption

type WorkflowExecutorOption func(*WorkflowExecutor)

WorkflowExecutorOption is a functional option for WorkflowExecutor.

func WithExecutorAuditService

func WithExecutorAuditService(svc *auditapp.AuditService) WorkflowExecutorOption

WithExecutorAuditService sets the audit service.

func WithExecutorDB

func WithExecutorDB(db *sql.DB) WorkflowExecutorOption

WithExecutorDB sets the database connection for transactions.

func WithExecutorIntegrationService

func WithExecutorIntegrationService(svc *integration.IntegrationService) WorkflowExecutorOption

WithExecutorIntegrationService sets the integration service.

func WithExecutorOutboxService

func WithExecutorOutboxService(svc *outbox.Service) WorkflowExecutorOption

WithExecutorOutboxService sets the notification service.

type WorkflowService

type WorkflowService struct {
	// contains filtered or unexported fields
}

WorkflowService handles workflow-related business operations.

func NewWorkflowService

NewWorkflowService creates a new WorkflowService.

func (*WorkflowService) AddEdge

func (s *WorkflowService) AddEdge(ctx context.Context, input AddEdgeInput) (*workflowdom.Edge, error)

AddEdge adds an edge to a workflow.

func (*WorkflowService) AddNode

func (s *WorkflowService) AddNode(ctx context.Context, input AddNodeInput) (*workflowdom.Node, error)

AddNode adds a node to a workflow.

func (*WorkflowService) CancelRun

func (s *WorkflowService) CancelRun(ctx context.Context, tenantID, userID, runID shared.ID) error

CancelRun cancels a running workflow.

func (*WorkflowService) CreateWorkflow

func (s *WorkflowService) CreateWorkflow(ctx context.Context, input CreateWorkflowInput) (*workflowdom.Workflow, error)

CreateWorkflow creates a new workflow with its nodes and edges.

func (*WorkflowService) DeleteEdge

func (s *WorkflowService) DeleteEdge(ctx context.Context, tenantID, workflowID, edgeID shared.ID) error

DeleteEdge deletes an edge from a workflow.

func (*WorkflowService) DeleteNode

func (s *WorkflowService) DeleteNode(ctx context.Context, tenantID, workflowID, nodeID shared.ID) error

DeleteNode deletes a node from a workflow.

func (*WorkflowService) DeleteWorkflow

func (s *WorkflowService) DeleteWorkflow(ctx context.Context, tenantID, userID, workflowID shared.ID) error

DeleteWorkflow deletes a workflow and all its nodes/edges.

func (*WorkflowService) GetRun

func (s *WorkflowService) GetRun(ctx context.Context, tenantID, runID shared.ID) (*workflowdom.Run, error)

GetRun retrieves a workflow run with its node runs.

func (*WorkflowService) GetWorkflow

func (s *WorkflowService) GetWorkflow(ctx context.Context, tenantID, workflowID shared.ID) (*workflowdom.Workflow, error)

GetWorkflow retrieves a workflow by ID with its graph.

func (*WorkflowService) ListRuns

ListRuns lists workflow runs.

func (*WorkflowService) ListWorkflows

ListWorkflows lists workflows with filters.

func (*WorkflowService) TriggerWorkflow

func (s *WorkflowService) TriggerWorkflow(ctx context.Context, input TriggerWorkflowInput) (*workflowdom.Run, error)

TriggerWorkflow triggers a workflow execution.

func (*WorkflowService) UpdateNode

func (s *WorkflowService) UpdateNode(ctx context.Context, input UpdateNodeInput) (*workflowdom.Node, error)

UpdateNode updates a workflow node.

func (*WorkflowService) UpdateWorkflow

func (s *WorkflowService) UpdateWorkflow(ctx context.Context, input UpdateWorkflowInput) (*workflowdom.Workflow, error)

UpdateWorkflow updates a workflow.

func (*WorkflowService) UpdateWorkflowGraph

func (s *WorkflowService) UpdateWorkflowGraph(ctx context.Context, input UpdateWorkflowGraphInput) (*workflowdom.Workflow, error)

UpdateWorkflowGraph atomically replaces a workflow's graph (nodes and edges). This is a complete replacement operation - all existing nodes/edges are deleted and new ones are created in a single atomic operation.

type WorkflowServiceOption

type WorkflowServiceOption func(*WorkflowService)

WorkflowServiceOption is a functional option for WorkflowService.

func WithWorkflowAuditService

func WithWorkflowAuditService(auditService *auditapp.AuditService) WorkflowServiceOption

WithWorkflowAuditService sets the audit service for WorkflowService.

func WithWorkflowExecutor

func WithWorkflowExecutor(executor *WorkflowExecutor) WorkflowServiceOption

WithWorkflowExecutor sets the workflow executor for WorkflowService.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL