docker

package
v0.0.0-...-bb46c97 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2026 License: MIT Imports: 17 Imported by: 0

README

Cloud Compute Docker Provider

The Cloud Compute Docker Provider is a local development implementation that allows running compute jobs using Docker containers. It's designed to provide a lightweight, local alternative to cloud-based compute providers like AWS Batch, making it ideal for development, testing, and local execution of compute jobs.

Overview

The Docker compute provider enables developers to run compute jobs locally using Docker containers. It supports:

  • Concurrent execution of jobs with configurable limits
  • Plugin-based job definitions using Docker images
  • Job dependency management
  • Job monitoring and status tracking
  • Secrets management for secure environment variables
  • Integration with the Cloud Compute framework's standard interfaces

Architecture

The Docker compute provider is built on several core components:

  1. DockerComputeProvider: The main interface that implements the ComputeProvider interface
  2. DockerComputeManager: Manages job queuing, execution, and concurrency control
  3. DockerJobRunner: Executes individual jobs using Docker
  4. PluginRegistry: Stores and retrieves job definitions (plugins)
  5. SecretsManager: Handles secure environment variable management

Usage

Creating a Docker Compute Provider
// Create a provider with default configuration
computeProvider := NewDockerComputeProvider(DockerComputeProviderConfig{})

// Create a provider with custom concurrency
computeProvider := NewDockerComputeProvider(DockerComputeProviderConfig{
    Concurrency: 4,
})

// Create a provider with a secrets manager
sm := NewSecretManager("")
sm.AddSecret("arn:local:secretsmanager:secret:prod/CloudCompute/ASDFASDF:AWS_ACCESS_KEY_ID::", "ASFASDFASDFASDF")
sm.AddSecret("arn:local:secretsmanager:secret:prod/CloudCompute/ASDFASDF:AWS_SECRET_ACCESS_KEY::", "ASDFFASDFASDFASDF")

computeProvider2 := NewDockerComputeProvider(DockerComputeProviderConfig{
    Concurrency:    1,
    SecretsManager: sm,
})
Registering Plugins

Plugins define the Docker images and execution parameters for jobs:

// Register a plugin
plugin := &Plugin{
    Name:        "TestPlugin",
    ImageAndTag: "hello-world:latest",
    Command:     []string{"/hello"},
    ComputeEnvironment: PluginComputeEnvironment{
        VCPU:   "1",
        Memory: "512",
    },
}

_, err := provider.RegisterPlugin(plugin)
Submitting Jobs

Jobs are submitted using the SubmitJobs method:

// Create a job
job := &Job{
    ID:            uuid.New(),
    EventID:       uuid.New(),
    ManifestID:    manifestId,
    PayloadID:     uuid.New(),
    JobName:       "test-job",
    JobQueue:      "docker-local",
    JobDefinition: "TestPlugin",
    ContainerOverrides: ContainerOverrides{
        Environment: KeyValuePairs{
            {Name: "TEST_VAR", Value: "test_value"},
        },
    },
}

// Submit the job
input := SubmitJobsInput{
    Jobs: []*Job{job},
}

err := provider.SubmitJobs(input)
Job Dependencies

Jobs can depend on other jobs using the DependsOn field:

job2 := &Job{
    ID:            uuid.New(),
    EventID:       uuid.New(),
    ManifestID:    manifestId2,
    PayloadID:     uuid.New(),
    JobName:       "test-job-2",
    JobQueue:      "docker-local",
    JobDefinition: "TestPlugin2",
    DependsOn:     []string{manifestId1.String()}, // job2 depends on job1
    ContainerOverrides: ContainerOverrides{
        Environment: KeyValuePairs{
            {Name: "TEST_VAR", Value: "test_value_2"},
        },
    },
}

Configuration Options

The DockerComputeProviderConfig struct accepts the following configuration options:

  • Concurrency: Number of concurrent containers to allow on the host (default: 1)
  • StartMonitor: If greater than 0, starts a monitor job with the specified interval in seconds
  • MonitorFunction: Optional custom monitor function (default prints to StdOut)
  • SecretsManager: Optional in-memory secrets manager for secure environment variables
  • DockerPullProgressFactory: Optional factory for Docker pull progress UI instances

Features

Concurrency Control

The provider supports configurable concurrency limits to control how many Docker containers run simultaneously. This helps manage system resources and prevent overloading.

Job Monitoring

A built-in monitoring system can track job status and provide periodic updates. You can customize the monitoring interval and function.

Secrets Management

The provider supports secrets management for secure environment variable handling, allowing you to inject sensitive information into containers.

Plugin System

Plugins define the Docker images and execution parameters for jobs, enabling flexible job definitions that can be reused across multiple jobs.

Dependency Management

Jobs can specify dependencies on other jobs, enabling complex workflows with proper execution ordering.

Methods

SubmitJobs

Submits one or more jobs for execution. The jobs are queued and executed according to the configured concurrency limit.

TerminateJobs

Terminates jobs based on a query or list of job identifiers.

Status

Retrieves job summaries based on a query, providing information about job status and execution.

JobLog

Retrieves logs for a specific job (currently not implemented in Docker provider).

RegisterPlugin

Registers a plugin definition that can be referenced by jobs.

UnregisterPlugin

Removes a plugin from the registry.

Example Usage

package main

import (
    "github.com/usace-cloud-compute/cloudcompute"
    "github.com/google/uuid"
)

func main() {
    // Create a Docker compute provider with concurrency of 2
    provider := cloudcompute.NewDockerComputeProvider(cloudcompute.DockerComputeProviderConfig{
        Concurrency: 2,
    })

    // Register a plugin
    plugin := &cloudcompute.Plugin{
        Name:        "HelloWorldPlugin",
        ImageAndTag: "hello-world:latest",
        Command:     []string{"/hello"},
        ComputeEnvironment: cloudcompute.PluginComputeEnvironment{
            VCPU:   "1",
            Memory: "512",
        },
    }

    _, err := provider.RegisterPlugin(plugin)
    if err != nil {
        panic(err)
    }

    // Create and submit a job
    manifestId := uuid.New()
    job := &cloudcompute.Job{
        ID:            uuid.New(),
        EventID:       uuid.New(),
        ManifestID:    manifestId,
        PayloadID:     uuid.New(),
        JobName:       "hello-world-job",
        JobQueue:      "docker-local",
        JobDefinition: "HelloWorldPlugin",
        ContainerOverrides: cloudcompute.ContainerOverrides{
            Environment: cloudcompute.KeyValuePairs{
                {Name: "ENV_VAR", Value: "value"},
            },
        },
    }

    input := cloudcompute.SubmitJobsInput{
        Jobs: []*cloudcompute.Job{job},
    }

    err = provider.SubmitJobs(input)
    if err != nil {
        panic(err)
    }
}

Testing

The Docker compute provider includes comprehensive tests covering:

  • Provider creation and configuration
  • Job submission with and without dependencies
  • Plugin registration
  • Job termination
  • Status queries
  • Concurrent job execution

Tests can be run using standard Go testing commands:

cd providers/docker
go test -v

Limitations

  • This provider is designed for local development and testing
  • Not suitable for production workloads
  • Limited to Docker container execution
  • Some methods (like JobLog) are not fully implemented

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func KvpToEnv

func KvpToEnv(kvp []KeyValuePair) []string

Types

type DockerComputeManager

type DockerComputeManager struct {
	// contains filtered or unexported fields
}

func NewDockerComputeManager

func NewDockerComputeManager(config DockerComputeManagerConfig) *DockerComputeManager

func (*DockerComputeManager) AddJob

func (dcm *DockerComputeManager) AddJob(djob *DockerJob)

func (*DockerComputeManager) StartMonitor

func (dcm *DockerComputeManager) StartMonitor(interval int, monitorFunc MonitorFunction)

func (*DockerComputeManager) TerminateJobs

func (dcm *DockerComputeManager) TerminateJobs(input TerminateJobInput)

type DockerComputeManagerConfig

type DockerComputeManagerConfig struct {
	Concurrency               int
	DockerPullProgressFactory DockerPullProgressFactory
}

type DockerComputeProvider

type DockerComputeProvider struct {
	// contains filtered or unexported fields
}

func NewDockerComputeProvider

func NewDockerComputeProvider(config DockerComputeProviderConfig) *DockerComputeProvider

NewDockerComputeProvider creates a new DockerComputeProvider based on the given configuration.

func (*DockerComputeProvider) JobLog

func (dcp *DockerComputeProvider) JobLog(input JobLogInput) (JobLogOutput, error)

func (*DockerComputeProvider) RegisterPlugin

func (dcp *DockerComputeProvider) RegisterPlugin(plugin *Plugin) (PluginRegistrationOutput, error)

RegisterPlugin registers a plugin with the DockerComputeProvider.

func (*DockerComputeProvider) Status

func (dcp *DockerComputeProvider) Status(jobQueue string, query JobsSummaryQuery) error

Status retrieves and returns job summaries based on the query.

func (*DockerComputeProvider) SubmitJobs

func (dcp *DockerComputeProvider) SubmitJobs(event SubmitJobsInput) error

SubmitJob submits a job for execution.

func (*DockerComputeProvider) TerminateJobs

func (dcp *DockerComputeProvider) TerminateJobs(input TerminateJobInput) error

TerminateJobs terminates jobs based on the provided input.

func (*DockerComputeProvider) UnregisterPlugin

func (dcp *DockerComputeProvider) UnregisterPlugin(nameAndRevision string) error

type DockerComputeProviderConfig

type DockerComputeProviderConfig struct {
	//number of concurrent containers to allow on the host
	Concurrency int

	//starts a monitor job which provides job summary counts similar to the AWS Batch dashboard.
	//If the value provided is greater than 0, the monitor will start using the value as a monitoring interval in seconds
	StartMonitor int

	//optional monitor function.  The default monitor will print the status counts to StdOut
	MonitorFunction MonitorFunction

	//optional in memory secrets manager if a secrets mock is necessary
	SecretsManager SecretsManager

	//docker pull progress ui instance generator
	DockerPullProgressFactory DockerPullProgressFactory
}

DockerComputeProviderConfig holds the configuration for DockerComputeProvider.

type DockerEvent

type DockerEvent struct {
	ID             string `json:"id"`
	Status         string `json:"status"`
	Error          string `json:"error"`
	Progress       string `json:"progress"`
	ProgressDetail struct {
		Current int64 `json:"current"`
		Total   int64 `json:"total"`
	} `json:"progressDetail"`
}

type DockerJob

type DockerJob struct {
	Job    *Job
	Plugin *Plugin
	Status JobStatus
	Log    string

	SecretsManager SecretsManager
	// contains filtered or unexported fields
}

type DockerJobRunner

type DockerJobRunner struct {
	// contains filtered or unexported fields
}

func NewRunner

func NewRunner(job *DockerJob) (*DockerJobRunner, error)

func (*DockerJobRunner) Close

func (dr *DockerJobRunner) Close()

func (*DockerJobRunner) Run

func (dr *DockerJobRunner) Run() error

func (*DockerJobRunner) Terminate

func (dr *DockerJobRunner) Terminate() error

type DockerMonitorWriter

type DockerMonitorWriter struct {
	// contains filtered or unexported fields
}

Custom writer that formats output with job ID

func (*DockerMonitorWriter) Write

func (cw *DockerMonitorWriter) Write(p []byte) (n int, err error)

type DockerPullProgress

type DockerPullProgress interface {
	Update(msg DockerEvent)
	Close()
}

type DockerPullProgressFactory

type DockerPullProgressFactory interface {
	New() DockerPullProgress
}

type DockerRunMonitor

type DockerRunMonitor struct {
	// contains filtered or unexported fields
}

func NewDockerRunMonitor

func NewDockerRunMonitor(dr *DockerJobRunner, containerId string) *DockerRunMonitor

func (*DockerRunMonitor) Wait

func (drm *DockerRunMonitor) Wait()

type EnvironmentSecretManager

type EnvironmentSecretManager struct {
	// contains filtered or unexported fields
}

func (*EnvironmentSecretManager) AddSecret

func (sm *EnvironmentSecretManager) AddSecret(key string, val string)

func (*EnvironmentSecretManager) GetSecret

func (sm *EnvironmentSecretManager) GetSecret(key string) string

type InMemoryJobQueue

type InMemoryJobQueue struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func (*InMemoryJobQueue) Add

func (jq *InMemoryJobQueue) Add(job *DockerJob)

func (*InMemoryJobQueue) GetJob

func (jq *InMemoryJobQueue) GetJob(id uuid.UUID) *DockerJob

func (*InMemoryJobQueue) GetNextRunnable

func (jq *InMemoryJobQueue) GetNextRunnable() *DockerJob

func (*InMemoryJobQueue) Jobs

func (jq *InMemoryJobQueue) Jobs(statuses ...JobStatus) []*DockerJob

func (*InMemoryJobQueue) UpdateJobs

func (jq *InMemoryJobQueue) UpdateJobs() []uuid.UUID

type InMemoryPluginRegistry

type InMemoryPluginRegistry struct {
	// contains filtered or unexported fields
}

func (*InMemoryPluginRegistry) Deregister

func (pr *InMemoryPluginRegistry) Deregister(name string) error

func (*InMemoryPluginRegistry) Get

func (pr *InMemoryPluginRegistry) Get(name string) (*Plugin, error)

func (*InMemoryPluginRegistry) Register

func (pr *InMemoryPluginRegistry) Register(plugin *Plugin) error

type InMemorySecretsManager

type InMemorySecretsManager struct {
	// contains filtered or unexported fields
}

func (*InMemorySecretsManager) AddSecret

func (sm *InMemorySecretsManager) AddSecret(key string, val string)

func (*InMemorySecretsManager) GetSecret

func (sm *InMemorySecretsManager) GetSecret(key string) string

type JobDeps

type JobDeps []*DockerJob

type JobQueue

type JobQueue interface {
	Add(job *DockerJob)

	//Gets the next runable job and sets the status to Starting
	GetNextRunnable() *DockerJob

	//Gets all jobs matching the set of statuses
	//Calling with no status argument will return all jobs
	Jobs(statuses ...JobStatus) []*DockerJob

	//Get a job by ID
	GetJob(jobId uuid.UUID) *DockerJob

	UpdateJobs() []uuid.UUID
}

func NewInMemoryJobQueue

func NewInMemoryJobQueue() JobQueue

type JobStatus

type JobStatus string
const (
	Submitted JobStatus = "SUBMITTED"
	Pending   JobStatus = "PENDING"
	Runnable  JobStatus = "RUNNABLE"
	Starting  JobStatus = "STARTING"
	Running   JobStatus = "RUNNING"
	Succeeded JobStatus = "SUCCEEDED"
	Failed    JobStatus = "FAILED"
)

type MonitorFunction

type MonitorFunction func(statuses map[JobStatus]int)

type PluginRegistry

type PluginRegistry interface {
	Register(plugin *Plugin) error
	Deregister(name string) error
	Get(name string) (*Plugin, error)
}

func NewInMemoryPluginRegistry

func NewInMemoryPluginRegistry() PluginRegistry

type SecretsManager

type SecretsManager interface {
	AddSecret(key string, val string)
	GetSecret(key string) string
}

func NewEnvironmentSecretsManager

func NewEnvironmentSecretsManager() SecretsManager

func NewInMemorySecretsManager

func NewInMemorySecretsManager(key string) SecretsManager

create a new in memory secret manager key is reserved for adding an encryption key should it be added later

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL