Documentation
¶
Index ¶
- Constants
- func TopologicalSort[T comparable](digraph map[T][]T) ([]T, error)
- type ArrayEventGenerator
- type BatchEvent
- type BatchEventGenerator
- type CcJobStore
- type CcMessageQueue
- type CloudCompute
- type ComputeManifest
- type ComputeManifests
- type ComputeProvider
- type ComputeProviderType
- type ConcurrentRunner
- type ContainerOverrides
- type Event
- type EventGenerator
- type EventGeneratorOld
- type EventList
- type EventProcessor
- type Job
- type JobLogOutput
- type JobNameParts
- type JobSummary
- type JobSummaryFunction
- type JobsLogQuery
- type JobsSummaryQuery
- type KeyValuePair
- type KeyValuePairs
- type LinuxDevice
- type LinuxParameters
- type MountPoint
- type PerEventLooper
- type Plugin
- type PluginComputeEnvironment
- type PluginComputeVolumes
- type PluginInputs
- type PluginRegistrationOutput
- type ResourceRequirement
- type ResourceType
- type StreamingEventGenerator
- type SubmitJobResult
- type TerminateJobFunction
- type TerminateJobInput
- type TerminateJobOutput
- type TopoSortable
- type VendorJob
- type VendorJobs
Constants ¶
const ( ResourceTypeGpu ResourceType = "GPU" ResourceTypeVcpu ResourceType = "VCPU" ResourceTypeMemory ResourceType = "MEMORY" ResourceTypeAttachedStorage ResourceType = "ATTACHEDSTORAGE" SUMMARY_COMPUTE string = "COMPUTE" SUMMARY_EVENT string = "EVENT" //SUMMARY_MANIFEST string = "MANIFEST" SUMMARY_JOB string = "JOB" AWSBATCH ComputeProviderType = 0 LOCALDOCKER ComputeProviderType = 1 KUBERNETES ComputeProviderType = 2 )
Variables ¶
This section is empty.
Functions ¶
func TopologicalSort ¶
func TopologicalSort[T comparable](digraph map[T][]T) ([]T, error)
Generic topological sort function. supports all types that implement 'comparable'
Types ¶
type ArrayEventGenerator ¶
type ArrayEventGenerator struct {
// contains filtered or unexported fields
}
func NewArrayEventGenerator ¶
type BatchEvent ¶
type BatchEvent struct {
EventIdentifier string `json:"eventIdentifier"`
ManifestOverrides ComputeManifest `json:"manifest"`
}
@TODO change this to TemplatedEvent? maybe just delete?
type BatchEventGenerator ¶
type BatchEventGenerator struct {
// contains filtered or unexported fields
}
func NewBatchEventGenerator ¶
func NewBatchEventGenerator(event Event, batch []BatchEvent) *BatchEventGenerator
type CcJobStore ¶
type CcMessageQueue ¶
type CloudCompute ¶
type CloudCompute struct {
//Compute Identifier
ID uuid.UUID `json:"id"`
//User friendly Name for the compute
Name string `json:"name"`
//JobQueue to push the events to
JobQueue string `json:"jobQueue"`
//Event generator
Events EventGenerator `json:"events"`
//compute provider for the compute (typically AwsBatchProvider)
ComputeProvider ComputeProvider `json:"computeProvider"`
//Job store is for saving the set of jobs being sent to the compute environment to a
//user defined storage location. Typically something like a relational database
JobStore CcJobStore
//Function that will postprocess an event after it is generated by the event generator
EventProcessor EventProcessor
}
CloudCompute is a compute submission for a single dag for a set of events The compute environment Job Queue and Job Definitions must exist before a CloudCompute can be initiated.
func (*CloudCompute) Cancel ¶
func (cc *CloudCompute) Cancel(reason string) error
Cancels jobs submitted to compute environment
func (*CloudCompute) Run ¶
func (cc *CloudCompute) Run() error
Runs a Compute on the ComputeProvider submitting jobs sequentially currently an event number of -1 represents an invalid event received from the event generator. These events are skipped
func (*CloudCompute) RunParallel ¶
func (cc *CloudCompute) RunParallel(concurrency int) error
Runs a Compute on the ComputeProvider submitting jobs in parallel based on the concurrency requested currently an event number of -1 represents an invalid event received from the event generator. These events are skipped
func (*CloudCompute) Status ¶
func (cc *CloudCompute) Status(query JobsSummaryQuery) error
Requests the status of a given compute at the COMPUTE, EVENT, or JOB level A JobSummaryFunction is necessary to process the status
type ComputeManifest ¶
type ComputeManifest struct {
ManifestName string `json:"manifest_name,omitempty" jsonschema:"title=Manifest Name,description=The name for the compute manifest"`
ManifestID uuid.UUID `json:"manifest_id,omitempty" jsonschema:"-"`
Command []string `json:"command,omitempty" jsonschema:"title=Command Override,description=An optional command override for the plugin"`
Dependencies []uuid.UUID `json:"dependencies,omitempty" jsonschema:"-"`
Stores []DataStore `json:"stores,omitempty" jsonschema:"title=Stores"`
Inputs PluginInputs `json:"inputs,omitempty" jsonschema:"title=Inputs"`
Outputs []DataSource `json:"outputs,omitempty" jsonschema:"title=Outputs"`
Actions []Action `json:"actions,omitempty" jsonschema:"title=Actions"`
PluginDefinition string `json:"plugin_definition,omitempty" jsonschema:"title=Plugin Definition"` //plugin resource name. "name:version"
Tags map[string]string `json:"tags,omitempty" jsonschema:"title=Tags"`
RetryAttemts int32 `json:"retry_attempts,omitempty" jsonschema:"title=Retry Attempts Override"`
JobTimeout int32 `json:"job_timeout,omitempty" jsonschema:"title=Job Timeout Override"`
ResourceRequirements []ResourceRequirement `json:"resource_requirements,omitempty" jsonschema:"title=Resource Requirement Overrides"`
// contains filtered or unexported fields
}
ComputeManifest is the information necessary to execute a single job in an event
func (ComputeManifest) Deps ¶
func (m ComputeManifest) Deps() []uuid.UUID
Manifest Deps sort function for a slice of uuid dependencies
func (*ComputeManifest) GetPayload ¶
func (cm *ComputeManifest) GetPayload() uuid.UUID
This is a transitional method that will be removed in a future version It is intended to facilitate running manifests written prior to payloadId @Depricated
func (ComputeManifest) Node ¶
func (m ComputeManifest) Node() uuid.UUID
Manifest Node sort function for uuid IDs
func (*ComputeManifest) WritePayload ¶
func (cm *ComputeManifest) WritePayload() error
writes a payload to the compute store
type ComputeManifests ¶
type ComputeManifests []ComputeManifest
func (*ComputeManifests) GetManifest ¶
func (cm *ComputeManifests) GetManifest(id uuid.UUID, deepcopy bool) (int, *ComputeManifest, error)
gets a reference to a manifest by manifest id in a slice of compute manifests. Optionally the caller can request a deep copy of the manifest method returns the index position of the manifest, a reference to the manifest or a copy of it, and any errors
func (*ComputeManifests) GetManifestByIndex ¶
func (cm *ComputeManifests) GetManifestByIndex(index int, deepcopy bool) (*ComputeManifest, error)
gets a reference to a manifest by manifest index in a slice of compute manifests. Optionally the caller can request a deep copy of the manifest method a reference to the manifest or a copy of it, and any errors
func (*ComputeManifests) Len ¶
func (cm *ComputeManifests) Len() int
type ComputeProvider ¶
type ComputeProvider interface {
SubmitJob(job *Job) error
TerminateJobs(input TerminateJobInput) error
Status(jobQueue string, query JobsSummaryQuery) error
JobLog(submittedJobId string, token *string) (JobLogOutput, error)
RegisterPlugin(plugin *Plugin) (PluginRegistrationOutput, error)
UnregisterPlugin(nameAndRevision string) error
}
Interface for a compute provider.
type ComputeProviderType ¶
type ComputeProviderType int
type ConcurrentRunner ¶
type ConcurrentRunner struct {
// contains filtered or unexported fields
}
func NewConcurrentRunner ¶
func NewConcurrentRunner(limit int) *ConcurrentRunner
func (*ConcurrentRunner) Run ¶
func (cr *ConcurrentRunner) Run(cf func())
func (*ConcurrentRunner) RunSequential ¶
func (cr *ConcurrentRunner) RunSequential(cf func())
func (*ConcurrentRunner) Wait ¶
func (cr *ConcurrentRunner) Wait()
type ContainerOverrides ¶
type ContainerOverrides struct {
Command []string
Environment KeyValuePairs
ResourceRequirements []ResourceRequirement
}
Overrides the container command or environment from the base values provided in the job description
type Event ¶
type Event struct {
ID uuid.UUID `json:"id"`
EventIdentifier string `json:"event"` //RULES ONLY NUMBERS, STRINGS, DASH, AND UNDERSCORE
Manifests []ComputeManifest `json:"manifests"`
AdditionalEventEnvVars KeyValuePairs
// contains filtered or unexported fields
}
EVENT is a single run through the DAG
func (*Event) AddManifest ¶
func (e *Event) AddManifest(m ComputeManifest)
Adds a manifest to the Event
func (*Event) AddManifestAt ¶
func (e *Event) AddManifestAt(m ComputeManifest, i int)
Adds a manifest at a specific ordinal position in the event.
func (*Event) SortManifests ¶
type EventGenerator ¶
EventGenerators provide an iterator type interface to work with sets of events for a Compute.
type EventGeneratorOld ¶
EventGenerators provide an iterator type interface to work with sets of events for a Compute. old Generator interface was too difficult to synchronize in a concurrent access scenario, so it was simplified below to a single call that can be managed within one mutex List type event generators will return new and separate manifests for each event All others (array, streaming) will return the same manifests but with a unique event identifier for each event which allows for shared payloads (write one and use for every event)
type EventList ¶
type EventList struct {
// contains filtered or unexported fields
}
EventList is an EventGenerator composed of a slice of events. Events are enumerated in the order they were placed in the slice.
type EventProcessor ¶
type Job ¶
type Job struct {
ID uuid.UUID
EventID uuid.UUID
ManifestID uuid.UUID
JobName string
JobQueue string
JobDefinition string
ContainerOverrides ContainerOverrides
DependsOn []string //compute provider dependencies
Parameters map[string]string
Tags map[string]string
RetryAttemts int32
JobTimeout int32 //duration in seconds
SubmittedJob *SubmitJobResult //reference to the job information from the compute environment
}
This is a single "job" or unit of compute for a ComputeProvider Essentually it is a mapping of a single Manifest
type JobLogOutput ¶
type JobNameParts ¶
func (*JobNameParts) Parse ¶
func (jnp *JobNameParts) Parse(jobname string) error
type JobSummary ¶
type JobSummary struct {
//identifier for the compute environment being used. e.g. AWS Batch Job ID
JobId string
//cloud compute job name
JobName string
//unix timestamp in milliseconds for when the job was created
CreatedAt *int64
//unix timestamp in milliseconds for when the job was started
StartedAt *int64
//status string value
Status string
//human readable string of the status
StatusDetail *string
//unix timestamp in milliseconds for when the job was stopped
StoppedAt *int64
//Compute Vendor resource name for the job. e.g. the Job ARN for AWS
ResourceName string
}
func (JobSummary) ID ¶
func (js JobSummary) ID() string
func (JobSummary) Name ¶
func (js JobSummary) Name() string
type JobSummaryFunction ¶
type JobSummaryFunction func(summaries []JobSummary)
function to process the results of a JobSummary request summaries are processed in batches of JobSummaries but will continue until all jobs are reported. In AWS this processes the slice of summaries for the initial request and all subsequenct continutation tokens
type JobsLogQuery ¶
type JobsLogQuery struct {
QueryLevel string
QueryValue JobNameParts
MatchValue string //regex to match and only print matching lines
JsScriptFunction any
}
type JobsSummaryQuery ¶
type JobsSummaryQuery struct {
//The Level to request. Must be one of three values
//COMPUTE/EVENT/MANIFEST as represented by the SUMMARY_{level} constants
QueryLevel string
//the GUIDs representing the referenced levels
//The value must include preceding levels, so COMPUTE level must have at least the compute GUID
//EVENT level must have the compuyte and event levels and MANIFEST level has all three guids
QueryValue JobNameParts
//a required function to process each job returned in the query
JobSummaryFunction JobSummaryFunction
}
type KeyValuePair ¶
type KeyValuePairs ¶
type KeyValuePairs []KeyValuePair
func MapToKeyValuePairs ¶
func MapToKeyValuePairs(mapdata map[string]string) KeyValuePairs
func (*KeyValuePairs) GetVal ¶
func (kvps *KeyValuePairs) GetVal(key string) string
designed to behave like a system env. Value returned if key is found empty string returned otherwise
func (*KeyValuePairs) HasKey ¶
func (kvps *KeyValuePairs) HasKey(key string) bool
func (*KeyValuePairs) Merge ¶
func (kvps *KeyValuePairs) Merge(newKvps *KeyValuePairs)
func (*KeyValuePairs) SetVal ¶
func (kvps *KeyValuePairs) SetVal(key string, val string)
type LinuxDevice ¶
type LinuxParameters ¶
type LinuxParameters struct {
Devices []LinuxDevice `json:"devices" jsonschema:"title=Devices"`
}
type MountPoint ¶
type PerEventLooper ¶
type PerEventLooper struct {
// contains filtered or unexported fields
}
func NewPerEventLooper ¶
func NewPerEventLooper(pel []map[string]string) *PerEventLooper
type Plugin ¶
type Plugin struct {
Name string `json:"name" jsonschema:"title=Name"`
//Revision string `json:"revision" yaml:"revision"`
ImageAndTag string `json:"image_and_tag" jsonschema:"title=Image and Tag,description=The docker image and tag"`
Description string `json:"description" jsonschema:"title=Description"`
Command []string `json:"command" jsonschema:"title=Command,description=The docker command and arguments to run"`
ComputeEnvironment PluginComputeEnvironment `json:"compute_environment" jsonschema:"title=Compute Environment,description=CPU and Memory runtime requirements"`
DefaultEnvironment KeyValuePairs `json:"environment" jsonschema:"title=Default Environment Variables,description=The list of default environment variables"` //default values for the container environment
Volumes []PluginComputeVolumes `` //@NOTE: currently not used
/* 128-byte string literal not displayed */
Credentials KeyValuePairs `` /* 213-byte string literal not displayed */
Parameters map[string]string `json:"parameters" jsonschema:"title=Parameters"` //@NOTE: currently not being used
RetryAttemts int32 `json:"retry_attempts" jsonschema:"title=Retry Attempts"`
ExecutionTimeout *int32 `json:"execution_timeout" jsonschema:"title=Execution Timeout (sec)"`
Privileged bool `json:"privileged" jsonschema:"title=Requires Privileged Execution"` //assign container privileged execution. for example to mount linux devices
LinuxParameters LinuxParameters `json:"linux_parameters" jsonschema:"title=Linux Parameters"`
MountPoints []MountPoint `json:"mountpoints" jsonschema:"title=MountPoints"` //@NOTE: currently not being used
}
Plugin struct is used to interact with the compute environment and create a Job Definition this is likely going to be moved to the CCAPI. When entering credentials, use the format of the compute provider. For example when using AWS Batch: "AWS_ACCESS_KEY_ID", "arn:aws:secretsmanager:us-east-1:01010101010:secret:mysecret:AWS_ACCESS_KEY_ID::
type PluginComputeVolumes ¶
type PluginComputeVolumes struct {
Name string `json:"name" jsonschema:"title=Name"`
ResourceName string `json:"resource_name" jsonschema:"title=ResourceName,description=The vendor resource name, e.g. ARN for AWS"`
ReadOnly bool `json:"read_only" jsonschema:"title=Read Only,description=Check to make this resource read-only"`
MountPoint string `json:"mount_point" jsonschema:"title=Mount Point,description=Path in the running container to mount the volume"` //default is "/data"
}
type PluginInputs ¶
type PluginInputs struct {
Environment KeyValuePairs `json:"environment,omitempty"`
Parameters map[string]string `json:"parameters,omitempty"`
DataSources []DataSource `json:"data_sources,omitempty"`
PayloadAttributes PayloadAttributes `json:"payload_attributes,omitempty"`
}
Job level inputs that can be injected into a container
type ResourceRequirement ¶
type ResourceRequirement struct {
Type ResourceType `json:"resource_type"`
Value string `json:"value"`
}
type ResourceType ¶
type ResourceType string
type StreamingEventGenerator ¶
type StreamingEventGenerator struct {
// contains filtered or unexported fields
}
type SubmitJobResult ¶
type SubmitJobResult struct {
//Vendor ID
JobId *string
//Vendor Resource Name
ResourceName *string //ARN in AWS
}
Vendor job information
type TerminateJobFunction ¶
type TerminateJobFunction func(output TerminateJobOutput)
function to process the results of each job termination
type TerminateJobInput ¶
type TerminateJobInput struct {
//Users reason for terminating the job
Reason string
//The Vendor Job Queue the job was submitted to
JobQueue string
//Optional. A jobs summary query that will generate a list of jobs to terminate
//if this value is provided the JobSummaryQuery JobSummaryFunction is ignored
//and should be left empty
Query JobsSummaryQuery
//Optional. A list of VendorJobs to terminate
VendorJobs VendorJobs
//Optional. A function to process the results of each terminated job
TerminateJobFunction TerminateJobFunction
}
Input for terminating jobs submitted to a queue. the list of jobs to terminate is determined by either a StatusQuery with each job in the status query being terminated or a list of VendorJobs for termination.
type TerminateJobOutput ¶
type TopoSortable ¶
type TopoSortable[T comparable] interface { Node() T Deps() []T }
Interface for supporting Topological sort in Manifests (or other structs that would use a toposort)
type VendorJob ¶
Vendor job data used for terminating jobs on the vendor's compute environment @TODO, why can't this be replaced by SubmitJobResult?
type VendorJobs ¶
type VendorJobs []VendorJob
func (VendorJobs) IncludesJob ¶
func (vj VendorJobs) IncludesJob(id string) bool