platform

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 15, 2026 License: GPL-3.0 Imports: 19 Imported by: 0

Documentation

Overview

Package platform provides components for running agents in platform mode.

Platform agents are centrally managed by OpenCTEM and execute jobs on behalf of tenants. Unlike tenant agents that run within a tenant's infrastructure, platform agents are deployed and operated by the OpenCTEM platform itself.

Key components:

  • LeaseManager: Handles K8s-style lease renewal for health monitoring
  • Bootstrapper: Handles agent registration using bootstrap tokens
  • JobPoller: Long-polls for jobs using /platform/poll endpoint
  • Client: Extended client with platform-specific endpoints

Usage:

// Bootstrap a new platform agent
bootstrapper := platform.NewBootstrapper(baseURL, bootstrapToken)
creds, err := bootstrapper.Register(ctx, &platform.RegistrationRequest{
    Name: "scanner-001",
    Capabilities: []string{"sast", "sca"},
})

// Create platform client
client := platform.NewClient(&platform.ClientConfig{
    BaseURL: baseURL,
    APIKey:  creds.APIKey,
    AgentID: creds.AgentID,
})

// Start lease manager
leaseManager := platform.NewLeaseManager(client, &platform.LeaseConfig{
    LeaseDuration: 60 * time.Second,
    RenewInterval: 20 * time.Second,
})
go leaseManager.Start(ctx)

// Start job poller
poller := platform.NewJobPoller(client, executor, &platform.PollerConfig{
    MaxJobs:     5,
    PollTimeout: 30 * time.Second,
})
poller.Start(ctx)

Index

Constants

View Source
const (
	DefaultLeaseDuration     = 60 * time.Second
	DefaultRenewInterval     = 20 * time.Second
	DefaultPollTimeout       = 30 * time.Second
	DefaultMaxConcurrentJobs = 5
	DefaultBootstrapTimeout  = 30 * time.Second
)

Default configuration values.

View Source
const Version = "1.0.0"

Version is the platform package version.

Variables

This section is empty.

Functions

func ValidateJob

func ValidateJob(job *JobInfo, config *PollerConfig) error

ValidateJob validates a job before execution. SECURITY: This prevents processing of malformed or unauthorized jobs.

Types

type AgentBuilder

type AgentBuilder struct {
	// contains filtered or unexported fields
}

AgentBuilder provides a fluent API for building a platform agent.

func NewAgentBuilder

func NewAgentBuilder() *AgentBuilder

NewAgentBuilder creates a new AgentBuilder.

func (*AgentBuilder) Build

func (b *AgentBuilder) Build() (*PlatformAgent, error)

Build creates a PlatformAgent from the builder configuration.

func (*AgentBuilder) OnJobCompleted

func (b *AgentBuilder) OnJobCompleted(fn func(*JobInfo, *JobResult)) *AgentBuilder

OnJobCompleted sets the callback for job completion.

func (*AgentBuilder) OnJobStarted

func (b *AgentBuilder) OnJobStarted(fn func(*JobInfo)) *AgentBuilder

OnJobStarted sets the callback for job start.

func (*AgentBuilder) OnLeaseExpired

func (b *AgentBuilder) OnLeaseExpired(fn func()) *AgentBuilder

OnLeaseExpired sets the callback for lease expiration.

func (*AgentBuilder) WithAuditLogger

func (b *AgentBuilder) WithAuditLogger(config *audit.LoggerConfig) *AgentBuilder

WithAuditLogger enables audit logging with the given config. When enabled, all job lifecycle events will be logged.

func (*AgentBuilder) WithCapabilities

func (b *AgentBuilder) WithCapabilities(caps ...string) *AgentBuilder

WithCapabilities sets the agent capabilities.

func (*AgentBuilder) WithChunkManager

func (b *AgentBuilder) WithChunkManager(config *chunk.Config, uploader chunk.Uploader) *AgentBuilder

WithChunkManager enables chunked uploads for large reports. When enabled, large reports are automatically detected and split into chunks for efficient upload. The chunk manager handles compression, storage, retry, and background upload.

func (*AgentBuilder) WithCredentials

func (b *AgentBuilder) WithCredentials(baseURL, apiKey, agentID string) *AgentBuilder

WithCredentials sets the agent credentials.

func (*AgentBuilder) WithExecutor

func (b *AgentBuilder) WithExecutor(executor JobExecutor) *AgentBuilder

WithExecutor sets the job executor.

func (*AgentBuilder) WithLeaseDuration

func (b *AgentBuilder) WithLeaseDuration(d time.Duration) *AgentBuilder

WithLeaseDuration sets the lease duration.

func (*AgentBuilder) WithMaxJobs

func (b *AgentBuilder) WithMaxJobs(n int) *AgentBuilder

WithMaxJobs sets the maximum concurrent jobs.

func (*AgentBuilder) WithMetricsCollector

func (b *AgentBuilder) WithMetricsCollector(collector MetricsCollector) *AgentBuilder

WithMetricsCollector sets the metrics collector.

func (*AgentBuilder) WithPipeline

func (b *AgentBuilder) WithPipeline(config *pipeline.PipelineConfig, uploader pipeline.Uploader) *AgentBuilder

WithPipeline enables the async upload pipeline. The pipeline allows scan results to be uploaded asynchronously in the background, so scans can complete immediately without waiting for uploads.

func (*AgentBuilder) WithPollTimeout

func (b *AgentBuilder) WithPollTimeout(d time.Duration) *AgentBuilder

WithPollTimeout sets the poll timeout.

func (*AgentBuilder) WithRenewInterval

func (b *AgentBuilder) WithRenewInterval(d time.Duration) *AgentBuilder

WithRenewInterval sets the lease renewal interval.

func (*AgentBuilder) WithResourceController

func (b *AgentBuilder) WithResourceController(config *resource.ControllerConfig) *AgentBuilder

WithResourceController enables resource throttling with the given config. When enabled, jobs will only be accepted when CPU/memory are below thresholds.

func (*AgentBuilder) WithVerbose

func (b *AgentBuilder) WithVerbose(v bool) *AgentBuilder

WithVerbose enables verbose logging.

type AgentCredentials

type AgentCredentials struct {
	AgentID   string `json:"agent_id"`
	APIKey    string `json:"api_key"`
	APIPrefix string `json:"api_prefix"`
}

AgentCredentials contains the credentials returned after agent registration.

func EnsureRegistered

func EnsureRegistered(ctx context.Context, config *EnsureRegisteredConfig) (*AgentCredentials, error)

EnsureRegistered ensures the agent is registered, either by loading existing credentials or registering with a bootstrap token.

This is the recommended way to initialize a platform agent:

creds, err := platform.EnsureRegistered(ctx, &platform.EnsureRegisteredConfig{
    BaseURL: "http://localhost:8080",
    BootstrapToken: os.Getenv("BOOTSTRAP_TOKEN"),
    CredentialsFile: "/etc/openctem/credentials.json",
    Registration: &platform.RegistrationRequest{
        Name: "scanner-001",
        Capabilities: []string{"sast", "sca"},
    },
})

type AgentInfo

type AgentInfo struct {
	ID           string    `json:"id"`
	Name         string    `json:"name"`
	Capabilities []string  `json:"capabilities"`
	Region       string    `json:"region"`
	Status       string    `json:"status"`
	CreatedAt    time.Time `json:"created_at"`
}

AgentInfo contains information about a registered platform agent.

type AgentStatus

type AgentStatus struct {
	AgentID     string
	Running     bool
	Healthy     bool
	CurrentJobs int
	LastRenew   time.Time
	LastError   error

	// Resource status (if controller is enabled)
	ResourceStatus *resource.ControllerStatus
}

AgentStatus represents the current agent status.

type AuditLogger

type AuditLogger interface {
	// JobStarted logs a job start event.
	JobStarted(jobID, jobType string, details map[string]interface{})

	// JobCompleted logs a job completion event.
	JobCompleted(jobID string, duration time.Duration, details map[string]interface{})

	// JobFailed logs a job failure event.
	JobFailed(jobID string, err error, details map[string]interface{})

	// ResourceThrottle logs a resource throttle event.
	ResourceThrottle(reason string, metrics map[string]interface{})
}

AuditLogger is an optional interface for audit logging. If set on JobPoller, job lifecycle events will be logged.

type BootstrapConfig

type BootstrapConfig struct {
	// Timeout for the registration request.
	Timeout time.Duration

	// RetryAttempts is the number of times to retry on failure.
	RetryAttempts int

	// RetryDelay is the delay between retry attempts.
	RetryDelay time.Duration

	// Verbose enables debug logging.
	Verbose bool
}

BootstrapConfig configures the Bootstrapper.

type Bootstrapper

type Bootstrapper struct {
	// contains filtered or unexported fields
}

Bootstrapper handles platform agent registration using bootstrap tokens.

Bootstrap tokens are short-lived tokens that allow new agents to register themselves with the platform. The flow is:

  1. Admin creates a bootstrap token via CLI or API
  2. Token is provided to the agent deployment (e.g., via environment variable)
  3. Agent uses Bootstrapper to register and receive permanent API credentials
  4. Agent stores credentials securely and uses them for all future API calls

Example:

bootstrapper := platform.NewBootstrapper(baseURL, os.Getenv("BOOTSTRAP_TOKEN"))
creds, err := bootstrapper.Register(ctx, &platform.RegistrationRequest{
    Name: "scanner-001",
    Capabilities: []string{"sast", "sca"},
    Region: "us-east-1",
})

func NewBootstrapper

func NewBootstrapper(baseURL, bootstrapToken string, config *BootstrapConfig) *Bootstrapper

NewBootstrapper creates a new Bootstrapper.

func (*Bootstrapper) Register

Register registers a new platform agent and returns the API credentials.

IMPORTANT: The returned API key is only provided once. Store it securely (e.g., in a secrets manager or encrypted file). If lost, the agent must be deleted and re-registered with a new bootstrap token.

type ClientConfig

type ClientConfig struct {
	// BaseURL is the API base URL.
	BaseURL string

	// APIKey is the agent's API key.
	APIKey string

	// AgentID is the agent's ID.
	AgentID string

	// PollTimeout is the long-poll timeout for job polling.
	PollTimeout time.Duration

	// Verbose enables debug logging.
	Verbose bool
}

ClientConfig configures the PlatformClient.

type CredentialStore

type CredentialStore interface {
	Save(creds *AgentCredentials) error
	Load() (*AgentCredentials, error)
	Exists() bool
}

CredentialStore interface for storing agent credentials.

type EnsureRegisteredConfig

type EnsureRegisteredConfig struct {
	// BaseURL is the API base URL.
	BaseURL string

	// BootstrapToken is the bootstrap token for registration.
	// Only needed if credentials don't exist.
	BootstrapToken string

	// CredentialsFile is the path to store/load credentials.
	CredentialsFile string

	// Registration is the registration request.
	// Only needed if credentials don't exist.
	Registration *RegistrationRequest

	// Verbose enables debug logging.
	Verbose bool
}

EnsureRegisteredConfig configures EnsureRegistered.

type FileCredentialStore

type FileCredentialStore struct {
	Path string
}

FileCredentialStore stores credentials in a file.

func NewFileCredentialStore

func NewFileCredentialStore(path string) *FileCredentialStore

NewFileCredentialStore creates a new file-based credential store.

func (*FileCredentialStore) Exists

func (s *FileCredentialStore) Exists() bool

Exists checks if the credentials file exists.

func (*FileCredentialStore) Load

Load loads credentials from the file.

func (*FileCredentialStore) Save

func (s *FileCredentialStore) Save(creds *AgentCredentials) error

Save saves credentials to the file.

type JobClient

type JobClient interface {
	// Poll polls for new jobs using long-polling.
	Poll(ctx context.Context, req *PollRequest) (*PollResponse, error)

	// AcknowledgeJob acknowledges receipt of a job.
	AcknowledgeJob(ctx context.Context, jobID string) error

	// ReportJobResult reports the result of a completed job.
	ReportJobResult(ctx context.Context, result *JobResult) error

	// ReportJobProgress reports progress on a running job.
	ReportJobProgress(ctx context.Context, jobID string, progress int, message string) error
}

JobClient defines the interface for job operations.

func NewHTTPJobClient

func NewHTTPJobClient(baseURL, apiKey, agentID string, pollTimeout time.Duration) JobClient

NewHTTPJobClient creates a new HTTP-based job client.

type JobExecutor

type JobExecutor interface {
	Execute(ctx context.Context, job *JobInfo) (*JobResult, error)
}

JobExecutor executes platform jobs.

type JobInfo

type JobInfo struct {
	ID         string                 `json:"id"`
	Type       string                 `json:"type"`
	Priority   int                    `json:"priority"`
	TenantID   string                 `json:"tenant_id"`
	Payload    map[string]interface{} `json:"payload"`
	AuthToken  string                 `json:"auth_token"` // JWT for tenant data access
	CreatedAt  time.Time              `json:"created_at"`
	TimeoutSec int                    `json:"timeout_seconds"`

	// WorkflowContext is set when the job was triggered by a workflow.
	// It allows correlating job execution with workflow runs.
	WorkflowContext *WorkflowContext `json:"workflow_context,omitempty"`
}

JobInfo contains information about a platform job.

func (*JobInfo) HasWorkflowContext

func (j *JobInfo) HasWorkflowContext() bool

HasWorkflowContext returns true if the job was triggered by a workflow.

type JobPoller

type JobPoller struct {
	// contains filtered or unexported fields
}

JobPoller polls for and executes platform jobs.

The JobPoller uses long-polling to efficiently wait for jobs. When no jobs are available, the server holds the connection open until a job arrives or the timeout expires. This provides near-real-time job dispatch with minimal network overhead.

Optional integrations:

  • ResourceController: Throttle job acceptance based on CPU/memory
  • AuditLogger: Log job lifecycle events for debugging and compliance

func NewJobPoller

func NewJobPoller(client JobClient, executor JobExecutor, config *PollerConfig) *JobPoller

NewJobPoller creates a new JobPoller.

func (*JobPoller) CurrentJobCount

func (p *JobPoller) CurrentJobCount() int

CurrentJobCount returns the current number of running jobs.

func (*JobPoller) SetAuditLogger

func (p *JobPoller) SetAuditLogger(logger AuditLogger)

SetAuditLogger sets the optional audit logger for event logging. When set, job lifecycle events will be logged.

func (*JobPoller) SetLeaseManager

func (p *JobPoller) SetLeaseManager(lm *LeaseManager)

SetLeaseManager sets the lease manager for job count reporting. It also registers a callback to cancel all running jobs when lease expires.

func (*JobPoller) SetResourceController

func (p *JobPoller) SetResourceController(rc ResourceController)

SetResourceController sets the optional resource controller for throttling. When set, jobs will only be accepted when resources are available.

func (*JobPoller) Start

func (p *JobPoller) Start(ctx context.Context) error

Start starts the job poller.

func (*JobPoller) Stop

func (p *JobPoller) Stop(timeout time.Duration) error

Stop stops the job poller and waits for active jobs to complete.

type JobResult

type JobResult struct {
	JobID         string                 `json:"job_id"`
	Status        string                 `json:"status"` // completed, failed, canceled
	CompletedAt   time.Time              `json:"completed_at"`
	DurationMs    int64                  `json:"duration_ms"`
	FindingsCount int                    `json:"findings_count"`
	Error         string                 `json:"error,omitempty"`
	Metadata      map[string]interface{} `json:"metadata,omitempty"`

	// WorkflowContext is echoed back if the job was triggered by a workflow.
	// This allows the Workflow Executor to correlate results with workflow runs.
	WorkflowContext *WorkflowContext `json:"workflow_context,omitempty"`
}

JobResult contains the result of a completed job.

func (*JobResult) HasWorkflowContext

func (r *JobResult) HasWorkflowContext() bool

HasWorkflowContext returns true if the result includes workflow context.

type JobValidationError

type JobValidationError struct {
	JobID  string
	Reason string
}

JobValidationError represents a job validation failure.

func (*JobValidationError) Error

func (e *JobValidationError) Error() string

type LeaseClient

type LeaseClient interface {
	RenewLease(ctx context.Context, req *LeaseRenewRequest) (*LeaseRenewResponse, error)
	ReleaseLease(ctx context.Context) error
}

LeaseClient defines the interface for lease operations.

func NewHTTPLeaseClient

func NewHTTPLeaseClient(baseURL, apiKey, agentID string) LeaseClient

NewHTTPLeaseClient creates a new HTTP-based lease client.

type LeaseConfig

type LeaseConfig struct {
	// LeaseDuration is how long the lease is valid for.
	// Default: 60 seconds.
	LeaseDuration time.Duration

	// RenewInterval is how often to renew the lease.
	// Should be less than LeaseDuration (typically 1/3).
	// Default: 20 seconds.
	RenewInterval time.Duration

	// GracePeriod is how long to wait after lease expiry before considering agent dead.
	// Default: 15 seconds.
	GracePeriod time.Duration

	// MaxJobs is the maximum concurrent jobs this agent can handle.
	MaxJobs int

	// MetricsCollector provides system metrics for lease renewal.
	// If nil, metrics are not reported.
	MetricsCollector MetricsCollector

	// OnLeaseExpired is called when the lease expires (agent should shutdown).
	OnLeaseExpired func()

	// Verbose enables debug logging.
	Verbose bool

	// UseSecureIdentity enables cryptographically secure holder identity.
	// When true, the holder identity includes a random nonce that cannot be guessed.
	// Default: true (should only be false for testing).
	UseSecureIdentity *bool

	// IdentityPrefix is an optional prefix for the holder identity.
	// Useful for identifying agent type (e.g., "scanner", "collector").
	IdentityPrefix string
}

LeaseConfig configures the LeaseManager.

type LeaseInfo

type LeaseInfo struct {
	AgentID          string    `json:"agent_id"`
	HolderIdentity   string    `json:"holder_identity"`
	LeaseDurationSec int       `json:"lease_duration_seconds"`
	AcquireTime      time.Time `json:"acquire_time"`
	RenewTime        time.Time `json:"renew_time"`
	ResourceVersion  int       `json:"resource_version"`
}

LeaseInfo contains information about the current lease.

type LeaseManager

type LeaseManager struct {
	// contains filtered or unexported fields
}

LeaseManager manages the agent's lease with the control plane. It periodically renews the lease to indicate the agent is healthy.

func NewLeaseManager

func NewLeaseManager(client LeaseClient, config *LeaseConfig) *LeaseManager

NewLeaseManager creates a new LeaseManager.

func (*LeaseManager) DecrementJobs

func (m *LeaseManager) DecrementJobs(failed bool)

DecrementJobs decrements the current job count by 1 and increments completed.

func (*LeaseManager) GetStatus

func (m *LeaseManager) GetStatus() *LeaseStatus

GetStatus returns the current lease status.

func (*LeaseManager) IncrementJobs

func (m *LeaseManager) IncrementJobs()

IncrementJobs increments the current job count by 1.

func (*LeaseManager) SetCurrentJobs

func (m *LeaseManager) SetCurrentJobs(count int)

SetCurrentJobs updates the current number of jobs being processed.

func (*LeaseManager) Start

func (m *LeaseManager) Start(ctx context.Context) error

Start starts the lease renewal loop.

func (*LeaseManager) Stop

func (m *LeaseManager) Stop(ctx context.Context) error

Stop stops the lease manager and releases the lease.

type LeaseRenewRequest

type LeaseRenewRequest struct {
	HolderIdentity       string   `json:"holder_identity"`
	LeaseDurationSeconds int      `json:"lease_duration_seconds"`
	CurrentJobs          int      `json:"current_jobs"`
	MaxJobs              int      `json:"max_jobs"`
	CPUPercent           *float64 `json:"cpu_percent,omitempty"`
	MemoryPercent        *float64 `json:"memory_percent,omitempty"`
	DiskPercent          *float64 `json:"disk_percent,omitempty"`
	JobsCompletedTotal   int      `json:"jobs_completed_total,omitempty"`
	JobsFailedTotal      int      `json:"jobs_failed_total,omitempty"`
}

LeaseRenewRequest contains the data for renewing a lease.

type LeaseRenewResponse

type LeaseRenewResponse struct {
	Success         bool      `json:"success"`
	Message         string    `json:"message,omitempty"`
	ResourceVersion int       `json:"resource_version"`
	RenewTime       time.Time `json:"renew_time"`
}

LeaseRenewResponse contains the response from lease renewal.

type LeaseStatus

type LeaseStatus struct {
	Running         bool
	Healthy         bool
	LastRenewTime   time.Time
	ResourceVersion int
	CurrentJobs     int
	LastError       error
}

LeaseStatus represents the current status of the lease.

type MetricsCollector

type MetricsCollector interface {
	Collect() (*SystemMetrics, error)
}

MetricsCollector collects system metrics.

type PlatformAgent

type PlatformAgent struct {
	// contains filtered or unexported fields
}

PlatformAgent represents a fully configured platform agent.

func (*PlatformAgent) AuditLogger

func (a *PlatformAgent) AuditLogger() *audit.Logger

AuditLogger returns the audit logger if configured.

func (*PlatformAgent) ChunkManager

func (a *PlatformAgent) ChunkManager() *chunk.Manager

ChunkManager returns the chunk manager if configured.

func (*PlatformAgent) ExtendedStatus

func (a *PlatformAgent) ExtendedStatus() *AgentStatus

ExtendedStatus returns the full agent status including resource metrics.

func (*PlatformAgent) FlushPipeline

func (a *PlatformAgent) FlushPipeline(ctx context.Context) error

FlushPipeline waits for all pending uploads to complete. Returns an error if the pipeline is not configured or if the context is canceled.

func (*PlatformAgent) NeedsChunking

func (a *PlatformAgent) NeedsChunking(report *ctis.Report) bool

NeedsChunking checks if a report should be uploaded via chunking. Returns false if chunk manager is not configured.

func (*PlatformAgent) Pipeline

func (a *PlatformAgent) Pipeline() *pipeline.Pipeline

Pipeline returns the upload pipeline if configured.

func (*PlatformAgent) PipelineStats

func (a *PlatformAgent) PipelineStats() *pipeline.Stats

PipelineStats returns the current pipeline statistics. Returns nil if the pipeline is not configured.

func (*PlatformAgent) ResourceController

func (a *PlatformAgent) ResourceController() *resource.Controller

ResourceController returns the resource controller if configured.

func (*PlatformAgent) SmartSubmitReport

func (a *PlatformAgent) SmartSubmitReport(ctx context.Context, report *ctis.Report, opts ...pipeline.SubmitOption) (string, *chunk.Report, error)

SmartSubmitReport automatically chooses between regular upload, pipeline, or chunked upload. - Small reports: uploaded directly via pipeline (if configured) or returned for manual upload - Large reports: uploaded via chunk manager (if configured)

Returns: - For pipeline submissions: (pipelineItemID, nil, nil) - For chunked submissions: ("", chunkReport, nil) - If neither is configured: ("", nil, error)

func (*PlatformAgent) Start

func (a *PlatformAgent) Start(ctx context.Context) error

Start starts the platform agent (lease manager + job poller).

func (*PlatformAgent) Status

func (a *PlatformAgent) Status() *AgentStatus

Status returns the current agent status.

func (*PlatformAgent) Stop

func (a *PlatformAgent) Stop(ctx context.Context, timeout time.Duration) error

Stop stops the platform agent gracefully.

func (*PlatformAgent) SubmitChunkedReport

func (a *PlatformAgent) SubmitChunkedReport(ctx context.Context, report *ctis.Report) (*chunk.Report, error)

SubmitChunkedReport queues a large report for chunked upload. The report will be split into chunks, compressed, and uploaded in the background. Returns an error if the chunk manager is not configured.

func (*PlatformAgent) SubmitReport

func (a *PlatformAgent) SubmitReport(report *ctis.Report, opts ...pipeline.SubmitOption) (string, error)

SubmitReport queues a report for async upload via the pipeline. Returns immediately after queueing. Use Pipeline().GetStats() to monitor progress. Returns an error if the pipeline is not configured.

type PlatformClient

type PlatformClient struct {
	// contains filtered or unexported fields
}

PlatformClient provides a unified interface for platform agent operations. It implements LeaseClient and JobClient interfaces.

func NewPlatformClient

func NewPlatformClient(config *ClientConfig) *PlatformClient

NewPlatformClient creates a new PlatformClient.

func (*PlatformClient) AcknowledgeJob

func (c *PlatformClient) AcknowledgeJob(ctx context.Context, jobID string) error

AcknowledgeJob implements JobClient.

func (*PlatformClient) Poll

Poll implements JobClient.

func (*PlatformClient) ReleaseLease

func (c *PlatformClient) ReleaseLease(ctx context.Context) error

ReleaseLease implements LeaseClient.

func (*PlatformClient) RenewLease

RenewLease implements LeaseClient.

func (*PlatformClient) ReportJobProgress

func (c *PlatformClient) ReportJobProgress(ctx context.Context, jobID string, progress int, message string) error

ReportJobProgress implements JobClient.

func (*PlatformClient) ReportJobResult

func (c *PlatformClient) ReportJobResult(ctx context.Context, result *JobResult) error

ReportJobResult implements JobClient.

type PollRequest

type PollRequest struct {
	MaxJobs        int      `json:"max_jobs"`
	Capabilities   []string `json:"capabilities,omitempty"`
	TimeoutSeconds int      `json:"timeout_seconds,omitempty"`
}

PollRequest contains the data for polling jobs.

type PollResponse

type PollResponse struct {
	Jobs             []*JobInfo `json:"jobs"`
	PollIntervalHint int        `json:"poll_interval_hint,omitempty"` // Suggested wait before next poll
	QueueDepth       int        `json:"queue_depth,omitempty"`        // Total pending jobs
}

PollResponse contains the response from job polling.

type PollerConfig

type PollerConfig struct {
	// MaxConcurrentJobs is the maximum number of concurrent jobs.
	MaxConcurrentJobs int

	// PollTimeout is the long-poll timeout (how long to wait for jobs).
	PollTimeout time.Duration

	// RetryDelay is the delay between poll attempts on error.
	RetryDelay time.Duration

	// Capabilities to advertise when polling.
	Capabilities []string

	// OnJobStarted is called when a job starts executing.
	OnJobStarted func(job *JobInfo)

	// OnJobCompleted is called when a job completes (success or failure).
	OnJobCompleted func(job *JobInfo, result *JobResult)

	// Verbose enables debug logging.
	Verbose bool

	// AllowedJobTypes restricts which job types can be executed.
	// If empty, all job types are allowed.
	AllowedJobTypes []string

	// MaxPayloadSize limits the maximum payload size for jobs (default: 10MB).
	MaxPayloadSize int

	// RequireAuthToken requires jobs to have a valid auth token.
	// When true, jobs without AuthToken will be rejected.
	RequireAuthToken bool

	// ValidateTokenClaims enables JWT claims validation.
	// When true, the AuthToken's tenant_id claim must match job's TenantID.
	ValidateTokenClaims bool
}

PollerConfig configures the JobPoller.

type RegistrationRequest

type RegistrationRequest struct {
	// Name is the agent's display name.
	Name string `json:"name"`

	// Capabilities are the scanner/collector capabilities (e.g., "sast", "sca", "dast").
	Capabilities []string `json:"capabilities"`

	// Tools are the specific tools available (e.g., "semgrep", "trivy", "nuclei").
	Tools []string `json:"tools,omitempty"`

	// Region is the deployment region (e.g., "us-east-1", "ap-southeast-1").
	Region string `json:"region,omitempty"`

	// Labels are optional key-value labels for filtering.
	Labels map[string]string `json:"labels,omitempty"`

	// MaxConcurrentJobs is the maximum number of concurrent jobs.
	MaxConcurrentJobs int `json:"max_concurrent_jobs,omitempty"`
}

RegistrationRequest contains the data for registering a new platform agent.

type RegistrationResponse

type RegistrationResponse struct {
	AgentID   string `json:"agent_id"`
	APIKey    string `json:"api_key"`    // Only returned once - store securely!
	APIPrefix string `json:"api_prefix"` // Prefix for display/logging (safe to log)
	Message   string `json:"message,omitempty"`
}

RegistrationResponse contains the response from agent registration.

type ResourceController

type ResourceController interface {
	// AcquireSlot attempts to acquire a job execution slot.
	// Returns true if acquired, false if throttled or at capacity.
	AcquireSlot(ctx context.Context) bool

	// ReleaseSlot releases a previously acquired slot.
	ReleaseSlot()

	// IsThrottled returns true if resource limits are exceeded.
	IsThrottled() bool
}

ResourceController is an optional interface for resource-based job throttling. If set on JobPoller, jobs will only be accepted when resources are available.

type SimpleMetricsCollector

type SimpleMetricsCollector struct{}

SimpleMetricsCollector provides basic system metrics collection.

func (*SimpleMetricsCollector) Collect

func (c *SimpleMetricsCollector) Collect() (*SystemMetrics, error)

Collect collects system metrics. This is a simplified implementation - production should use proper system calls.

type SystemMetrics

type SystemMetrics struct {
	CPUPercent    float64 `json:"cpu_percent"`
	MemoryPercent float64 `json:"memory_percent"`
	DiskPercent   float64 `json:"disk_percent"`
	CurrentJobs   int     `json:"current_jobs"`
	MaxJobs       int     `json:"max_jobs"`
}

SystemMetrics contains agent system metrics for health reporting.

type WorkflowContext

type WorkflowContext struct {
	// WorkflowID is the UUID of the workflow definition.
	WorkflowID string `json:"workflow_id"`

	// WorkflowRunID is the UUID of the specific workflow execution.
	WorkflowRunID string `json:"workflow_run_id"`

	// TriggerType indicates what triggered the workflow (e.g., "finding_created", "schedule", "manual").
	TriggerType string `json:"trigger_type"`

	// ActionNodeID is the UUID of the action node that triggered this job.
	ActionNodeID string `json:"action_node_id"`

	// ActionNodeKey is the node_key of the action node (e.g., "run_scan_1").
	ActionNodeKey string `json:"action_node_key"`
}

WorkflowContext contains context information when a job is triggered by the Workflow Executor as part of an automation workflow.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL