docker

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 15, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Copyright 2026 Teradata

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Package docker implements Docker-based execution backend with MCP server support.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Package docker implements trace collection from Docker containers.

Index

Constants

View Source
const (
	// BaggageKeyTenantID is the W3C baggage key for tenant identification.
	BaggageKeyTenantID = "tenant_id"
	// BaggageKeyOrgID is the W3C baggage key for organization identification.
	BaggageKeyOrgID = "org_id"
)

Variables

This section is empty.

Functions

func DefaultDockerSocketPaths

func DefaultDockerSocketPaths() []string

DefaultDockerSocketPaths returns the default Docker socket paths for the current platform. Can be overridden via LOOM_DOCKER_SOCKET_PATHS environment variable (comma-separated). Default: OrbStack on macOS (recommended for performance and resource efficiency).

Types

type ContainerScheduler

type ContainerScheduler interface {
	// Schedule decides where to run a container based on requirements.
	// Returns scheduling decision with node_id, reason, and cost estimate.
	//
	// For LocalScheduler: Always returns node_id="localhost"
	// For DistributedScheduler: Evaluates multiple nodes and selects best
	Schedule(ctx context.Context, req *loomv1.ScheduleRequest) (*loomv1.ScheduleDecision, error)

	// GetOrCreateContainer retrieves an existing container from the pool or creates a new one.
	// Handles container lifecycle: creation, health checks, rotation.
	//
	// Returns container ID and whether it was newly created.
	GetOrCreateContainer(ctx context.Context, req *loomv1.ContainerRequest) (string, bool, error)

	// ListContainers returns all containers managed by this scheduler.
	// Supports filtering by tenant, runtime type, labels, status.
	ListContainers(ctx context.Context, filters map[string]string) ([]*loomv1.Container, error)

	// RemoveContainer removes a container from the pool and deletes it.
	// Used for explicit cleanup, rotation, or when container becomes unhealthy.
	RemoveContainer(ctx context.Context, containerID string) error

	// GetNodeInfo retrieves information about a specific node.
	// For LocalScheduler: Returns localhost capacity/availability
	// For DistributedScheduler: Queries node metrics
	GetNodeInfo(ctx context.Context, nodeID string) (*loomv1.NodeInfo, error)

	// Close releases scheduler resources (connection pools, goroutines, etc.)
	Close() error
}

ContainerScheduler decides where to schedule containers. This interface enables future distributed scheduling across multiple Docker daemons without refactoring the Docker backend.

For v1.0: LocalScheduler schedules all containers to "localhost" (single daemon) For future: DistributedScheduler can schedule across nodes based on:

  • Resource availability (CPU, memory)
  • Data locality (Teradata node affinity)
  • Tenant isolation (dedicated nodes per tenant)
  • Cost optimization (spot instances, preemptible VMs)

type DockerBackend

type DockerBackend struct {
	// contains filtered or unexported fields
}

DockerBackend implements fabric.ExecutionBackend for Docker-based execution.

Enables running arbitrary code in isolated Docker containers:

  • Python scripts (with pip packages)
  • Node.js applications (with npm packages)
  • Custom binaries (Rust, Go, Ruby, etc.)
  • MCP servers (Teradata, GitHub, filesystem, etc.)

Features:

  • Container pooling and reuse (50-100ms warm start vs 1-3s cold start)
  • Resource limits (CPU, memory, storage, PIDs)
  • Security hardening (read-only rootfs, capability dropping, non-root)
  • Package caching (pip, npm) for fast installs
  • Container rotation (time-based and execution-based)
  • Health checks and auto-cleanup

Use Cases:

  1. **Python Agent**: Run Python code generated by LLM - Install packages: numpy, pandas, requests - Execute: data analysis, API calls, file processing

  2. **MCP Server Host**: Run MCP servers in containers - Teradata MCP: Database queries via JSON-RPC - GitHub MCP: GitHub API access - Filesystem MCP: Safe file operations

  3. **Multi-Language Toolchain**: Support non-SQL domains - Rust: Compile and run Rust code - Node.js: Execute JavaScript/TypeScript - Ruby: Run Ruby scripts

Thread Safety: All methods are thread-safe.

func NewDockerBackend

func NewDockerBackend(ctx context.Context, config DockerBackendConfig) (*DockerBackend, error)

NewDockerBackend creates a new Docker backend.

func (*DockerBackend) Capabilities

func (db *DockerBackend) Capabilities() *fabric.Capabilities

Capabilities implements fabric.ExecutionBackend.Capabilities.

func (*DockerBackend) Close

func (db *DockerBackend) Close() error

Close implements fabric.ExecutionBackend.Close.

func (*DockerBackend) ExecuteCustomOperation

func (db *DockerBackend) ExecuteCustomOperation(ctx context.Context, op string, params map[string]interface{}) (interface{}, error)

ExecuteCustomOperation implements fabric.ExecutionBackend.ExecuteCustomOperation.

Supported operations:

  • "install_packages": Install runtime-specific packages
  • "get_logs": Get container logs
  • "list_containers": List all managed containers

func (*DockerBackend) ExecuteQuery

func (db *DockerBackend) ExecuteQuery(ctx context.Context, query string) (*fabric.QueryResult, error)

ExecuteQuery implements fabric.ExecutionBackend.ExecuteQuery.

For Docker backend:

  • query: Command to execute (e.g., "python script.py", "node app.js")
  • result: Command output (stdout/stderr, exit code)

Query Format:

  • Simple command: "python -c 'print(2+2)'"
  • Script file: "python /workspace/script.py"
  • With args: "node app.js --input data.json"

Result Format:

  • Type: "command_output"
  • Data: {"stdout": "...", "stderr": "...", "exit_code": 0}
  • ExecutionStats: Duration, resource usage

func (*DockerBackend) GetMetadata

func (db *DockerBackend) GetMetadata(ctx context.Context, resource string) (map[string]interface{}, error)

GetMetadata implements fabric.ExecutionBackend.GetMetadata.

For Docker backend:

  • Returns runtime metadata (Python version, installed packages, etc.)

func (*DockerBackend) GetSchema

func (db *DockerBackend) GetSchema(ctx context.Context, resource string) (*fabric.Schema, error)

GetSchema implements fabric.ExecutionBackend.GetSchema.

For Docker backend:

  • resource: "python_packages", "node_packages", "container_info"
  • schema: Available packages, container configuration

func (*DockerBackend) ListResources

func (db *DockerBackend) ListResources(ctx context.Context, filters map[string]string) ([]fabric.Resource, error)

ListResources implements fabric.ExecutionBackend.ListResources.

For Docker backend:

  • Lists available resources (runtime_info, container_config)

func (*DockerBackend) Name

func (db *DockerBackend) Name() string

Name implements fabric.ExecutionBackend.Name.

func (*DockerBackend) Ping

func (db *DockerBackend) Ping(ctx context.Context) error

Ping implements fabric.ExecutionBackend.Ping.

type DockerBackendConfig

type DockerBackendConfig struct {
	// Config is the proto configuration
	Config *loomv1.DockerBackendConfig

	// DockerHost is the Docker daemon endpoint (default: "unix:///var/run/docker.sock")
	DockerHost string

	// Scheduler is the container scheduler (optional, created if nil)
	Scheduler ContainerScheduler

	// Logger is the zap logger (optional, created if nil)
	Logger *zap.Logger
}

DockerBackendConfig wraps proto config and adds Go-specific fields.

type DockerExecutor

type DockerExecutor struct {
	// contains filtered or unexported fields
}

DockerExecutor handles Docker container lifecycle and execution.

Responsibilities:

  • Container creation using runtime strategies
  • Command execution in containers
  • stdout/stderr capture
  • Container health checks
  • Container rotation (time-based and execution-based)
  • Resource cleanup

Container Lifecycle:

  1. GetOrCreateContainer() via scheduler
  2. If new container: CreateContainer() with runtime config
  3. Execute() command in container
  4. Check rotation criteria (executions >= max OR time >= interval)
  5. If rotation needed: RemoveContainer(), create new on next execution

Thread Safety: All methods are thread-safe.

func NewDockerExecutor

func NewDockerExecutor(ctx context.Context, config DockerExecutorConfig) (*DockerExecutor, error)

NewDockerExecutor creates a new DockerExecutor instance.

func (*DockerExecutor) Close

func (de *DockerExecutor) Close() error

Close releases executor resources.

func (*DockerExecutor) Execute

Execute executes a command in a Docker container.

Flow:

  1. GetOrCreateContainer() from scheduler
  2. If new container: CreateContainer() with runtime config
  3. StartContainer() if not running
  4. Execute command with stdin/stdout/stderr handling
  5. Check rotation criteria
  6. Return ExecuteResponse with output and metadata

func (*DockerExecutor) GetContainerLogs

func (de *DockerExecutor) GetContainerLogs(ctx context.Context, containerID string, tail int, timestamps bool) (string, error)

GetContainerLogs retrieves logs from a container.

func (*DockerExecutor) Health

func (de *DockerExecutor) Health(ctx context.Context) error

Health checks the health of the Docker executor and its dependencies.

type DockerExecutorConfig

type DockerExecutorConfig struct {
	// DockerHost is the Docker daemon endpoint (default: "unix:///var/run/docker.sock")
	DockerHost string

	// Scheduler is the container scheduler (required)
	Scheduler ContainerScheduler

	// Logger is the zap logger (required)
	Logger *zap.Logger

	// Tracer is the observability tracer (optional - if nil, tracing is disabled)
	Tracer observability.Tracer
}

DockerExecutorConfig configures DockerExecutor.

type FilteringWriter

type FilteringWriter struct {
	// contains filtered or unexported fields
}

FilteringWriter wraps an io.Writer and filters out trace lines before writing. This is used to remove __LOOM_TRACE__ lines from stderr output.

func NewFilteringWriter

func NewFilteringWriter(w io.Writer) *FilteringWriter

NewFilteringWriter creates a new filtering writer.

func (*FilteringWriter) Flush

func (fw *FilteringWriter) Flush() error

Flush writes any remaining buffered data (called at end of stream).

func (*FilteringWriter) Write

func (fw *FilteringWriter) Write(p []byte) (n int, err error)

Write implements io.Writer. It filters out lines starting with __LOOM_TRACE__.

type LocalScheduler

type LocalScheduler struct {
	// contains filtered or unexported fields
}

LocalScheduler schedules containers to a single local Docker daemon. All containers run on "localhost" (node_id="localhost").

Thread Safety: All methods are thread-safe (can be called from multiple goroutines).

Container Lifecycle:

  1. GetOrCreateContainer() -> Container created or retrieved from pool
  2. Container used for 0-1000 executions over 0-4 hours
  3. Rotation triggered by max_executions OR rotation_interval_hours
  4. Old container removed, new container created for next execution

Multi-Tenancy (Future):

  • Tenant-scoped container pools (tenant_id -> []containerID)
  • Resource accounting per tenant (CPU seconds, memory GB-hours)
  • Quota enforcement (reject if tenant exceeds quota)

Currently: All containers share a global pool (no tenant isolation)

func NewLocalScheduler

func NewLocalScheduler(ctx context.Context, config LocalSchedulerConfig) (*LocalScheduler, error)

NewLocalScheduler creates a new LocalScheduler instance.

func (*LocalScheduler) Close

func (ls *LocalScheduler) Close() error

Close implements ContainerScheduler.Close.

func (*LocalScheduler) GetNodeInfo

func (ls *LocalScheduler) GetNodeInfo(ctx context.Context, nodeID string) (*loomv1.NodeInfo, error)

GetNodeInfo implements ContainerScheduler.GetNodeInfo.

func (*LocalScheduler) GetOrCreateContainer

func (ls *LocalScheduler) GetOrCreateContainer(ctx context.Context, req *loomv1.ContainerRequest) (string, bool, error)

GetOrCreateContainer implements ContainerScheduler.GetOrCreateContainer.

func (*LocalScheduler) ListContainers

func (ls *LocalScheduler) ListContainers(ctx context.Context, filters map[string]string) ([]*loomv1.Container, error)

ListContainers implements ContainerScheduler.ListContainers.

func (*LocalScheduler) RemoveContainer

func (ls *LocalScheduler) RemoveContainer(ctx context.Context, containerID string) error

RemoveContainer implements ContainerScheduler.RemoveContainer.

func (*LocalScheduler) Schedule

Schedule implements ContainerScheduler.Schedule. For LocalScheduler: Always schedules to "localhost" with zero cost.

type LocalSchedulerConfig

type LocalSchedulerConfig struct {
	// DockerHost is the Docker daemon endpoint (default: "unix:///var/run/docker.sock")
	DockerHost string

	// SocketPaths is a list of Docker socket paths to try in order (default: platform-specific)
	// This allows configuration via LOOM_DOCKER_SOCKET_PATHS environment variable (comma-separated)
	SocketPaths []string

	// NodeID is the identifier for this node (default: "localhost")
	NodeID string

	// CleanupInterval is how often to run health checks and rotation (default: 5 minutes)
	CleanupInterval time.Duration

	// DefaultRotationInterval is the default rotation interval if not specified in config (default: 4 hours)
	DefaultRotationInterval time.Duration

	// DefaultMaxExecutions is the default max executions before rotation (default: 1000)
	DefaultMaxExecutions int32

	// Logger is the zap logger (optional, created if nil)
	Logger *zap.Logger
}

LocalSchedulerConfig configures LocalScheduler.

type MCPManagerConfig

type MCPManagerConfig struct {
	Executor *DockerExecutor
	Logger   *zap.Logger
}

MCPManagerConfig configures the MCP server manager.

type MCPServerInfo

type MCPServerInfo struct {
	Name         string
	ContainerID  string
	Healthy      bool
	CreatedAt    time.Time
	RestartCount int
	LastRestart  time.Time
}

MCPServerInfo provides information about a managed MCP server.

func (MCPServerInfo) String

func (info MCPServerInfo) String() string

String implements fmt.Stringer for MCPServerInfo.

type MCPServerManager

type MCPServerManager struct {
	// contains filtered or unexported fields
}

MCPServerManager manages MCP servers running inside Docker containers.

Unlike the standard pkg/mcp/manager.Manager which runs MCP servers as host processes, this manager runs MCP servers inside Docker containers for: - Isolation (separate Python/Node environments per MCP server) - Observability (trace propagation into MCP server execution) - Resource limits (CPU, memory constraints per MCP server) - Security (sandboxed execution of untrusted MCP servers)

func NewMCPServerManager

func NewMCPServerManager(config MCPManagerConfig) (*MCPServerManager, error)

NewMCPServerManager creates a new Docker-based MCP server manager.

func (*MCPServerManager) Close

func (msm *MCPServerManager) Close(ctx context.Context) error

Close stops all MCP servers and cleans up resources.

func (*MCPServerManager) GetServerInfo

func (msm *MCPServerManager) GetServerInfo(serverName string) (*MCPServerInfo, error)

GetServerInfo returns information about a managed MCP server.

func (*MCPServerManager) HealthCheck

func (msm *MCPServerManager) HealthCheck(ctx context.Context, serverName string) error

HealthCheck checks the health of an MCP server by pinging it.

func (*MCPServerManager) InvokeTool

func (msm *MCPServerManager) InvokeTool(
	ctx context.Context,
	serverName string,
	toolName string,
	arguments map[string]interface{},
) (*protocol.CallToolResult, error)

InvokeTool invokes a tool on an MCP server running in a Docker container.

This method: 1. Looks up the managed MCP server by name 2. Validates the tool exists 3. Calls the tool via JSON-RPC over stdin/stdout 4. Returns the result

Example:

result, err := manager.InvokeTool(ctx, "teradata", "query", map[string]interface{}{
    "sql": "SELECT * FROM users LIMIT 10",
})

func (*MCPServerManager) ListMCPServers

func (msm *MCPServerManager) ListMCPServers() []MCPServerInfo

ListMCPServers returns information about all managed MCP servers.

func (*MCPServerManager) ListTools

func (msm *MCPServerManager) ListTools(ctx context.Context, serverName string) ([]protocol.Tool, error)

ListTools returns all available tools from an MCP server.

func (*MCPServerManager) StartMCPServer

func (msm *MCPServerManager) StartMCPServer(
	ctx context.Context,
	serverName string,
	mcpConfig *loomv1.MCPServerConfig,
	runtimeType loomv1.RuntimeType,
	dockerConfig *loomv1.DockerBackendConfig,
) error

StartMCPServer starts an MCP server inside a Docker container.

The MCP server runs as a long-lived process inside the container, with stdio transport for JSON-RPC communication. The server is kept alive for multiple tool invocations.

Example:

config := &loomv1.MCPServerConfig{
    Enabled: true,
    Transport: "stdio",
    Command: "python",
    Args: []string{"-m", "vantage_mcp.server"},
    Env: map[string]string{
        "TD_USER": "dbc",
        "TD_HOST": "vantage.teradata.com",
    },
}
err := manager.StartMCPServer(ctx, "teradata", config, runtimeType, dockerConfig)

func (*MCPServerManager) StopMCPServer

func (msm *MCPServerManager) StopMCPServer(ctx context.Context, serverName string) error

StopMCPServer stops an MCP server and cleans up its container.

type TraceCollector

type TraceCollector struct {
	// contains filtered or unexported fields
}

TraceCollector collects trace spans from container stderr output.

Container-side trace libraries (loom_trace.py, loom-trace.js) write spans to stderr with the prefix "__LOOM_TRACE__:". This collector parses those lines, deserializes the spans, and forwards them to the host tracer for export to Hawk.

Thread Safety: Safe for concurrent use (can read from multiple containers).

func NewTraceCollector

func NewTraceCollector(config TraceCollectorConfig) (*TraceCollector, error)

NewTraceCollector creates a new trace collector.

func (*TraceCollector) CollectFromReader

func (tc *TraceCollector) CollectFromReader(ctx context.Context, reader io.Reader, containerID string) error

CollectFromReader reads from an io.Reader (typically container stderr) and collects trace spans written by container-side trace libraries.

This method blocks until EOF or context cancellation. It's designed to be run in a goroutine per container execution.

Format: __LOOM_TRACE__:{"trace_id":"...","span_id":"...","name":"..."}

Example:

go collector.CollectFromReader(ctx, stderrPipe, containerID)

func (*TraceCollector) GetStats

func (tc *TraceCollector) GetStats() (spansCollected int64, parseErrors int64)

GetStats returns collector statistics.

func (*TraceCollector) Reset

func (tc *TraceCollector) Reset()

Reset resets collector statistics (for testing).

type TraceCollectorConfig

type TraceCollectorConfig struct {
	Tracer observability.Tracer
	Logger *zap.Logger
}

TraceCollectorConfig configures the trace collector.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL