process

package
v0.0.0-...-5bc8300 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 10, 2026 License: AGPL-3.0 Imports: 22 Imported by: 0

Documentation

Overview

Package process manages plugin process lifecycle, respawn, and event delivery pipelines.

Index

Constants

View Source
const (
	// RespawnLimit is max respawns per RespawnWindow before disabling.
	// ExaBGP: respawn_number=5 per ~63 seconds.
	RespawnLimit = 5

	// RespawnWindow is the time window for respawn limit tracking.
	RespawnWindow = 60 * time.Second

	// MaxTotalRespawns is the cumulative respawn limit before permanent disable.
	// Prevents a permanently broken plugin from cycling indefinitely across windows.
	MaxTotalRespawns = 20
)

Variables

View Source
var (
	ErrRespawnLimitExceeded = errors.New("respawn limit exceeded")
	ErrProcessDisabled      = errors.New("process disabled due to respawn limit")
	ErrProcessNotFound      = errors.New("process not found")
)

Respawn errors.

View Source
var ErrConnectionClosed = errors.New("connection closed")

ErrConnectionClosed is returned when the plugin connection is closed during event delivery.

Functions

This section is empty.

Types

type EventDelivery

type EventDelivery struct {
	Output string             // Pre-formatted event payload (text/JSON consumers)
	Event  any                // Structured event for DirectBridge consumers (nil for text/JSON)
	Result chan<- EventResult // Caller-provided result channel (nil if fire-and-forget)
}

EventDelivery represents a work item for the per-process delivery goroutine. The long-lived goroutine reads these from Process.eventChan and calls SendDeliverEvent. For DirectBridge consumers, Event is set (structured delivery, no text formatting). For text/JSON consumers, Output is set (pre-formatted at observation time).

type EventResult

type EventResult struct {
	ProcName      string // Process name (for logging)
	Err           error  // nil on success
	CacheConsumer bool   // true if delivery succeeded AND process is a cache consumer
}

EventResult is sent back to the caller after delivery completes.

type Process

type Process struct {
	// contains filtered or unexported fields
}

Process represents a plugin subprocess (internal goroutine or external fork).

Lifecycle: Start (or StartWithContext) -> Stop -> Wait. Stop signals the process to exit; Wait blocks until all goroutines finish. Callers MUST call Wait after Stop to avoid leaking goroutines.

func NewProcess

func NewProcess(config plugin.PluginConfig) *Process

NewProcess creates a new process with the given configuration.

func (*Process) AddRegisteredCommand

func (p *Process) AddRegisteredCommand(name string)

AddRegisteredCommand tracks a command registered by this process.

func (*Process) Bridge

func (p *Process) Bridge() *rpc.DirectBridge

Bridge returns the direct transport bridge for internal plugins (nil for external).

func (*Process) Capabilities

func (p *Process) Capabilities() *plugin.PluginCapabilities

Capabilities returns the plugin capability declarations (Stage 3).

func (*Process) ClearConn

func (p *Process) ClearConn()

ClearConn nils the connection pointer without closing the underlying connection. Used in tests to simulate a process dying between verify and apply phases.

func (*Process) CloseConn

func (p *Process) CloseConn()

CloseConn closes and nils the RPC connection under the mutex.

func (*Process) Cmd

func (p *Process) Cmd() *exec.Cmd

Cmd returns the underlying exec.Cmd for external plugins (nil for internal). Protected by mu since startExternal() writes p.cmd under the same lock.

func (*Process) Config

func (p *Process) Config() plugin.PluginConfig

Config returns the plugin configuration.

func (*Process) Conn

func (p *Process) Conn() *ipc.PluginConn

Conn returns the plugin RPC connection under the mutex. Returns nil if the connection has been closed (e.g. by Stop() or monitor()). Callers must check for nil before use to avoid racing with shutdown.

func (*Process) Deliver

func (p *Process) Deliver(d EventDelivery) bool

Deliver enqueues an event for the long-lived delivery goroutine. Returns true if the event was enqueued, false if the process is stopping. Thread-safe: uses RLock to allow parallel sends from multiple callers.

func (*Process) Encoding

func (p *Process) Encoding() string

Encoding returns the high-level encoding (json or text).

func (*Process) Format

func (p *Process) Format() string

Format returns the wire format (hex, base64, parsed, full).

func (*Process) FormatCacheKey

func (p *Process) FormatCacheKey() string

FormatCacheKey returns the precomputed "format+encoding" string for event dispatch cache lookup. Avoids per-event string concatenation on the hot path.

func (*Process) HasStructuredHandler

func (p *Process) HasStructuredHandler() bool

HasStructuredHandler reports whether this process supports structured event delivery. True when the process has a DirectBridge with a registered structured handler.

func (*Process) Index

func (p *Process) Index() int

Index returns the plugin index for coordinator tracking.

func (*Process) InitConns

func (p *Process) InitConns() error

InitConns creates PluginConn connections from the raw engine-side connections. If PluginConns already exist (set directly by tests), returns immediately. Must be called exactly once before any reads from the connections.

InitConns creates a MuxPluginConn from the raw connection. If already initialized (conn set by test), returns immediately.

func (*Process) IsCacheConsumer

func (p *Process) IsCacheConsumer() bool

IsCacheConsumer returns whether this plugin participates in cache consumer tracking. Cache consumers must forward or release each UPDATE they receive.

func (*Process) Name

func (p *Process) Name() string

Name returns the process name from config.

func (*Process) RegisteredCommands

func (p *Process) RegisteredCommands() []string

RegisteredCommands returns a copy of the registered command names.

func (*Process) Registration

func (p *Process) Registration() *plugin.PluginRegistration

Registration returns the plugin registration data (Stage 1).

func (*Process) RemoveRegisteredCommand

func (p *Process) RemoveRegisteredCommand(name string)

RemoveRegisteredCommand removes a command from tracking.

func (*Process) Running

func (p *Process) Running() bool

Running returns true if the process is running.

func (*Process) SendShutdown

func (p *Process) SendShutdown() bool

SendShutdown sends a graceful shutdown signal (bye RPC) to the plugin. Returns true if the process was running. The bye RPC gives the plugin a chance to clean up before Stop() closes connections and kills the process.

func (*Process) SetAcceptor

func (p *Process) SetAcceptor(a *ipc.PluginAcceptor)

SetAcceptor sets the TLS acceptor for external plugin connect-back.

func (*Process) SetCacheConsumer

func (p *Process) SetCacheConsumer(enabled bool)

SetCacheConsumer marks whether this plugin participates in cache consumer tracking.

func (*Process) SetCapabilities

func (p *Process) SetCapabilities(caps *plugin.PluginCapabilities)

SetCapabilities sets the plugin capability declarations (Stage 3).

func (*Process) SetConn

func (p *Process) SetConn(conn *ipc.PluginConn)

SetConn sets the plugin RPC connection. Used by test code.

func (*Process) SetEncoding

func (p *Process) SetEncoding(enc string)

SetEncoding sets the high-level encoding (json or text).

func (*Process) SetFormat

func (p *Process) SetFormat(format string)

SetFormat sets the wire format (hex, base64, parsed, full, summary).

func (*Process) SetIndex

func (p *Process) SetIndex(i int)

SetIndex sets the plugin index for coordinator tracking.

func (*Process) SetRegistration

func (p *Process) SetRegistration(reg *plugin.PluginRegistration)

SetRegistration sets the plugin registration data (Stage 1).

func (*Process) SetRunning

func (p *Process) SetRunning(running bool)

SetRunning sets the running state of the process.

func (*Process) SetStage

func (p *Process) SetStage(stage plugin.PluginStage)

SetStage sets the current stage and notifies waiters. If an onStageChange callback is set (by ProcessManager metrics), it is called after the stage is stored and waiters are notified, outside the lock.

func (*Process) SetSync

func (p *Process) SetSync(enabled bool)

SetSync enables or disables sync mode for this process.

func (*Process) SetWireEncoding

func (p *Process) SetWireEncoding(enc plugin.WireEncoding)

SetWireEncoding sets both inbound and outbound wire encoding.

func (*Process) SetWireEncodingIn

func (p *Process) SetWireEncodingIn(enc plugin.WireEncoding)

SetWireEncodingIn sets the inbound wire encoding.

func (*Process) SetWireEncodingOut

func (p *Process) SetWireEncodingOut(enc plugin.WireEncoding)

SetWireEncodingOut sets the outbound wire encoding.

func (*Process) Stage

func (p *Process) Stage() plugin.PluginStage

Stage returns the current plugin startup stage.

func (*Process) Start

func (p *Process) Start() error

Start spawns the process.

func (*Process) StartDelivery

func (p *Process) StartDelivery(ctx context.Context)

StartDelivery starts only the event delivery goroutine. Used by tests that inject connections via SetConn without starting a real process.

func (*Process) StartWithContext

func (p *Process) StartWithContext(ctx context.Context) error

StartWithContext spawns the process with the given context. For internal plugins (config.Internal=true), runs in-process via goroutine. For external plugins, forks via exec.Command.

func (*Process) Stop

func (p *Process) Stop()

Stop signals the process to terminate. Does not block. For external plugins, canceling context kills the process via exec.CommandContext. For internal plugins, closing RPC connections unblocks the plugin's reads and causes it to exit. Callers MUST call Wait after Stop to ensure all goroutines have exited.

func (*Process) SyncEnabled

func (p *Process) SyncEnabled() bool

SyncEnabled returns true if sync mode is enabled for this process. When enabled, announce/withdraw waits for wire transmission before ACK.

func (*Process) Wait

func (p *Process) Wait(ctx context.Context) error

Wait blocks until all process goroutines have exited, or ctx expires. Must be called after Stop to avoid goroutine leaks.

func (*Process) WaitForStage

func (p *Process) WaitForStage(ctx context.Context, stage plugin.PluginStage) error

WaitForStage waits for the process to reach or pass the given stage. Returns error on context cancellation or timeout.

func (*Process) WireEncodingIn

func (p *Process) WireEncodingIn() plugin.WireEncoding

WireEncodingIn returns the inbound wire encoding (events ze→Process).

func (*Process) WireEncodingOut

func (p *Process) WireEncodingOut() plugin.WireEncoding

WireEncodingOut returns the outbound wire encoding (commands Process→ze).

type ProcessManager

type ProcessManager struct {
	// contains filtered or unexported fields
}

ProcessManager manages multiple external processes.

func NewProcessManager

func NewProcessManager(configs []plugin.PluginConfig) *ProcessManager

NewProcessManager creates a new process manager.

func (*ProcessManager) AddProcess

func (pm *ProcessManager) AddProcess(name string, proc *Process)

AddProcess registers a process by name. Used by tests to inject mock processes. Wires metrics callbacks if a metrics registry is configured.

func (*ProcessManager) AllProcesses

func (pm *ProcessManager) AllProcesses() []*Process

AllProcesses returns a snapshot of all processes. Caller may iterate and filter the returned slice without holding the lock.

func (*ProcessManager) GetProcess

func (pm *ProcessManager) GetProcess(name string) *Process

GetProcess returns a process by name, or nil if not found.

func (*ProcessManager) IsDisabled

func (pm *ProcessManager) IsDisabled(name string) bool

IsDisabled returns true if the named process is disabled due to respawn limit.

func (*ProcessManager) IsRunning

func (pm *ProcessManager) IsRunning(name string) bool

IsRunning returns true if the named process is running.

func (*ProcessManager) ProcessCount

func (pm *ProcessManager) ProcessCount() int

ProcessCount returns the number of running processes.

func (*ProcessManager) RemoveProcess

func (pm *ProcessManager) RemoveProcess(name string)

RemoveProcess unregisters a stopped process by name. Used during config reload to clean up auto-stopped plugins.

func (*ProcessManager) Respawn

func (pm *ProcessManager) Respawn(name string) error

Respawn restarts a process, enforcing respawn limits. Returns ErrRespawnLimitExceeded if limit exceeded within window. Returns ErrProcessDisabled if process was previously disabled. Returns ErrProcessNotFound if process name not in configuration. Returns error if ProcessManager was not started (ctx is nil).

func (*ProcessManager) SetAcceptor

func (pm *ProcessManager) SetAcceptor(a *ipc.PluginAcceptor)

SetAcceptor sets the TLS acceptor for external plugin connect-back. Must be called before StartWithContext.

func (*ProcessManager) SetMetricsRegistry

func (pm *ProcessManager) SetMetricsRegistry(reg metrics.Registry)

SetMetricsRegistry creates plugin health metrics from the given registry. Must be called before StartWithContext. Nil registry disables metrics. Idempotent: second call is a no-op.

func (*ProcessManager) Start

func (pm *ProcessManager) Start() error

Start starts all configured processes.

func (*ProcessManager) StartMore

func (pm *ProcessManager) StartMore(configs []plugin.PluginConfig) error

StartMore starts the additional configs under the existing context. Must be called after StartWithContext (the context is captured then). Returns an error if the manager has not been started yet.

func (*ProcessManager) StartWithContext

func (pm *ProcessManager) StartWithContext(ctx context.Context) error

StartWithContext starts all configured processes with the given context. On the first call it captures the context for use by later StartMore calls. Subsequent calls re-spawn pm.configs (legacy single-shot behavior); use StartMore to add additional configs while keeping the originals running.

func (*ProcessManager) Stop

func (pm *ProcessManager) Stop()

Stop stops all processes. Cancels context and closes connections immediately, which unblocks plugin reads on net.Pipe and causes prompt exit. No bye round-trip — closing the connection is the shutdown signal for internal plugins, and context cancellation kills external plugins via exec.CommandContext.

func (*ProcessManager) Wait

func (pm *ProcessManager) Wait(ctx context.Context) error

Wait waits for all processes to stop.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL