workload

package
v1.5.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package workload models a deployable workload as a top-level resource that owns N replica Instances. It maps to Kubernetes' Deployment in role: a Workload carries the spec (image, env, resources, ReplicaCount) and ctrlplane orchestrates per-replica container/pod creation through the provider layer.

Entity relationships:

Workload 1:N Instance     (replicas; cascade delete)
Workload 1:N Release      (version history of the spec)
Workload 1:N Deployment   (rollout event log)
Release  1:N Deployment   (a release can be re-rolled out)

Replica orchestration is a ctrlplane concern. Providers stay per-Pod (one Provision call = one container/pod). When a Workload scales from N=1 to N=3, the Workload service calls instance.Service.Create three times rather than asking the provider to "make 3 of these"; the docker provider creates 3 independent containers, and the kubernetes provider creates 3 independent Pods. This keeps the provider interface trivial and makes rollout strategies (rolling/blue-green/canary) uniform across providers.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewService

func NewService(store Store, instances instance.Service, deploys deploy.Service, templates template.Service, healthSvc health.Service, metricsSvc metrics.Service, networkSvc network.Service, events event.Bus, authProvider auth.Provider) *service

NewService wires the workload service. The instance + deploy + health services are mandatory; templates, metrics + network are optional (passing nil yields workload-level read methods that return empty results for those subsystems, and FromTemplateID is rejected). The composition layer is expected to call templates.SetWorkloadReader(NewSpecReader(...)) so the template service can fork from existing workloads.

func ValidateTransition

func ValidateTransition(from, to State) error

ValidateTransition reports nil when (from → to) is allowed, or wraps ctrlplane.ErrInvalidState with detail.

Types

type AggregatedSample

type AggregatedSample struct {
	At                    time.Time `json:"at"`
	ReplicaCount          int       `json:"replica_count"`
	CPUPercent            float64   `json:"cpu_percent"`
	MemoryUsedMB          int       `json:"memory_used_mb"`
	MemoryLimitMB         int       `json:"memory_limit_mb"`
	NetworkInBytesPerSec  float64   `json:"network_in_bytes_per_sec"`
	NetworkOutBytesPerSec float64   `json:"network_out_bytes_per_sec"`
	RequestsPerSec        float64   `json:"requests_per_sec,omitempty"`
	LatencyP95Ms          float64   `json:"latency_p95_ms,omitempty"`
}

AggregatedSample is a per-bucket roll-up across replicas. It is shaped like a single metrics.Sample so dashboards can render it the same way as per-instance series.

type CreateRequest

type CreateRequest struct {
	Name           string                 `json:"name"                      validate:"required"`
	DatacenterID   id.ID                  `json:"datacenter_id,omitzero"`
	Region         string                 `json:"region,omitempty"`
	ProviderName   string                 `json:"provider_name,omitempty"`
	FromTemplateID id.ID                  `json:"from_template_id,omitzero"`
	Kind           provider.WorkloadKind  `json:"kind,omitempty"`
	Services       []provider.ServiceSpec `json:"services,omitempty"`
	Labels         map[string]string      `json:"labels,omitempty"`

	// Replicas is the desired replica count. Defaults to 1 when zero
	// or negative.
	Replicas int `json:"replicas,omitempty"`
}

CreateRequest holds the parameters for creating a Workload. The service runs Workload.Create followed by N Instance.Create calls (one per replica) so the caller's experience is single-shot: "give me a workload running 3 replicas of [main + sidecar]" → one API call.

FromTemplateID, when non-zero, instructs the service to read the referenced template and use its fields as defaults; any field also non-zero on this request overrides the template's value.

type DeployRequest

type DeployRequest struct {
	Services []provider.ServiceDeploySpec `json:"services"           validate:"required,min=1"`
	Strategy string                       `json:"strategy,omitempty"` // "rolling" (default), "recreate", "blue_green", "canary"
	Notes    string                       `json:"notes,omitempty"`
}

DeployRequest kicks off a new release rollout. Services lists only the services being changed in this rollout — services not listed inherit their snapshot from the prior Release.

type HealthEvent

type HealthEvent struct {
	WorkloadID   id.ID `json:"workload_id"`
	InstanceID   id.ID `json:"instance_id"`
	ReplicaIndex int   `json:"replica_index"`
	Result       any   `json:"result"` // *health.HealthResult — kept opaque to avoid an import cycle
}

HealthEvent is one HealthResult tagged with the replica metadata the consumer needs to render per-replica state in a UI.

type ListOptions

type ListOptions struct {
	State        State  `json:"state,omitempty"`
	ProviderName string `json:"provider_name,omitempty"`
	Region       string `json:"region,omitempty"`
	Limit        int    `json:"limit,omitempty"`
}

ListOptions filters the Workload list endpoint.

type ListResult

type ListResult struct {
	Items []*Workload `json:"items"`
	Total int         `json:"total"`
}

ListResult holds a page of Workloads.

type LogEvent

type LogEvent struct {
	WorkloadID   id.ID  `json:"workload_id"`
	InstanceID   id.ID  `json:"instance_id"`
	ReplicaIndex int    `json:"replica_index"`
	Line         []byte `json:"line"`
}

LogEvent wraps one line of log output with the replica metadata. The Line field is the raw demuxed JSON object emitted by the docker provider (or whatever the underlying provider produces), kept as bytes so the consumer can stream it through without an extra unmarshal.

type LogsOptions

type LogsOptions struct {
	Follow bool      `json:"follow"`
	Since  time.Time `json:"since,omitzero"`
	Tail   int       `json:"tail,omitempty"`
}

LogsOptions mirrors instance.LogsOptions on the workload service. Defined here to avoid the workload package importing the instance LogsOptions struct directly (cycle prevention).

type MetricsEvent

type MetricsEvent struct {
	WorkloadID   id.ID `json:"workload_id"`
	InstanceID   id.ID `json:"instance_id"`
	ReplicaIndex int   `json:"replica_index"`
	Sample       any   `json:"sample"` // metrics.Sample
}

MetricsEvent is one metric sample tagged with replica metadata. Sample is kept opaque (any) so this package doesn't import the metrics package directly — the upstream metrics.Sample type is what gets attached at runtime.

type MetricsRange

type MetricsRange struct {
	Since      time.Time     `json:"since"`
	Until      time.Time     `json:"until"`
	Resolution time.Duration `json:"resolution,omitempty"`
}

MetricsRange mirrors metrics.RangeQuery without the import.

type MetricsSeries

type MetricsSeries []AggregatedSample

MetricsSeries is the aggregated workload-level series. Each AggregatedSample sums replica CPU/Memory/Network and averages latency-style fields across replicas that had a sample in the bucket.

type Service

type Service interface {
	// Create allocates a Workload and provisions Replicas Instances
	// in one shot. Returns the Workload; List the Workload's
	// instances separately if you need them.
	Create(ctx context.Context, req CreateRequest) (*Workload, error)

	Get(ctx context.Context, workloadID id.ID) (*Workload, error)
	GetBySlug(ctx context.Context, slug string) (*Workload, error)
	List(ctx context.Context, opts ListOptions) (*ListResult, error)

	// Update mutates the Workload spec (image, env, resources,
	// labels). For changes that need a deploy (image swap),
	// callers should use Deploy instead — Update sets the spec but
	// doesn't push it to running replicas.
	Update(ctx context.Context, workloadID id.ID, req UpdateRequest) (*Workload, error)

	// Scale adjusts the replica count. Adds new Instances when
	// growing, deprovisions trailing Instances when shrinking. Does
	// not touch existing replicas' images or env.
	Scale(ctx context.Context, workloadID id.ID, replicas int) (*Workload, error)

	// Pause scales to zero while retaining the spec. Resume scales
	// back to the previously-set replica count.
	Pause(ctx context.Context, workloadID id.ID) error
	Resume(ctx context.Context, workloadID id.ID) error

	// Restart performs an in-place restart of every replica. Each
	// replica's container is stopped and started again by the
	// underlying provider (docker ContainerRestart, k8s Pod
	// restart, etc.) — no deprovision, no replica-count change,
	// no new container IDs. The Workload state stays Active
	// throughout.
	Restart(ctx context.Context, workloadID id.ID) error

	// Deploy creates a new Release from the Workload's current spec
	// (or from req overrides) and rolls it out to all replicas via
	// the chosen strategy. Returns the Deployment record so callers
	// can poll its state.
	Deploy(ctx context.Context, workloadID id.ID, req DeployRequest) (*deploy.Deployment, error)

	// Delete tears down the Workload and all its replicas.
	Delete(ctx context.Context, workloadID id.ID) error

	// ListInstances returns every Instance owned by the Workload,
	// ordered by ReplicaIndex.
	ListInstances(ctx context.Context, workloadID id.ID) ([]*instance.Instance, error)

	// ListDeployments returns every Deployment whose target instance
	// is one of the workload's replicas. Each rollout is represented
	// once (deployments are inherently per-replica today; if a
	// future Deploy strategy emits a single workload-level record,
	// this aggregator collapses to a passthrough).
	ListDeployments(ctx context.Context, workloadID id.ID, opts deploy.ListOptions) (*deploy.DeployListResult, error)

	// ListReleases returns the union of releases across the
	// workload's replicas, deduplicated by Release.ID. Sorted by
	// CreatedAt descending so the most recent release is first.
	ListReleases(ctx context.Context, workloadID id.ID, opts deploy.ListOptions) (*deploy.ReleaseListResult, error)

	// ListDomains returns every Domain currently bound to any of
	// the workload's replicas, deduplicated by Domain.ID.
	ListDomains(ctx context.Context, workloadID id.ID) ([]network.Domain, error)

	// ListRoutes returns every Route currently bound to any of the
	// workload's replicas, deduplicated by Route.ID.
	ListRoutes(ctx context.Context, workloadID id.ID) ([]network.Route, error)

	// WatchHealth returns a channel that fans in HealthResults from
	// every replica in the workload. Each event carries the source
	// instance ID + replica index. The fan-in re-lists replicas
	// every 30s so a Scale that adds new replicas mid-stream picks
	// them up automatically. Channel is closed when ctx is cancelled.
	WatchHealth(ctx context.Context, workloadID id.ID) (<-chan *HealthEvent, error)

	// StreamLogs returns a channel of log events fanned in across
	// every replica's stdout/stderr. Each event carries the source
	// instance ID + replica index alongside the original log line.
	// Closed when ctx is cancelled or every replica's underlying
	// log stream has terminated.
	StreamLogs(ctx context.Context, workloadID id.ID, opts LogsOptions) (<-chan *LogEvent, error)

	// GetHealth returns the worst-of-replicas health status. Useful
	// for the workspace dashboard badge — aggregates per-replica
	// InstanceHealth and takes the most severe state. Replica counts
	// (healthy/degraded/unhealthy/unknown) are returned alongside so
	// the badge can show "2/3 healthy" detail.
	GetHealth(ctx context.Context, workloadID id.ID) (*WorkloadHealth, error)

	// RangeMetrics returns aggregated resource metrics summed across
	// the workload's replicas at each bucket. CPU/Memory/Network are
	// summed; LatencyP95 and RequestsPerSec are averaged (a per-
	// replica P95 isn't meaningfully summable). Empty when no
	// replicas have any samples in the window.
	RangeMetrics(ctx context.Context, workloadID id.ID, q MetricsRange) (MetricsSeries, error)

	// WatchMetrics emits one MetricsEvent per replica per sample.
	// Aggregating into a workload total is the consumer's job — the
	// dashboard wants per-replica spark lines next to the aggregate.
	WatchMetrics(ctx context.Context, workloadID id.ID) (<-chan *MetricsEvent, error)
}

Service is the public interface for managing Workloads. The service orchestrates per-replica Instance lifecycle through instance.Service — callers should not call instance.Service.Create directly except for one-off debugging needs.

type SpecReader

type SpecReader struct {
	// contains filtered or unexported fields
}

SpecReader adapts a workload service onto template.WorkloadSpecReader so the template service can fork a Template from an existing Workload's spec without importing the workload package directly (which would create an import cycle).

Wire it at composition time:

wlSvc := workload.NewService(...)
tplSvc := template.NewService(store, events)
tplSvc.SetWorkloadReader(workload.NewSpecReader(wlSvc))

func NewSpecReader

func NewSpecReader(source workloadGetter) *SpecReader

NewSpecReader returns a reader backed by the given workload service.

func (*SpecReader) ReadWorkloadSpec

func (r *SpecReader) ReadWorkloadSpec(ctx context.Context, tenantID string, workloadID id.ID) (*template.WorkloadSpec, error)

ReadWorkloadSpec implements template.WorkloadSpecReader. It loads the workload via the embedded service and projects its blueprint- relevant fields onto a template.WorkloadSpec.

type State

type State string

State is the lifecycle of a Workload as an aggregate.

const (
	// StateProvisioning — initial replicas being created.
	StateProvisioning State = "provisioning"

	// StateActive — desired replica count met and all replicas Running.
	StateActive State = "active"

	// StateScaling — a Scale call is mid-flight.
	StateScaling State = "scaling"

	// StateDeploying — a Deploy is mid-rollout.
	StateDeploying State = "deploying"

	// StatePaused — explicitly scaled to zero. Spec retained.
	StatePaused State = "paused"

	// StateFailed — non-recoverable error. Manual intervention required.
	StateFailed State = "failed"

	StateDestroying State = "destroying"
	StateDestroyed  State = "destroyed"
)

type Store

type Store interface {
	InsertWorkload(ctx context.Context, w *Workload) error
	GetWorkloadByID(ctx context.Context, tenantID string, workloadID id.ID) (*Workload, error)
	GetWorkloadBySlug(ctx context.Context, tenantID, slug string) (*Workload, error)

	// ListWorkloads returns workloads visible to tenantID. Empty
	// tenantID is the cross-tenant convention used by admin views
	// (matches the instance + deploy stores).
	ListWorkloads(ctx context.Context, tenantID string, opts ListOptions) (*ListResult, error)

	UpdateWorkload(ctx context.Context, w *Workload) error
	DeleteWorkload(ctx context.Context, tenantID string, workloadID id.ID) error
}

Store is the persistence interface for Workload entities. Method names are suffixed with `Workload` so a single concrete store type can implement Workload + Instance + Deploy + Network etc. without method-name collisions on `Insert`/`Get`/`List`/`Update`. (Instance.Store has dibs on the bare names.)

type UpdateRequest

type UpdateRequest struct {
	Name     *string                `json:"name,omitempty"`
	Services []provider.ServiceSpec `json:"services,omitempty"`
	Labels   map[string]string      `json:"labels,omitempty"`
}

UpdateRequest mutates a Workload's spec. Replicas live on Scale, not here. Kind is immutable post-creation and must be set via recreate, not Update.

type Workload

type Workload struct {
	ctrlplane.Entity

	TenantID     string `db:"tenant_id"     json:"tenant_id"`
	Name         string `db:"name"          json:"name"`
	Slug         string `db:"slug"          json:"slug"`
	DatacenterID id.ID  `db:"datacenter_id" json:"datacenter_id,omitzero"`
	ProviderName string `db:"provider_name" json:"provider_name"`
	Region       string `db:"region"        json:"region"`

	// Kind selects the runtime topology — KindDeployment for stateless
	// replicas (default), KindStatefulSet for stable per-replica
	// identity + storage. Cannot change after creation; mutating this
	// field through Update is rejected.
	Kind provider.WorkloadKind `db:"kind" json:"kind"`

	// Services is the per-service spec applied to every replica. One
	// Workload spawns N replicas; each replica spawns every service in
	// this slice as a co-scheduled container/task.
	Services []provider.ServiceSpec `db:"services" json:"services"`

	// Labels is workload-level metadata applied to every container in
	// every replica. ctrlplane reserves keys prefixed `ctrlplane.` for
	// internal discovery (e.g. `ctrlplane.workload=<id>`).
	Labels map[string]string `db:"labels" json:"labels,omitempty"`

	// CurrentReleaseID points at the Release whose snapshot is in effect.
	// Bumped by Deploy. Empty until the first Deploy succeeds.
	CurrentReleaseID id.ID `db:"current_release_id" json:"current_release_id,omitzero"`

	// ReplicaCount is the desired number of running replicas.
	ReplicaCount int `db:"replica_count" json:"replica_count"`

	// State is the lifecycle of the Workload as an aggregate.
	State State `db:"state" json:"state"`

	PausedAt *time.Time `db:"paused_at" json:"paused_at,omitempty"`

	// PreviousReplicas remembers the desired replica count just
	// before the most recent Pause. Resume reads this so a 3-replica
	// workload paused-and-resumed comes back as 3 replicas, not 1.
	PreviousReplicas int `db:"previous_replicas" json:"previous_replicas,omitempty"`

	// TemplateID records the template (if any) the Workload was forked
	// from. Empty for workloads created from raw fields.
	TemplateID id.ID `db:"template_id" json:"template_id,omitzero"`
}

Workload is the top-level deployable unit. It owns N Instance replicas (the actual running co-scheduling units — k8s Pods, Nomad allocations, Docker Compose projects). Workload itself never runs anything; provider interactions happen via Instance lifecycle.

Multi-service: Services is the per-service spec slice. Every replica runs every service co-scheduled together. Workload-level fields (ReplicaCount, Kind, Labels, State) apply across all services.

func NewWorkload

func NewWorkload() *Workload

NewWorkload allocates a Workload with a fresh ID and timestamps. Defaults to StateProvisioning + KindDeployment so callers don't have to remember to set them.

func (*Workload) MainService

func (w *Workload) MainService() *provider.ServiceSpec

MainService returns the workload's primary (RoleMain) service, or nil when no Main is configured. Used by callers that need a single representative service — health, default network endpoint, "the workload's image" displays.

type WorkloadHealth

type WorkloadHealth struct {
	WorkloadID    id.ID  `json:"workload_id"`
	Status        string `json:"status"` // healthy / degraded / unhealthy / unknown — uses health.Status string values
	ReplicaCount  int    `json:"replica_count"`
	HealthyCount  int    `json:"healthy_count"`
	DegradedCount int    `json:"degraded_count"`
	UnhealthyCnt  int    `json:"unhealthy_count"`
	UnknownCount  int    `json:"unknown_count"`
}

WorkloadHealth is the aggregate health view across a workload's replicas. Status is the worst-of-replicas — a single unhealthy replica is enough to flip the workload to "unhealthy". When no replicas have any health checks configured the whole workload reports "unknown".

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL