Documentation
¶
Overview ¶
Copyright 2026 Teradata
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Package docker implements Docker-based execution backend with MCP server support.
Copyright 2026 Teradata ¶
Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Package docker implements trace collection from Docker containers.
Index ¶
- Constants
- func DefaultDockerSocketPaths() []string
- type ContainerScheduler
- type DockerBackend
- func (db *DockerBackend) Capabilities() *fabric.Capabilities
- func (db *DockerBackend) Close() error
- func (db *DockerBackend) ExecuteCustomOperation(ctx context.Context, op string, params map[string]interface{}) (interface{}, error)
- func (db *DockerBackend) ExecuteQuery(ctx context.Context, query string) (*fabric.QueryResult, error)
- func (db *DockerBackend) GetMetadata(ctx context.Context, resource string) (map[string]interface{}, error)
- func (db *DockerBackend) GetSchema(ctx context.Context, resource string) (*fabric.Schema, error)
- func (db *DockerBackend) ListResources(ctx context.Context, filters map[string]string) ([]fabric.Resource, error)
- func (db *DockerBackend) Name() string
- func (db *DockerBackend) Ping(ctx context.Context) error
- type DockerBackendConfig
- type DockerExecutor
- func (de *DockerExecutor) Close() error
- func (de *DockerExecutor) Execute(ctx context.Context, req *loomv1.ExecuteRequest) (*loomv1.ExecuteResponse, error)
- func (de *DockerExecutor) GetContainerLogs(ctx context.Context, containerID string, tail int, timestamps bool) (string, error)
- func (de *DockerExecutor) Health(ctx context.Context) error
- type DockerExecutorConfig
- type FilteringWriter
- type LocalScheduler
- func (ls *LocalScheduler) Close() error
- func (ls *LocalScheduler) GetNodeInfo(ctx context.Context, nodeID string) (*loomv1.NodeInfo, error)
- func (ls *LocalScheduler) GetOrCreateContainer(ctx context.Context, req *loomv1.ContainerRequest) (string, bool, error)
- func (ls *LocalScheduler) ListContainers(ctx context.Context, filters map[string]string) ([]*loomv1.Container, error)
- func (ls *LocalScheduler) RemoveContainer(ctx context.Context, containerID string) error
- func (ls *LocalScheduler) Schedule(ctx context.Context, req *loomv1.ScheduleRequest) (*loomv1.ScheduleDecision, error)
- type LocalSchedulerConfig
- type MCPManagerConfig
- type MCPServerInfo
- type MCPServerManager
- func (msm *MCPServerManager) Close(ctx context.Context) error
- func (msm *MCPServerManager) GetServerInfo(serverName string) (*MCPServerInfo, error)
- func (msm *MCPServerManager) HealthCheck(ctx context.Context, serverName string) error
- func (msm *MCPServerManager) InvokeTool(ctx context.Context, serverName string, toolName string, ...) (*protocol.CallToolResult, error)
- func (msm *MCPServerManager) ListMCPServers() []MCPServerInfo
- func (msm *MCPServerManager) ListTools(ctx context.Context, serverName string) ([]protocol.Tool, error)
- func (msm *MCPServerManager) StartMCPServer(ctx context.Context, serverName string, mcpConfig *loomv1.MCPServerConfig, ...) error
- func (msm *MCPServerManager) StopMCPServer(ctx context.Context, serverName string) error
- type TraceCollector
- type TraceCollectorConfig
Constants ¶
const ( // BaggageKeyTenantID is the W3C baggage key for tenant identification. BaggageKeyTenantID = "tenant_id" // BaggageKeyOrgID is the W3C baggage key for organization identification. BaggageKeyOrgID = "org_id" )
Variables ¶
This section is empty.
Functions ¶
func DefaultDockerSocketPaths ¶
func DefaultDockerSocketPaths() []string
DefaultDockerSocketPaths returns the default Docker socket paths for the current platform. Can be overridden via LOOM_DOCKER_SOCKET_PATHS environment variable (comma-separated). Default: OrbStack on macOS (recommended for performance and resource efficiency).
Types ¶
type ContainerScheduler ¶
type ContainerScheduler interface {
// Schedule decides where to run a container based on requirements.
// Returns scheduling decision with node_id, reason, and cost estimate.
//
// For LocalScheduler: Always returns node_id="localhost"
// For DistributedScheduler: Evaluates multiple nodes and selects best
Schedule(ctx context.Context, req *loomv1.ScheduleRequest) (*loomv1.ScheduleDecision, error)
// GetOrCreateContainer retrieves an existing container from the pool or creates a new one.
// Handles container lifecycle: creation, health checks, rotation.
//
// Returns container ID and whether it was newly created.
GetOrCreateContainer(ctx context.Context, req *loomv1.ContainerRequest) (string, bool, error)
// ListContainers returns all containers managed by this scheduler.
// Supports filtering by tenant, runtime type, labels, status.
ListContainers(ctx context.Context, filters map[string]string) ([]*loomv1.Container, error)
// RemoveContainer removes a container from the pool and deletes it.
// Used for explicit cleanup, rotation, or when container becomes unhealthy.
RemoveContainer(ctx context.Context, containerID string) error
// GetNodeInfo retrieves information about a specific node.
// For LocalScheduler: Returns localhost capacity/availability
// For DistributedScheduler: Queries node metrics
GetNodeInfo(ctx context.Context, nodeID string) (*loomv1.NodeInfo, error)
// Close releases scheduler resources (connection pools, goroutines, etc.)
Close() error
}
ContainerScheduler decides where to schedule containers. This interface enables future distributed scheduling across multiple Docker daemons without refactoring the Docker backend.
For v1.0: LocalScheduler schedules all containers to "localhost" (single daemon) For future: DistributedScheduler can schedule across nodes based on:
- Resource availability (CPU, memory)
- Data locality (Teradata node affinity)
- Tenant isolation (dedicated nodes per tenant)
- Cost optimization (spot instances, preemptible VMs)
type DockerBackend ¶
type DockerBackend struct {
// contains filtered or unexported fields
}
DockerBackend implements fabric.ExecutionBackend for Docker-based execution.
Enables running arbitrary code in isolated Docker containers:
- Python scripts (with pip packages)
- Node.js applications (with npm packages)
- Custom binaries (Rust, Go, Ruby, etc.)
- MCP servers (Teradata, GitHub, filesystem, etc.)
Features:
- Container pooling and reuse (50-100ms warm start vs 1-3s cold start)
- Resource limits (CPU, memory, storage, PIDs)
- Security hardening (read-only rootfs, capability dropping, non-root)
- Package caching (pip, npm) for fast installs
- Container rotation (time-based and execution-based)
- Health checks and auto-cleanup
Use Cases:
**Python Agent**: Run Python code generated by LLM - Install packages: numpy, pandas, requests - Execute: data analysis, API calls, file processing
**MCP Server Host**: Run MCP servers in containers - Teradata MCP: Database queries via JSON-RPC - GitHub MCP: GitHub API access - Filesystem MCP: Safe file operations
**Multi-Language Toolchain**: Support non-SQL domains - Rust: Compile and run Rust code - Node.js: Execute JavaScript/TypeScript - Ruby: Run Ruby scripts
Thread Safety: All methods are thread-safe.
func NewDockerBackend ¶
func NewDockerBackend(ctx context.Context, config DockerBackendConfig) (*DockerBackend, error)
NewDockerBackend creates a new Docker backend.
func (*DockerBackend) Capabilities ¶
func (db *DockerBackend) Capabilities() *fabric.Capabilities
Capabilities implements fabric.ExecutionBackend.Capabilities.
func (*DockerBackend) Close ¶
func (db *DockerBackend) Close() error
Close implements fabric.ExecutionBackend.Close.
func (*DockerBackend) ExecuteCustomOperation ¶
func (db *DockerBackend) ExecuteCustomOperation(ctx context.Context, op string, params map[string]interface{}) (interface{}, error)
ExecuteCustomOperation implements fabric.ExecutionBackend.ExecuteCustomOperation.
Supported operations:
- "install_packages": Install runtime-specific packages
- "get_logs": Get container logs
- "list_containers": List all managed containers
func (*DockerBackend) ExecuteQuery ¶
func (db *DockerBackend) ExecuteQuery(ctx context.Context, query string) (*fabric.QueryResult, error)
ExecuteQuery implements fabric.ExecutionBackend.ExecuteQuery.
For Docker backend:
- query: Command to execute (e.g., "python script.py", "node app.js")
- result: Command output (stdout/stderr, exit code)
Query Format:
- Simple command: "python -c 'print(2+2)'"
- Script file: "python /workspace/script.py"
- With args: "node app.js --input data.json"
Result Format:
- Type: "command_output"
- Data: {"stdout": "...", "stderr": "...", "exit_code": 0}
- ExecutionStats: Duration, resource usage
func (*DockerBackend) GetMetadata ¶
func (db *DockerBackend) GetMetadata(ctx context.Context, resource string) (map[string]interface{}, error)
GetMetadata implements fabric.ExecutionBackend.GetMetadata.
For Docker backend:
- Returns runtime metadata (Python version, installed packages, etc.)
func (*DockerBackend) GetSchema ¶
GetSchema implements fabric.ExecutionBackend.GetSchema.
For Docker backend:
- resource: "python_packages", "node_packages", "container_info"
- schema: Available packages, container configuration
func (*DockerBackend) ListResources ¶
func (db *DockerBackend) ListResources(ctx context.Context, filters map[string]string) ([]fabric.Resource, error)
ListResources implements fabric.ExecutionBackend.ListResources.
For Docker backend:
- Lists available resources (runtime_info, container_config)
func (*DockerBackend) Name ¶
func (db *DockerBackend) Name() string
Name implements fabric.ExecutionBackend.Name.
type DockerBackendConfig ¶
type DockerBackendConfig struct {
// Config is the proto configuration
Config *loomv1.DockerBackendConfig
// DockerHost is the Docker daemon endpoint (default: "unix:///var/run/docker.sock")
DockerHost string
// Scheduler is the container scheduler (optional, created if nil)
Scheduler ContainerScheduler
// Logger is the zap logger (optional, created if nil)
Logger *zap.Logger
}
DockerBackendConfig wraps proto config and adds Go-specific fields.
type DockerExecutor ¶
type DockerExecutor struct {
// contains filtered or unexported fields
}
DockerExecutor handles Docker container lifecycle and execution.
Responsibilities:
- Container creation using runtime strategies
- Command execution in containers
- stdout/stderr capture
- Container health checks
- Container rotation (time-based and execution-based)
- Resource cleanup
Container Lifecycle:
- GetOrCreateContainer() via scheduler
- If new container: CreateContainer() with runtime config
- Execute() command in container
- Check rotation criteria (executions >= max OR time >= interval)
- If rotation needed: RemoveContainer(), create new on next execution
Thread Safety: All methods are thread-safe.
func NewDockerExecutor ¶
func NewDockerExecutor(ctx context.Context, config DockerExecutorConfig) (*DockerExecutor, error)
NewDockerExecutor creates a new DockerExecutor instance.
func (*DockerExecutor) Close ¶
func (de *DockerExecutor) Close() error
Close releases executor resources.
func (*DockerExecutor) Execute ¶
func (de *DockerExecutor) Execute(ctx context.Context, req *loomv1.ExecuteRequest) (*loomv1.ExecuteResponse, error)
Execute executes a command in a Docker container.
Flow:
- GetOrCreateContainer() from scheduler
- If new container: CreateContainer() with runtime config
- StartContainer() if not running
- Execute command with stdin/stdout/stderr handling
- Check rotation criteria
- Return ExecuteResponse with output and metadata
func (*DockerExecutor) GetContainerLogs ¶
func (de *DockerExecutor) GetContainerLogs(ctx context.Context, containerID string, tail int, timestamps bool) (string, error)
GetContainerLogs retrieves logs from a container.
type DockerExecutorConfig ¶
type DockerExecutorConfig struct {
// DockerHost is the Docker daemon endpoint (default: "unix:///var/run/docker.sock")
DockerHost string
// Scheduler is the container scheduler (required)
Scheduler ContainerScheduler
// Logger is the zap logger (required)
Logger *zap.Logger
// Tracer is the observability tracer (optional - if nil, tracing is disabled)
Tracer observability.Tracer
}
DockerExecutorConfig configures DockerExecutor.
type FilteringWriter ¶
type FilteringWriter struct {
// contains filtered or unexported fields
}
FilteringWriter wraps an io.Writer and filters out trace lines before writing. This is used to remove __LOOM_TRACE__ lines from stderr output.
func NewFilteringWriter ¶
func NewFilteringWriter(w io.Writer) *FilteringWriter
NewFilteringWriter creates a new filtering writer.
func (*FilteringWriter) Flush ¶
func (fw *FilteringWriter) Flush() error
Flush writes any remaining buffered data (called at end of stream).
type LocalScheduler ¶
type LocalScheduler struct {
// contains filtered or unexported fields
}
LocalScheduler schedules containers to a single local Docker daemon. All containers run on "localhost" (node_id="localhost").
Thread Safety: All methods are thread-safe (can be called from multiple goroutines).
Container Lifecycle:
- GetOrCreateContainer() -> Container created or retrieved from pool
- Container used for 0-1000 executions over 0-4 hours
- Rotation triggered by max_executions OR rotation_interval_hours
- Old container removed, new container created for next execution
Multi-Tenancy (Future):
- Tenant-scoped container pools (tenant_id -> []containerID)
- Resource accounting per tenant (CPU seconds, memory GB-hours)
- Quota enforcement (reject if tenant exceeds quota)
Currently: All containers share a global pool (no tenant isolation)
func NewLocalScheduler ¶
func NewLocalScheduler(ctx context.Context, config LocalSchedulerConfig) (*LocalScheduler, error)
NewLocalScheduler creates a new LocalScheduler instance.
func (*LocalScheduler) Close ¶
func (ls *LocalScheduler) Close() error
Close implements ContainerScheduler.Close.
func (*LocalScheduler) GetNodeInfo ¶
GetNodeInfo implements ContainerScheduler.GetNodeInfo.
func (*LocalScheduler) GetOrCreateContainer ¶
func (ls *LocalScheduler) GetOrCreateContainer(ctx context.Context, req *loomv1.ContainerRequest) (string, bool, error)
GetOrCreateContainer implements ContainerScheduler.GetOrCreateContainer.
func (*LocalScheduler) ListContainers ¶
func (ls *LocalScheduler) ListContainers(ctx context.Context, filters map[string]string) ([]*loomv1.Container, error)
ListContainers implements ContainerScheduler.ListContainers.
func (*LocalScheduler) RemoveContainer ¶
func (ls *LocalScheduler) RemoveContainer(ctx context.Context, containerID string) error
RemoveContainer implements ContainerScheduler.RemoveContainer.
func (*LocalScheduler) Schedule ¶
func (ls *LocalScheduler) Schedule(ctx context.Context, req *loomv1.ScheduleRequest) (*loomv1.ScheduleDecision, error)
Schedule implements ContainerScheduler.Schedule. For LocalScheduler: Always schedules to "localhost" with zero cost.
type LocalSchedulerConfig ¶
type LocalSchedulerConfig struct {
// DockerHost is the Docker daemon endpoint (default: "unix:///var/run/docker.sock")
DockerHost string
// SocketPaths is a list of Docker socket paths to try in order (default: platform-specific)
// This allows configuration via LOOM_DOCKER_SOCKET_PATHS environment variable (comma-separated)
SocketPaths []string
// NodeID is the identifier for this node (default: "localhost")
NodeID string
// CleanupInterval is how often to run health checks and rotation (default: 5 minutes)
CleanupInterval time.Duration
// DefaultRotationInterval is the default rotation interval if not specified in config (default: 4 hours)
DefaultRotationInterval time.Duration
// DefaultMaxExecutions is the default max executions before rotation (default: 1000)
DefaultMaxExecutions int32
// Logger is the zap logger (optional, created if nil)
Logger *zap.Logger
}
LocalSchedulerConfig configures LocalScheduler.
type MCPManagerConfig ¶
type MCPManagerConfig struct {
Executor *DockerExecutor
Logger *zap.Logger
}
MCPManagerConfig configures the MCP server manager.
type MCPServerInfo ¶
type MCPServerInfo struct {
Name string
ContainerID string
Healthy bool
CreatedAt time.Time
RestartCount int
LastRestart time.Time
}
MCPServerInfo provides information about a managed MCP server.
func (MCPServerInfo) String ¶
func (info MCPServerInfo) String() string
String implements fmt.Stringer for MCPServerInfo.
type MCPServerManager ¶
type MCPServerManager struct {
// contains filtered or unexported fields
}
MCPServerManager manages MCP servers running inside Docker containers.
Unlike the standard pkg/mcp/manager.Manager which runs MCP servers as host processes, this manager runs MCP servers inside Docker containers for: - Isolation (separate Python/Node environments per MCP server) - Observability (trace propagation into MCP server execution) - Resource limits (CPU, memory constraints per MCP server) - Security (sandboxed execution of untrusted MCP servers)
func NewMCPServerManager ¶
func NewMCPServerManager(config MCPManagerConfig) (*MCPServerManager, error)
NewMCPServerManager creates a new Docker-based MCP server manager.
func (*MCPServerManager) Close ¶
func (msm *MCPServerManager) Close(ctx context.Context) error
Close stops all MCP servers and cleans up resources.
func (*MCPServerManager) GetServerInfo ¶
func (msm *MCPServerManager) GetServerInfo(serverName string) (*MCPServerInfo, error)
GetServerInfo returns information about a managed MCP server.
func (*MCPServerManager) HealthCheck ¶
func (msm *MCPServerManager) HealthCheck(ctx context.Context, serverName string) error
HealthCheck checks the health of an MCP server by pinging it.
func (*MCPServerManager) InvokeTool ¶
func (msm *MCPServerManager) InvokeTool( ctx context.Context, serverName string, toolName string, arguments map[string]interface{}, ) (*protocol.CallToolResult, error)
InvokeTool invokes a tool on an MCP server running in a Docker container.
This method: 1. Looks up the managed MCP server by name 2. Validates the tool exists 3. Calls the tool via JSON-RPC over stdin/stdout 4. Returns the result
Example:
result, err := manager.InvokeTool(ctx, "teradata", "query", map[string]interface{}{
"sql": "SELECT * FROM users LIMIT 10",
})
func (*MCPServerManager) ListMCPServers ¶
func (msm *MCPServerManager) ListMCPServers() []MCPServerInfo
ListMCPServers returns information about all managed MCP servers.
func (*MCPServerManager) ListTools ¶
func (msm *MCPServerManager) ListTools(ctx context.Context, serverName string) ([]protocol.Tool, error)
ListTools returns all available tools from an MCP server.
func (*MCPServerManager) StartMCPServer ¶
func (msm *MCPServerManager) StartMCPServer( ctx context.Context, serverName string, mcpConfig *loomv1.MCPServerConfig, runtimeType loomv1.RuntimeType, dockerConfig *loomv1.DockerBackendConfig, ) error
StartMCPServer starts an MCP server inside a Docker container.
The MCP server runs as a long-lived process inside the container, with stdio transport for JSON-RPC communication. The server is kept alive for multiple tool invocations.
Example:
config := &loomv1.MCPServerConfig{
Enabled: true,
Transport: "stdio",
Command: "python",
Args: []string{"-m", "vantage_mcp.server"},
Env: map[string]string{
"TD_USER": "dbc",
"TD_HOST": "vantage.teradata.com",
},
}
err := manager.StartMCPServer(ctx, "teradata", config, runtimeType, dockerConfig)
func (*MCPServerManager) StopMCPServer ¶
func (msm *MCPServerManager) StopMCPServer(ctx context.Context, serverName string) error
StopMCPServer stops an MCP server and cleans up its container.
type TraceCollector ¶
type TraceCollector struct {
// contains filtered or unexported fields
}
TraceCollector collects trace spans from container stderr output.
Container-side trace libraries (loom_trace.py, loom-trace.js) write spans to stderr with the prefix "__LOOM_TRACE__:". This collector parses those lines, deserializes the spans, and forwards them to the host tracer for export to Hawk.
Thread Safety: Safe for concurrent use (can read from multiple containers).
func NewTraceCollector ¶
func NewTraceCollector(config TraceCollectorConfig) (*TraceCollector, error)
NewTraceCollector creates a new trace collector.
func (*TraceCollector) CollectFromReader ¶
func (tc *TraceCollector) CollectFromReader(ctx context.Context, reader io.Reader, containerID string) error
CollectFromReader reads from an io.Reader (typically container stderr) and collects trace spans written by container-side trace libraries.
This method blocks until EOF or context cancellation. It's designed to be run in a goroutine per container execution.
Format: __LOOM_TRACE__:{"trace_id":"...","span_id":"...","name":"..."}
Example:
go collector.CollectFromReader(ctx, stderrPipe, containerID)
func (*TraceCollector) GetStats ¶
func (tc *TraceCollector) GetStats() (spansCollected int64, parseErrors int64)
GetStats returns collector statistics.
func (*TraceCollector) Reset ¶
func (tc *TraceCollector) Reset()
Reset resets collector statistics (for testing).
type TraceCollectorConfig ¶
type TraceCollectorConfig struct {
Tracer observability.Tracer
Logger *zap.Logger
}
TraceCollectorConfig configures the trace collector.