server

package
v0.0.0-...-e3a76c1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 17, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var KnownServiceTypes = map[string]bool{
	"container": true,
	"process":   true,
	"script":    true,
	"go":        true,
	"client":    true,
	"postgres":  true,
	"temporal":  true,
	"redis":     true,
	"s3":        true,
	"sqs":       true,
	"kafka":     true,
	"custom":    true,
	"proxy":     true,
	"test":      true,
}

KnownServiceTypes is the set of service types built into rigd. Custom client-side types are declared with the "custom" type.

Functions

func BuildInitHookEnv

func BuildInitHookEnv(
	serviceName string,
	ingresses map[string]spec.Endpoint,
	tempDir string,
	envDir string,
	hostEnv map[string]string,
) (map[string]string, error)

BuildInitHookEnv builds the environment variable map for an init hook. Init hooks receive only the service's own ingress attributes — no egresses. Default ingress is unprefixed, named ingresses are prefixed.

func BuildPrestartHookEnv

func BuildPrestartHookEnv(
	serviceName string,
	ingresses map[string]spec.Endpoint,
	egresses map[string]spec.Endpoint,
	tempDir string,
	envDir string,
	hostEnv map[string]string,
) (map[string]string, error)

BuildPrestartHookEnv builds the environment variable map for a prestart hook. Prestart hooks receive full wiring: own ingresses + resolved egresses.

func BuildServiceEnv

func BuildServiceEnv(
	serviceName string,
	ingresses map[string]spec.Endpoint,
	egresses map[string]spec.Endpoint,
	tempDir string,
	envDir string,
	hostEnv map[string]string,
) (map[string]string, error)

BuildServiceEnv builds the full environment variable map for a service during its start phase. This includes:

  • RIG_WIRING: full wiring as JSON for rig-aware services
  • Service-level attributes (RIG_TEMP_DIR, RIG_ENV_DIR, etc.)
  • Own ingress attributes (HOST/PORT for default, prefixed for named)
  • Egress attributes (always prefixed by egress name)

Rig-aware services should read RIG_WIRING. The flat env vars are a convenience fallback for services that don't know about rig.

func DefaultRigDir

func DefaultRigDir() string

DefaultRigDir returns the base rig directory. It checks RIG_DIR first, then falls back to ~/.rig, then $TMPDIR/rig.

func ExpandTemplate

func ExpandTemplate(tmpl string, attrs map[string]string) string

ExpandTemplate expands $VAR and ${VAR} references in a single string.

func ExpandTemplates

func ExpandTemplates(templates []string, attrs map[string]string) []string

ExpandTemplates expands $VAR and ${VAR} references in a list of strings against the given attribute map.

func InsertTestNode

func InsertTestNode(env *spec.Environment)

InsertTestNode adds a virtual ~test service node to the environment. The ~test node has an egress to every real service's every ingress, using a naming convention that maps back to service/ingress pairs:

  • default ingress: egress name = service name
  • non-default ingress: egress name = "service~ingress"

The ~test node has no ingresses. Its waitForEgressesStep gates on all real services being READY, and emitEnvironmentUp fires from its lifecycle.

func ResolveDefaults

func ResolveDefaults(env *spec.Environment)

ResolveDefaults fills in default values on the environment spec. Called automatically by ValidateEnvironment.

func TransformObserve

func TransformObserve(env *spec.Environment)

TransformObserve inserts proxy service nodes on every egress edge in the graph when observe mode is enabled. Each proxy node sits between a source service and its target, transparently forwarding traffic while capturing events.

For each egress edge (source → target.ingress):

  1. A proxy node is inserted with name "{target}~proxy~{source}" (or "{target}~{ingress}~proxy~{source}" for non-default ingresses)
  2. The proxy has ingress "default" (protocol from target's ingress), egress "target" pointing at the real target, and a ProxyConfig
  3. The source's egress is retargeted to the proxy node's "default" ingress — the egress name (map key) is unchanged, making the proxy transparent

func ValidateEnvironment

func ValidateEnvironment(env *spec.Environment) []string

ValidateEnvironment checks an environment spec for structural errors. It calls ResolveDefaults first to fill in default values, then validates. Returns all errors found (not just the first) so the user can fix them in one pass.

Types

type CallbackRequest

type CallbackRequest struct {
	RequestID string         `json:"request_id"`
	Name      string         `json:"name"`
	Type      string         `json:"type"` // "hook", "publish", "start", "ready"
	Wiring    *WiringContext `json:"wiring,omitempty"`
}

CallbackRequest is published when the server needs the client to execute a function (hook or custom service type callback).

type CallbackResponse

type CallbackResponse struct {
	RequestID string         `json:"request_id"`
	Error     string         `json:"error,omitempty"`
	Data      map[string]any `json:"data,omitempty"`
}

CallbackResponse is posted by the client after handling a callback request.

type ConnectionInfo

type ConnectionInfo struct {
	Source     string  `json:"source"`
	Target     string  `json:"target"`
	Ingress    string  `json:"ingress"`
	BytesIn    int64   `json:"bytes_in"`
	BytesOut   int64   `json:"bytes_out"`
	DurationMs float64 `json:"duration_ms"`
}

ConnectionInfo captures an observed TCP connection.

type DiagnosticSnapshot

type DiagnosticSnapshot struct {
	StalledFor string            `json:"stalled_for"`
	Services   []ServiceSnapshot `json:"services"`
}

DiagnosticSnapshot captures the state of all services when a progress stall is detected. Published as part of a progress.stall event.

type Event

type Event struct {
	Seq          uint64              `json:"seq"`
	Type         EventType           `json:"type"`
	Environment  string              `json:"environment,omitempty"`
	Service      string              `json:"service,omitempty"`
	Ingress      string              `json:"ingress,omitempty"`
	Endpoint     *spec.Endpoint      `json:"endpoint,omitempty"`
	Artifact     string              `json:"artifact,omitempty"`
	Log          *LogEntry           `json:"log,omitempty"`
	Callback     *CallbackRequest    `json:"callback,omitempty"`
	Result       *CallbackResponse   `json:"result,omitempty"`
	Error        string              `json:"error,omitempty"`
	Request      *RequestInfo        `json:"request,omitempty"`
	Connection   *ConnectionInfo     `json:"connection,omitempty"`
	GRPCCall     *GRPCCallInfo       `json:"grpc_call,omitempty"`
	KafkaRequest *KafkaRequestInfo   `json:"kafka_request,omitempty"`
	Diagnostic   *DiagnosticSnapshot `json:"diagnostic,omitempty"`
	EnvDir       string              `json:"env_dir,omitempty"`
	Message      string              `json:"message,omitempty"`
	// Ingresses is populated on environment.up. It maps service name to a
	// map of ingress name to resolved endpoint, giving clients everything
	// they need to connect to any service without a follow-up GET request.
	Ingresses map[string]map[string]spec.ResolvedEndpoint `json:"ingresses,omitempty"`
	Timestamp time.Time                                   `json:"timestamp"`
}

Event is a single entry in the event log.

type EventLog

type EventLog struct {
	// contains filtered or unexported fields
}

EventLog is a persistent, ordered event log. Events are stored in two separate slices — lifecycle events and log events (service.log) — sharing a single monotonically increasing sequence counter. This keeps hot-path scans (WaitFor, buildResolvedEnvironment) fast by avoiding high-volume log output. When the full timeline is needed (Events, Subscribe, log dump), both slices are zip-merged by sequence number.

func NewEventLog

func NewEventLog() *EventLog

NewEventLog creates an empty event log.

func (*EventLog) Events

func (l *EventLog) Events() []Event

Events returns a snapshot of all events (lifecycle + log) merged by sequence number.

func (*EventLog) LifecycleEvents

func (l *EventLog) LifecycleEvents() []Event

LifecycleEvents returns a snapshot of lifecycle events only, excluding high-volume service.log events. Use this for building resolved state or scanning for specific lifecycle transitions.

func (*EventLog) Publish

func (l *EventLog) Publish(event Event)

Publish appends an event to the log with the next sequence number and the current timestamp, then wakes all waiters.

func (*EventLog) ServiceLogTail

func (l *EventLog) ServiceLogTail(service string, n int) string

ServiceLogTail returns the last n log lines for the named service, formatted with " | " prefixes. Returns "" if there are no log events.

func (*EventLog) Since

func (l *EventLog) Since(seq uint64) []Event

Since returns all events (lifecycle + log) with sequence number > seq, merged by sequence number.

func (*EventLog) Subscribe

func (l *EventLog) Subscribe(ctx context.Context, fromSeq uint64, filter func(Event) bool) <-chan Event

Subscribe returns a channel that receives events starting from fromSeq. It replays all existing events with Seq > fromSeq, then streams new events as they arrive. The channel is closed when ctx is cancelled.

The channel is buffered (256). If a subscriber falls behind and the buffer fills, new events are dropped for that subscriber (publishers never block).

func (*EventLog) WaitFor

func (l *EventLog) WaitFor(ctx context.Context, match func(Event) bool) (Event, error)

WaitFor scans lifecycle events for a matching event. If found, returns it immediately. Otherwise blocks until a matching lifecycle event is published or the context is cancelled. Log events (service.log) are not scanned.

type EventType

type EventType string

EventType identifies the kind of lifecycle event.

const (
	// Artifact phase.
	EventArtifactStarted   EventType = "artifact.started"
	EventArtifactCompleted EventType = "artifact.completed"
	EventArtifactFailed    EventType = "artifact.failed"
	EventArtifactCached    EventType = "artifact.cached"

	// Service lifecycle.
	EventIngressPublished EventType = "ingress.published"
	EventWiringResolved   EventType = "wiring.resolved"
	EventServicePrestart  EventType = "service.prestart"
	EventServiceStarting  EventType = "service.starting"
	EventServiceHealthy   EventType = "service.healthy"
	EventServiceInit      EventType = "service.init"
	EventServiceReady     EventType = "service.ready"
	EventServiceFailed    EventType = "service.failed"
	EventServiceStopping  EventType = "service.stopping"
	EventServiceStopped   EventType = "service.stopped"
	EventServiceLog       EventType = "service.log"

	// Client-side callbacks.
	EventCallbackRequest  EventType = "callback.request"
	EventCallbackResponse EventType = "callback.response"

	// Environment lifecycle.
	EventEnvironmentFailing    EventType = "environment.failing"
	EventEnvironmentDestroying EventType = "environment.destroying"
	EventEnvironmentUp         EventType = "environment.up"
	EventEnvironmentDown       EventType = "environment.down"

	// Client-side test events.
	EventTestNote EventType = "test.note"

	// Health checks.
	EventHealthCheckFailed EventType = "health.check_failed"

	// Progress diagnostics.
	EventProgressStall EventType = "progress.stall"

	// Traffic observation.
	EventRequestCompleted      EventType = "request.completed"
	EventConnectionOpened      EventType = "connection.opened"
	EventConnectionClosed      EventType = "connection.closed"
	EventGRPCCallCompleted     EventType = "grpc.call.completed"
	EventKafkaRequestCompleted EventType = "kafka.request.completed"
)

type GRPCCallInfo

type GRPCCallInfo struct {
	Source           string              `json:"source"`
	Target           string              `json:"target"`
	Ingress          string              `json:"ingress"`
	Service          string              `json:"service"`      // "pkg.ServiceName"
	Method           string              `json:"method"`       // "MethodName"
	GRPCStatus       string              `json:"grpc_status"`  // "0" (OK), "5" (NOT_FOUND), etc.
	GRPCMessage      string              `json:"grpc_message"` // status message
	LatencyMs        float64             `json:"latency_ms"`
	RequestSize      int64               `json:"request_size"`
	ResponseSize     int64               `json:"response_size"`
	RequestMetadata  map[string][]string `json:"request_metadata,omitempty"`
	ResponseMetadata map[string][]string `json:"response_metadata,omitempty"`

	RequestBody           []byte          `json:"request_body,omitempty"`
	RequestBodyTruncated  bool            `json:"request_body_truncated,omitempty"`
	ResponseBody          []byte          `json:"response_body,omitempty"`
	ResponseBodyTruncated bool            `json:"response_body_truncated,omitempty"`
	RequestBodyDecoded    json.RawMessage `json:"request_body_decoded,omitempty"`
	ResponseBodyDecoded   json.RawMessage `json:"response_body_decoded,omitempty"`
}

GRPCCallInfo captures an observed gRPC call.

type IdleTimer

type IdleTimer struct {
	// contains filtered or unexported fields
}

IdleTimer fires a shutdown signal after a configurable period with no active environments. EnvironmentCreated/EnvironmentDestroyed keep the count; once the count returns to zero the countdown restarts.

func NewIdleTimer

func NewIdleTimer(timeout time.Duration) *IdleTimer

NewIdleTimer creates an IdleTimer that will fire after timeout if no environments are created first. Pass zero to disable (the timer never fires).

func (*IdleTimer) EnvironmentCreated

func (t *IdleTimer) EnvironmentCreated()

EnvironmentCreated records a new active environment and stops the countdown.

func (*IdleTimer) EnvironmentDestroyed

func (t *IdleTimer) EnvironmentDestroyed()

EnvironmentDestroyed records an environment teardown. If no environments remain the countdown restarts.

func (*IdleTimer) ShutdownCh

func (t *IdleTimer) ShutdownCh() <-chan struct{}

ShutdownCh returns a channel that is closed when the idle timeout fires.

type KafkaRequestInfo

type KafkaRequestInfo struct {
	Source        string  `json:"source"`
	Target        string  `json:"target"`
	Ingress       string  `json:"ingress"`
	APIKey        int16   `json:"api_key"`
	APIName       string  `json:"api_name"`
	APIVersion    int16   `json:"api_version"`
	CorrelationID int32   `json:"correlation_id"`
	LatencyMs     float64 `json:"latency_ms"`
	RequestSize   int64   `json:"request_size"`
	ResponseSize  int64   `json:"response_size"`
}

KafkaRequestInfo captures an observed Kafka request/response pair.

type LogEntry

type LogEntry struct {
	Stream string `json:"stream"` // "stdout" or "stderr"
	Data   string `json:"data"`
}

LogEntry holds a line of service output.

type Orchestrator

type Orchestrator struct {
	Ports    *PortAllocator
	Registry *service.Registry
	Log      *EventLog
	TempBase string          // base directory for temp dirs (default os.TempDir()/rig)
	Cache    *artifact.Cache // artifact cache (shared with background refresher)
	Preserve *bool           // when non-nil and true, skip temp dir cleanup on exit
}

Orchestrator coordinates the lifecycle of all services in an environment.

func (*Orchestrator) Orchestrate

func (o *Orchestrator) Orchestrate(env *spec.Environment) (run.Runner, string, string, error)

Orchestrate builds a run.Runner that manages the full lifecycle of the given environment. The runner executes two phases sequentially:

  1. Artifact phase: resolves all required artifacts (compiled binaries, etc.) in parallel, using a content-addressable cache.
  2. Service phase: starts all services concurrently. Dependency ordering emerges from services blocking on the event log until their egress targets are ready. On first failure, the server cancels all remaining services and emits environment.failing with the root cause.

If either phase fails, the runner emits environment.failing with the root cause before returning. The results map is safe to share because the artifact phase completes before the service phase begins.

type PortAllocator

type PortAllocator struct {
	// contains filtered or unexported fields
}

PortAllocator allocates ports using a prime-stepping strategy that spreads allocations across the port range, minimising collisions in parallel tests. Ports are returned as open net.Listeners — callers that need zero TOCTOU (proxies) can use the listener directly; callers that need a raw port number close the listener and use the port.

func NewPortAllocator

func NewPortAllocator() *PortAllocator

NewPortAllocator creates an empty port allocator.

func (*PortAllocator) Allocate

func (a *PortAllocator) Allocate(instanceID string, n int) ([]net.Listener, error)

Allocate reserves n ports for the given instance. It steps through the port range by a random prime, trying net.Listen on each candidate. Listeners are returned open — the caller decides whether to keep them (proxy) or close them (service port).

func (*PortAllocator) Allocated

func (a *PortAllocator) Allocated() int

Allocated returns the number of currently tracked ports.

func (*PortAllocator) Release

func (a *PortAllocator) Release(instanceID string)

Release removes all port tracking for the given instance.

type RequestInfo

type RequestInfo struct {
	Source       string  `json:"source"`
	Target       string  `json:"target"`
	Ingress      string  `json:"ingress"`
	Method       string  `json:"method"`
	Path         string  `json:"path"`
	StatusCode   int     `json:"status_code"`
	LatencyMs    float64 `json:"latency_ms"`
	RequestSize  int64   `json:"request_size"`
	ResponseSize int64   `json:"response_size"`

	RequestHeaders        map[string][]string `json:"request_headers,omitempty"`
	RequestBody           []byte              `json:"request_body,omitempty"`
	RequestBodyTruncated  bool                `json:"request_body_truncated,omitempty"`
	ResponseHeaders       map[string][]string `json:"response_headers,omitempty"`
	ResponseBody          []byte              `json:"response_body,omitempty"`
	ResponseBodyTruncated bool                `json:"response_body_truncated,omitempty"`
}

RequestInfo captures an observed HTTP request/response pair.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the rig HTTP API server. It manages the lifecycle of one or more concurrent environments, each with its own event log and run.Runner.

func NewServer

func NewServer(
	ports *PortAllocator,
	registry *service.Registry,
	tempBase string,
	idleTimeout time.Duration,
	rigDir string,
) *Server

NewServer creates a Server and registers all HTTP routes. Pass idleTimeout = 0 to disable automatic shutdown. Pass rigDir = "" to use the default (~/.rig via DefaultRigDir()). Cache lives at {rigDir}/cache/, event logs at {rigDir}/logs/.

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler.

func (*Server) ShutdownCh

func (s *Server) ShutdownCh() <-chan struct{}

ShutdownCh returns a channel that is closed when the idle timer fires.

func (*Server) StartBackgroundTasks

func (s *Server) StartBackgroundTasks(ctx context.Context)

StartBackgroundTasks runs a polling loop that checks for server idleness every 30 seconds and triggers maintenance tasks (e.g. Docker image cache refresh) when no environments are active. Blocks until ctx is cancelled; call it in its own goroutine.

type ServiceSnapshot

type ServiceSnapshot struct {
	Name      string   `json:"name"`
	Phase     string   `json:"phase"`
	WaitingOn []string `json:"waiting_on,omitempty"`
}

ServiceSnapshot describes a single service's state in a diagnostic snapshot.

type WiringContext

type WiringContext struct {
	Ingresses map[string]spec.ResolvedEndpoint `json:"ingresses,omitempty"`
	Egresses  map[string]spec.ResolvedEndpoint `json:"egresses,omitempty"`
	TempDir   string                           `json:"temp_dir,omitempty"`
	EnvDir    string                           `json:"env_dir,omitempty"`
}

WiringContext provides resolved endpoint information to callbacks. All hooks and start callbacks receive the full wiring: Ingresses, Egresses, TempDir, and EnvDir. Endpoints are resolved — all attribute templates have been expanded to concrete values.

Directories

Path Synopsis
Package artifact provides the artifact resolution system for rig.
Package artifact provides the artifact resolution system for rig.
Package dockerutil provides a shared Docker client with automatic socket discovery for common Docker Desktop installations.
Package dockerutil provides a shared Docker client with automatic socket discovery for common Docker Desktop installations.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL