cloudcompute

package module
v0.0.0-...-9d8e1fe Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 14, 2025 License: MIT Imports: 15 Imported by: 0

README

Cloud Compute

Cloud Compute is a framework designed for large-scale computing, drawing significant inspiration from AWS Batch. It structures compute operations into the following key components:

  • Compute Environments – Define the execution environment for jobs, including resources and constraints.
  • Job Definitions – Specify job configurations such as container images, resource requirements, and execution parameters.
  • Job Queues – Manage the prioritization and scheduling of jobs across compute environments.
  • Jobs – The actual units of work submitted to the system for execution.

Core Principles

Cloud Compute key definitions and principles are listed below:

  • Plugins

    • Compute Plugins: The containerized code executed as jobs within the environment.

    Plugins and Execution Model

    Plugins are central to Cloud Compute. They function as externally developed software packages integrated into the framework. Key characteristics:

    • Input Handling – Plugins accept an input payload or environment variables defining execution parameters.
    • Execution Model – A plugin runs once its required inputs are available, as determined by the DAG structure.
    • Computational Scope – A plugin's functionality can range from simple tasks, such as generating random numbers, to complex models, such as physics-based watershed simulations.

    Plugin Principals

    • Compute Plugins are fully containerized and operate independently without awareness of other plugins.
    • Plugins can be written in any programming language.
    • Each Compute Plugin must define a Compute Manifest Schema, specifying its input and output requirements. Details can be found in the Plugins documentation
    • Plugins undergo an approval process before deployment, which varies by environment but addresses computational, licensing, Department of Defense (DoD) policy, and cybersecurity concerns.
    • Plugins communicate execution status and results via logging to STDOUT and STDERR or MQTT messaging.
    • Plugins can only execute on resources provisioned by Cloud Compute and cannot spawn external Compute Plugins.
    • Resource Access:
      • Compute Plugins can read input from data sources that they have authorization to access
      • Compute Plugins write output to the data sources they have authorization to access
    • Compute Plugins will be responsible for identifying error conditions within the compute job and dumping debug info to either a file store and reporting the error.
  • Cloud Compute:

    • Will manage job execution by
      • Provisioning plugin resources
      • Pushing jobs to an internal queue to submit to a compute provider
    • Is responsible for scaling events horizontally on compute environments.

Events and Computational Flow

Cloud Compute introduces the concept of an Event, which represents a circuit through the directed acyclic graph (DAG). This DAG defines the computational sequence for a given event. Key properties include:

  • Parallel Execution – Events can be executed in parallel, and nodes within the DAG can run concurrently subject to dependency constraints.
  • Manifests – A single plugin execution within an event is called a Manifest. An event consists of multiple manifests which can have dependencies on each other.

Implementations

Cloud Compute currently has two implementations:

  • AWS Batch Integration – Designed for scalable, cloud-based compute workloads.
  • Local Docker Compute – Primarily used for plugin development and testing.

Development Guidelines

  • Cloud Compute licensing is MIT
  • Core libraries are written in golang, however compute plugins can be written in any language.
  • Plugins should avoid proprietary licensing, and licensing terms will potentially prevent deployment of a plugin within the cloud environment.
  • Plugins will be subject to a vetting process
  • Plugins can not contain Personably Identifiable Information (PII), Personal Health Information (PHI), or Controlled Unclassified Information (CUI)
  • Cloud Compute Events (DAGS) can only be constructed from approved plugins

License

MIT License

Software Development Kits

The software development kit (SDK) provides the essential data structures and a handful of utility services to provide the necessary consistency needed for cloud compute.

GO : https://github.com/USACE/cc-go-sdk

Java : https://github.com/USACE/cc-java-sdk

Python : https://github.com/USACE/cc-python-sdk

DotNet : https://github.com/USACE/cc-dotnet-sdk

Documentation

Index

Constants

View Source
const (
	ResourceTypeGpu             ResourceType = "GPU"
	ResourceTypeVcpu            ResourceType = "VCPU"
	ResourceTypeMemory          ResourceType = "MEMORY"
	ResourceTypeAttachedStorage ResourceType = "ATTACHEDSTORAGE"

	SUMMARY_COMPUTE string = "COMPUTE"
	SUMMARY_EVENT   string = "EVENT"
	//SUMMARY_MANIFEST string = "MANIFEST"
	SUMMARY_JOB string = "JOB"

	AWSBATCH    ComputeProviderType = 0
	LOCALDOCKER ComputeProviderType = 1
	KUBERNETES  ComputeProviderType = 2
)

Variables

This section is empty.

Functions

func TopologicalSort

func TopologicalSort[T comparable](digraph map[T][]T) ([]T, error)

Generic topological sort function. supports all types that implement 'comparable'

Types

type ArrayEventGenerator

type ArrayEventGenerator struct {
	// contains filtered or unexported fields
}

func NewArrayEventGenerator

func NewArrayEventGenerator(event Event, perEventLoopData []map[string]string, start int64, end int64) (*ArrayEventGenerator, error)

func (*ArrayEventGenerator) NextEvent

func (aeg *ArrayEventGenerator) NextEvent() (Event, bool, error)

type BatchEvent

type BatchEvent struct {
	EventIdentifier   string          `json:"eventIdentifier"`
	ManifestOverrides ComputeManifest `json:"manifest"`
}

type BatchEventGenerator

type BatchEventGenerator struct {
	// contains filtered or unexported fields
}

func NewBatchEventGenerator

func NewBatchEventGenerator(event Event, batch []BatchEvent) *BatchEventGenerator

func (*BatchEventGenerator) NextEvent

func (beg *BatchEventGenerator) NextEvent() (Event, bool, error)

@TODO how to handle error in set of events?

type CcJobStore

type CcJobStore interface {
	SaveJob(computeId uuid.UUID, payloadId uuid.UUID, event string, job *Job) error
}

type CcMessageQueue

type CcMessageQueue interface {
	SendMessage(channel string, message []byte) error
	Subscribe(channel string) (<-chan any, error) //amqp.Delivery
}

type CloudCompute

type CloudCompute struct {
	//Compute Identifier
	ID uuid.UUID `json:"id"`

	//User friendly Name for the compute
	Name string `json:"name"`

	//JobQueue to push the events to
	JobQueue string `json:"jobQueue"`

	//Event generator
	Events EventGenerator `json:"events"`

	//compute provider for the compute (typically AwsBatchProvider)
	ComputeProvider ComputeProvider `json:"computeProvider"`

	//Job store is for saving the set of jobs being sent to the compute environment to a
	//user defined storage location.  Typically something like a relational database
	JobStore CcJobStore

	//Function that will postprocess an event after it is generated by the event generator
	EventProcessor EventProcessor
}

CloudCompute is a compute submission for a single dag for a set of events The compute environment Job Queue and Job Definitions must exist before a CloudCompute can be initiated.

func (*CloudCompute) Cancel

func (cc *CloudCompute) Cancel(reason string) error

Cancels jobs submitted to compute environment

func (*CloudCompute) Run

func (cc *CloudCompute) Run() error

Runs a Compute on the ComputeProvider submitting jobs sequentially currently an event number of -1 represents an invalid event received from the event generator. These events are skipped

func (*CloudCompute) RunParallel

func (cc *CloudCompute) RunParallel(concurrency int) error

Runs a Compute on the ComputeProvider submitting jobs in parallel based on the concurrency requested currently an event number of -1 represents an invalid event received from the event generator. These events are skipped

func (*CloudCompute) Status

func (cc *CloudCompute) Status(query JobsSummaryQuery) error

Requests the status of a given compute at the COMPUTE, EVENT, or JOB level A JobSummaryFunction is necessary to process the status

type ComputeManifest

