pipeline

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: GPL-3.0 Imports: 17 Imported by: 0

Documentation

Overview

Package pipeline provides adapters to bridge app types with pipeline interfaces.

Package pipeline provides pipeline management services.

Index

Constants

View Source
const (
	// MaxConcurrentRunsPerPipeline is the maximum concurrent runs per pipeline template.
	MaxConcurrentRunsPerPipeline = 5

	// MaxConcurrentRunsPerTenant is the maximum concurrent runs per tenant.
	MaxConcurrentRunsPerTenant = 50
)

Concurrent run limits to prevent resource exhaustion.

Variables

This section is empty.

Functions

This section is empty.

Types

type AddStepInput

type AddStepInput struct {
	TenantID          string              `json:"tenant_id" validate:"required,uuid"`
	TemplateID        string              `json:"template_id" validate:"required,uuid"`
	StepKey           string              `json:"step_key" validate:"required,min=1,max=100"`
	Name              string              `json:"name" validate:"required,min=1,max=255"`
	Description       string              `json:"description" validate:"max=1000"`
	Order             int                 `json:"order"`
	UIPositionX       *float64            `json:"ui_position_x"`
	UIPositionY       *float64            `json:"ui_position_y"`
	Tool              string              `json:"tool" validate:"max=100"`
	Capabilities      []string            `json:"capabilities" validate:"omitempty,max=10"`
	Config            map[string]any      `json:"config"`
	TimeoutSeconds    int                 `json:"timeout_seconds"`
	DependsOn         []string            `json:"depends_on"`
	Condition         *pipeline.Condition `json:"condition"`
	MaxRetries        int                 `json:"max_retries"`
	RetryDelaySeconds int                 `json:"retry_delay_seconds"`
}

AddStepInput represents the input for adding a step. Capabilities are optional - if not provided and tool is specified, they will be derived from the tool.

type AgentSelector

type AgentSelector interface {
	SelectAgent(ctx context.Context, req SelectAgentRequest) (*SelectAgentResult, error)
	CanUsePlatformAgents(ctx context.Context, tenantID shared.ID) (bool, string)
}

AgentSelector interface for agent selection.

type AgentSelectorAdapter

type AgentSelectorAdapter struct {
	SelectAgentFunc          func(ctx context.Context, req SelectAgentRequest) (*SelectAgentResult, error)
	CanUsePlatformAgentsFunc func(ctx context.Context, tenantID shared.ID) (bool, string)
}

AgentSelectorAdapter adapts app.AgentSelector to pipeline.AgentSelector.

func (*AgentSelectorAdapter) CanUsePlatformAgents

func (a *AgentSelectorAdapter) CanUsePlatformAgents(ctx context.Context, tenantID shared.ID) (bool, string)

func (*AgentSelectorAdapter) SelectAgent

type AuditContext

type AuditContext struct {
	TenantID string
	ActorID  string
}

AuditContext contains context for audit logging.

type AuditEvent

type AuditEvent struct {
	Action       audit.Action
	ResourceType audit.ResourceType
	ResourceID   string
	ResourceName string
	Message      string
	Success      bool
	Error        error
	Metadata     map[string]any
}

AuditEvent represents an audit event.

func NewFailureEvent

func NewFailureEvent(action audit.Action, resourceType audit.ResourceType, resourceID string, err error) AuditEvent

NewFailureEvent creates a failure audit event.

func NewSuccessEvent

func NewSuccessEvent(action audit.Action, resourceType audit.ResourceType, resourceID string) AuditEvent

NewSuccessEvent creates a success audit event.

func (AuditEvent) WithMessage

func (e AuditEvent) WithMessage(msg string) AuditEvent

WithMessage sets the message.

func (AuditEvent) WithMetadata

func (e AuditEvent) WithMetadata(key string, value any) AuditEvent

WithMetadata adds metadata.

func (AuditEvent) WithResourceName

func (e AuditEvent) WithResourceName(name string) AuditEvent

WithResourceName sets the resource name.

type AuditService

type AuditService interface {
	LogEvent(ctx context.Context, actx AuditContext, event AuditEvent) error
}

AuditService interface for audit logging.

type AuditServiceAdapter

type AuditServiceAdapter struct {
	LogEventFunc func(ctx context.Context, tenantID, actorID string, action audit.Action, resourceType audit.ResourceType, resourceID, resourceName, message string, success bool, err error, metadata map[string]any) error
}

AuditServiceAdapter adapts app.AuditService to pipeline.AuditService.

func (*AuditServiceAdapter) LogEvent

func (a *AuditServiceAdapter) LogEvent(ctx context.Context, actx AuditContext, event AuditEvent) error

type AuditServiceFunc

type AuditServiceFunc func(ctx context.Context, actx AuditContext, event AuditEvent) error

AuditServiceFunc wraps a function to implement AuditService.

func (AuditServiceFunc) LogEvent

func (f AuditServiceFunc) LogEvent(ctx context.Context, actx AuditContext, event AuditEvent) error

type CloneSystemTemplateInput

type CloneSystemTemplateInput struct {
	TenantID         string `json:"tenant_id" validate:"required,uuid"`
	SystemTemplateID string `json:"system_template_id" validate:"required,uuid"`
	NewName          string `json:"new_name" validate:"omitempty,min=1,max=255"`
	CreatedBy        string `json:"created_by" validate:"omitempty,uuid"`
}

CloneSystemTemplateInput represents the input for cloning a system template.

type CloneTemplateInput

type CloneTemplateInput struct {
	TenantID   string `json:"tenant_id" validate:"required,uuid"`
	TemplateID string `json:"template_id" validate:"required,uuid"`
	NewName    string `json:"new_name" validate:"required,min=1,max=255"`
	ClonedBy   string `json:"cloned_by" validate:"omitempty,uuid"`
}

CloneTemplateInput represents the input for cloning a template.

type CreateTemplateInput

type CreateTemplateInput struct {
	TenantID    string             `json:"tenant_id" validate:"required,uuid"`
	Name        string             `json:"name" validate:"required,min=1,max=255"`
	Description string             `json:"description" validate:"max=1000"`
	Triggers    []pipeline.Trigger `json:"triggers"`
	Settings    *pipeline.Settings `json:"settings"`
	Tags        []string           `json:"tags" validate:"max=10,dive,max=50"`
	CreatedBy   string             `json:"created_by" validate:"omitempty,uuid"`
}

CreateTemplateInput represents the input for creating a template.

type ListRunsInput

type ListRunsInput struct {
	TenantID   string `json:"tenant_id" validate:"required,uuid"`
	PipelineID string `json:"pipeline_id" validate:"omitempty,uuid"`
	AssetID    string `json:"asset_id" validate:"omitempty,uuid"`
	Status     string `json:"status" validate:"omitempty,oneof=pending running completed failed canceled timeout"`
	Page       int    `json:"page"`
	PerPage    int    `json:"per_page"`
}

ListRunsInput represents the input for listing runs.

type ListTemplatesInput

type ListTemplatesInput struct {
	TenantID string   `json:"tenant_id" validate:"required,uuid"`
	IsActive *bool    `json:"is_active"`
	Tags     []string `json:"tags"`
	Search   string   `json:"search" validate:"max=255"`
	Page     int      `json:"page"`
	PerPage  int      `json:"per_page"`
}

ListTemplatesInput represents the input for listing templates.

type Option

type Option func(*Service)

Option is a functional option for Service.

func WithAgentSelector

func WithAgentSelector(selector AgentSelector) Option

WithAgentSelector sets the agent selector for platform agent support.

func WithAuditService

func WithAuditService(auditService AuditService) Option

WithAuditService sets the audit service for Service.

func WithDB

func WithDB(db TransactionDB) Option

WithDB sets the database for transaction support.

func WithQualityGate

func WithQualityGate(profileRepo scanprofile.Repository, findingRepo vulnerability.FindingRepository) Option

WithQualityGate sets the dependencies for quality gate evaluation.

func WithScanDeactivator

func WithScanDeactivator(deactivator ScanDeactivator) Option

WithScanDeactivator sets the scan deactivator for cascade deactivation.

func WithToolRepo

func WithToolRepo(toolRepo tool.Repository) Option

WithToolRepo sets the tool repository for deriving capabilities from tools.

type ScanDeactivator

type ScanDeactivator interface {
	DeactivateScansByPipeline(ctx context.Context, pipelineID shared.ID) (int, error)
}

ScanDeactivator interface for cascade deactivation when pipelines are disabled.

type SecurityValidator

type SecurityValidator interface {
	ValidateIdentifier(value string, maxLen int, fieldName string) *ValidationResult
	ValidateIdentifiers(values []string, maxLen int, fieldName string) *ValidationResult
	ValidateStepConfig(ctx context.Context, tenantID shared.ID, tool string, capabilities []string, config map[string]any) *ValidationResult
	ValidateCommandPayload(ctx context.Context, tenantID shared.ID, payload map[string]any) *ValidationResult
}

SecurityValidator interface for security validation.

type SecurityValidatorFunc

type SecurityValidatorFunc struct {
	ValidateIdentifierFunc     func(value string, maxLen int, fieldName string) *ValidationResult
	ValidateIdentifiersFunc    func(values []string, maxLen int, fieldName string) *ValidationResult
	ValidateStepConfigFunc     func(ctx context.Context, tenantID shared.ID, tool string, capabilities []string, config map[string]any) *ValidationResult
	ValidateCommandPayloadFunc func(ctx context.Context, tenantID shared.ID, payload map[string]any) *ValidationResult
}

SecurityValidatorFunc is a function that implements SecurityValidator.

func (*SecurityValidatorFunc) ValidateCommandPayload

func (f *SecurityValidatorFunc) ValidateCommandPayload(ctx context.Context, tenantID shared.ID, payload map[string]any) *ValidationResult

func (*SecurityValidatorFunc) ValidateIdentifier

func (f *SecurityValidatorFunc) ValidateIdentifier(value string, maxLen int, fieldName string) *ValidationResult

func (*SecurityValidatorFunc) ValidateIdentifiers

func (f *SecurityValidatorFunc) ValidateIdentifiers(values []string, maxLen int, fieldName string) *ValidationResult

func (*SecurityValidatorFunc) ValidateStepConfig

func (f *SecurityValidatorFunc) ValidateStepConfig(ctx context.Context, tenantID shared.ID, tool string, capabilities []string, config map[string]any) *ValidationResult

type SelectAgentRequest

type SelectAgentRequest struct {
	TenantID     shared.ID
	Capabilities []string
	Tool         string
	Mode         SelectMode
	AllowQueue   bool
}

SelectAgentRequest represents a request to select an agent.

type SelectAgentResult

type SelectAgentResult struct {
	Agent      *agent.Agent
	IsPlatform bool
}

SelectAgentResult represents the result of agent selection.

type SelectMode

type SelectMode int

SelectMode represents the agent selection mode.

const (
	// SelectTenantFirst tries tenant agents first, then platform.
	SelectTenantFirst SelectMode = iota
)

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service handles pipeline-related business operations.

func NewService

func NewService(
	templateRepo pipeline.TemplateRepository,
	stepRepo pipeline.StepRepository,
	runRepo pipeline.RunRepository,
	stepRunRepo pipeline.StepRunRepository,
	agentRepo agent.Repository,
	commandRepo command.Repository,
	securityValidator SecurityValidator,
	log *logger.Logger,
	opts ...Option,
) *Service

NewService creates a new Service.

func (*Service) AddStep

func (s *Service) AddStep(ctx context.Context, input AddStepInput) (*pipeline.Step, error)

AddStep adds a step to a template.

func (*Service) CancelRun

func (s *Service) CancelRun(ctx context.Context, tenantID, runID string) error

CancelRun cancels a pipeline run.

func (*Service) CloneSystemTemplate

func (s *Service) CloneSystemTemplate(ctx context.Context, input CloneSystemTemplateInput) (*pipeline.Template, error)

CloneSystemTemplate clones a system template for a tenant. This is the "copy-on-use" mechanism for system templates.

func (*Service) CloneTemplate

func (s *Service) CloneTemplate(ctx context.Context, input CloneTemplateInput) (*pipeline.Template, error)

CloneTemplate creates a copy of an existing template with all its steps. This supports cloning both tenant templates AND system templates.

func (*Service) CompleteStepRun

func (s *Service) CompleteStepRun(ctx context.Context, stepRunID string, findingsCount int, output map[string]any) error

CompleteStepRun marks a step run as completed (called by agent).

func (*Service) CreateTemplate

func (s *Service) CreateTemplate(ctx context.Context, input CreateTemplateInput) (*pipeline.Template, error)

CreateTemplate creates a new pipeline template.

func (*Service) DeactivatePipelinesByTool

func (s *Service) DeactivatePipelinesByTool(ctx context.Context, toolName string) (int, []shared.ID, error)

DeactivatePipelinesByTool deactivates all active pipelines that use a specific tool. This is called when a tool is deactivated or deleted to ensure data consistency. Returns the count of deactivated pipelines and list of affected pipeline IDs.

func (*Service) DeleteStep

func (s *Service) DeleteStep(ctx context.Context, tenantID, stepID string) error

DeleteStep deletes a step.

func (*Service) DeleteStepsByPipelineID

func (s *Service) DeleteStepsByPipelineID(ctx context.Context, tenantID, pipelineID string) error

DeleteStepsByPipelineID deletes all steps for a pipeline.

func (*Service) DeleteTemplate

func (s *Service) DeleteTemplate(ctx context.Context, tenantID, templateID string) error

DeleteTemplate deletes a template.

func (*Service) FailStepRun

func (s *Service) FailStepRun(ctx context.Context, stepRunID, errorMessage, errorCode string) error

FailStepRun marks a step run as failed (called by agent).

func (*Service) GetPipelinesUsingTool

func (s *Service) GetPipelinesUsingTool(ctx context.Context, toolName string) ([]shared.ID, error)

GetPipelinesUsingTool returns all active pipeline IDs that use a specific tool. This can be used to check if a tool can be safely deleted.

func (*Service) GetRun

func (s *Service) GetRun(ctx context.Context, tenantID, runID string) (*pipeline.Run, error)

GetRun retrieves a pipeline run by ID.

func (*Service) GetRunWithSteps

func (s *Service) GetRunWithSteps(ctx context.Context, runID string) (*pipeline.Run, error)

GetRunWithSteps retrieves a pipeline run with its step runs.

func (*Service) GetSteps

func (s *Service) GetSteps(ctx context.Context, templateID string) ([]*pipeline.Step, error)

GetSteps retrieves all steps for a template.

func (*Service) GetTemplate

func (s *Service) GetTemplate(ctx context.Context, tenantID, templateID string) (*pipeline.Template, error)

GetTemplate retrieves a template by ID. For system templates, this returns the template as read-only (for viewing). Use CloneSystemTemplate to create an editable copy for a tenant.

func (*Service) GetTemplateWithSteps

func (s *Service) GetTemplateWithSteps(ctx context.Context, templateID string) (*pipeline.Template, error)

GetTemplateWithSteps retrieves a template with its steps.

func (*Service) ListRuns

func (s *Service) ListRuns(ctx context.Context, input ListRunsInput) (pagination.Result[*pipeline.Run], error)

ListRuns lists pipeline runs with filters.

func (*Service) ListTemplates

func (s *Service) ListTemplates(ctx context.Context, input ListTemplatesInput) (pagination.Result[*pipeline.Template], error)

ListTemplates lists templates with filters. Returns both tenant-specific templates AND system templates (available to all tenants).

func (*Service) OnStepCompleted

func (s *Service) OnStepCompleted(ctx context.Context, runID, stepKey string, findingsCount int, output map[string]any) error

OnStepCompleted is called when an agent reports step completion. This triggers scheduling of dependent steps.

func (*Service) OnStepFailed

func (s *Service) OnStepFailed(ctx context.Context, runID, stepKey, errorMessage, errorCode string) error

OnStepFailed is called when an agent reports step failure.

func (*Service) TriggerPipeline

func (s *Service) TriggerPipeline(ctx context.Context, input TriggerPipelineInput) (*pipeline.Run, error)

TriggerPipeline starts a new pipeline run. Uses atomic CreateRunIfUnderLimit to prevent race conditions in concurrent run limits. If the template is a system template, it will be auto-cloned for the tenant first.

func (*Service) UpdateStep

func (s *Service) UpdateStep(ctx context.Context, stepID string, input AddStepInput) (*pipeline.Step, error)

UpdateStep updates a step.

func (*Service) UpdateTemplate

func (s *Service) UpdateTemplate(ctx context.Context, input UpdateTemplateInput) (*pipeline.Template, error)

UpdateTemplate updates a template. Note: System templates cannot be updated directly. They must be cloned first.

func (*Service) ValidateSteps

func (s *Service) ValidateSteps(ctx context.Context, inputs []AddStepInput) error

ValidateSteps validates a list of step inputs without creating them. This is used to pre-validate steps before deleting existing ones during update.

func (*Service) ValidateToolReferences

func (s *Service) ValidateToolReferences(ctx context.Context, template *pipeline.Template, tenantID shared.ID) error

ValidateToolReferences validates that all tools referenced by pipeline steps are available and active. This should be called before triggering a pipeline or activating it to ensure all required tools are present. Returns an error with details if any tool is missing or inactive.

Validation rules: 1. If step has Tool specified → Tool must exist and be active 2. If step has no Tool but has Capabilities → At least one active tool must match those capabilities 3. If step has no Tool AND no Capabilities → Step is invalid (cannot execute)

type TransactionDB

type TransactionDB interface {
	BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx, error)
}

TransactionDB defines the interface for database transaction support.

type TriggerPipelineInput

type TriggerPipelineInput struct {
	TenantID    string         `json:"tenant_id" validate:"required,uuid"`
	TemplateID  string         `json:"template_id" validate:"required,uuid"`
	AssetID     string         `json:"asset_id" validate:"omitempty,uuid"`
	TriggerType string         `json:"trigger_type" validate:"omitempty,oneof=manual schedule webhook api"`
	TriggeredBy string         `json:"triggered_by"`
	Context     map[string]any `json:"context"`
}

TriggerPipelineInput represents the input for triggering a pipeline.

type UpdateTemplateInput

type UpdateTemplateInput struct {
	TenantID        string               `json:"tenant_id" validate:"required,uuid"`
	TemplateID      string               `json:"template_id" validate:"required,uuid"`
	Name            string               `json:"name" validate:"omitempty,min=1,max=255"`
	Description     string               `json:"description" validate:"max=1000"`
	Triggers        []pipeline.Trigger   `json:"triggers"`
	Settings        *pipeline.Settings   `json:"settings"`
	Tags            []string             `json:"tags" validate:"max=10,dive,max=50"`
	IsActive        *bool                `json:"is_active"`
	UIStartPosition *pipeline.UIPosition `json:"ui_start_position"`
	UIEndPosition   *pipeline.UIPosition `json:"ui_end_position"`
}

UpdateTemplateInput represents the input for updating a template.

type ValidationError

type ValidationError struct {
	Field   string
	Message string
	Code    string
}

ValidationError represents a validation error.

type ValidationResult

type ValidationResult struct {
	Valid  bool
	Errors []ValidationError
}

ValidationResult represents the result of a validation.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL