Documentation
¶
Overview ¶
Package pipeline provides adapters to bridge app types with pipeline interfaces.
Package pipeline provides pipeline management services.
Index ¶
- Constants
- type AddStepInput
- type AgentSelector
- type AgentSelectorAdapter
- type AuditContext
- type AuditEvent
- type AuditService
- type AuditServiceAdapter
- type AuditServiceFunc
- type CloneSystemTemplateInput
- type CloneTemplateInput
- type CreateTemplateInput
- type ListRunsInput
- type ListTemplatesInput
- type Option
- func WithAgentSelector(selector AgentSelector) Option
- func WithAuditService(auditService AuditService) Option
- func WithDB(db TransactionDB) Option
- func WithQualityGate(profileRepo scanprofile.Repository, ...) Option
- func WithScanDeactivator(deactivator ScanDeactivator) Option
- func WithToolRepo(toolRepo tool.Repository) Option
- type ScanDeactivator
- type SecurityValidator
- type SecurityValidatorFunc
- func (f *SecurityValidatorFunc) ValidateCommandPayload(ctx context.Context, tenantID shared.ID, payload map[string]any) *ValidationResult
- func (f *SecurityValidatorFunc) ValidateIdentifier(value string, maxLen int, fieldName string) *ValidationResult
- func (f *SecurityValidatorFunc) ValidateIdentifiers(values []string, maxLen int, fieldName string) *ValidationResult
- func (f *SecurityValidatorFunc) ValidateStepConfig(ctx context.Context, tenantID shared.ID, tool string, capabilities []string, ...) *ValidationResult
- type SelectAgentRequest
- type SelectAgentResult
- type SelectMode
- type Service
- func (s *Service) AddStep(ctx context.Context, input AddStepInput) (*pipeline.Step, error)
- func (s *Service) CancelRun(ctx context.Context, tenantID, runID string) error
- func (s *Service) CloneSystemTemplate(ctx context.Context, input CloneSystemTemplateInput) (*pipeline.Template, error)
- func (s *Service) CloneTemplate(ctx context.Context, input CloneTemplateInput) (*pipeline.Template, error)
- func (s *Service) CompleteStepRun(ctx context.Context, stepRunID string, findingsCount int, ...) error
- func (s *Service) CreateTemplate(ctx context.Context, input CreateTemplateInput) (*pipeline.Template, error)
- func (s *Service) DeactivatePipelinesByTool(ctx context.Context, toolName string) (int, []shared.ID, error)
- func (s *Service) DeleteStep(ctx context.Context, tenantID, stepID string) error
- func (s *Service) DeleteStepsByPipelineID(ctx context.Context, tenantID, pipelineID string) error
- func (s *Service) DeleteTemplate(ctx context.Context, tenantID, templateID string) error
- func (s *Service) FailStepRun(ctx context.Context, stepRunID, errorMessage, errorCode string) error
- func (s *Service) GetPipelinesUsingTool(ctx context.Context, toolName string) ([]shared.ID, error)
- func (s *Service) GetRun(ctx context.Context, tenantID, runID string) (*pipeline.Run, error)
- func (s *Service) GetRunWithSteps(ctx context.Context, runID string) (*pipeline.Run, error)
- func (s *Service) GetSteps(ctx context.Context, templateID string) ([]*pipeline.Step, error)
- func (s *Service) GetTemplate(ctx context.Context, tenantID, templateID string) (*pipeline.Template, error)
- func (s *Service) GetTemplateWithSteps(ctx context.Context, templateID string) (*pipeline.Template, error)
- func (s *Service) ListRuns(ctx context.Context, input ListRunsInput) (pagination.Result[*pipeline.Run], error)
- func (s *Service) ListTemplates(ctx context.Context, input ListTemplatesInput) (pagination.Result[*pipeline.Template], error)
- func (s *Service) OnStepCompleted(ctx context.Context, runID, stepKey string, findingsCount int, ...) error
- func (s *Service) OnStepFailed(ctx context.Context, runID, stepKey, errorMessage, errorCode string) error
- func (s *Service) TriggerPipeline(ctx context.Context, input TriggerPipelineInput) (*pipeline.Run, error)
- func (s *Service) UpdateStep(ctx context.Context, stepID string, input AddStepInput) (*pipeline.Step, error)
- func (s *Service) UpdateTemplate(ctx context.Context, input UpdateTemplateInput) (*pipeline.Template, error)
- func (s *Service) ValidateSteps(ctx context.Context, inputs []AddStepInput) error
- func (s *Service) ValidateToolReferences(ctx context.Context, template *pipeline.Template, tenantID shared.ID) error
- type TransactionDB
- type TriggerPipelineInput
- type UpdateTemplateInput
- type ValidationError
- type ValidationResult
Constants ¶
const ( // MaxConcurrentRunsPerPipeline is the maximum concurrent runs per pipeline template. MaxConcurrentRunsPerPipeline = 5 // MaxConcurrentRunsPerTenant is the maximum concurrent runs per tenant. MaxConcurrentRunsPerTenant = 50 )
Concurrent run limits to prevent resource exhaustion.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AddStepInput ¶
type AddStepInput struct {
TenantID string `json:"tenant_id" validate:"required,uuid"`
TemplateID string `json:"template_id" validate:"required,uuid"`
StepKey string `json:"step_key" validate:"required,min=1,max=100"`
Name string `json:"name" validate:"required,min=1,max=255"`
Description string `json:"description" validate:"max=1000"`
Order int `json:"order"`
UIPositionX *float64 `json:"ui_position_x"`
UIPositionY *float64 `json:"ui_position_y"`
Tool string `json:"tool" validate:"max=100"`
Capabilities []string `json:"capabilities" validate:"omitempty,max=10"`
Config map[string]any `json:"config"`
TimeoutSeconds int `json:"timeout_seconds"`
DependsOn []string `json:"depends_on"`
Condition *pipeline.Condition `json:"condition"`
MaxRetries int `json:"max_retries"`
RetryDelaySeconds int `json:"retry_delay_seconds"`
}
AddStepInput represents the input for adding a step. Capabilities are optional - if not provided and tool is specified, they will be derived from the tool.
type AgentSelector ¶
type AgentSelector interface {
SelectAgent(ctx context.Context, req SelectAgentRequest) (*SelectAgentResult, error)
CanUsePlatformAgents(ctx context.Context, tenantID shared.ID) (bool, string)
}
AgentSelector interface for agent selection.
type AgentSelectorAdapter ¶
type AgentSelectorAdapter struct {
SelectAgentFunc func(ctx context.Context, req SelectAgentRequest) (*SelectAgentResult, error)
CanUsePlatformAgentsFunc func(ctx context.Context, tenantID shared.ID) (bool, string)
}
AgentSelectorAdapter adapts app.AgentSelector to pipeline.AgentSelector.
func (*AgentSelectorAdapter) CanUsePlatformAgents ¶
func (*AgentSelectorAdapter) SelectAgent ¶
func (a *AgentSelectorAdapter) SelectAgent(ctx context.Context, req SelectAgentRequest) (*SelectAgentResult, error)
type AuditContext ¶
AuditContext contains context for audit logging.
type AuditEvent ¶
type AuditEvent struct {
Action audit.Action
ResourceType audit.ResourceType
ResourceID string
ResourceName string
Message string
Success bool
Error error
Metadata map[string]any
}
AuditEvent represents an audit event.
func NewFailureEvent ¶
func NewFailureEvent(action audit.Action, resourceType audit.ResourceType, resourceID string, err error) AuditEvent
NewFailureEvent creates a failure audit event.
func NewSuccessEvent ¶
func NewSuccessEvent(action audit.Action, resourceType audit.ResourceType, resourceID string) AuditEvent
NewSuccessEvent creates a success audit event.
func (AuditEvent) WithMessage ¶
func (e AuditEvent) WithMessage(msg string) AuditEvent
WithMessage sets the message.
func (AuditEvent) WithMetadata ¶
func (e AuditEvent) WithMetadata(key string, value any) AuditEvent
WithMetadata adds metadata.
func (AuditEvent) WithResourceName ¶
func (e AuditEvent) WithResourceName(name string) AuditEvent
WithResourceName sets the resource name.
type AuditService ¶
type AuditService interface {
LogEvent(ctx context.Context, actx AuditContext, event AuditEvent) error
}
AuditService interface for audit logging.
type AuditServiceAdapter ¶
type AuditServiceAdapter struct {
LogEventFunc func(ctx context.Context, tenantID, actorID string, action audit.Action, resourceType audit.ResourceType, resourceID, resourceName, message string, success bool, err error, metadata map[string]any) error
}
AuditServiceAdapter adapts app.AuditService to pipeline.AuditService.
func (*AuditServiceAdapter) LogEvent ¶
func (a *AuditServiceAdapter) LogEvent(ctx context.Context, actx AuditContext, event AuditEvent) error
type AuditServiceFunc ¶
type AuditServiceFunc func(ctx context.Context, actx AuditContext, event AuditEvent) error
AuditServiceFunc wraps a function to implement AuditService.
func (AuditServiceFunc) LogEvent ¶
func (f AuditServiceFunc) LogEvent(ctx context.Context, actx AuditContext, event AuditEvent) error
type CloneSystemTemplateInput ¶
type CloneSystemTemplateInput struct {
TenantID string `json:"tenant_id" validate:"required,uuid"`
SystemTemplateID string `json:"system_template_id" validate:"required,uuid"`
NewName string `json:"new_name" validate:"omitempty,min=1,max=255"`
CreatedBy string `json:"created_by" validate:"omitempty,uuid"`
}
CloneSystemTemplateInput represents the input for cloning a system template.
type CloneTemplateInput ¶
type CloneTemplateInput struct {
TenantID string `json:"tenant_id" validate:"required,uuid"`
TemplateID string `json:"template_id" validate:"required,uuid"`
NewName string `json:"new_name" validate:"required,min=1,max=255"`
ClonedBy string `json:"cloned_by" validate:"omitempty,uuid"`
}
CloneTemplateInput represents the input for cloning a template.
type CreateTemplateInput ¶
type CreateTemplateInput struct {
TenantID string `json:"tenant_id" validate:"required,uuid"`
Name string `json:"name" validate:"required,min=1,max=255"`
Description string `json:"description" validate:"max=1000"`
Triggers []pipeline.Trigger `json:"triggers"`
Settings *pipeline.Settings `json:"settings"`
Tags []string `json:"tags" validate:"max=10,dive,max=50"`
CreatedBy string `json:"created_by" validate:"omitempty,uuid"`
}
CreateTemplateInput represents the input for creating a template.
type ListRunsInput ¶
type ListRunsInput struct {
TenantID string `json:"tenant_id" validate:"required,uuid"`
PipelineID string `json:"pipeline_id" validate:"omitempty,uuid"`
AssetID string `json:"asset_id" validate:"omitempty,uuid"`
Status string `json:"status" validate:"omitempty,oneof=pending running completed failed canceled timeout"`
Page int `json:"page"`
PerPage int `json:"per_page"`
}
ListRunsInput represents the input for listing runs.
type ListTemplatesInput ¶
type ListTemplatesInput struct {
TenantID string `json:"tenant_id" validate:"required,uuid"`
IsActive *bool `json:"is_active"`
Tags []string `json:"tags"`
Search string `json:"search" validate:"max=255"`
Page int `json:"page"`
PerPage int `json:"per_page"`
}
ListTemplatesInput represents the input for listing templates.
type Option ¶
type Option func(*Service)
Option is a functional option for Service.
func WithAgentSelector ¶
func WithAgentSelector(selector AgentSelector) Option
WithAgentSelector sets the agent selector for platform agent support.
func WithAuditService ¶
func WithAuditService(auditService AuditService) Option
WithAuditService sets the audit service for Service.
func WithDB ¶
func WithDB(db TransactionDB) Option
WithDB sets the database for transaction support.
func WithQualityGate ¶
func WithQualityGate(profileRepo scanprofile.Repository, findingRepo vulnerability.FindingRepository) Option
WithQualityGate sets the dependencies for quality gate evaluation.
func WithScanDeactivator ¶
func WithScanDeactivator(deactivator ScanDeactivator) Option
WithScanDeactivator sets the scan deactivator for cascade deactivation.
func WithToolRepo ¶
func WithToolRepo(toolRepo tool.Repository) Option
WithToolRepo sets the tool repository for deriving capabilities from tools.
type ScanDeactivator ¶
type ScanDeactivator interface {
DeactivateScansByPipeline(ctx context.Context, pipelineID shared.ID) (int, error)
}
ScanDeactivator interface for cascade deactivation when pipelines are disabled.
type SecurityValidator ¶
type SecurityValidator interface {
ValidateIdentifier(value string, maxLen int, fieldName string) *ValidationResult
ValidateIdentifiers(values []string, maxLen int, fieldName string) *ValidationResult
ValidateStepConfig(ctx context.Context, tenantID shared.ID, tool string, capabilities []string, config map[string]any) *ValidationResult
ValidateCommandPayload(ctx context.Context, tenantID shared.ID, payload map[string]any) *ValidationResult
}
SecurityValidator interface for security validation.
type SecurityValidatorFunc ¶
type SecurityValidatorFunc struct {
ValidateIdentifierFunc func(value string, maxLen int, fieldName string) *ValidationResult
ValidateIdentifiersFunc func(values []string, maxLen int, fieldName string) *ValidationResult
ValidateStepConfigFunc func(ctx context.Context, tenantID shared.ID, tool string, capabilities []string, config map[string]any) *ValidationResult
ValidateCommandPayloadFunc func(ctx context.Context, tenantID shared.ID, payload map[string]any) *ValidationResult
}
SecurityValidatorFunc is a function that implements SecurityValidator.
func (*SecurityValidatorFunc) ValidateCommandPayload ¶
func (f *SecurityValidatorFunc) ValidateCommandPayload(ctx context.Context, tenantID shared.ID, payload map[string]any) *ValidationResult
func (*SecurityValidatorFunc) ValidateIdentifier ¶
func (f *SecurityValidatorFunc) ValidateIdentifier(value string, maxLen int, fieldName string) *ValidationResult
func (*SecurityValidatorFunc) ValidateIdentifiers ¶
func (f *SecurityValidatorFunc) ValidateIdentifiers(values []string, maxLen int, fieldName string) *ValidationResult
func (*SecurityValidatorFunc) ValidateStepConfig ¶
func (f *SecurityValidatorFunc) ValidateStepConfig(ctx context.Context, tenantID shared.ID, tool string, capabilities []string, config map[string]any) *ValidationResult
type SelectAgentRequest ¶
type SelectAgentRequest struct {
TenantID shared.ID
Capabilities []string
Tool string
Mode SelectMode
AllowQueue bool
}
SelectAgentRequest represents a request to select an agent.
type SelectAgentResult ¶
SelectAgentResult represents the result of agent selection.
type SelectMode ¶
type SelectMode int
SelectMode represents the agent selection mode.
const ( // SelectTenantFirst tries tenant agents first, then platform. SelectTenantFirst SelectMode = iota )
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service handles pipeline-related business operations.
func NewService ¶
func NewService( templateRepo pipeline.TemplateRepository, stepRepo pipeline.StepRepository, runRepo pipeline.RunRepository, stepRunRepo pipeline.StepRunRepository, agentRepo agent.Repository, commandRepo command.Repository, securityValidator SecurityValidator, log *logger.Logger, opts ...Option, ) *Service
NewService creates a new Service.
func (*Service) CloneSystemTemplate ¶
func (s *Service) CloneSystemTemplate(ctx context.Context, input CloneSystemTemplateInput) (*pipeline.Template, error)
CloneSystemTemplate clones a system template for a tenant. This is the "copy-on-use" mechanism for system templates.
func (*Service) CloneTemplate ¶
func (s *Service) CloneTemplate(ctx context.Context, input CloneTemplateInput) (*pipeline.Template, error)
CloneTemplate creates a copy of an existing template with all its steps. This supports cloning both tenant templates AND system templates.
func (*Service) CompleteStepRun ¶
func (s *Service) CompleteStepRun(ctx context.Context, stepRunID string, findingsCount int, output map[string]any) error
CompleteStepRun marks a step run as completed (called by agent).
func (*Service) CreateTemplate ¶
func (s *Service) CreateTemplate(ctx context.Context, input CreateTemplateInput) (*pipeline.Template, error)
CreateTemplate creates a new pipeline template.
func (*Service) DeactivatePipelinesByTool ¶
func (s *Service) DeactivatePipelinesByTool(ctx context.Context, toolName string) (int, []shared.ID, error)
DeactivatePipelinesByTool deactivates all active pipelines that use a specific tool. This is called when a tool is deactivated or deleted to ensure data consistency. Returns the count of deactivated pipelines and list of affected pipeline IDs.
func (*Service) DeleteStep ¶
DeleteStep deletes a step.
func (*Service) DeleteStepsByPipelineID ¶
DeleteStepsByPipelineID deletes all steps for a pipeline.
func (*Service) DeleteTemplate ¶
DeleteTemplate deletes a template.
func (*Service) FailStepRun ¶
FailStepRun marks a step run as failed (called by agent).
func (*Service) GetPipelinesUsingTool ¶
GetPipelinesUsingTool returns all active pipeline IDs that use a specific tool. This can be used to check if a tool can be safely deleted.
func (*Service) GetRunWithSteps ¶
GetRunWithSteps retrieves a pipeline run with its step runs.
func (*Service) GetTemplate ¶
func (s *Service) GetTemplate(ctx context.Context, tenantID, templateID string) (*pipeline.Template, error)
GetTemplate retrieves a template by ID. For system templates, this returns the template as read-only (for viewing). Use CloneSystemTemplate to create an editable copy for a tenant.
func (*Service) GetTemplateWithSteps ¶
func (s *Service) GetTemplateWithSteps(ctx context.Context, templateID string) (*pipeline.Template, error)
GetTemplateWithSteps retrieves a template with its steps.
func (*Service) ListRuns ¶
func (s *Service) ListRuns(ctx context.Context, input ListRunsInput) (pagination.Result[*pipeline.Run], error)
ListRuns lists pipeline runs with filters.
func (*Service) ListTemplates ¶
func (s *Service) ListTemplates(ctx context.Context, input ListTemplatesInput) (pagination.Result[*pipeline.Template], error)
ListTemplates lists templates with filters. Returns both tenant-specific templates AND system templates (available to all tenants).
func (*Service) OnStepCompleted ¶
func (s *Service) OnStepCompleted(ctx context.Context, runID, stepKey string, findingsCount int, output map[string]any) error
OnStepCompleted is called when an agent reports step completion. This triggers scheduling of dependent steps.
func (*Service) OnStepFailed ¶
func (s *Service) OnStepFailed(ctx context.Context, runID, stepKey, errorMessage, errorCode string) error
OnStepFailed is called when an agent reports step failure.
func (*Service) TriggerPipeline ¶
func (s *Service) TriggerPipeline(ctx context.Context, input TriggerPipelineInput) (*pipeline.Run, error)
TriggerPipeline starts a new pipeline run. Uses atomic CreateRunIfUnderLimit to prevent race conditions in concurrent run limits. If the template is a system template, it will be auto-cloned for the tenant first.
func (*Service) UpdateStep ¶
func (s *Service) UpdateStep(ctx context.Context, stepID string, input AddStepInput) (*pipeline.Step, error)
UpdateStep updates a step.
func (*Service) UpdateTemplate ¶
func (s *Service) UpdateTemplate(ctx context.Context, input UpdateTemplateInput) (*pipeline.Template, error)
UpdateTemplate updates a template. Note: System templates cannot be updated directly. They must be cloned first.
func (*Service) ValidateSteps ¶
func (s *Service) ValidateSteps(ctx context.Context, inputs []AddStepInput) error
ValidateSteps validates a list of step inputs without creating them. This is used to pre-validate steps before deleting existing ones during update.
func (*Service) ValidateToolReferences ¶
func (s *Service) ValidateToolReferences(ctx context.Context, template *pipeline.Template, tenantID shared.ID) error
ValidateToolReferences validates that all tools referenced by pipeline steps are available and active. This should be called before triggering a pipeline or activating it to ensure all required tools are present. Returns an error with details if any tool is missing or inactive.
Validation rules: 1. If step has Tool specified → Tool must exist and be active 2. If step has no Tool but has Capabilities → At least one active tool must match those capabilities 3. If step has no Tool AND no Capabilities → Step is invalid (cannot execute)
type TransactionDB ¶
TransactionDB defines the interface for database transaction support.
type TriggerPipelineInput ¶
type TriggerPipelineInput struct {
TenantID string `json:"tenant_id" validate:"required,uuid"`
TemplateID string `json:"template_id" validate:"required,uuid"`
AssetID string `json:"asset_id" validate:"omitempty,uuid"`
TriggerType string `json:"trigger_type" validate:"omitempty,oneof=manual schedule webhook api"`
TriggeredBy string `json:"triggered_by"`
Context map[string]any `json:"context"`
}
TriggerPipelineInput represents the input for triggering a pipeline.
type UpdateTemplateInput ¶
type UpdateTemplateInput struct {
TenantID string `json:"tenant_id" validate:"required,uuid"`
TemplateID string `json:"template_id" validate:"required,uuid"`
Name string `json:"name" validate:"omitempty,min=1,max=255"`
Description string `json:"description" validate:"max=1000"`
Triggers []pipeline.Trigger `json:"triggers"`
Settings *pipeline.Settings `json:"settings"`
Tags []string `json:"tags" validate:"max=10,dive,max=50"`
IsActive *bool `json:"is_active"`
UIStartPosition *pipeline.UIPosition `json:"ui_start_position"`
UIEndPosition *pipeline.UIPosition `json:"ui_end_position"`
}
UpdateTemplateInput represents the input for updating a template.
type ValidationError ¶
ValidationError represents a validation error.
type ValidationResult ¶
type ValidationResult struct {
Valid bool
Errors []ValidationError
}
ValidationResult represents the result of a validation.