Documentation
¶
Index ¶
- Variables
- func BuildInitHookEnv(serviceName string, ingresses map[string]spec.Endpoint, tempDir string, ...) (map[string]string, error)
- func BuildPrestartHookEnv(serviceName string, ingresses map[string]spec.Endpoint, ...) (map[string]string, error)
- func BuildServiceEnv(serviceName string, ingresses map[string]spec.Endpoint, ...) (map[string]string, error)
- func DefaultRigDir() string
- func ExpandTemplate(tmpl string, attrs map[string]string) string
- func ExpandTemplates(templates []string, attrs map[string]string) []string
- func InsertTestNode(env *spec.Environment)
- func ResolveDefaults(env *spec.Environment)
- func TransformObserve(env *spec.Environment)
- func ValidateEnvironment(env *spec.Environment) []string
- type CallbackRequest
- type CallbackResponse
- type ConnectionInfo
- type DiagnosticSnapshot
- type Event
- type EventLog
- func (l *EventLog) Events() []Event
- func (l *EventLog) LifecycleEvents() []Event
- func (l *EventLog) Publish(event Event)
- func (l *EventLog) ServiceLogTail(service string, n int) string
- func (l *EventLog) Since(seq uint64) []Event
- func (l *EventLog) Subscribe(ctx context.Context, fromSeq uint64, filter func(Event) bool) <-chan Event
- func (l *EventLog) WaitFor(ctx context.Context, match func(Event) bool) (Event, error)
- type EventType
- type GRPCCallInfo
- type IdleTimer
- type KafkaRequestInfo
- type LogEntry
- type Orchestrator
- type PortAllocator
- type RequestInfo
- type Server
- type ServiceSnapshot
- type WiringContext
Constants ¶
This section is empty.
Variables ¶
var KnownServiceTypes = map[string]bool{ "container": true, "process": true, "script": true, "go": true, "client": true, "postgres": true, "temporal": true, "redis": true, "s3": true, "sqs": true, "kafka": true, "custom": true, "proxy": true, "test": true, }
KnownServiceTypes is the set of service types built into rigd. Custom client-side types are declared with the "custom" type.
Functions ¶
func BuildInitHookEnv ¶
func BuildInitHookEnv( serviceName string, ingresses map[string]spec.Endpoint, tempDir string, envDir string, hostEnv map[string]string, ) (map[string]string, error)
BuildInitHookEnv builds the environment variable map for an init hook. Init hooks receive only the service's own ingress attributes — no egresses. Default ingress is unprefixed, named ingresses are prefixed.
func BuildPrestartHookEnv ¶
func BuildPrestartHookEnv( serviceName string, ingresses map[string]spec.Endpoint, egresses map[string]spec.Endpoint, tempDir string, envDir string, hostEnv map[string]string, ) (map[string]string, error)
BuildPrestartHookEnv builds the environment variable map for a prestart hook. Prestart hooks receive full wiring: own ingresses + resolved egresses.
func BuildServiceEnv ¶
func BuildServiceEnv( serviceName string, ingresses map[string]spec.Endpoint, egresses map[string]spec.Endpoint, tempDir string, envDir string, hostEnv map[string]string, ) (map[string]string, error)
BuildServiceEnv builds the full environment variable map for a service during its start phase. This includes:
- RIG_WIRING: full wiring as JSON for rig-aware services
- Service-level attributes (RIG_TEMP_DIR, RIG_ENV_DIR, etc.)
- Own ingress attributes (HOST/PORT for default, prefixed for named)
- Egress attributes (always prefixed by egress name)
Rig-aware services should read RIG_WIRING. The flat env vars are a convenience fallback for services that don't know about rig.
func DefaultRigDir ¶
func DefaultRigDir() string
DefaultRigDir returns the base rig directory. It checks RIG_DIR first, then falls back to ~/.rig, then $TMPDIR/rig.
func ExpandTemplate ¶
ExpandTemplate expands $VAR and ${VAR} references in a single string.
func ExpandTemplates ¶
ExpandTemplates expands $VAR and ${VAR} references in a list of strings against the given attribute map.
func InsertTestNode ¶
func InsertTestNode(env *spec.Environment)
InsertTestNode adds a virtual ~test service node to the environment. The ~test node has an egress to every real service's every ingress, using a naming convention that maps back to service/ingress pairs:
- default ingress: egress name = service name
- non-default ingress: egress name = "service~ingress"
The ~test node has no ingresses. Its waitForEgressesStep gates on all real services being READY, and emitEnvironmentUp fires from its lifecycle.
func ResolveDefaults ¶
func ResolveDefaults(env *spec.Environment)
ResolveDefaults fills in default values on the environment spec. Called automatically by ValidateEnvironment.
func TransformObserve ¶
func TransformObserve(env *spec.Environment)
TransformObserve inserts proxy service nodes on every egress edge in the graph when observe mode is enabled. Each proxy node sits between a source service and its target, transparently forwarding traffic while capturing events.
For each egress edge (source → target.ingress):
- A proxy node is inserted with name "{target}~proxy~{source}" (or "{target}~{ingress}~proxy~{source}" for non-default ingresses)
- The proxy has ingress "default" (protocol from target's ingress), egress "target" pointing at the real target, and a ProxyConfig
- The source's egress is retargeted to the proxy node's "default" ingress — the egress name (map key) is unchanged, making the proxy transparent
func ValidateEnvironment ¶
func ValidateEnvironment(env *spec.Environment) []string
ValidateEnvironment checks an environment spec for structural errors. It calls ResolveDefaults first to fill in default values, then validates. Returns all errors found (not just the first) so the user can fix them in one pass.
Types ¶
type CallbackRequest ¶
type CallbackRequest struct {
RequestID string `json:"request_id"`
Name string `json:"name"`
Type string `json:"type"` // "hook", "publish", "start", "ready"
Wiring *WiringContext `json:"wiring,omitempty"`
}
CallbackRequest is published when the server needs the client to execute a function (hook or custom service type callback).
type CallbackResponse ¶
type CallbackResponse struct {
RequestID string `json:"request_id"`
Error string `json:"error,omitempty"`
Data map[string]any `json:"data,omitempty"`
}
CallbackResponse is posted by the client after handling a callback request.
type ConnectionInfo ¶
type ConnectionInfo struct {
Source string `json:"source"`
Target string `json:"target"`
Ingress string `json:"ingress"`
BytesIn int64 `json:"bytes_in"`
BytesOut int64 `json:"bytes_out"`
DurationMs float64 `json:"duration_ms"`
}
ConnectionInfo captures an observed TCP connection.
type DiagnosticSnapshot ¶
type DiagnosticSnapshot struct {
StalledFor string `json:"stalled_for"`
Services []ServiceSnapshot `json:"services"`
}
DiagnosticSnapshot captures the state of all services when a progress stall is detected. Published as part of a progress.stall event.
type Event ¶
type Event struct {
Seq uint64 `json:"seq"`
Type EventType `json:"type"`
Environment string `json:"environment,omitempty"`
Service string `json:"service,omitempty"`
Ingress string `json:"ingress,omitempty"`
Endpoint *spec.Endpoint `json:"endpoint,omitempty"`
Artifact string `json:"artifact,omitempty"`
Log *LogEntry `json:"log,omitempty"`
Callback *CallbackRequest `json:"callback,omitempty"`
Result *CallbackResponse `json:"result,omitempty"`
Error string `json:"error,omitempty"`
Request *RequestInfo `json:"request,omitempty"`
Connection *ConnectionInfo `json:"connection,omitempty"`
GRPCCall *GRPCCallInfo `json:"grpc_call,omitempty"`
KafkaRequest *KafkaRequestInfo `json:"kafka_request,omitempty"`
Diagnostic *DiagnosticSnapshot `json:"diagnostic,omitempty"`
EnvDir string `json:"env_dir,omitempty"`
Message string `json:"message,omitempty"`
// Ingresses is populated on environment.up. It maps service name to a
// map of ingress name to resolved endpoint, giving clients everything
// they need to connect to any service without a follow-up GET request.
Ingresses map[string]map[string]spec.ResolvedEndpoint `json:"ingresses,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
Event is a single entry in the event log.
type EventLog ¶
type EventLog struct {
// contains filtered or unexported fields
}
EventLog is a persistent, ordered event log. Events are stored in two separate slices — lifecycle events and log events (service.log) — sharing a single monotonically increasing sequence counter. This keeps hot-path scans (WaitFor, buildResolvedEnvironment) fast by avoiding high-volume log output. When the full timeline is needed (Events, Subscribe, log dump), both slices are zip-merged by sequence number.
func (*EventLog) Events ¶
Events returns a snapshot of all events (lifecycle + log) merged by sequence number.
func (*EventLog) LifecycleEvents ¶
LifecycleEvents returns a snapshot of lifecycle events only, excluding high-volume service.log events. Use this for building resolved state or scanning for specific lifecycle transitions.
func (*EventLog) Publish ¶
Publish appends an event to the log with the next sequence number and the current timestamp, then wakes all waiters.
func (*EventLog) ServiceLogTail ¶
ServiceLogTail returns the last n log lines for the named service, formatted with " | " prefixes. Returns "" if there are no log events.
func (*EventLog) Since ¶
Since returns all events (lifecycle + log) with sequence number > seq, merged by sequence number.
func (*EventLog) Subscribe ¶
func (l *EventLog) Subscribe(ctx context.Context, fromSeq uint64, filter func(Event) bool) <-chan Event
Subscribe returns a channel that receives events starting from fromSeq. It replays all existing events with Seq > fromSeq, then streams new events as they arrive. The channel is closed when ctx is cancelled.
The channel is buffered (256). If a subscriber falls behind and the buffer fills, new events are dropped for that subscriber (publishers never block).
type EventType ¶
type EventType string
EventType identifies the kind of lifecycle event.
const ( // Artifact phase. EventArtifactStarted EventType = "artifact.started" EventArtifactCompleted EventType = "artifact.completed" EventArtifactFailed EventType = "artifact.failed" EventArtifactCached EventType = "artifact.cached" // Service lifecycle. EventIngressPublished EventType = "ingress.published" EventWiringResolved EventType = "wiring.resolved" EventServicePrestart EventType = "service.prestart" EventServiceStarting EventType = "service.starting" EventServiceHealthy EventType = "service.healthy" EventServiceInit EventType = "service.init" EventServiceReady EventType = "service.ready" EventServiceFailed EventType = "service.failed" EventServiceStopping EventType = "service.stopping" EventServiceStopped EventType = "service.stopped" EventServiceLog EventType = "service.log" // Client-side callbacks. EventCallbackRequest EventType = "callback.request" EventCallbackResponse EventType = "callback.response" // Environment lifecycle. EventEnvironmentFailing EventType = "environment.failing" EventEnvironmentDestroying EventType = "environment.destroying" EventEnvironmentUp EventType = "environment.up" EventEnvironmentDown EventType = "environment.down" // Client-side test events. EventTestNote EventType = "test.note" // Health checks. EventHealthCheckFailed EventType = "health.check_failed" // Progress diagnostics. EventProgressStall EventType = "progress.stall" // Traffic observation. EventRequestCompleted EventType = "request.completed" EventConnectionOpened EventType = "connection.opened" EventConnectionClosed EventType = "connection.closed" EventGRPCCallCompleted EventType = "grpc.call.completed" EventKafkaRequestCompleted EventType = "kafka.request.completed" )
type GRPCCallInfo ¶
type GRPCCallInfo struct {
Source string `json:"source"`
Target string `json:"target"`
Ingress string `json:"ingress"`
Service string `json:"service"` // "pkg.ServiceName"
Method string `json:"method"` // "MethodName"
GRPCStatus string `json:"grpc_status"` // "0" (OK), "5" (NOT_FOUND), etc.
GRPCMessage string `json:"grpc_message"` // status message
LatencyMs float64 `json:"latency_ms"`
RequestSize int64 `json:"request_size"`
ResponseSize int64 `json:"response_size"`
RequestMetadata map[string][]string `json:"request_metadata,omitempty"`
ResponseMetadata map[string][]string `json:"response_metadata,omitempty"`
RequestBody []byte `json:"request_body,omitempty"`
RequestBodyTruncated bool `json:"request_body_truncated,omitempty"`
ResponseBody []byte `json:"response_body,omitempty"`
ResponseBodyTruncated bool `json:"response_body_truncated,omitempty"`
RequestBodyDecoded json.RawMessage `json:"request_body_decoded,omitempty"`
ResponseBodyDecoded json.RawMessage `json:"response_body_decoded,omitempty"`
}
GRPCCallInfo captures an observed gRPC call.
type IdleTimer ¶
type IdleTimer struct {
// contains filtered or unexported fields
}
IdleTimer fires a shutdown signal after a configurable period with no active environments. EnvironmentCreated/EnvironmentDestroyed keep the count; once the count returns to zero the countdown restarts.
func NewIdleTimer ¶
NewIdleTimer creates an IdleTimer that will fire after timeout if no environments are created first. Pass zero to disable (the timer never fires).
func (*IdleTimer) EnvironmentCreated ¶
func (t *IdleTimer) EnvironmentCreated()
EnvironmentCreated records a new active environment and stops the countdown.
func (*IdleTimer) EnvironmentDestroyed ¶
func (t *IdleTimer) EnvironmentDestroyed()
EnvironmentDestroyed records an environment teardown. If no environments remain the countdown restarts.
func (*IdleTimer) ShutdownCh ¶
func (t *IdleTimer) ShutdownCh() <-chan struct{}
ShutdownCh returns a channel that is closed when the idle timeout fires.
type KafkaRequestInfo ¶
type KafkaRequestInfo struct {
Source string `json:"source"`
Target string `json:"target"`
Ingress string `json:"ingress"`
APIKey int16 `json:"api_key"`
APIName string `json:"api_name"`
APIVersion int16 `json:"api_version"`
CorrelationID int32 `json:"correlation_id"`
LatencyMs float64 `json:"latency_ms"`
RequestSize int64 `json:"request_size"`
ResponseSize int64 `json:"response_size"`
}
KafkaRequestInfo captures an observed Kafka request/response pair.
type LogEntry ¶
type LogEntry struct {
Stream string `json:"stream"` // "stdout" or "stderr"
Data string `json:"data"`
}
LogEntry holds a line of service output.
type Orchestrator ¶
type Orchestrator struct {
Ports *PortAllocator
Registry *service.Registry
Log *EventLog
TempBase string // base directory for temp dirs (default os.TempDir()/rig)
Cache *artifact.Cache // artifact cache (shared with background refresher)
Preserve *bool // when non-nil and true, skip temp dir cleanup on exit
}
Orchestrator coordinates the lifecycle of all services in an environment.
func (*Orchestrator) Orchestrate ¶
func (o *Orchestrator) Orchestrate(env *spec.Environment) (run.Runner, string, string, error)
Orchestrate builds a run.Runner that manages the full lifecycle of the given environment. The runner executes two phases sequentially:
- Artifact phase: resolves all required artifacts (compiled binaries, etc.) in parallel, using a content-addressable cache.
- Service phase: starts all services concurrently. Dependency ordering emerges from services blocking on the event log until their egress targets are ready. On first failure, the server cancels all remaining services and emits environment.failing with the root cause.
If either phase fails, the runner emits environment.failing with the root cause before returning. The results map is safe to share because the artifact phase completes before the service phase begins.
type PortAllocator ¶
type PortAllocator struct {
// contains filtered or unexported fields
}
PortAllocator allocates ports using a prime-stepping strategy that spreads allocations across the port range, minimising collisions in parallel tests. Ports are returned as open net.Listeners — callers that need zero TOCTOU (proxies) can use the listener directly; callers that need a raw port number close the listener and use the port.
func NewPortAllocator ¶
func NewPortAllocator() *PortAllocator
NewPortAllocator creates an empty port allocator.
func (*PortAllocator) Allocate ¶
Allocate reserves n ports for the given instance. It steps through the port range by a random prime, trying net.Listen on each candidate. Listeners are returned open — the caller decides whether to keep them (proxy) or close them (service port).
func (*PortAllocator) Allocated ¶
func (a *PortAllocator) Allocated() int
Allocated returns the number of currently tracked ports.
func (*PortAllocator) Release ¶
func (a *PortAllocator) Release(instanceID string)
Release removes all port tracking for the given instance.
type RequestInfo ¶
type RequestInfo struct {
Source string `json:"source"`
Target string `json:"target"`
Ingress string `json:"ingress"`
Method string `json:"method"`
Path string `json:"path"`
StatusCode int `json:"status_code"`
LatencyMs float64 `json:"latency_ms"`
RequestSize int64 `json:"request_size"`
ResponseSize int64 `json:"response_size"`
RequestHeaders map[string][]string `json:"request_headers,omitempty"`
RequestBody []byte `json:"request_body,omitempty"`
RequestBodyTruncated bool `json:"request_body_truncated,omitempty"`
ResponseHeaders map[string][]string `json:"response_headers,omitempty"`
ResponseBody []byte `json:"response_body,omitempty"`
ResponseBodyTruncated bool `json:"response_body_truncated,omitempty"`
}
RequestInfo captures an observed HTTP request/response pair.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server is the rig HTTP API server. It manages the lifecycle of one or more concurrent environments, each with its own event log and run.Runner.
func NewServer ¶
func NewServer( ports *PortAllocator, registry *service.Registry, tempBase string, idleTimeout time.Duration, rigDir string, ) *Server
NewServer creates a Server and registers all HTTP routes. Pass idleTimeout = 0 to disable automatic shutdown. Pass rigDir = "" to use the default (~/.rig via DefaultRigDir()). Cache lives at {rigDir}/cache/, event logs at {rigDir}/logs/.
func (*Server) ServeHTTP ¶
func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)
ServeHTTP implements http.Handler.
func (*Server) ShutdownCh ¶
func (s *Server) ShutdownCh() <-chan struct{}
ShutdownCh returns a channel that is closed when the idle timer fires.
func (*Server) StartBackgroundTasks ¶
StartBackgroundTasks runs a polling loop that checks for server idleness every 30 seconds and triggers maintenance tasks (e.g. Docker image cache refresh) when no environments are active. Blocks until ctx is cancelled; call it in its own goroutine.
type ServiceSnapshot ¶
type ServiceSnapshot struct {
Name string `json:"name"`
Phase string `json:"phase"`
WaitingOn []string `json:"waiting_on,omitempty"`
}
ServiceSnapshot describes a single service's state in a diagnostic snapshot.
type WiringContext ¶
type WiringContext struct {
Ingresses map[string]spec.ResolvedEndpoint `json:"ingresses,omitempty"`
Egresses map[string]spec.ResolvedEndpoint `json:"egresses,omitempty"`
TempDir string `json:"temp_dir,omitempty"`
EnvDir string `json:"env_dir,omitempty"`
}
WiringContext provides resolved endpoint information to callbacks. All hooks and start callbacks receive the full wiring: Ingresses, Egresses, TempDir, and EnvDir. Endpoints are resolved — all attribute templates have been expanded to concrete values.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package artifact provides the artifact resolution system for rig.
|
Package artifact provides the artifact resolution system for rig. |
|
Package dockerutil provides a shared Docker client with automatic socket discovery for common Docker Desktop installations.
|
Package dockerutil provides a shared Docker client with automatic socket discovery for common Docker Desktop installations. |