Documentation
¶
Overview ¶
Package platform provides components for running agents in platform mode.
Platform agents are centrally managed by OpenCTEM and execute jobs on behalf of tenants. Unlike tenant agents that run within a tenant's infrastructure, platform agents are deployed and operated by the OpenCTEM platform itself.
Key components:
- LeaseManager: Handles K8s-style lease renewal for health monitoring
- Bootstrapper: Handles agent registration using bootstrap tokens
- JobPoller: Long-polls for jobs using /platform/poll endpoint
- Client: Extended client with platform-specific endpoints
Usage:
// Bootstrap a new platform agent
bootstrapper := platform.NewBootstrapper(baseURL, bootstrapToken)
creds, err := bootstrapper.Register(ctx, &platform.RegistrationRequest{
Name: "scanner-001",
Capabilities: []string{"sast", "sca"},
})
// Create platform client
client := platform.NewClient(&platform.ClientConfig{
BaseURL: baseURL,
APIKey: creds.APIKey,
AgentID: creds.AgentID,
})
// Start lease manager
leaseManager := platform.NewLeaseManager(client, &platform.LeaseConfig{
LeaseDuration: 60 * time.Second,
RenewInterval: 20 * time.Second,
})
go leaseManager.Start(ctx)
// Start job poller
poller := platform.NewJobPoller(client, executor, &platform.PollerConfig{
MaxJobs: 5,
PollTimeout: 30 * time.Second,
})
poller.Start(ctx)
Index ¶
- Constants
- func ValidateJob(job *JobInfo, config *PollerConfig) error
- type AgentBuilder
- func (b *AgentBuilder) Build() (*PlatformAgent, error)
- func (b *AgentBuilder) OnJobCompleted(fn func(*JobInfo, *JobResult)) *AgentBuilder
- func (b *AgentBuilder) OnJobStarted(fn func(*JobInfo)) *AgentBuilder
- func (b *AgentBuilder) OnLeaseExpired(fn func()) *AgentBuilder
- func (b *AgentBuilder) WithAuditLogger(config *audit.LoggerConfig) *AgentBuilder
- func (b *AgentBuilder) WithCapabilities(caps ...string) *AgentBuilder
- func (b *AgentBuilder) WithChunkManager(config *chunk.Config, uploader chunk.Uploader) *AgentBuilder
- func (b *AgentBuilder) WithCredentials(baseURL, apiKey, agentID string) *AgentBuilder
- func (b *AgentBuilder) WithExecutor(executor JobExecutor) *AgentBuilder
- func (b *AgentBuilder) WithLeaseDuration(d time.Duration) *AgentBuilder
- func (b *AgentBuilder) WithMaxJobs(n int) *AgentBuilder
- func (b *AgentBuilder) WithMetricsCollector(collector MetricsCollector) *AgentBuilder
- func (b *AgentBuilder) WithPipeline(config *pipeline.PipelineConfig, uploader pipeline.Uploader) *AgentBuilder
- func (b *AgentBuilder) WithPollTimeout(d time.Duration) *AgentBuilder
- func (b *AgentBuilder) WithRenewInterval(d time.Duration) *AgentBuilder
- func (b *AgentBuilder) WithResourceController(config *resource.ControllerConfig) *AgentBuilder
- func (b *AgentBuilder) WithVerbose(v bool) *AgentBuilder
- type AgentCredentials
- type AgentInfo
- type AgentStatus
- type AuditLogger
- type BootstrapConfig
- type Bootstrapper
- type ClientConfig
- type CredentialStore
- type EnsureRegisteredConfig
- type FileCredentialStore
- type JobClient
- type JobExecutor
- type JobInfo
- type JobPoller
- func (p *JobPoller) CurrentJobCount() int
- func (p *JobPoller) SetAuditLogger(logger AuditLogger)
- func (p *JobPoller) SetLeaseManager(lm *LeaseManager)
- func (p *JobPoller) SetResourceController(rc ResourceController)
- func (p *JobPoller) Start(ctx context.Context) error
- func (p *JobPoller) Stop(timeout time.Duration) error
- type JobResult
- type JobValidationError
- type LeaseClient
- type LeaseConfig
- type LeaseInfo
- type LeaseManager
- type LeaseRenewRequest
- type LeaseRenewResponse
- type LeaseStatus
- type MetricsCollector
- type PlatformAgent
- func (a *PlatformAgent) AuditLogger() *audit.Logger
- func (a *PlatformAgent) ChunkManager() *chunk.Manager
- func (a *PlatformAgent) ExtendedStatus() *AgentStatus
- func (a *PlatformAgent) FlushPipeline(ctx context.Context) error
- func (a *PlatformAgent) NeedsChunking(report *ctis.Report) bool
- func (a *PlatformAgent) Pipeline() *pipeline.Pipeline
- func (a *PlatformAgent) PipelineStats() *pipeline.Stats
- func (a *PlatformAgent) ResourceController() *resource.Controller
- func (a *PlatformAgent) SmartSubmitReport(ctx context.Context, report *ctis.Report, opts ...pipeline.SubmitOption) (string, *chunk.Report, error)
- func (a *PlatformAgent) Start(ctx context.Context) error
- func (a *PlatformAgent) Status() *AgentStatus
- func (a *PlatformAgent) Stop(ctx context.Context, timeout time.Duration) error
- func (a *PlatformAgent) SubmitChunkedReport(ctx context.Context, report *ctis.Report) (*chunk.Report, error)
- func (a *PlatformAgent) SubmitReport(report *ctis.Report, opts ...pipeline.SubmitOption) (string, error)
- type PlatformClient
- func (c *PlatformClient) AcknowledgeJob(ctx context.Context, jobID string) error
- func (c *PlatformClient) Poll(ctx context.Context, req *PollRequest) (*PollResponse, error)
- func (c *PlatformClient) ReleaseLease(ctx context.Context) error
- func (c *PlatformClient) RenewLease(ctx context.Context, req *LeaseRenewRequest) (*LeaseRenewResponse, error)
- func (c *PlatformClient) ReportJobProgress(ctx context.Context, jobID string, progress int, message string) error
- func (c *PlatformClient) ReportJobResult(ctx context.Context, result *JobResult) error
- type PollRequest
- type PollResponse
- type PollerConfig
- type RegistrationRequest
- type RegistrationResponse
- type ResourceController
- type SimpleMetricsCollector
- type SystemMetrics
- type WorkflowContext
Constants ¶
const ( DefaultLeaseDuration = 60 * time.Second DefaultRenewInterval = 20 * time.Second DefaultPollTimeout = 30 * time.Second DefaultMaxConcurrentJobs = 5 DefaultBootstrapTimeout = 30 * time.Second )
Default configuration values.
const Version = "1.0.0"
Version is the platform package version.
Variables ¶
This section is empty.
Functions ¶
func ValidateJob ¶
func ValidateJob(job *JobInfo, config *PollerConfig) error
ValidateJob validates a job before execution. SECURITY: This prevents processing of malformed or unauthorized jobs.
Types ¶
type AgentBuilder ¶
type AgentBuilder struct {
// contains filtered or unexported fields
}
AgentBuilder provides a fluent API for building a platform agent.
func NewAgentBuilder ¶
func NewAgentBuilder() *AgentBuilder
NewAgentBuilder creates a new AgentBuilder.
func (*AgentBuilder) Build ¶
func (b *AgentBuilder) Build() (*PlatformAgent, error)
Build creates a PlatformAgent from the builder configuration.
func (*AgentBuilder) OnJobCompleted ¶
func (b *AgentBuilder) OnJobCompleted(fn func(*JobInfo, *JobResult)) *AgentBuilder
OnJobCompleted sets the callback for job completion.
func (*AgentBuilder) OnJobStarted ¶
func (b *AgentBuilder) OnJobStarted(fn func(*JobInfo)) *AgentBuilder
OnJobStarted sets the callback for job start.
func (*AgentBuilder) OnLeaseExpired ¶
func (b *AgentBuilder) OnLeaseExpired(fn func()) *AgentBuilder
OnLeaseExpired sets the callback for lease expiration.
func (*AgentBuilder) WithAuditLogger ¶
func (b *AgentBuilder) WithAuditLogger(config *audit.LoggerConfig) *AgentBuilder
WithAuditLogger enables audit logging with the given config. When enabled, all job lifecycle events will be logged.
func (*AgentBuilder) WithCapabilities ¶
func (b *AgentBuilder) WithCapabilities(caps ...string) *AgentBuilder
WithCapabilities sets the agent capabilities.
func (*AgentBuilder) WithChunkManager ¶
func (b *AgentBuilder) WithChunkManager(config *chunk.Config, uploader chunk.Uploader) *AgentBuilder
WithChunkManager enables chunked uploads for large reports. When enabled, large reports are automatically detected and split into chunks for efficient upload. The chunk manager handles compression, storage, retry, and background upload.
func (*AgentBuilder) WithCredentials ¶
func (b *AgentBuilder) WithCredentials(baseURL, apiKey, agentID string) *AgentBuilder
WithCredentials sets the agent credentials.
func (*AgentBuilder) WithExecutor ¶
func (b *AgentBuilder) WithExecutor(executor JobExecutor) *AgentBuilder
WithExecutor sets the job executor.
func (*AgentBuilder) WithLeaseDuration ¶
func (b *AgentBuilder) WithLeaseDuration(d time.Duration) *AgentBuilder
WithLeaseDuration sets the lease duration.
func (*AgentBuilder) WithMaxJobs ¶
func (b *AgentBuilder) WithMaxJobs(n int) *AgentBuilder
WithMaxJobs sets the maximum concurrent jobs.
func (*AgentBuilder) WithMetricsCollector ¶
func (b *AgentBuilder) WithMetricsCollector(collector MetricsCollector) *AgentBuilder
WithMetricsCollector sets the metrics collector.
func (*AgentBuilder) WithPipeline ¶
func (b *AgentBuilder) WithPipeline(config *pipeline.PipelineConfig, uploader pipeline.Uploader) *AgentBuilder
WithPipeline enables the async upload pipeline. The pipeline allows scan results to be uploaded asynchronously in the background, so scans can complete immediately without waiting for uploads.
func (*AgentBuilder) WithPollTimeout ¶
func (b *AgentBuilder) WithPollTimeout(d time.Duration) *AgentBuilder
WithPollTimeout sets the poll timeout.
func (*AgentBuilder) WithRenewInterval ¶
func (b *AgentBuilder) WithRenewInterval(d time.Duration) *AgentBuilder
WithRenewInterval sets the lease renewal interval.
func (*AgentBuilder) WithResourceController ¶
func (b *AgentBuilder) WithResourceController(config *resource.ControllerConfig) *AgentBuilder
WithResourceController enables resource throttling with the given config. When enabled, jobs will only be accepted when CPU/memory are below thresholds.
func (*AgentBuilder) WithVerbose ¶
func (b *AgentBuilder) WithVerbose(v bool) *AgentBuilder
WithVerbose enables verbose logging.
type AgentCredentials ¶
type AgentCredentials struct {
AgentID string `json:"agent_id"`
APIKey string `json:"api_key"`
APIPrefix string `json:"api_prefix"`
}
AgentCredentials contains the credentials returned after agent registration.
func EnsureRegistered ¶
func EnsureRegistered(ctx context.Context, config *EnsureRegisteredConfig) (*AgentCredentials, error)
EnsureRegistered ensures the agent is registered, either by loading existing credentials or registering with a bootstrap token.
This is the recommended way to initialize a platform agent:
creds, err := platform.EnsureRegistered(ctx, &platform.EnsureRegisteredConfig{
BaseURL: "http://localhost:8080",
BootstrapToken: os.Getenv("BOOTSTRAP_TOKEN"),
CredentialsFile: "/etc/openctem/credentials.json",
Registration: &platform.RegistrationRequest{
Name: "scanner-001",
Capabilities: []string{"sast", "sca"},
},
})
type AgentInfo ¶
type AgentInfo struct {
ID string `json:"id"`
Name string `json:"name"`
Capabilities []string `json:"capabilities"`
Region string `json:"region"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
}
AgentInfo contains information about a registered platform agent.
type AgentStatus ¶
type AgentStatus struct {
AgentID string
Running bool
Healthy bool
CurrentJobs int
LastRenew time.Time
LastError error
// Resource status (if controller is enabled)
ResourceStatus *resource.ControllerStatus
}
AgentStatus represents the current agent status.
type AuditLogger ¶
type AuditLogger interface {
// JobStarted logs a job start event.
JobStarted(jobID, jobType string, details map[string]interface{})
// JobCompleted logs a job completion event.
JobCompleted(jobID string, duration time.Duration, details map[string]interface{})
// JobFailed logs a job failure event.
JobFailed(jobID string, err error, details map[string]interface{})
// ResourceThrottle logs a resource throttle event.
ResourceThrottle(reason string, metrics map[string]interface{})
}
AuditLogger is an optional interface for audit logging. If set on JobPoller, job lifecycle events will be logged.
type BootstrapConfig ¶
type BootstrapConfig struct {
// Timeout for the registration request.
Timeout time.Duration
// RetryAttempts is the number of times to retry on failure.
RetryAttempts int
// RetryDelay is the delay between retry attempts.
RetryDelay time.Duration
// Verbose enables debug logging.
Verbose bool
}
BootstrapConfig configures the Bootstrapper.
type Bootstrapper ¶
type Bootstrapper struct {
// contains filtered or unexported fields
}
Bootstrapper handles platform agent registration using bootstrap tokens.
Bootstrap tokens are short-lived tokens that allow new agents to register themselves with the platform. The flow is:
- Admin creates a bootstrap token via CLI or API
- Token is provided to the agent deployment (e.g., via environment variable)
- Agent uses Bootstrapper to register and receive permanent API credentials
- Agent stores credentials securely and uses them for all future API calls
Example:
bootstrapper := platform.NewBootstrapper(baseURL, os.Getenv("BOOTSTRAP_TOKEN"))
creds, err := bootstrapper.Register(ctx, &platform.RegistrationRequest{
Name: "scanner-001",
Capabilities: []string{"sast", "sca"},
Region: "us-east-1",
})
func NewBootstrapper ¶
func NewBootstrapper(baseURL, bootstrapToken string, config *BootstrapConfig) *Bootstrapper
NewBootstrapper creates a new Bootstrapper.
func (*Bootstrapper) Register ¶
func (b *Bootstrapper) Register(ctx context.Context, req *RegistrationRequest) (*RegistrationResponse, error)
Register registers a new platform agent and returns the API credentials.
IMPORTANT: The returned API key is only provided once. Store it securely (e.g., in a secrets manager or encrypted file). If lost, the agent must be deleted and re-registered with a new bootstrap token.
type ClientConfig ¶
type ClientConfig struct {
// BaseURL is the API base URL.
BaseURL string
// APIKey is the agent's API key.
APIKey string
// AgentID is the agent's ID.
AgentID string
// PollTimeout is the long-poll timeout for job polling.
PollTimeout time.Duration
// Verbose enables debug logging.
Verbose bool
}
ClientConfig configures the PlatformClient.
type CredentialStore ¶
type CredentialStore interface {
Save(creds *AgentCredentials) error
Load() (*AgentCredentials, error)
Exists() bool
}
CredentialStore interface for storing agent credentials.
type EnsureRegisteredConfig ¶
type EnsureRegisteredConfig struct {
// BaseURL is the API base URL.
BaseURL string
// BootstrapToken is the bootstrap token for registration.
// Only needed if credentials don't exist.
BootstrapToken string
// CredentialsFile is the path to store/load credentials.
CredentialsFile string
// Registration is the registration request.
// Only needed if credentials don't exist.
Registration *RegistrationRequest
// Verbose enables debug logging.
Verbose bool
}
EnsureRegisteredConfig configures EnsureRegistered.
type FileCredentialStore ¶
type FileCredentialStore struct {
Path string
}
FileCredentialStore stores credentials in a file.
func NewFileCredentialStore ¶
func NewFileCredentialStore(path string) *FileCredentialStore
NewFileCredentialStore creates a new file-based credential store.
func (*FileCredentialStore) Exists ¶
func (s *FileCredentialStore) Exists() bool
Exists checks if the credentials file exists.
func (*FileCredentialStore) Load ¶
func (s *FileCredentialStore) Load() (*AgentCredentials, error)
Load loads credentials from the file.
func (*FileCredentialStore) Save ¶
func (s *FileCredentialStore) Save(creds *AgentCredentials) error
Save saves credentials to the file.
type JobClient ¶
type JobClient interface {
// Poll polls for new jobs using long-polling.
Poll(ctx context.Context, req *PollRequest) (*PollResponse, error)
// AcknowledgeJob acknowledges receipt of a job.
AcknowledgeJob(ctx context.Context, jobID string) error
// ReportJobResult reports the result of a completed job.
ReportJobResult(ctx context.Context, result *JobResult) error
// ReportJobProgress reports progress on a running job.
ReportJobProgress(ctx context.Context, jobID string, progress int, message string) error
}
JobClient defines the interface for job operations.
type JobExecutor ¶
JobExecutor executes platform jobs.
type JobInfo ¶
type JobInfo struct {
ID string `json:"id"`
Type string `json:"type"`
Priority int `json:"priority"`
TenantID string `json:"tenant_id"`
Payload map[string]interface{} `json:"payload"`
AuthToken string `json:"auth_token"` // JWT for tenant data access
CreatedAt time.Time `json:"created_at"`
TimeoutSec int `json:"timeout_seconds"`
// WorkflowContext is set when the job was triggered by a workflow.
// It allows correlating job execution with workflow runs.
WorkflowContext *WorkflowContext `json:"workflow_context,omitempty"`
}
JobInfo contains information about a platform job.
func (*JobInfo) HasWorkflowContext ¶
HasWorkflowContext returns true if the job was triggered by a workflow.
type JobPoller ¶
type JobPoller struct {
// contains filtered or unexported fields
}
JobPoller polls for and executes platform jobs.
The JobPoller uses long-polling to efficiently wait for jobs. When no jobs are available, the server holds the connection open until a job arrives or the timeout expires. This provides near-real-time job dispatch with minimal network overhead.
Optional integrations:
- ResourceController: Throttle job acceptance based on CPU/memory
- AuditLogger: Log job lifecycle events for debugging and compliance
func NewJobPoller ¶
func NewJobPoller(client JobClient, executor JobExecutor, config *PollerConfig) *JobPoller
NewJobPoller creates a new JobPoller.
func (*JobPoller) CurrentJobCount ¶
CurrentJobCount returns the current number of running jobs.
func (*JobPoller) SetAuditLogger ¶
func (p *JobPoller) SetAuditLogger(logger AuditLogger)
SetAuditLogger sets the optional audit logger for event logging. When set, job lifecycle events will be logged.
func (*JobPoller) SetLeaseManager ¶
func (p *JobPoller) SetLeaseManager(lm *LeaseManager)
SetLeaseManager sets the lease manager for job count reporting. It also registers a callback to cancel all running jobs when lease expires.
func (*JobPoller) SetResourceController ¶
func (p *JobPoller) SetResourceController(rc ResourceController)
SetResourceController sets the optional resource controller for throttling. When set, jobs will only be accepted when resources are available.
type JobResult ¶
type JobResult struct {
JobID string `json:"job_id"`
Status string `json:"status"` // completed, failed, canceled
CompletedAt time.Time `json:"completed_at"`
DurationMs int64 `json:"duration_ms"`
FindingsCount int `json:"findings_count"`
Error string `json:"error,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
// WorkflowContext is echoed back if the job was triggered by a workflow.
// This allows the Workflow Executor to correlate results with workflow runs.
WorkflowContext *WorkflowContext `json:"workflow_context,omitempty"`
}
JobResult contains the result of a completed job.
func (*JobResult) HasWorkflowContext ¶
HasWorkflowContext returns true if the result includes workflow context.
type JobValidationError ¶
JobValidationError represents a job validation failure.
func (*JobValidationError) Error ¶
func (e *JobValidationError) Error() string
type LeaseClient ¶
type LeaseClient interface {
RenewLease(ctx context.Context, req *LeaseRenewRequest) (*LeaseRenewResponse, error)
ReleaseLease(ctx context.Context) error
}
LeaseClient defines the interface for lease operations.
func NewHTTPLeaseClient ¶
func NewHTTPLeaseClient(baseURL, apiKey, agentID string) LeaseClient
NewHTTPLeaseClient creates a new HTTP-based lease client.
type LeaseConfig ¶
type LeaseConfig struct {
// LeaseDuration is how long the lease is valid for.
// Default: 60 seconds.
LeaseDuration time.Duration
// RenewInterval is how often to renew the lease.
// Should be less than LeaseDuration (typically 1/3).
// Default: 20 seconds.
RenewInterval time.Duration
// GracePeriod is how long to wait after lease expiry before considering agent dead.
// Default: 15 seconds.
GracePeriod time.Duration
// MaxJobs is the maximum concurrent jobs this agent can handle.
MaxJobs int
// MetricsCollector provides system metrics for lease renewal.
// If nil, metrics are not reported.
MetricsCollector MetricsCollector
// OnLeaseExpired is called when the lease expires (agent should shutdown).
OnLeaseExpired func()
// Verbose enables debug logging.
Verbose bool
// UseSecureIdentity enables cryptographically secure holder identity.
// When true, the holder identity includes a random nonce that cannot be guessed.
// Default: true (should only be false for testing).
UseSecureIdentity *bool
// IdentityPrefix is an optional prefix for the holder identity.
// Useful for identifying agent type (e.g., "scanner", "collector").
IdentityPrefix string
}
LeaseConfig configures the LeaseManager.
type LeaseInfo ¶
type LeaseInfo struct {
AgentID string `json:"agent_id"`
HolderIdentity string `json:"holder_identity"`
LeaseDurationSec int `json:"lease_duration_seconds"`
AcquireTime time.Time `json:"acquire_time"`
RenewTime time.Time `json:"renew_time"`
ResourceVersion int `json:"resource_version"`
}
LeaseInfo contains information about the current lease.
type LeaseManager ¶
type LeaseManager struct {
// contains filtered or unexported fields
}
LeaseManager manages the agent's lease with the control plane. It periodically renews the lease to indicate the agent is healthy.
func NewLeaseManager ¶
func NewLeaseManager(client LeaseClient, config *LeaseConfig) *LeaseManager
NewLeaseManager creates a new LeaseManager.
func (*LeaseManager) DecrementJobs ¶
func (m *LeaseManager) DecrementJobs(failed bool)
DecrementJobs decrements the current job count by 1 and increments completed.
func (*LeaseManager) GetStatus ¶
func (m *LeaseManager) GetStatus() *LeaseStatus
GetStatus returns the current lease status.
func (*LeaseManager) IncrementJobs ¶
func (m *LeaseManager) IncrementJobs()
IncrementJobs increments the current job count by 1.
func (*LeaseManager) SetCurrentJobs ¶
func (m *LeaseManager) SetCurrentJobs(count int)
SetCurrentJobs updates the current number of jobs being processed.
type LeaseRenewRequest ¶
type LeaseRenewRequest struct {
HolderIdentity string `json:"holder_identity"`
LeaseDurationSeconds int `json:"lease_duration_seconds"`
CurrentJobs int `json:"current_jobs"`
MaxJobs int `json:"max_jobs"`
CPUPercent *float64 `json:"cpu_percent,omitempty"`
MemoryPercent *float64 `json:"memory_percent,omitempty"`
DiskPercent *float64 `json:"disk_percent,omitempty"`
JobsCompletedTotal int `json:"jobs_completed_total,omitempty"`
JobsFailedTotal int `json:"jobs_failed_total,omitempty"`
}
LeaseRenewRequest contains the data for renewing a lease.
type LeaseRenewResponse ¶
type LeaseRenewResponse struct {
Success bool `json:"success"`
Message string `json:"message,omitempty"`
ResourceVersion int `json:"resource_version"`
RenewTime time.Time `json:"renew_time"`
}
LeaseRenewResponse contains the response from lease renewal.
type LeaseStatus ¶
type LeaseStatus struct {
Running bool
Healthy bool
LastRenewTime time.Time
ResourceVersion int
CurrentJobs int
LastError error
}
LeaseStatus represents the current status of the lease.
type MetricsCollector ¶
type MetricsCollector interface {
Collect() (*SystemMetrics, error)
}
MetricsCollector collects system metrics.
type PlatformAgent ¶
type PlatformAgent struct {
// contains filtered or unexported fields
}
PlatformAgent represents a fully configured platform agent.
func (*PlatformAgent) AuditLogger ¶
func (a *PlatformAgent) AuditLogger() *audit.Logger
AuditLogger returns the audit logger if configured.
func (*PlatformAgent) ChunkManager ¶
func (a *PlatformAgent) ChunkManager() *chunk.Manager
ChunkManager returns the chunk manager if configured.
func (*PlatformAgent) ExtendedStatus ¶
func (a *PlatformAgent) ExtendedStatus() *AgentStatus
ExtendedStatus returns the full agent status including resource metrics.
func (*PlatformAgent) FlushPipeline ¶
func (a *PlatformAgent) FlushPipeline(ctx context.Context) error
FlushPipeline waits for all pending uploads to complete. Returns an error if the pipeline is not configured or if the context is canceled.
func (*PlatformAgent) NeedsChunking ¶
func (a *PlatformAgent) NeedsChunking(report *ctis.Report) bool
NeedsChunking checks if a report should be uploaded via chunking. Returns false if chunk manager is not configured.
func (*PlatformAgent) Pipeline ¶
func (a *PlatformAgent) Pipeline() *pipeline.Pipeline
Pipeline returns the upload pipeline if configured.
func (*PlatformAgent) PipelineStats ¶
func (a *PlatformAgent) PipelineStats() *pipeline.Stats
PipelineStats returns the current pipeline statistics. Returns nil if the pipeline is not configured.
func (*PlatformAgent) ResourceController ¶
func (a *PlatformAgent) ResourceController() *resource.Controller
ResourceController returns the resource controller if configured.
func (*PlatformAgent) SmartSubmitReport ¶
func (a *PlatformAgent) SmartSubmitReport(ctx context.Context, report *ctis.Report, opts ...pipeline.SubmitOption) (string, *chunk.Report, error)
SmartSubmitReport automatically chooses between regular upload, pipeline, or chunked upload. - Small reports: uploaded directly via pipeline (if configured) or returned for manual upload - Large reports: uploaded via chunk manager (if configured)
Returns: - For pipeline submissions: (pipelineItemID, nil, nil) - For chunked submissions: ("", chunkReport, nil) - If neither is configured: ("", nil, error)
func (*PlatformAgent) Start ¶
func (a *PlatformAgent) Start(ctx context.Context) error
Start starts the platform agent (lease manager + job poller).
func (*PlatformAgent) Status ¶
func (a *PlatformAgent) Status() *AgentStatus
Status returns the current agent status.
func (*PlatformAgent) SubmitChunkedReport ¶
func (a *PlatformAgent) SubmitChunkedReport(ctx context.Context, report *ctis.Report) (*chunk.Report, error)
SubmitChunkedReport queues a large report for chunked upload. The report will be split into chunks, compressed, and uploaded in the background. Returns an error if the chunk manager is not configured.
func (*PlatformAgent) SubmitReport ¶
func (a *PlatformAgent) SubmitReport(report *ctis.Report, opts ...pipeline.SubmitOption) (string, error)
SubmitReport queues a report for async upload via the pipeline. Returns immediately after queueing. Use Pipeline().GetStats() to monitor progress. Returns an error if the pipeline is not configured.
type PlatformClient ¶
type PlatformClient struct {
// contains filtered or unexported fields
}
PlatformClient provides a unified interface for platform agent operations. It implements LeaseClient and JobClient interfaces.
func NewPlatformClient ¶
func NewPlatformClient(config *ClientConfig) *PlatformClient
NewPlatformClient creates a new PlatformClient.
func (*PlatformClient) AcknowledgeJob ¶
func (c *PlatformClient) AcknowledgeJob(ctx context.Context, jobID string) error
AcknowledgeJob implements JobClient.
func (*PlatformClient) Poll ¶
func (c *PlatformClient) Poll(ctx context.Context, req *PollRequest) (*PollResponse, error)
Poll implements JobClient.
func (*PlatformClient) ReleaseLease ¶
func (c *PlatformClient) ReleaseLease(ctx context.Context) error
ReleaseLease implements LeaseClient.
func (*PlatformClient) RenewLease ¶
func (c *PlatformClient) RenewLease(ctx context.Context, req *LeaseRenewRequest) (*LeaseRenewResponse, error)
RenewLease implements LeaseClient.
func (*PlatformClient) ReportJobProgress ¶
func (c *PlatformClient) ReportJobProgress(ctx context.Context, jobID string, progress int, message string) error
ReportJobProgress implements JobClient.
func (*PlatformClient) ReportJobResult ¶
func (c *PlatformClient) ReportJobResult(ctx context.Context, result *JobResult) error
ReportJobResult implements JobClient.
type PollRequest ¶
type PollRequest struct {
MaxJobs int `json:"max_jobs"`
Capabilities []string `json:"capabilities,omitempty"`
TimeoutSeconds int `json:"timeout_seconds,omitempty"`
}
PollRequest contains the data for polling jobs.
type PollResponse ¶
type PollResponse struct {
Jobs []*JobInfo `json:"jobs"`
PollIntervalHint int `json:"poll_interval_hint,omitempty"` // Suggested wait before next poll
QueueDepth int `json:"queue_depth,omitempty"` // Total pending jobs
}
PollResponse contains the response from job polling.
type PollerConfig ¶
type PollerConfig struct {
// MaxConcurrentJobs is the maximum number of concurrent jobs.
MaxConcurrentJobs int
// PollTimeout is the long-poll timeout (how long to wait for jobs).
PollTimeout time.Duration
// RetryDelay is the delay between poll attempts on error.
RetryDelay time.Duration
// Capabilities to advertise when polling.
Capabilities []string
// OnJobStarted is called when a job starts executing.
OnJobStarted func(job *JobInfo)
// OnJobCompleted is called when a job completes (success or failure).
OnJobCompleted func(job *JobInfo, result *JobResult)
// Verbose enables debug logging.
Verbose bool
// AllowedJobTypes restricts which job types can be executed.
// If empty, all job types are allowed.
AllowedJobTypes []string
// MaxPayloadSize limits the maximum payload size for jobs (default: 10MB).
MaxPayloadSize int
// RequireAuthToken requires jobs to have a valid auth token.
// When true, jobs without AuthToken will be rejected.
RequireAuthToken bool
// ValidateTokenClaims enables JWT claims validation.
// When true, the AuthToken's tenant_id claim must match job's TenantID.
ValidateTokenClaims bool
}
PollerConfig configures the JobPoller.
type RegistrationRequest ¶
type RegistrationRequest struct {
// Name is the agent's display name.
Name string `json:"name"`
// Capabilities are the scanner/collector capabilities (e.g., "sast", "sca", "dast").
Capabilities []string `json:"capabilities"`
// Tools are the specific tools available (e.g., "semgrep", "trivy", "nuclei").
Tools []string `json:"tools,omitempty"`
// Region is the deployment region (e.g., "us-east-1", "ap-southeast-1").
Region string `json:"region,omitempty"`
// Labels are optional key-value labels for filtering.
Labels map[string]string `json:"labels,omitempty"`
// MaxConcurrentJobs is the maximum number of concurrent jobs.
MaxConcurrentJobs int `json:"max_concurrent_jobs,omitempty"`
}
RegistrationRequest contains the data for registering a new platform agent.
type RegistrationResponse ¶
type RegistrationResponse struct {
AgentID string `json:"agent_id"`
APIKey string `json:"api_key"` // Only returned once - store securely!
APIPrefix string `json:"api_prefix"` // Prefix for display/logging (safe to log)
Message string `json:"message,omitempty"`
}
RegistrationResponse contains the response from agent registration.
type ResourceController ¶
type ResourceController interface {
// AcquireSlot attempts to acquire a job execution slot.
// Returns true if acquired, false if throttled or at capacity.
AcquireSlot(ctx context.Context) bool
// ReleaseSlot releases a previously acquired slot.
ReleaseSlot()
// IsThrottled returns true if resource limits are exceeded.
IsThrottled() bool
}
ResourceController is an optional interface for resource-based job throttling. If set on JobPoller, jobs will only be accepted when resources are available.
type SimpleMetricsCollector ¶
type SimpleMetricsCollector struct{}
SimpleMetricsCollector provides basic system metrics collection.
func (*SimpleMetricsCollector) Collect ¶
func (c *SimpleMetricsCollector) Collect() (*SystemMetrics, error)
Collect collects system metrics. This is a simplified implementation - production should use proper system calls.
type SystemMetrics ¶
type SystemMetrics struct {
CPUPercent float64 `json:"cpu_percent"`
MemoryPercent float64 `json:"memory_percent"`
DiskPercent float64 `json:"disk_percent"`
CurrentJobs int `json:"current_jobs"`
MaxJobs int `json:"max_jobs"`
}
SystemMetrics contains agent system metrics for health reporting.
type WorkflowContext ¶
type WorkflowContext struct {
// WorkflowID is the UUID of the workflow definition.
WorkflowID string `json:"workflow_id"`
// WorkflowRunID is the UUID of the specific workflow execution.
WorkflowRunID string `json:"workflow_run_id"`
// TriggerType indicates what triggered the workflow (e.g., "finding_created", "schedule", "manual").
TriggerType string `json:"trigger_type"`
// ActionNodeID is the UUID of the action node that triggered this job.
ActionNodeID string `json:"action_node_id"`
// ActionNodeKey is the node_key of the action node (e.g., "run_scan_1").
ActionNodeKey string `json:"action_node_key"`
}
WorkflowContext contains context information when a job is triggered by the Workflow Executor as part of an automation workflow.