Documentation
¶
Overview ¶
Package pipeline defines the Pipeline domain entities for scan orchestration.
Index ¶
- Constants
- type AgentPreference
- type Condition
- type ConditionType
- type Run
- func (r *Run) AddStepRun(stepRun *StepRun)
- func (r *Run) Cancel()
- func (r *Run) Complete()
- func (r *Run) Duration() time.Duration
- func (r *Run) Fail(message string)
- func (r *Run) GetProgress() int
- func (r *Run) GetStepRun(stepKey string) *StepRun
- func (r *Run) HasFailedSteps() bool
- func (r *Run) IsComplete() bool
- func (r *Run) IsPending() bool
- func (r *Run) IsRunning() bool
- func (r *Run) QualityGatePassed() bool
- func (r *Run) SetQualityGateResult(result *scanprofile.QualityGateResult)
- func (r *Run) SetScanProfile(profileID shared.ID)
- func (r *Run) SetTotalSteps(total int)
- func (r *Run) Start()
- func (r *Run) Timeout()
- func (r *Run) UpdateStats(completed, failed, skipped, findings int)
- type RunFilter
- type RunRepository
- type RunStats
- type RunStatus
- type Settings
- type Step
- func (s *Step) AddDependency(stepKey string)
- func (s *Step) Clone() *Step
- func (s *Step) HasDependencies() bool
- func (s *Step) IsDisabled() bool
- func (s *Step) SetCondition(condition Condition) error
- func (s *Step) SetConfig(config map[string]any)
- func (s *Step) SetDependencies(stepKeys []string)
- func (s *Step) SetRetry(maxRetries, delaySeconds int)
- func (s *Step) SetTimeout(seconds int) error
- func (s *Step) SetTool(tool string)
- func (s *Step) SetToolID(id *shared.ID)
- func (s *Step) SetUIPosition(x, y float64)
- func (s *Step) ShouldAlwaysRun() bool
- type StepRepository
- type StepRun
- func (sr *StepRun) CanRetry() bool
- func (sr *StepRun) Cancel()
- func (sr *StepRun) Complete(findingsCount int, output map[string]any)
- func (sr *StepRun) Duration() time.Duration
- func (sr *StepRun) Fail(errorMessage, errorCode string)
- func (sr *StepRun) IsComplete() bool
- func (sr *StepRun) IsPending() bool
- func (sr *StepRun) IsQueued() bool
- func (sr *StepRun) IsRunning() bool
- func (sr *StepRun) IsSuccess() bool
- func (sr *StepRun) PrepareRetry()
- func (sr *StepRun) Queue()
- func (sr *StepRun) SetConditionResult(result bool)
- func (sr *StepRun) ShouldSkip() bool
- func (sr *StepRun) Skip(reason string)
- func (sr *StepRun) Start(agentID, commandID shared.ID)
- func (sr *StepRun) Timeout()
- func (sr *StepRun) WaitTime() time.Duration
- type StepRunFilter
- type StepRunRepository
- type StepRunStatus
- type Template
- func (t *Template) Activate()
- func (t *Template) AddStep(step *Step)
- func (t *Template) AddTrigger(trigger Trigger)
- func (t *Template) Clone(newName string) *Template
- func (t *Template) Deactivate()
- func (t *Template) GetRunnableSteps(completedSteps map[string]bool) []*Step
- func (t *Template) GetStepByKey(key string) *Step
- func (t *Template) IncrementVersion()
- func (t *Template) SetCreatedBy(userID shared.ID)
- func (t *Template) SetSettings(settings Settings)
- func (t *Template) SetUIEndPosition(pos *UIPosition)
- func (t *Template) SetUIStartPosition(pos *UIPosition)
- func (t *Template) ValidateSteps() error
- type TemplateFilter
- type TemplateRepository
- type Trigger
- type TriggerType
- type UIPosition
Constants ¶
const ( // MinTimeoutSeconds is the minimum allowed timeout (1 minute). MinTimeoutSeconds = 60 // MaxTimeoutSeconds is the maximum allowed timeout (24 hours). MaxTimeoutSeconds = 86400 // DefaultTimeoutSeconds is the default timeout (30 minutes). DefaultTimeoutSeconds = 1800 )
Step timeout limits.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AgentPreference ¶
type AgentPreference string
AgentPreference determines which agents can execute the pipeline.
const ( // AgentPreferenceAuto tries tenant agents first, falls back to platform. AgentPreferenceAuto AgentPreference = "auto" // AgentPreferenceTenant only uses tenant's own agents. AgentPreferenceTenant AgentPreference = "tenant" // AgentPreferencePlatform only uses platform agents. AgentPreferencePlatform AgentPreference = "platform" )
type Condition ¶
type Condition struct {
Type ConditionType `json:"type"`
Value string `json:"value,omitempty"`
}
Condition represents a step execution condition.
func AlwaysCondition ¶
func AlwaysCondition() Condition
AlwaysCondition creates an always-run condition.
func AssetTypeCondition ¶
AssetTypeCondition creates an asset-type condition.
func ExpressionCondition ¶
ExpressionCondition creates an expression-based condition.
type ConditionType ¶
type ConditionType string
ConditionType represents the type of condition for step execution.
const ( ConditionTypeAlways ConditionType = "always" // Always run ConditionTypeNever ConditionType = "never" // Never run (disabled) ConditionTypeExpression ConditionType = "expression" // Custom expression (NOT YET IMPLEMENTED) ConditionTypeAssetType ConditionType = "asset_type" // Based on asset type ConditionTypeStepResult ConditionType = "step_result" // Based on previous step result )
func (ConditionType) IsValid ¶
func (c ConditionType) IsValid() bool
IsValid checks if the condition type is valid.
type Run ¶
type Run struct {
ID shared.ID
PipelineID shared.ID
TenantID shared.ID
AssetID *shared.ID
ScanID *shared.ID // Reference to the scan that triggered this run
// Trigger info
TriggerType TriggerType
TriggeredBy string // User email, system, webhook name, etc.
// Status
Status RunStatus
// Context (inputs for the pipeline)
Context map[string]any
// Results summary
TotalSteps int
CompletedSteps int
FailedSteps int
SkippedSteps int
TotalFindings int
// Timing
StartedAt *time.Time
CompletedAt *time.Time
// Error info
ErrorMessage string
// Scan Profile and Quality Gate
ScanProfileID *shared.ID // Reference to the scan profile used
QualityGateResult *scanprofile.QualityGateResult // Quality gate evaluation result
// Step runs (loaded separately)
StepRuns []*StepRun
// Timestamps
CreatedAt time.Time
}
Run represents an execution of a pipeline.
func NewRun ¶
func NewRun( pipelineID shared.ID, tenantID shared.ID, assetID *shared.ID, triggerType TriggerType, triggeredBy string, context map[string]any, ) (*Run, error)
NewRun creates a new pipeline run.
func (*Run) GetProgress ¶
GetProgress returns the progress percentage.
func (*Run) GetStepRun ¶
GetStepRun returns a step run by step key.
func (*Run) HasFailedSteps ¶
HasFailedSteps checks if any steps failed.
func (*Run) IsComplete ¶
IsComplete checks if the run is complete (terminal state).
func (*Run) QualityGatePassed ¶
QualityGatePassed returns true if quality gate passed or was not evaluated.
func (*Run) SetQualityGateResult ¶
func (r *Run) SetQualityGateResult(result *scanprofile.QualityGateResult)
SetQualityGateResult stores the quality gate evaluation result.
func (*Run) SetScanProfile ¶
SetScanProfile links this run to a scan profile.
func (*Run) SetTotalSteps ¶
SetTotalSteps sets the total number of steps.
func (*Run) UpdateStats ¶
UpdateStats updates the run statistics.
type RunFilter ¶
type RunFilter struct {
TenantID *shared.ID
PipelineID *shared.ID
AssetID *shared.ID
Status *RunStatus
TriggerType *TriggerType
TriggeredBy string
StartedFrom *time.Time
StartedTo *time.Time
}
RunFilter represents filter options for listing runs.
type RunRepository ¶
type RunRepository interface {
// Create creates a new run.
Create(ctx context.Context, run *Run) error
// GetByID retrieves a run by ID.
GetByID(ctx context.Context, id shared.ID) (*Run, error)
// GetByTenantAndID retrieves a run by tenant and ID.
GetByTenantAndID(ctx context.Context, tenantID, id shared.ID) (*Run, error)
// List lists runs with filters and pagination.
List(ctx context.Context, filter RunFilter, page pagination.Pagination) (pagination.Result[*Run], error)
// ListByScanID lists runs for a specific scan with pagination.
ListByScanID(ctx context.Context, scanID shared.ID, page, perPage int) ([]*Run, int64, error)
// Update updates a run.
Update(ctx context.Context, run *Run) error
// Delete deletes a run.
Delete(ctx context.Context, id shared.ID) error
// GetWithStepRuns retrieves a run with its step runs.
GetWithStepRuns(ctx context.Context, id shared.ID) (*Run, error)
// GetActiveByPipelineID retrieves active runs for a pipeline.
GetActiveByPipelineID(ctx context.Context, pipelineID shared.ID) ([]*Run, error)
// GetActiveByAssetID retrieves active runs for an asset.
GetActiveByAssetID(ctx context.Context, assetID shared.ID) ([]*Run, error)
// CountActiveByPipelineID counts active runs (pending/running) for a pipeline.
CountActiveByPipelineID(ctx context.Context, pipelineID shared.ID) (int, error)
// CountActiveByTenantID counts active runs (pending/running) for a tenant.
CountActiveByTenantID(ctx context.Context, tenantID shared.ID) (int, error)
// CountActiveByScanID counts active runs (pending/running) for a scan config.
CountActiveByScanID(ctx context.Context, scanID shared.ID) (int, error)
// CreateRunIfUnderLimit atomically checks concurrent run limits and creates run if under limit.
// Returns ErrConcurrentLimitExceeded if scan or tenant limit is exceeded.
// This prevents race conditions where multiple triggers bypass the limit.
CreateRunIfUnderLimit(ctx context.Context, run *Run, maxPerScan, maxPerTenant int) error
// UpdateStats updates run statistics.
UpdateStats(ctx context.Context, id shared.ID, completed, failed, skipped, findings int) error
// UpdateStatus updates run status.
UpdateStatus(ctx context.Context, id shared.ID, status RunStatus, errorMessage string) error
// GetStatsByTenant returns aggregated run statistics for a tenant in a single query.
// This is optimized to avoid N+1 queries when fetching stats.
GetStatsByTenant(ctx context.Context, tenantID shared.ID) (RunStats, error)
}
RunRepository defines the interface for pipeline run persistence.
type RunStats ¶
type RunStats struct {
Total int64
Pending int64
Running int64
Completed int64
Failed int64
Canceled int64
}
RunStats represents aggregated run statistics.
type RunStatus ¶
type RunStatus string
RunStatus represents the status of a pipeline run.
func (RunStatus) IsTerminal ¶
IsTerminal checks if the status is terminal (no more state changes).
type Settings ¶
type Settings struct {
MaxParallelSteps int `json:"max_parallel_steps,omitempty"`
FailFast bool `json:"fail_fast,omitempty"`
RetryFailedSteps int `json:"retry_failed_steps,omitempty"`
TimeoutSeconds int `json:"timeout_seconds,omitempty"`
NotifyOnComplete bool `json:"notify_on_complete,omitempty"`
NotifyOnFailure bool `json:"notify_on_failure,omitempty"`
NotificationChannels []string `json:"notification_channels,omitempty"`
AgentPreference AgentPreference `json:"agent_preference,omitempty"` // Agent selection mode: auto, tenant, platform
}
Settings represents pipeline execution settings.
func DefaultSettings ¶
func DefaultSettings() Settings
DefaultSettings returns default pipeline settings.
type Step ¶
type Step struct {
ID shared.ID
PipelineID shared.ID
// Step definition
StepKey string
Name string
Description string
StepOrder int
// Visual workflow builder
UIPosition UIPosition
// Tool requirements
Tool string // Preferred tool name (optional)
ToolID *shared.ID // Preferred tool ID (FK reference to tools table, optional)
Capabilities []string // Required capabilities
// Configuration
Config map[string]any
TimeoutSeconds int
// Dependencies
DependsOn []string // Step keys this step depends on
// Conditions
Condition Condition
// Retry settings
MaxRetries int
RetryDelaySeconds int
// Timestamps
CreatedAt time.Time
}
Step represents a single step in a pipeline.
func NewStep ¶
func NewStep( pipelineID shared.ID, stepKey string, name string, order int, capabilities []string, ) (*Step, error)
NewStep creates a new pipeline step.
func (*Step) AddDependency ¶
AddDependency adds a dependency on another step.
func (*Step) HasDependencies ¶
HasDependencies checks if the step has dependencies.
func (*Step) IsDisabled ¶
IsDisabled checks if the step is disabled.
func (*Step) SetCondition ¶
SetCondition sets the execution condition with validation. Returns error if the condition type is not yet supported.
func (*Step) SetDependencies ¶
SetDependencies sets the step dependencies.
func (*Step) SetTimeout ¶
SetTimeout sets the timeout in seconds with validation. Returns error if timeout is out of valid range [60, 86400].
func (*Step) SetToolID ¶ added in v0.1.2
SetToolID sets the preferred tool ID (FK reference to tools table).
func (*Step) SetUIPosition ¶
SetUIPosition sets the visual position for the workflow builder.
func (*Step) ShouldAlwaysRun ¶
ShouldAlwaysRun checks if the step should always run.
type StepRepository ¶
type StepRepository interface {
// Create creates a new step.
Create(ctx context.Context, step *Step) error
// CreateBatch creates multiple steps.
CreateBatch(ctx context.Context, steps []*Step) error
// GetByID retrieves a step by ID.
GetByID(ctx context.Context, id shared.ID) (*Step, error)
// GetByPipelineID retrieves all steps for a pipeline.
GetByPipelineID(ctx context.Context, pipelineID shared.ID) ([]*Step, error)
// GetByKey retrieves a step by pipeline ID and step key.
GetByKey(ctx context.Context, pipelineID shared.ID, stepKey string) (*Step, error)
// Update updates a step.
Update(ctx context.Context, step *Step) error
// Delete deletes a step.
Delete(ctx context.Context, id shared.ID) error
// DeleteByPipelineID deletes all steps for a pipeline.
DeleteByPipelineID(ctx context.Context, pipelineID shared.ID) error
// DeleteByPipelineIDInTx deletes all steps for a pipeline within a transaction.
DeleteByPipelineIDInTx(ctx context.Context, tx *sql.Tx, pipelineID shared.ID) error
// Reorder updates the order of steps.
Reorder(ctx context.Context, pipelineID shared.ID, stepOrders map[string]int) error
// FindPipelineIDsByToolName finds all active pipeline IDs that use a specific tool.
// Used for cascade deactivation when a tool is deactivated or deleted.
FindPipelineIDsByToolName(ctx context.Context, toolName string) ([]shared.ID, error)
}
StepRepository defines the interface for pipeline step persistence.
type StepRun ¶
type StepRun struct {
ID shared.ID
PipelineRunID shared.ID
StepID shared.ID
// Step identification
StepKey string
StepOrder int
// Execution
Status StepRunStatus
// Agent assignment
AgentID *shared.ID
CommandID *shared.ID
// Condition evaluation
ConditionEvaluated bool
ConditionResult *bool
SkipReason string
// Results
FindingsCount int
Output map[string]any
// Retry tracking
Attempt int
MaxAttempts int
// Timing
QueuedAt *time.Time
StartedAt *time.Time
CompletedAt *time.Time
// Error info
ErrorMessage string
ErrorCode string
// Timestamps
CreatedAt time.Time
}
StepRun represents an execution of a pipeline step.
func NewStepRun ¶
func NewStepRun( pipelineRunID shared.ID, stepID shared.ID, stepKey string, order int, maxRetries int, ) *StepRun
NewStepRun creates a new step run.
func (*StepRun) IsComplete ¶
IsComplete checks if the step is complete (terminal state).
func (*StepRun) PrepareRetry ¶
func (sr *StepRun) PrepareRetry()
PrepareRetry prepares the step for retry.
func (*StepRun) SetConditionResult ¶
SetConditionResult sets the condition evaluation result.
func (*StepRun) ShouldSkip ¶
ShouldSkip checks if the step should be skipped based on condition.
type StepRunFilter ¶
type StepRunFilter struct {
PipelineRunID *shared.ID
StepID *shared.ID
AgentID *shared.ID
Status *StepRunStatus
}
StepRunFilter represents filter options for listing step runs.
type StepRunRepository ¶
type StepRunRepository interface {
// Create creates a new step run.
Create(ctx context.Context, stepRun *StepRun) error
// CreateBatch creates multiple step runs.
CreateBatch(ctx context.Context, stepRuns []*StepRun) error
// GetByID retrieves a step run by ID.
GetByID(ctx context.Context, id shared.ID) (*StepRun, error)
// GetByPipelineRunID retrieves all step runs for a pipeline run.
GetByPipelineRunID(ctx context.Context, pipelineRunID shared.ID) ([]*StepRun, error)
// GetByStepKey retrieves a step run by pipeline run ID and step key.
GetByStepKey(ctx context.Context, pipelineRunID shared.ID, stepKey string) (*StepRun, error)
// List lists step runs with filters.
List(ctx context.Context, filter StepRunFilter) ([]*StepRun, error)
// Update updates a step run.
Update(ctx context.Context, stepRun *StepRun) error
// Delete deletes a step run.
Delete(ctx context.Context, id shared.ID) error
// UpdateStatus updates step run status.
UpdateStatus(ctx context.Context, id shared.ID, status StepRunStatus, errorMessage, errorCode string) error
// AssignAgent assigns an agent and command to a step run.
AssignAgent(ctx context.Context, id shared.ID, agentID, commandID shared.ID) error
// Complete marks a step run as completed.
Complete(ctx context.Context, id shared.ID, findingsCount int, output map[string]any) error
// GetPendingByDependencies gets step runs that are pending and have their dependencies completed.
GetPendingByDependencies(ctx context.Context, pipelineRunID shared.ID, completedStepKeys []string) ([]*StepRun, error)
// GetStatsByTenant returns aggregated step run statistics for a tenant in a single query.
// This is optimized to avoid N+1 queries when fetching stats.
GetStatsByTenant(ctx context.Context, tenantID shared.ID) (RunStats, error)
}
StepRunRepository defines the interface for step run persistence.
type StepRunStatus ¶
type StepRunStatus string
StepRunStatus represents the status of a step run.
const ( StepRunStatusPending StepRunStatus = "pending" StepRunStatusQueued StepRunStatus = "queued" StepRunStatusRunning StepRunStatus = "running" StepRunStatusCompleted StepRunStatus = "completed" StepRunStatusFailed StepRunStatus = "failed" StepRunStatusSkipped StepRunStatus = "skipped" StepRunStatusCanceled StepRunStatus = "canceled" StepRunStatusTimeout StepRunStatus = "timeout" )
func (StepRunStatus) IsSuccess ¶
func (s StepRunStatus) IsSuccess() bool
IsSuccess checks if the status indicates success.
func (StepRunStatus) IsTerminal ¶
func (s StepRunStatus) IsTerminal() bool
IsTerminal checks if the status is terminal.
func (StepRunStatus) IsValid ¶
func (s StepRunStatus) IsValid() bool
IsValid checks if the step run status is valid.
type Template ¶
type Template struct {
ID shared.ID
TenantID shared.ID
Name string
Description string
Version int
// Configuration
Triggers []Trigger
Settings Settings
// Status
IsActive bool
IsSystemTemplate bool
// Metadata
Tags []string
// Visual Builder UI positions for Start/End nodes
UIStartPosition *UIPosition
UIEndPosition *UIPosition
// Steps (loaded separately)
Steps []*Step
// Audit
CreatedBy *shared.ID
CreatedAt time.Time
UpdatedAt time.Time
}
Template represents a reusable pipeline definition.
func NewTemplate ¶
NewTemplate creates a new pipeline template.
func (*Template) AddTrigger ¶
AddTrigger adds a trigger to the template.
func (*Template) GetRunnableSteps ¶
GetRunnableSteps returns steps that can be run (no pending dependencies).
func (*Template) GetStepByKey ¶
GetStepByKey returns a step by its key.
func (*Template) IncrementVersion ¶
func (t *Template) IncrementVersion()
IncrementVersion increments the template version.
func (*Template) SetCreatedBy ¶
SetCreatedBy sets the user who created the template.
func (*Template) SetSettings ¶
SetSettings sets the pipeline settings.
func (*Template) SetUIEndPosition ¶
func (t *Template) SetUIEndPosition(pos *UIPosition)
SetUIEndPosition sets the visual builder End node position.
func (*Template) SetUIStartPosition ¶
func (t *Template) SetUIStartPosition(pos *UIPosition)
SetUIStartPosition sets the visual builder Start node position.
func (*Template) ValidateSteps ¶
ValidateSteps validates the step configuration.
type TemplateFilter ¶
type TemplateFilter struct {
TenantID *shared.ID
IsActive *bool
IsSystemTemplate *bool
Tags []string
Search string
IncludeSystemTemplate bool // Include system templates in results (for tenant views)
}
TemplateFilter represents filter options for listing templates.
type TemplateRepository ¶
type TemplateRepository interface {
// Create creates a new pipeline template.
Create(ctx context.Context, template *Template) error
// GetByID retrieves a template by ID.
GetByID(ctx context.Context, id shared.ID) (*Template, error)
// GetByTenantAndID retrieves a template by tenant and ID.
GetByTenantAndID(ctx context.Context, tenantID, id shared.ID) (*Template, error)
// GetByName retrieves a template by name and version.
GetByName(ctx context.Context, tenantID shared.ID, name string, version int) (*Template, error)
// List lists templates with filters and pagination.
List(ctx context.Context, filter TemplateFilter, page pagination.Pagination) (pagination.Result[*Template], error)
// Update updates a template.
Update(ctx context.Context, template *Template) error
// Delete deletes a template.
Delete(ctx context.Context, id shared.ID) error
// DeleteInTx deletes a template within a transaction.
DeleteInTx(ctx context.Context, tx *sql.Tx, id shared.ID) error
// GetWithSteps retrieves a template with its steps.
GetWithSteps(ctx context.Context, id shared.ID) (*Template, error)
// GetSystemTemplateByID retrieves a system template by ID (for copy-on-use).
GetSystemTemplateByID(ctx context.Context, id shared.ID) (*Template, error)
// ListWithSystemTemplates lists tenant templates + system templates.
// System templates are returned with is_system_template=true flag.
ListWithSystemTemplates(ctx context.Context, tenantID shared.ID, filter TemplateFilter, page pagination.Pagination) (pagination.Result[*Template], error)
}
TemplateRepository defines the interface for pipeline template persistence.
type Trigger ¶
type Trigger struct {
Type TriggerType `json:"type"`
Schedule string `json:"schedule,omitempty"` // Cron expression
Webhook string `json:"webhook,omitempty"` // Webhook name/path
Filters map[string]any `json:"filters,omitempty"` // Asset filters
}
Trigger represents a pipeline trigger configuration.
type TriggerType ¶
type TriggerType string
TriggerType represents how a pipeline can be triggered.
const ( TriggerTypeManual TriggerType = "manual" TriggerTypeSchedule TriggerType = "schedule" TriggerTypeWebhook TriggerType = "webhook" TriggerTypeAPI TriggerType = "api" TriggerTypeOnAssetDiscovery TriggerType = "on_asset_discovery" )
type UIPosition ¶
UIPosition represents the visual position in the workflow builder.