Documentation
¶
Overview ¶
Package process manages plugin process lifecycle, respawn, and event delivery pipelines.
Index ¶
- Constants
- Variables
- type EventDelivery
- type EventResult
- type Process
- func (p *Process) AddRegisteredCommand(name string)
- func (p *Process) Bridge() *rpc.DirectBridge
- func (p *Process) Capabilities() *plugin.PluginCapabilities
- func (p *Process) ClearConn()
- func (p *Process) CloseConn()
- func (p *Process) Cmd() *exec.Cmd
- func (p *Process) Config() plugin.PluginConfig
- func (p *Process) Conn() *ipc.PluginConn
- func (p *Process) Deliver(d EventDelivery) bool
- func (p *Process) Encoding() string
- func (p *Process) Format() string
- func (p *Process) FormatCacheKey() string
- func (p *Process) HasStructuredHandler() bool
- func (p *Process) Index() int
- func (p *Process) InitConns() error
- func (p *Process) IsCacheConsumer() bool
- func (p *Process) Name() string
- func (p *Process) RegisteredCommands() []string
- func (p *Process) Registration() *plugin.PluginRegistration
- func (p *Process) RemoveRegisteredCommand(name string)
- func (p *Process) Running() bool
- func (p *Process) SendShutdown() bool
- func (p *Process) SetAcceptor(a *ipc.PluginAcceptor)
- func (p *Process) SetCacheConsumer(enabled bool)
- func (p *Process) SetCapabilities(caps *plugin.PluginCapabilities)
- func (p *Process) SetConn(conn *ipc.PluginConn)
- func (p *Process) SetEncoding(enc string)
- func (p *Process) SetFormat(format string)
- func (p *Process) SetIndex(i int)
- func (p *Process) SetRegistration(reg *plugin.PluginRegistration)
- func (p *Process) SetRunning(running bool)
- func (p *Process) SetStage(stage plugin.PluginStage)
- func (p *Process) SetSync(enabled bool)
- func (p *Process) SetWireEncoding(enc plugin.WireEncoding)
- func (p *Process) SetWireEncodingIn(enc plugin.WireEncoding)
- func (p *Process) SetWireEncodingOut(enc plugin.WireEncoding)
- func (p *Process) Stage() plugin.PluginStage
- func (p *Process) Start() error
- func (p *Process) StartDelivery(ctx context.Context)
- func (p *Process) StartWithContext(ctx context.Context) error
- func (p *Process) Stop()
- func (p *Process) SyncEnabled() bool
- func (p *Process) Wait(ctx context.Context) error
- func (p *Process) WaitForStage(ctx context.Context, stage plugin.PluginStage) error
- func (p *Process) WireEncodingIn() plugin.WireEncoding
- func (p *Process) WireEncodingOut() plugin.WireEncoding
- type ProcessManager
- func (pm *ProcessManager) AddProcess(name string, proc *Process)
- func (pm *ProcessManager) AllProcesses() []*Process
- func (pm *ProcessManager) GetProcess(name string) *Process
- func (pm *ProcessManager) IsDisabled(name string) bool
- func (pm *ProcessManager) IsRunning(name string) bool
- func (pm *ProcessManager) ProcessCount() int
- func (pm *ProcessManager) RemoveProcess(name string)
- func (pm *ProcessManager) Respawn(name string) error
- func (pm *ProcessManager) SetAcceptor(a *ipc.PluginAcceptor)
- func (pm *ProcessManager) SetMetricsRegistry(reg metrics.Registry)
- func (pm *ProcessManager) Start() error
- func (pm *ProcessManager) StartMore(configs []plugin.PluginConfig) error
- func (pm *ProcessManager) StartWithContext(ctx context.Context) error
- func (pm *ProcessManager) Stop()
- func (pm *ProcessManager) Wait(ctx context.Context) error
Constants ¶
const ( // RespawnLimit is max respawns per RespawnWindow before disabling. // ExaBGP: respawn_number=5 per ~63 seconds. RespawnLimit = 5 // RespawnWindow is the time window for respawn limit tracking. RespawnWindow = 60 * time.Second // MaxTotalRespawns is the cumulative respawn limit before permanent disable. // Prevents a permanently broken plugin from cycling indefinitely across windows. MaxTotalRespawns = 20 )
Variables ¶
var ( ErrRespawnLimitExceeded = errors.New("respawn limit exceeded") ErrProcessDisabled = errors.New("process disabled due to respawn limit") ErrProcessNotFound = errors.New("process not found") )
Respawn errors.
var ErrConnectionClosed = errors.New("connection closed")
ErrConnectionClosed is returned when the plugin connection is closed during event delivery.
Functions ¶
This section is empty.
Types ¶
type EventDelivery ¶
type EventDelivery struct {
Output string // Pre-formatted event payload (text/JSON consumers)
Event any // Structured event for DirectBridge consumers (nil for text/JSON)
Result chan<- EventResult // Caller-provided result channel (nil if fire-and-forget)
}
EventDelivery represents a work item for the per-process delivery goroutine. The long-lived goroutine reads these from Process.eventChan and calls SendDeliverEvent. For DirectBridge consumers, Event is set (structured delivery, no text formatting). For text/JSON consumers, Output is set (pre-formatted at observation time).
type EventResult ¶
type EventResult struct {
ProcName string // Process name (for logging)
Err error // nil on success
CacheConsumer bool // true if delivery succeeded AND process is a cache consumer
}
EventResult is sent back to the caller after delivery completes.
type Process ¶
type Process struct {
// contains filtered or unexported fields
}
Process represents a plugin subprocess (internal goroutine or external fork).
Lifecycle: Start (or StartWithContext) -> Stop -> Wait. Stop signals the process to exit; Wait blocks until all goroutines finish. Callers MUST call Wait after Stop to avoid leaking goroutines.
func NewProcess ¶
func NewProcess(config plugin.PluginConfig) *Process
NewProcess creates a new process with the given configuration.
func (*Process) AddRegisteredCommand ¶
AddRegisteredCommand tracks a command registered by this process.
func (*Process) Bridge ¶
func (p *Process) Bridge() *rpc.DirectBridge
Bridge returns the direct transport bridge for internal plugins (nil for external).
func (*Process) Capabilities ¶
func (p *Process) Capabilities() *plugin.PluginCapabilities
Capabilities returns the plugin capability declarations (Stage 3).
func (*Process) ClearConn ¶
func (p *Process) ClearConn()
ClearConn nils the connection pointer without closing the underlying connection. Used in tests to simulate a process dying between verify and apply phases.
func (*Process) CloseConn ¶
func (p *Process) CloseConn()
CloseConn closes and nils the RPC connection under the mutex.
func (*Process) Cmd ¶
Cmd returns the underlying exec.Cmd for external plugins (nil for internal). Protected by mu since startExternal() writes p.cmd under the same lock.
func (*Process) Config ¶
func (p *Process) Config() plugin.PluginConfig
Config returns the plugin configuration.
func (*Process) Conn ¶
func (p *Process) Conn() *ipc.PluginConn
Conn returns the plugin RPC connection under the mutex. Returns nil if the connection has been closed (e.g. by Stop() or monitor()). Callers must check for nil before use to avoid racing with shutdown.
func (*Process) Deliver ¶
func (p *Process) Deliver(d EventDelivery) bool
Deliver enqueues an event for the long-lived delivery goroutine. Returns true if the event was enqueued, false if the process is stopping. Thread-safe: uses RLock to allow parallel sends from multiple callers.
func (*Process) FormatCacheKey ¶
FormatCacheKey returns the precomputed "format+encoding" string for event dispatch cache lookup. Avoids per-event string concatenation on the hot path.
func (*Process) HasStructuredHandler ¶
HasStructuredHandler reports whether this process supports structured event delivery. True when the process has a DirectBridge with a registered structured handler.
func (*Process) InitConns ¶
InitConns creates PluginConn connections from the raw engine-side connections. If PluginConns already exist (set directly by tests), returns immediately. Must be called exactly once before any reads from the connections.
InitConns creates a MuxPluginConn from the raw connection. If already initialized (conn set by test), returns immediately.
func (*Process) IsCacheConsumer ¶
IsCacheConsumer returns whether this plugin participates in cache consumer tracking. Cache consumers must forward or release each UPDATE they receive.
func (*Process) RegisteredCommands ¶
RegisteredCommands returns a copy of the registered command names.
func (*Process) Registration ¶
func (p *Process) Registration() *plugin.PluginRegistration
Registration returns the plugin registration data (Stage 1).
func (*Process) RemoveRegisteredCommand ¶
RemoveRegisteredCommand removes a command from tracking.
func (*Process) SendShutdown ¶
SendShutdown sends a graceful shutdown signal (bye RPC) to the plugin. Returns true if the process was running. The bye RPC gives the plugin a chance to clean up before Stop() closes connections and kills the process.
func (*Process) SetAcceptor ¶
func (p *Process) SetAcceptor(a *ipc.PluginAcceptor)
SetAcceptor sets the TLS acceptor for external plugin connect-back.
func (*Process) SetCacheConsumer ¶
SetCacheConsumer marks whether this plugin participates in cache consumer tracking.
func (*Process) SetCapabilities ¶
func (p *Process) SetCapabilities(caps *plugin.PluginCapabilities)
SetCapabilities sets the plugin capability declarations (Stage 3).
func (*Process) SetConn ¶
func (p *Process) SetConn(conn *ipc.PluginConn)
SetConn sets the plugin RPC connection. Used by test code.
func (*Process) SetEncoding ¶
SetEncoding sets the high-level encoding (json or text).
func (*Process) SetRegistration ¶
func (p *Process) SetRegistration(reg *plugin.PluginRegistration)
SetRegistration sets the plugin registration data (Stage 1).
func (*Process) SetRunning ¶
SetRunning sets the running state of the process.
func (*Process) SetStage ¶
func (p *Process) SetStage(stage plugin.PluginStage)
SetStage sets the current stage and notifies waiters. If an onStageChange callback is set (by ProcessManager metrics), it is called after the stage is stored and waiters are notified, outside the lock.
func (*Process) SetWireEncoding ¶
func (p *Process) SetWireEncoding(enc plugin.WireEncoding)
SetWireEncoding sets both inbound and outbound wire encoding.
func (*Process) SetWireEncodingIn ¶
func (p *Process) SetWireEncodingIn(enc plugin.WireEncoding)
SetWireEncodingIn sets the inbound wire encoding.
func (*Process) SetWireEncodingOut ¶
func (p *Process) SetWireEncodingOut(enc plugin.WireEncoding)
SetWireEncodingOut sets the outbound wire encoding.
func (*Process) Stage ¶
func (p *Process) Stage() plugin.PluginStage
Stage returns the current plugin startup stage.
func (*Process) StartDelivery ¶
StartDelivery starts only the event delivery goroutine. Used by tests that inject connections via SetConn without starting a real process.
func (*Process) StartWithContext ¶
StartWithContext spawns the process with the given context. For internal plugins (config.Internal=true), runs in-process via goroutine. For external plugins, forks via exec.Command.
func (*Process) Stop ¶
func (p *Process) Stop()
Stop signals the process to terminate. Does not block. For external plugins, canceling context kills the process via exec.CommandContext. For internal plugins, closing RPC connections unblocks the plugin's reads and causes it to exit. Callers MUST call Wait after Stop to ensure all goroutines have exited.
func (*Process) SyncEnabled ¶
SyncEnabled returns true if sync mode is enabled for this process. When enabled, announce/withdraw waits for wire transmission before ACK.
func (*Process) Wait ¶
Wait blocks until all process goroutines have exited, or ctx expires. Must be called after Stop to avoid goroutine leaks.
func (*Process) WaitForStage ¶
WaitForStage waits for the process to reach or pass the given stage. Returns error on context cancellation or timeout.
func (*Process) WireEncodingIn ¶
func (p *Process) WireEncodingIn() plugin.WireEncoding
WireEncodingIn returns the inbound wire encoding (events ze→Process).
func (*Process) WireEncodingOut ¶
func (p *Process) WireEncodingOut() plugin.WireEncoding
WireEncodingOut returns the outbound wire encoding (commands Process→ze).
type ProcessManager ¶
type ProcessManager struct {
// contains filtered or unexported fields
}
ProcessManager manages multiple external processes.
func NewProcessManager ¶
func NewProcessManager(configs []plugin.PluginConfig) *ProcessManager
NewProcessManager creates a new process manager.
func (*ProcessManager) AddProcess ¶
func (pm *ProcessManager) AddProcess(name string, proc *Process)
AddProcess registers a process by name. Used by tests to inject mock processes. Wires metrics callbacks if a metrics registry is configured.
func (*ProcessManager) AllProcesses ¶
func (pm *ProcessManager) AllProcesses() []*Process
AllProcesses returns a snapshot of all processes. Caller may iterate and filter the returned slice without holding the lock.
func (*ProcessManager) GetProcess ¶
func (pm *ProcessManager) GetProcess(name string) *Process
GetProcess returns a process by name, or nil if not found.
func (*ProcessManager) IsDisabled ¶
func (pm *ProcessManager) IsDisabled(name string) bool
IsDisabled returns true if the named process is disabled due to respawn limit.
func (*ProcessManager) IsRunning ¶
func (pm *ProcessManager) IsRunning(name string) bool
IsRunning returns true if the named process is running.
func (*ProcessManager) ProcessCount ¶
func (pm *ProcessManager) ProcessCount() int
ProcessCount returns the number of running processes.
func (*ProcessManager) RemoveProcess ¶
func (pm *ProcessManager) RemoveProcess(name string)
RemoveProcess unregisters a stopped process by name. Used during config reload to clean up auto-stopped plugins.
func (*ProcessManager) Respawn ¶
func (pm *ProcessManager) Respawn(name string) error
Respawn restarts a process, enforcing respawn limits. Returns ErrRespawnLimitExceeded if limit exceeded within window. Returns ErrProcessDisabled if process was previously disabled. Returns ErrProcessNotFound if process name not in configuration. Returns error if ProcessManager was not started (ctx is nil).
func (*ProcessManager) SetAcceptor ¶
func (pm *ProcessManager) SetAcceptor(a *ipc.PluginAcceptor)
SetAcceptor sets the TLS acceptor for external plugin connect-back. Must be called before StartWithContext.
func (*ProcessManager) SetMetricsRegistry ¶
func (pm *ProcessManager) SetMetricsRegistry(reg metrics.Registry)
SetMetricsRegistry creates plugin health metrics from the given registry. Must be called before StartWithContext. Nil registry disables metrics. Idempotent: second call is a no-op.
func (*ProcessManager) Start ¶
func (pm *ProcessManager) Start() error
Start starts all configured processes.
func (*ProcessManager) StartMore ¶
func (pm *ProcessManager) StartMore(configs []plugin.PluginConfig) error
StartMore starts the additional configs under the existing context. Must be called after StartWithContext (the context is captured then). Returns an error if the manager has not been started yet.
func (*ProcessManager) StartWithContext ¶
func (pm *ProcessManager) StartWithContext(ctx context.Context) error
StartWithContext starts all configured processes with the given context. On the first call it captures the context for use by later StartMore calls. Subsequent calls re-spawn pm.configs (legacy single-shot behavior); use StartMore to add additional configs while keeping the originals running.
func (*ProcessManager) Stop ¶
func (pm *ProcessManager) Stop()
Stop stops all processes. Cancels context and closes connections immediately, which unblocks plugin reads on net.Pipe and causes prompt exit. No bye round-trip — closing the connection is the shutdown signal for internal plugins, and context cancellation kills external plugins via exec.CommandContext.