type ComputeManifest struct {
	ManifestName         string                `json:"manifest_name,omitempty" jsonschema:"title=Manifest Name,description=The name for the compute manifest"`
	ManifestID           uuid.UUID             `json:"manifest_id,omitempty" jsonschema:"-"`
	Command              []string              `json:"command,omitempty" jsonschema:"title=Command Override,description=An optional command override for the plugin"`
	Dependencies         []uuid.UUID           `json:"dependencies,omitempty" jsonschema:"-"`
	Stores               []DataStore           `json:"stores,omitempty" jsonschema:"title=Stores"`
	Inputs               PluginInputs          `json:"inputs,omitempty" jsonschema:"title=Inputs"`
	Outputs              []DataSource          `json:"outputs,omitempty" jsonschema:"title=Outputs"`
	Actions              []Action              `json:"actions,omitempty" jsonschema:"title=Actions"`
	PluginDefinition     string                `json:"plugin_definition,omitempty" jsonschema:"title=Plugin Definition"` //plugin resource name. "name:version"
	Tags                 map[string]string     `json:"tags,omitempty" jsonschema:"title=Tags"`
	RetryAttemts         int32                 `json:"retry_attempts,omitempty" jsonschema:"title=Retry Attempts Override"`
	JobTimeout           int32                 `json:"job_timeout,omitempty" jsonschema:"title=Job Timeout Override"`
	ResourceRequirements []ResourceRequirement `json:"resource_requirements,omitempty" jsonschema:"title=Resource Requirement Overrides"`
	// contains filtered or unexported fields
}

ComputeManifest is the information necessary to execute a single job in an event

func (ComputeManifest) Deps

func (m ComputeManifest) Deps() []uuid.UUID

Manifest Deps sort function for a slice of uuid dependencies

func (*ComputeManifest) GetPayload

func (cm *ComputeManifest) GetPayload() uuid.UUID

This is a transitional method that will be removed in a future version It is intended to facilitate running manifests written prior to payloadId @Depricated

func (ComputeManifest) Node

func (m ComputeManifest) Node() uuid.UUID

Manifest Node sort function for uuid IDs

func (*ComputeManifest) WritePayload

func (cm *ComputeManifest) WritePayload() error

writes a payload to the compute store

type ComputeManifests

type ComputeManifests []ComputeManifest

func (*ComputeManifests) GetManifest

func (cm *ComputeManifests) GetManifest(id uuid.UUID, deepcopy bool) (int, *ComputeManifest, error)

gets a reference to a manifest by manifest id in a slice of compute manifests. Optionally the caller can request a deep copy of the manifest method returns the index position of the manifest, a reference to the manifest or a copy of it, and any errors

func (*ComputeManifests) GetManifestByIndex

func (cm *ComputeManifests) GetManifestByIndex(index int, deepcopy bool) (*ComputeManifest, error)

gets a reference to a manifest by manifest index in a slice of compute manifests. Optionally the caller can request a deep copy of the manifest method a reference to the manifest or a copy of it, and any errors

func (*ComputeManifests) Len

func (cm *ComputeManifests) Len() int

type ComputeProvider

type ComputeProvider interface {
	SubmitJob(job *Job) error
	TerminateJobs(input TerminateJobInput) error
	Status(jobQueue string, query JobsSummaryQuery) error
	JobLog(submittedJobId string, token *string) (JobLogOutput, error)
	RegisterPlugin(plugin *Plugin) (PluginRegistrationOutput, error)
	UnregisterPlugin(nameAndRevision string) error
}

Interface for a compute provider.

type ComputeProviderType

type ComputeProviderType int

type ConcurrentRunner

type ConcurrentRunner struct {
	// contains filtered or unexported fields
}

func NewConcurrentRunner

func NewConcurrentRunner(limit int) *ConcurrentRunner

func (*ConcurrentRunner) Run

func (cr *ConcurrentRunner) Run(cf func())

func (*ConcurrentRunner) Wait

func (cr *ConcurrentRunner) Wait()

type ContainerOverrides

type ContainerOverrides struct {
	Command              []string
	Environment          KeyValuePairs
	ResourceRequirements []ResourceRequirement
}

Overrides the container command or environment from the base values provided in the job description

type Event

type Event struct {
	ID              uuid.UUID         `json:"id"`
	EventIdentifier string            `json:"event"` //RULES ONLY NUMBERS, STRINGS, DASH, AND UNDERSCORE
	Manifests       []ComputeManifest `json:"manifests"`

	AdditionalEventEnvVars KeyValuePairs
	// contains filtered or unexported fields
}

EVENT is a single run through the DAG

func (*Event) AddManifest

func (e *Event) AddManifest(m ComputeManifest)

Adds a manifest to the Event

func (*Event) AddManifestAt

func (e *Event) AddManifestAt(m ComputeManifest, i int)

Adds a manifest at a specific ordinal position in the event.

func (*Event) SortManifests

func (e *Event) SortManifests() error

func (*Event) TopoSort

func (e *Event) TopoSort() ([]uuid.UUID, error)

Topological Sort function for an Event returns an ordered list of manifest IDs

type EventGenerator

type EventGenerator interface {
	NextEvent() (Event, bool, error)
}

EventGenerators provide an iterator type interface to work with sets of events for a Compute.

type EventGeneratorOld

type EventGeneratorOld interface {
	HasNextEvent() bool
	NextEvent() (Event, error)
}

EventGenerators provide an iterator type interface to work with sets of events for a Compute. old Generator interface was too difficult to synchronize in a concurrent access scenario, so it was simplified below to a single call that can be managed within one mutex List type event generators will return new and separate manifests for each event All others (array, streaming) will return the same manifests but with a unique event identifier for each event which allows for shared payloads (write one and use for every event)

type EventList

type EventList struct {
	// contains filtered or unexported fields
}

EventList is an EventGenerator composed of a slice of events. Events are enumerated in the order they were placed in the slice.

func NewEventList

func NewEventList(events []Event) *EventList

Instantiates a new EventList

func (*EventList) NextEvent

func (el *EventList) NextEvent() (Event, bool, error)

Retrieves the next event. Attempts to perform a topological sort on the manifest slice before returning. If sort fails it will log the issue and return the unsorted manifest slice

type EventProcessor

type EventProcessor interface {
	Process(event Event) (Event, error)
}

type Job

type Job struct {
	ID                 uuid.UUID
	EventID            uuid.UUID
	ManifestID         uuid.UUID
	JobName            string
	JobQueue           string
	JobDefinition      string
	ContainerOverrides ContainerOverrides
	DependsOn          []string //compute provider dependencies
	Parameters         map[string]string
	Tags               map[string]string
	RetryAttemts       int32
	JobTimeout         int32            //duration in seconds
	SubmittedJob       *SubmitJobResult //reference to the job information from the compute environment
}

This is a single "job" or unit of compute for a ComputeProvider Essentually it is a mapping of a single Manifest

type JobLogOutput

type JobLogOutput struct {
	Logs  []string
	Token *string
}

type JobNameParts

type JobNameParts struct {
	Compute string
	Event   string
	Job     string
}

func (*JobNameParts) Parse

func (jnp *JobNameParts) Parse(jobname string) error

type JobSummary

type JobSummary struct {
	//identifier for the compute environment being used.  e.g. AWS Batch Job ID
	JobId string

	//cloud compute job name
	JobName string

	//unix timestamp in milliseconds for when the job was created
	CreatedAt *int64

	//unix timestamp in milliseconds for when the job was started
	StartedAt *int64

	//status string value
	Status string

	//human readable string of the status
	StatusDetail *string

	//unix timestamp in milliseconds for when the job was stopped
	StoppedAt *int64

	//Compute Vendor resource name for the job.  e.g. the Job ARN for AWS
	ResourceName string
}

func (JobSummary) ID

func (js JobSummary) ID() string

func (JobSummary) Name

func (js JobSummary) Name() string

type JobSummaryFunction

type JobSummaryFunction func(summaries []JobSummary)

function to process the results of a JobSummary request summaries are processed in batches of JobSummaries but will continue until all jobs are reported. In AWS this processes the slice of summaries for the initial request and all subsequenct continutation tokens

type JobsSummaryQuery

type JobsSummaryQuery struct {
	//The Level to request.  Must be one of three values
	//COMPUTE/EVENT/MANIFEST as represented by the SUMMARY_{level} constants
	QueryLevel string

	//the GUIDs representing the referenced levels
	//The value must include preceding levels, so COMPUTE level must have at least the compute GUID
	//EVENT level must have the compuyte and event levels and MANIFEST level has all three guids
	QueryValue JobNameParts

	//a required function to process each job returned in the query
	JobSummaryFunction JobSummaryFunction
}

type KeyValuePair

type KeyValuePair struct {
	Name  string `json:"name"`
	Value string `json:"value"`
}

type KeyValuePairs

type KeyValuePairs []KeyValuePair

func MapToKeyValuePairs

func MapToKeyValuePairs(mapdata map[string]string) KeyValuePairs

func (*KeyValuePairs) GetVal

func (kvps *KeyValuePairs) GetVal(key string) string

designed to behave like a system env. Value returned if key is found empty string returned otherwise

func (*KeyValuePairs) HasKey

func (kvps *KeyValuePairs) HasKey(key string) bool

func (*KeyValuePairs) Merge

func (kvps *KeyValuePairs) Merge(newKvps *KeyValuePairs)

func (*KeyValuePairs) SetVal

func (kvps *KeyValuePairs) SetVal(key string, val string)

type LinuxDevice

type LinuxDevice struct {
	HostPath      *string `json:"host_path" jsonschema:"title=Host Path"`
	ContainerPath *string `json:"container_path" jsonschema:"title=Container Path"`
}

type LinuxParameters

type LinuxParameters struct {
	Devices []LinuxDevice `json:"devices" jsonschema:"title=Devices"`
}

type MountPoint

type MountPoint struct {
	ContainerPath string
	ReadOnly      bool
	SourceVolume  string
}

type PerEventLooper

type PerEventLooper struct {
	// contains filtered or unexported fields
}

func NewPerEventLooper

func NewPerEventLooper(pel []map[string]string) *PerEventLooper

func (*PerEventLooper) Next

func (pel *PerEventLooper) Next() (envVars map[string]string, incrementEvent bool)

type Plugin

type Plugin struct {
	Name string `json:"name" jsonschema:"title=Name"`
	//Revision           string                   `json:"revision" yaml:"revision"`
	ImageAndTag        string                   `json:"image_and_tag" jsonschema:"title=Image and Tag,description=The docker image and tag"`
	Description        string                   `json:"description" jsonschema:"title=Description"`
	Command            []string                 `json:"command" jsonschema:"title=Command,description=The docker command and arguments to run"`
	ComputeEnvironment PluginComputeEnvironment `json:"compute_environment" jsonschema:"title=Compute Environment,description=CPU and Memory runtime requirements"`
	DefaultEnvironment KeyValuePairs            `json:"environment" jsonschema:"title=Default Environment Variables,description=The list of default environment variables"` //default values for the container environment
	Volumes            []PluginComputeVolumes   ``                                                                                                                          /* 128-byte string literal not displayed */
	Credentials        KeyValuePairs            ``                                                                                                                          /* 213-byte string literal not displayed */
	Parameters         map[string]string        `json:"parameters" jsonschema:"title=Parameters"`
	RetryAttemts       int32                    `json:"retry_attempts" jsonschema:"title=Retry Attempts"`
	ExecutionTimeout   *int32                   `json:"execution_timeout" jsonschema:"title=Execution Timeout (sec)"`
	Privileged         bool                     `json:"privileged" jsonschema:"title=Requires Privileged Execution"` //assign container privileged execution.  for example to mount linux devices
	LinuxParameters    LinuxParameters          `json:"linux_parameters" jsonschema:"title=Linux Parameters"`
	MountPoints        []MountPoint             `json:"mountpoints" jsonschema:"title=MountPoints"`
}

Plugin struct is used to interact with the compute environment and create a Job Definition this is likely going to be moved to the CCAPI. When entering credentials, use the format of the compute provider. For example when using AWS Batch: "AWS_ACCESS_KEY_ID", "arn:aws:secretsmanager:us-east-1:01010101010:secret:mysecret:AWS_ACCESS_KEY_ID::

type PluginComputeEnvironment

type PluginComputeEnvironment struct {
	VCPU       string   `json:"vcpu" jsonschema:"title=Virtual CPUs"`
	Memory     string   `json:"memory" jsonschema:"title=Memory in MB"`
	ExtraHosts []string `json:"extraHosts" jsonschema:"title=Extra Hosts for the Docker API"`
}

type PluginComputeVolumes

type PluginComputeVolumes struct {
	Name         string `json:"name" jsonschema:"title=Name"`
	ResourceName string `json:"resource_name" jsonschema:"title=ResourceName,description=The vendor resource name, e.g. ARN for AWS"`
	ReadOnly     bool   `json:"read_only" jsonschema:"title=Read Only,description=Check to make this resource read-only"`
	MountPoint   string `json:"mount_point" jsonschema:"title=Mount Point,description=Path in the running container to mount the volume"` //default is "/data"
}

type PluginInputs

type PluginInputs struct {
	Environment       KeyValuePairs     `json:"environment,omitempty"`
	Parameters        map[string]string `json:"parameters,omitempty"`
	DataSources       []DataSource      `json:"data_sources,omitempty"`
	PayloadAttributes PayloadAttributes `json:"payload_attributes,omitempty"`
}

Job level inputs that can be injected into a container

type PluginRegistrationOutput

type PluginRegistrationOutput struct {
	Name         string `json:"name"`
	ResourceName string `json:"resourceName"`
	Revision     int32  `json:"revision"`
}

type ResourceRequirement

type ResourceRequirement struct {
	Type  ResourceType `json:"resource_type"`
	Value string       `json:"value"`
}

type ResourceType

type ResourceType string

type StreamingEventGenerator

type StreamingEventGenerator struct {
	// contains filtered or unexported fields
}

func NewStreamingEventGenerator

func NewStreamingEventGenerator(event Event, perEventLoopData []map[string]string, scanner *bufio.Scanner) (*StreamingEventGenerator, error)

func NewStreamingEventGeneratorForReader

func NewStreamingEventGeneratorForReader(event Event, perEventLoopData []map[string]string, reader io.Reader, delimiter string) (*StreamingEventGenerator, error)

func (*StreamingEventGenerator) NextEvent

func (seg *StreamingEventGenerator) NextEvent() (Event, bool, error)

type SubmitJobResult

type SubmitJobResult struct {

	//Vendor ID
	JobId *string

	//Vendor Resource Name
	ResourceName *string //ARN in AWS
}

Vendor job information

type TerminateJobFunction

type TerminateJobFunction func(output TerminateJobOutput)

function to process the results of each job termination

type TerminateJobInput

type TerminateJobInput struct {
	//Users reason for terminating the job
	Reason string

	//The Vendor Job Queue the job was submitted to
	JobQueue string

	//Optional. A jobs summary query that will generate a list of jobs to terminate
	//if this value is provided the JobSummaryQuery JobSummaryFunction is ignored
	//and should be left empty
	Query JobsSummaryQuery

	//Optional. A list of VendorJobs to terminate
	VendorJobs VendorJobs

	//Optional.  A function to process the results of each terminated job
	TerminateJobFunction TerminateJobFunction
}

Input for terminating jobs submitted to a queue. the list of jobs to terminate is determined by either a StatusQuery with each job in the status query being terminated or a list of VendorJobs for termination.

type TerminateJobOutput

type TerminateJobOutput struct {

	//CloudCompute Job Name
	JobName string

	//Error if returned from the terminate operation
	Err error

	//Vendor Job ID
	JobId string
}

type TopoSortable

type TopoSortable[T comparable] interface {
	Node() T
	Deps() []T
}

Interface for supporting Topological sort in Manifests (or other structs that would use a toposort)

type VendorJob

type VendorJob interface {
	ID() string
	Name() string
}

Vendor job data used for terminating jobs on the vendor's compute environment @TODO, why can't this be replaced by SubmitJobResult?

type VendorJobs

type VendorJobs []VendorJob

func (VendorJobs) IncludesJob

func (vj VendorJobs) IncludesJob(id string) bool

Directories

Path Synopsis
providers

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL