server

package
v0.0.0-...-7fcc808 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 11, 2026 License: AGPL-3.0 Imports: 37 Imported by: 0

Documentation

Overview

Package server manages plugin process connections, the 5-stage startup protocol, and command dispatch.

Index

Constants

View Source
const (
	DefaultCommandTimeout = 30 * time.Second
	CompletionTimeout     = 500 * time.Millisecond
)

Default timeouts for plugin commands.

View Source
const (
	FilterOnErrorReject = "reject"
	FilterOnErrorAccept = "accept"
)

Filter on-error constants.

View Source
const APIVersion = "0.1.0"

APIVersion is the IPC protocol version.

View Source
const MaxPendingPerProcess = 100

MaxPendingPerProcess limits pending requests to prevent memory exhaustion.

Variables

View Source
var (
	ErrSchemaModuleEmpty      = errors.New("schema module name is empty")
	ErrSchemaModuleDuplicate  = errors.New("schema module already registered")
	ErrSchemaHandlerDuplicate = errors.New("schema handler already registered")
	ErrSchemaNotFound         = errors.New("schema not found")
	ErrRPCNotFound            = errors.New("RPC not found")
	ErrRPCDuplicate           = errors.New("RPC wire method already registered")
	ErrNotificationDuplicate  = errors.New("notification wire method already registered")
)

Errors for schema registration.

View Source
var ErrClientConfigNotFound = errors.New("client config not found")

ErrClientConfigNotFound is returned when no config exists for a client name.

View Source
var ErrDuplicateClient = errors.New("duplicate client name")

ErrDuplicateClient is returned when a client with the same name is already connected.

View Source
var ErrEmptyCommand = errors.New("empty command")

ErrEmptyCommand is returned when the command is empty.

View Source
var ErrPluginConnectionClosed = errors.New("plugin connection closed")

ErrPluginConnectionClosed is returned when the plugin's connection is no longer available.

View Source
var ErrPluginProcessNotRunning = errors.New("plugin process not running")

ErrPluginProcessNotRunning is returned when a plugin command targets a non-running process.

View Source
var ErrReloadInProgress = errors.New("config reload already in progress")

ErrReloadInProgress is returned when a config reload is attempted while another is already running. Callers can check this with errors.Is to decide whether to queue the reload (SIGHUP) or reject it (CLI/API).

View Source
var ErrSilent = errors.New("silent")

ErrSilent is returned when a command should produce no response.

View Source
var ErrSubsystemConnectionClosed = errors.New("subsystem connection closed")

ErrSubsystemConnectionClosed is returned when the subsystem's connection is no longer available.

View Source
var ErrSubsystemNotRunning = errors.New("subsystem not running")

ErrSubsystemNotRunning is returned when a command targets a non-running subsystem.

View Source
var ErrUnauthorized = errors.New("unauthorized")

ErrUnauthorized is returned when a command is denied by authorization.

View Source
var ErrUnknownCommand = errors.New("unknown command")

ErrUnknownCommand is returned when a command is not recognized.

Functions

func BuiltinCount

func BuiltinCount() int

BuiltinCount returns the number of registered builtin handlers.

func ExtractConfigSubtree

func ExtractConfigSubtree(configTree map[string]any, path string) any

ExtractConfigSubtree extracts a subtree from the config based on path. Always returns data wrapped in its full path structure from root. Supports:

  • "*" -> entire tree
  • "bgp" -> {"bgp": configTree["bgp"]}
  • "bgp/peer" -> {"bgp": {"peer": configTree["bgp"]["peer"]}}

func GetSchemaNode

func GetSchemaNode(path string) config.Node

GetSchemaNode returns a cached YANG schema node for the given config path.

func GetVersion

func GetVersion() (string, string)

GetVersion returns the current version and build date.

func HandleNodeWith

func HandleNodeWith(
	ctx *CommandContext,
	args []string,
	schemaPath string,
	treeKey string,
	prepare NodePrepare,
	apply NodeApply,
) (*plugin.Response, error)

HandleNodeWith is the generic handler for "set <domain> <node> <selector> with <args>". It parses inline args via the YANG schema at schemaPath, runs the prepare callback for node-specific validation/defaults, then calls apply to commit the change.

func IsReadOnlyPath

func IsReadOnlyPath(path string) bool

IsReadOnlyPath returns true if the command path starts with a read-only verb or a top-level query command not yet migrated under a verb.

func IsStreamingCommand

func IsStreamingCommand(input string) bool

IsStreamingCommand returns true if the input matches any registered streaming prefix.

func LoadBuiltins

func LoadBuiltins(d *Dispatcher, wireToPath, pathToDesc map[string]string)

LoadBuiltins registers all builtin handlers with the dispatcher. The wireToPath map provides the dispatch key for each handler, derived from the YANG command tree (WireMethod -> CLI path). pathToDesc provides YANG descriptions for help text. Handlers without a YANG entry are skipped.

func LookupCommandHelp

func LookupCommandHelp(ctx *CommandContext, name, kind string) (*plugin.Response, error)

LookupCommandHelp looks up a command by name in builtins then plugins. The kind parameter is used in error messages (e.g., "command", "bgp rib command").

func MonitorEventFormatter

func MonitorEventFormatter() func(string) string

MonitorEventFormatter returns the registered event formatter, or nil if none is registered.

func ParseInlineArgsForSchema

func ParseInlineArgsForSchema(schemaPath string, args []string) (map[string]any, *plugin.Response, error)

ParseInlineArgsForSchema parses config-syntax inline args using the YANG schema at the given path. Generic: works for any YANG node (peer, group, etc.). Returns (map, nil, nil) on success. Returns (nil, response, error) on failure.

func PeerSubcommandKeywords

func PeerSubcommandKeywords(wireToPath map[string]string) map[string]bool

PeerSubcommandKeywords returns the set of first words that follow "peer" in CLI command paths. Used by config validation to reject peer names that would collide with subcommand dispatch. The wireToPath map is typically built via yang.WireMethodToPath(loader).

func RegisterDefaultHandlers

func RegisterDefaultHandlers(d *Dispatcher, wireToPath, pathToDesc map[string]string)

RegisterDefaultHandlers registers all builtin handlers with the dispatcher.

func RegisterMonitorEventFormatter

func RegisterMonitorEventFormatter(fn func(string) string)

RegisterMonitorEventFormatter registers the function that formats raw JSON event lines into compact one-liners for monitor streaming output (both CLI and TUI). Called from the monitor plugin's init().

func RegisterRPCs

func RegisterRPCs(rpcs ...RPCRegistration)

RegisterRPCs adds RPCs to the package-level registry. Called from init() in register.go files.

func RegisterStreamingHandler

func RegisterStreamingHandler(prefix string, h StreamingHandler)

RegisterStreamingHandler registers a streaming command handler for a prefix. The prefix is matched case-insensitively against command input. Called from plugin init() functions.

func RequireReactor

func RequireReactor(ctx *CommandContext) (plugin.ReactorLifecycle, *plugin.Response, error)

RequireReactor returns the reactor or an error response if not available.

func SetVersion

func SetVersion(v, d string)

SetVersion sets the application version and build date (called from main).

func StreamEventMonitor

func StreamEventMonitor(ctx context.Context, s *Server, w io.Writer, _ string, args []string) error

StreamEventMonitor is the streaming handler for the "event monitor" command. It parses arguments, creates subscriptions, registers a MonitorClient, and streams events until the context is canceled. Registration: called from internal/component/bgp/plugins/cmd/monitor/monitor.go init() via RegisterStreamingHandler("event monitor", StreamEventMonitor).

func StreamingPrefixes

func StreamingPrefixes() []string

StreamingPrefixes returns the registered streaming command prefixes, sorted.

Types

type Authorizer

type Authorizer interface {
	Authorize(username, command string, isReadOnly bool) authz.Action
}

Authorizer checks whether a user is allowed to execute a command.

type Command

type Command struct {
	Name             string
	Handler          Handler
	Help             string
	ReadOnly         bool // True if command only reads state (safe for "ze show")
	RequiresSelector bool // True if command requires explicit peer selector (not default "*")
}

Command represents a registered command with metadata.

type CommandContext

type CommandContext struct {
	Server   *Server          // Gateway to all server state (reactor, dispatcher, etc.)
	Process  *process.Process // The API process (for session state)
	Peer     string           // Peer selector: "*" for all, or specific IP. Empty = "*"
	Username string           // Authenticated username (empty = no auth, full access)
	Meta     map[string]any   // Route metadata from UpdateRoute RPC; nil if not set.
}

CommandContext provides access to reactor and session state. Dependencies are accessed through Server; per-request state is stored directly.

func (*CommandContext) CommitManager

func (c *CommandContext) CommitManager() any

CommitManager returns the commit manager via Server. Nil-safe: returns nil if Server is nil.

func (*CommandContext) Dispatcher

func (c *CommandContext) Dispatcher() *Dispatcher

Dispatcher returns the command dispatcher via Server. Nil-safe: returns nil if Server is nil.

func (*CommandContext) PeerSelector

func (c *CommandContext) PeerSelector() string

PeerSelector returns the effective neighbor selector. Returns "*" if no neighbor was specified.

func (*CommandContext) ProtocolReactor

func (c *CommandContext) ProtocolReactor(name string) any

ProtocolReactor returns a named protocol reactor from the Coordinator. Callers type-assert to the protocol-specific interface they need. Nil-safe: returns nil if Server is nil or protocol not registered.

func (*CommandContext) Reactor

func (c *CommandContext) Reactor() plugin.ReactorLifecycle

Reactor returns the BGP reactor lifecycle interface via Server. Nil-safe: returns nil if Server is nil.

func (*CommandContext) Subscriptions

func (c *CommandContext) Subscriptions() *SubscriptionManager

Subscriptions returns the subscription manager via Server. Nil-safe: returns nil if Server is nil.

type CommandDef

type CommandDef struct {
	Name        string        // Command name (e.g., "myapp status")
	Description string        // Help text
	Args        string        // Usage hint (e.g., "<component>")
	Completable bool          // Process handles arg completion
	Timeout     time.Duration // Per-command timeout (0 = default 30s)
}

CommandDef describes a command to register. Passed from process to registry during registration.

type CommandRegistry

type CommandRegistry struct {
	// contains filtered or unexported fields
}

CommandRegistry manages plugin commands. Thread-safe for concurrent registration and lookup.

func NewCommandRegistry

func NewCommandRegistry() *CommandRegistry

NewCommandRegistry creates a new command registry.

func (*CommandRegistry) AddBuiltin

func (r *CommandRegistry) AddBuiltin(name string)

AddBuiltin marks a command name as builtin (cannot be shadowed). Called during dispatcher initialization.

func (*CommandRegistry) All

func (r *CommandRegistry) All() []*RegisteredCommand

All returns all registered commands.

func (*CommandRegistry) Complete

func (r *CommandRegistry) Complete(partial string) []Completion

Complete returns commands matching the partial input. Used for CLI command completion.

func (*CommandRegistry) Freeze

func (r *CommandRegistry) Freeze()

Freeze creates an immutable snapshot of the commands map. After Freeze(), Lookup uses atomic.Load instead of RLock. MUST be called after all Register calls complete (after startup barrier). Safe to call multiple times (each call overwrites the previous snapshot).

func (*CommandRegistry) IsBuiltin

func (r *CommandRegistry) IsBuiltin(name string) bool

IsBuiltin returns true if the command name is a builtin.

func (*CommandRegistry) Lookup

func (r *CommandRegistry) Lookup(name string) *RegisteredCommand

Lookup finds a command by exact name (case-insensitive). After Freeze(), uses lock-free atomic.Load on the frozen snapshot.

func (*CommandRegistry) Register

func (r *CommandRegistry) Register(proc *process.Process, defs []CommandDef) []RegisterResult

Register adds commands for a process. Returns results for each command (success or failure reason).

func (*CommandRegistry) Unregister

func (r *CommandRegistry) Unregister(proc *process.Process, names []string)

Unregister removes commands owned by the process. Only the owning process can unregister a command. Unknown commands are silently ignored. If frozen, publishes a new snapshot reflecting the removal.

func (*CommandRegistry) UnregisterAll

func (r *CommandRegistry) UnregisterAll(proc *process.Process)

UnregisterAll removes all commands owned by the process. Called when a process dies. If frozen, publishes a new snapshot reflecting the removal.

type Completion

type Completion struct {
	Value  string `json:"value"`            // The completion text
	Help   string `json:"help,omitempty"`   // Optional description
	Source string `json:"source,omitempty"` // "builtin" or process name (verbose mode)
}

Completion represents a single completion suggestion. Used for both command and argument completion.

type ConfigBlock

type ConfigBlock struct {
	Handler string // Handler path (e.g., "bgp/peer")
	Action  string // create, modify, delete
	Path    string // Full path (e.g., "bgp/peer[address=192.0.2.1]")
	Data    string // JSON data
}

ConfigBlock represents a config command to send to a plugin.

func ParseCommand

func ParseCommand(line string) (*ConfigBlock, error)

ParseCommand parses a namespace command. Format: <namespace> <path> <action> {json}. Or: <namespace> <action> {json} (for namespace-level config). Or: <namespace> commit|rollback|diff.

type ConfigEventGateway

type ConfigEventGateway struct {
	// contains filtered or unexported fields
}

ConfigEventGateway adapts Server to the internal/component/config/transaction.EventGateway interface used by the config transaction orchestrator.

The adapter hides the namespace parameter (always plugin.NamespaceConfig) and converts between the orchestrator's []byte payloads and Server's string event payloads.

Performance note: each emit/dispatch round-trips through []byte -> string -> []byte (one copy in EmitConfigEvent, one copy in the SubscribeConfigEvent handler bridge). This is acceptable for small config transaction payloads (~hundreds of bytes per ack) and trades two small allocations against the simpler string-based deliverEvent path. If this ever becomes a hot path, the right fix is to add a []byte-native variant to deliverEvent rather than complicating the adapter.

func NewConfigEventGateway

func NewConfigEventGateway(s *Server) *ConfigEventGateway

NewConfigEventGateway creates a new adapter wrapping the given Server. The Server must outlive the gateway; the gateway holds a reference but does not manage Server lifecycle.

func (*ConfigEventGateway) EmitConfigEvent

func (g *ConfigEventGateway) EmitConfigEvent(eventType string, payload []byte) (int, error)

EmitConfigEvent publishes a stream event in the config namespace. Returns the number of plugin processes that received the event.

func (*ConfigEventGateway) SubscribeConfigEvent

func (g *ConfigEventGateway) SubscribeConfigEvent(eventType string, handler func(payload []byte)) func()

SubscribeConfigEvent registers a handler for a config namespace event type. The handler is invoked synchronously from deliverEvent. Returns an unsubscribe function; nil handler returns a no-op unsubscribe.

type ConfigLoader

type ConfigLoader func() (map[string]any, error)

ConfigLoader loads a new config tree from disk or other source. Returns the parsed config tree or an error. Set on Server.configLoader before calling ReloadFromDisk.

type ConfigReader

type ConfigReader func(name string) ([]byte, error)

ConfigReader reads a client's config by name from the hub's blob store. Returns the raw config bytes, or ErrClientConfigNotFound if the client has no config entry.

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher routes commands to handlers.

func NewDispatcher

func NewDispatcher() *Dispatcher

NewDispatcher creates a new command dispatcher.

func (*Dispatcher) Commands

func (d *Dispatcher) Commands() []*Command

Commands returns all registered commands.

func (*Dispatcher) Dispatch

func (d *Dispatcher) Dispatch(ctx *CommandContext, input string) (*plugin.Response, error)

Dispatch parses and executes a command. Supports peer selector prefix: "peer <addr|name|*> <command>". If no peer prefix, defaults to all peers ("*"). Priority: 1) builtin commands, 2) forked subsystems, 3) plugin registry.

func (*Dispatcher) ForwardToPlugin

func (d *Dispatcher) ForwardToPlugin(command string, args []string, peerSelector string) (*plugin.Response, error)

ForwardToPlugin routes a command to a plugin process by exact name lookup. Used by proxy handlers that bridge CLI builtins to plugin commands. Returns ErrUnknownCommand if the command is not registered (plugin may not be running).

func (*Dispatcher) HasCommandPrefix

func (d *Dispatcher) HasCommandPrefix(input string) bool

HasCommandPrefix returns true if the input matches any registered command prefix (builtin or plugin). Used by dispatch routing to distinguish top-level commands from peer subcommands that need "peer <selector> " prepended.

func (*Dispatcher) IsAuthorized

func (d *Dispatcher) IsAuthorized(ctx *CommandContext, input string, readOnly bool) bool

IsAuthorized checks if the user is allowed to execute the command. Exported for use by streaming handlers (e.g., monitor) that bypass the normal dispatch path.

func (*Dispatcher) Lookup

func (d *Dispatcher) Lookup(name string) *Command

Lookup finds a command by exact name.

func (*Dispatcher) Pending

func (d *Dispatcher) Pending() *PendingRequests

Pending returns the pending requests tracker.

func (*Dispatcher) Register

func (d *Dispatcher) Register(name string, handler Handler, help string)

Register adds a builtin command handler. Also marks the command as builtin in the registry to prevent shadowing.

func (*Dispatcher) RegisterWithOptions

func (d *Dispatcher) RegisterWithOptions(name string, handler Handler, help string, opts RegisterOptions)

RegisterWithOptions adds a builtin command handler with additional options.

func (*Dispatcher) Registry

func (d *Dispatcher) Registry() *CommandRegistry

Registry returns the plugin command registry.

func (*Dispatcher) SetAuthorizer

func (d *Dispatcher) SetAuthorizer(a Authorizer)

SetAuthorizer sets the authorization checker for the dispatcher. When set, all commands are checked against the authorizer before execution.

func (*Dispatcher) SetSubsystems

func (d *Dispatcher) SetSubsystems(sm *SubsystemManager)

SetSubsystems sets the subsystem manager.

func (*Dispatcher) Subsystems

func (d *Dispatcher) Subsystems() *SubsystemManager

Subsystems returns the subsystem manager.

type EngineEventHandler

type EngineEventHandler func(event string)

EngineEventHandler is invoked when a stream event matches an engine subscription. The event string is the same payload that plugin subscribers receive (typically JSON). Handlers are called synchronously from deliverEvent. Handlers MUST NOT block on external I/O; push to a buffered channel and return if work is needed.

A handler that panics is recovered by the dispatch loop, logged, and the remaining handlers for the same event still fire. The panic does NOT propagate to the emitter (whoever called EmitEngineEvent or any other path that ends in deliverEvent).

type EventMonitorOpts

type EventMonitorOpts struct {
	IncludeTypes []string
	ExcludeTypes []string
	Peer         string
	Direction    string
}

EventMonitorOpts holds parsed arguments for the event monitor command.

func ParseEventMonitorArgs

func ParseEventMonitorArgs(args []string) (*EventMonitorOpts, error)

ParseEventMonitorArgs parses keyword arguments for the event monitor command.

Syntax: [include|exclude <types>] [peer <selector>] [direction received|sent] Keywords may appear in any order. Include and exclude are mutually exclusive.

type Handler

type Handler func(ctx *CommandContext, args []string) (*plugin.Response, error)

Handler processes a command and returns a response.

type Hub

type Hub struct {
	// contains filtered or unexported fields
}

Hub orchestrates plugin communication and command routing.

func NewHub

func NewHub(registry *SchemaRegistry, subsystems *SubsystemManager) *Hub

NewHub creates a new Hub with the given registry and subsystem manager.

func (*Hub) ProcessConfig

func (h *Hub) ProcessConfig(ctx context.Context, blocks []ConfigBlock) error

ProcessConfig processes a configuration transaction. Sends all commands to plugins, then commits each affected namespace.

func (*Hub) RouteCommand

func (h *Hub) RouteCommand(ctx context.Context, block *ConfigBlock) error

RouteCommand routes a command to the appropriate plugin. Format: <namespace> <path> <action> {json}.

func (*Hub) RouteCommit

func (h *Hub) RouteCommit(ctx context.Context, namespace string) error

RouteCommit sends a commit command to a plugin. Format: <namespace> commit.

func (*Hub) RouteRollback

func (h *Hub) RouteRollback(ctx context.Context, namespace string) error

RouteRollback sends a rollback command to a plugin. Format: <namespace> rollback.

type ManagedConfigService

type ManagedConfigService struct {
	// contains filtered or unexported fields
}

ManagedConfigService handles hub-side config-fetch and config-changed operations for managed clients. It reads client configs via a ConfigReader and computes version hashes for change detection. Tracks connected clients and rejects duplicate names. Safe for concurrent use.

func NewManagedConfigService

func NewManagedConfigService(reader ConfigReader) *ManagedConfigService

NewManagedConfigService creates a service that reads client configs via reader.

func (*ManagedConfigService) BuildConfigChanged

func (s *ManagedConfigService) BuildConfigChanged(clientName string) (fleet.ConfigChanged, error)

BuildConfigChanged creates a config-changed notification for a client. Reads the client's current config and computes its version hash.

func (*ManagedConfigService) HandleConfigFetch

func (s *ManagedConfigService) HandleConfigFetch(clientName string, req fleet.ConfigFetchRequest) (fleet.ConfigFetchResponse, error)

HandleConfigFetch processes a config-fetch request from a managed client. If the client's version matches the current config, returns status "current". Otherwise returns the full config as base64 with the new version hash.

func (*ManagedConfigService) RegisterClient

func (s *ManagedConfigService) RegisterClient(name string) error

RegisterClient marks a client as connected. Returns ErrDuplicateClient if a client with the same name is already connected. Caller MUST call UnregisterClient when the client disconnects.

func (*ManagedConfigService) UnregisterClient

func (s *ManagedConfigService) UnregisterClient(name string)

UnregisterClient removes a client from the connected set.

type MonitorClient

type MonitorClient struct {
	EventChan chan string     // Buffered channel for formatted events.
	Ctx       context.Context // Client-scoped context for cancellation.
	Dropped   atomic.Uint64   // Count of events dropped due to full channel.
	// contains filtered or unexported fields
}

MonitorClient represents an active monitor session.

func NewMonitorClient

func NewMonitorClient(ctx context.Context, id string, subs []*Subscription, bufSize int) *MonitorClient

NewMonitorClient creates a monitor client with the given subscriptions and buffer size. Caller MUST call MonitorManager.Remove(id) when done to release resources.

type MonitorManager

type MonitorManager struct {
	// contains filtered or unexported fields
}

MonitorManager manages active monitor clients. Parallel to SubscriptionManager (which manages plugin process subscriptions).

func NewMonitorManager

func NewMonitorManager() *MonitorManager

NewMonitorManager creates a new monitor manager.

func (*MonitorManager) Add

func (mm *MonitorManager) Add(mc *MonitorClient)

Add registers a monitor client.

func (*MonitorManager) Count

func (mm *MonitorManager) Count() int

Count returns the number of active monitors.

func (*MonitorManager) Deliver

func (mm *MonitorManager) Deliver(namespace, eventType, direction, peerAddr, peerName, output string)

Deliver sends a formatted event to all matching monitors. Uses non-blocking send: if a monitor's channel is full, the event is dropped and the dropped counter is incremented (backpressure). peerName is the configured peer name (may be empty).

func (*MonitorManager) GetMatching

func (mm *MonitorManager) GetMatching(namespace, eventType, direction, peerAddr, peerName string) []*MonitorClient

GetMatching returns monitors with subscriptions matching the event. A monitor matches if any of its subscriptions match. peerName is the configured peer name (may be empty).

func (*MonitorManager) Remove

func (mm *MonitorManager) Remove(id string)

Remove unregisters a monitor client by ID.

type NodeApply

type NodeApply func(selector string, nodeTree map[string]any) error

NodeApply applies the validated config tree to the reactor. Called after prepare succeeds. Each node type provides its own apply (e.g., peers call AddDynamicPeer, groups would call AddDynamicGroup).

type NodePrepare

type NodePrepare func(selector string, nodeTree map[string]any) (*plugin.Response, error)

NodePrepare is called after YANG parsing to apply node-specific validation and defaults. selector is the dispatcher-extracted selector (IP, name, etc.); nodeTree is the parsed config map. MUST return (nil, nil) on success or (response, error) on failure.

type PeerFilter

type PeerFilter struct {
	Selector string // "*", "10.0.0.1", "!10.0.0.1", "my-peer", "!my-peer"
}

PeerFilter specifies which peers to filter.

func (*PeerFilter) Matches

func (pf *PeerFilter) Matches(peerAddr, peerName string) bool

Matches returns true if the peer matches this filter. Matches against both the peer address (IP) and peer name.

type PendingRequest

type PendingRequest struct {
	Serial   string                // Alpha serial (a, b, bcd, ...)
	Command  string                // Matched command name
	Process  *process.Process      // Target process
	Timeout  time.Duration         // Timeout for response
	RespChan chan *plugin.Response // Channel to deliver response
	// contains filtered or unexported fields
}

PendingRequest represents an in-flight plugin command request.

type PendingRequests

type PendingRequests struct {
	// contains filtered or unexported fields
}

PendingRequests tracks in-flight plugin command requests. Thread-safe for concurrent access.

func NewPendingRequests

func NewPendingRequests() *PendingRequests

NewPendingRequests creates a new pending requests tracker.

func (*PendingRequests) Add

func (p *PendingRequests) Add(req *PendingRequest) string

Add registers a new pending request and starts the timeout timer. Returns the assigned alpha serial, or empty string if limit exceeded. If limit exceeded, sends error response to RespChan.

func (*PendingRequests) CancelAll

func (p *PendingRequests) CancelAll(proc *process.Process)

CancelAll cancels all pending requests for a process (process death). Sends error response to all waiting clients.

func (*PendingRequests) Complete

func (p *PendingRequests) Complete(serial string, resp *plugin.Response) bool

Complete delivers a final response and removes the request. Returns true if the serial was found (response delivered).

func (*PendingRequests) Count

func (p *PendingRequests) Count(proc *process.Process) int

Count returns the number of pending requests for a process.

func (*PendingRequests) Partial

func (p *PendingRequests) Partial(serial string, resp *plugin.Response) bool

Partial delivers a partial response (streaming) and resets the timeout. Returns true if the serial was found.

func (*PendingRequests) Total

func (p *PendingRequests) Total() int

Total returns the total number of pending requests.

type RPCParams

type RPCParams struct {
	Selector string   `json:"selector,omitempty"` // Peer selector (optional)
	Args     []string `json:"args,omitempty"`     // Command arguments (optional)
}

RPCParams is the standard params format for JSON RPC requests from socket clients. Handlers receive Args as positional arguments and Selector as the peer filter. Identity (Username) is never accepted from client JSON -- it MUST be injected by the transport layer (SSH session, plugin process manager, TLS auth).

type RPCRegistration

type RPCRegistration struct {
	WireMethod       string  // "module:rpc-name" format (e.g., "ze-bgp:peer-list")
	Handler          Handler // Handler function
	RequiresSelector bool    // True if peer commands must have explicit selector (not default "*")
	PluginCommand    string  // If set, this builtin proxies to a runtime plugin command (e.g., "bgp rib show")
}

RPCRegistration maps a YANG RPC wire method to its handler function. The CLI command name is derived from the YANG command tree (-cmd.yang modules) via yang.WireMethodToPath(). It is not stored in the registration. Help text comes from YANG descriptions. Read-only classification comes from the verb position in the command tree (show/validate/monitor = read-only).

func AllBuiltinRPCs

func AllBuiltinRPCs() []RPCRegistration

AllBuiltinRPCs returns all RPCs registered via init() + RegisterRPCs(). Includes server, handler, and editor RPCs (when their packages are imported).

type RegisterOptions

type RegisterOptions struct {
	ReadOnly         bool // True if command only reads state
	RequiresSelector bool // True if "bgp peer <command>" must have an explicit peer selector
	PluginProxy      bool // True if this builtin proxies to a plugin command (allows plugin to register same name)
}

RegisterOptions holds optional settings for command registration.

type RegisterResult

type RegisterResult struct {
	Name  string // Command that was registered
	OK    bool   // True if registration succeeded
	Error string // Error message if failed
}

RegisterResult holds the result of a single command registration.

type RegisteredCommand

type RegisteredCommand struct {
	Name         string
	LowerName    string // Pre-lowercased at registration for dispatch matching (zero alloc per lookup)
	Description  string
	Args         string           // Usage hint (e.g., "<component>")
	Completable  bool             // Process handles arg completion
	Timeout      time.Duration    // Per-command timeout
	Process      *process.Process // Owning process
	RegisteredAt time.Time
}

RegisteredCommand represents a plugin command in the registry.

type RegisteredNotification

type RegisteredNotification struct {
	Module      string          // YANG module name
	Name        string          // Notification name in kebab-case
	WireMethod  string          // Wire format "module:notification-name"
	Description string          // From YANG description
	Leaves      []yang.LeafMeta // Notification data leaves
}

RegisteredNotification represents a notification indexed in the schema registry.

type RegisteredRPC

type RegisteredRPC struct {
	Module      string          // YANG module name (e.g., "ze-bgp-api")
	Name        string          // RPC name in kebab-case (e.g., "peer-list")
	WireMethod  string          // Wire format "module:rpc-name" (e.g., "ze-bgp:peer-list")
	CLICommand  string          // CLI text command (e.g., "bgp peer list")
	Description string          // From YANG description
	Input       []yang.LeafMeta // Input parameter leaves
	Output      []yang.LeafMeta // Output parameter leaves
	Handler     Handler         // Handler function (set during registration)
}

RegisteredRPC represents an RPC indexed in the schema registry.

type Schema

type Schema struct {
	Module      string   // YANG module name
	Namespace   string   // YANG namespace URI
	Yang        string   // Full YANG module text
	Imports     []string // Imported module names (from YANG import statements)
	Handlers    []string // Handler paths (e.g., "bgp", "bgp/peer")
	Plugin      string   // Plugin that registered this schema
	Priority    int      // Config ordering (lower = processed first, default 1000)
	WantsConfig []string // Config roots plugin wants (from "declare wants config <root>")
}

Schema represents a YANG schema registered by a plugin.

type SchemaRegistry

type SchemaRegistry struct {
	// contains filtered or unexported fields
}

SchemaRegistry stores and manages schemas from all plugins.

func NewSchemaRegistry

func NewSchemaRegistry() *SchemaRegistry

NewSchemaRegistry creates a new schema registry.

func (*SchemaRegistry) Count

func (r *SchemaRegistry) Count() int

Count returns the number of registered schemas.

func (*SchemaRegistry) FindHandler

func (r *SchemaRegistry) FindHandler(path string) (*Schema, string)

FindHandler returns the schema for a handler path using longest prefix match. For example, if "bgp" and "bgp/peer" are registered, FindHandler("bgp/peer/timers") returns the schema for "bgp/peer". Predicates like [address=192.0.2.1] are stripped before matching. After Freeze(), uses lock-free atomic.Load on the frozen snapshot.

func (*SchemaRegistry) FindRPC

func (r *SchemaRegistry) FindRPC(wireMethod string) (*RegisteredRPC, error)

FindRPC returns the registered RPC for an exact wire method match.

func (*SchemaRegistry) FindRPCByCommand

func (r *SchemaRegistry) FindRPCByCommand(cliCommand string) (*RegisteredRPC, error)

FindRPCByCommand returns the registered RPC for a CLI text command.

func (*SchemaRegistry) Freeze

func (r *SchemaRegistry) Freeze()

Freeze creates an immutable snapshot of the handler and module maps. After Freeze(), FindHandler uses atomic.Load instead of RLock. MUST be called after all Register calls complete (after startup barrier). Safe to call multiple times (each call overwrites the previous snapshot).

func (*SchemaRegistry) GetByHandler

func (r *SchemaRegistry) GetByHandler(path string) (*Schema, error)

GetByHandler returns a schema by exact handler path.

func (*SchemaRegistry) GetByModule

func (r *SchemaRegistry) GetByModule(name string) (*Schema, error)

GetByModule returns a schema by module name.

func (*SchemaRegistry) ListHandlers

func (r *SchemaRegistry) ListHandlers() map[string]string

ListHandlers returns all registered handler paths with their modules.

func (*SchemaRegistry) ListModules

func (r *SchemaRegistry) ListModules() []string

ListModules returns all registered module names.

func (*SchemaRegistry) ListNotifications

func (r *SchemaRegistry) ListNotifications(module string) []*RegisteredNotification

ListNotifications returns all registered notifications, optionally filtered by YANG module name. Pass empty string to list all notifications.

func (*SchemaRegistry) ListRPCs

func (r *SchemaRegistry) ListRPCs(module string) []*RegisteredRPC

ListRPCs returns all registered RPCs, optionally filtered by YANG module name. Pass empty string to list all RPCs.

func (*SchemaRegistry) Register

func (r *SchemaRegistry) Register(schema *Schema) error

Register adds a schema to the registry. Returns error if module name or handler paths conflict with existing registrations.

func (*SchemaRegistry) RegisterCLICommand

func (r *SchemaRegistry) RegisterCLICommand(cliCommand, wireMethod string) error

RegisterCLICommand associates a CLI text command with a wire method. The wire method must already be registered via RegisterRPCs.

func (*SchemaRegistry) RegisterNotifications

func (r *SchemaRegistry) RegisterNotifications(module string, notifs []yang.NotificationMeta) error

RegisterNotifications indexes notifications extracted from a YANG module.

func (*SchemaRegistry) RegisterRPCs

func (r *SchemaRegistry) RegisterRPCs(module string, rpcs []yang.RPCMeta) error

RegisterRPCs indexes RPCs extracted from a YANG module. Wire methods use the stripped module prefix (e.g., "ze-bgp-api" → "ze-bgp:peer-list").

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server manages API connections and command dispatch.

func NewServer

func NewServer(config *ServerConfig, reactor plugin.ReactorLifecycle) (*Server, error)

NewServer creates a new API server.

func (*Server) AllPluginCapabilities

func (s *Server) AllPluginCapabilities() []plugin.InjectedCapability

AllPluginCapabilities returns all stored capabilities (global + all per-peer). Used by the restart handler to compute max restart-time for the GR marker.

func (*Server) CallFilterUpdate

func (s *Server) CallFilterUpdate(ctx context.Context, pluginName string, input *rpc.FilterUpdateInput) (*rpc.FilterUpdateOutput, error)

CallFilterUpdate sends a filter-update RPC to a named plugin and returns the response. Returns an error if the plugin is not found, not connected, or the RPC fails.

func (*Server) CommitManager

func (s *Server) CommitManager() any

CommitManager returns the commit manager.

func (*Server) ConfigPath

func (s *Server) ConfigPath() string

ConfigPath returns the path to the config file. Empty if not set.

func (*Server) Context

func (s *Server) Context() context.Context

Context returns the server's context. Used by RPC handlers that need a cancellable context tied to the server's lifetime (e.g., coordinator reload).

func (*Server) DecodeNLRI

func (s *Server) DecodeNLRI(family, hexData string) (string, error)

DecodeNLRI decodes NLRI by routing to the appropriate family plugin via RPC. Returns the JSON representation of the decoded NLRI. Returns error if no plugin registered or plugin not running.

func (*Server) Dispatcher

func (s *Server) Dispatcher() *Dispatcher

Dispatcher returns the command dispatcher.

func (*Server) DrainSIGHUP

func (s *Server) DrainSIGHUP() bool

DrainSIGHUP returns true if a SIGHUP was queued and clears the flag.

func (*Server) Emit

func (s *Server) Emit(namespace, eventType, payload string) (int, error)

Emit satisfies the pkg/ze.EventBus interface. It is a thin alias for EmitEngineEvent so engine components can depend on the public ze.EventBus type without importing this package directly.

func (*Server) EmitEngineEvent

func (s *Server) EmitEngineEvent(namespace, eventType, event string) (int, error)

EmitEngineEvent publishes an event from the engine to the stream system. Both engine subscribers and plugin process subscribers receive it. Returns the number of plugin processes that received the event (engine handler count is intentionally not reported because engine subscribers are in-process and always receive synchronously when matching).

The event must use a registered (namespace, eventType) per plugin.IsValidEvent; unknown pairs return an error and deliver to nobody (neither engine handlers nor plugin subscribers).

func (*Server) EncodeNLRI

func (s *Server) EncodeNLRI(family string, args []string) ([]byte, error)

EncodeNLRI encodes NLRI by routing to the appropriate family plugin via RPC. Returns error if no plugin registered or plugin not running.

func (*Server) FilterInfo

func (s *Server) FilterInfo(pluginName, filterName string) (declaredAttrs []string, raw bool)

FilterInfo returns declaration info for a named filter: declared attributes and raw flag. Returns nil attributes and false if the plugin or filter is not found.

func (*Server) FilterOnError

func (s *Server) FilterOnError(pluginName, filterName string) string

FilterOnError returns the declared on-error mode for a named filter. Returns FilterOnErrorReject (fail-closed) if the plugin or filter is not found.

func (*Server) GetDecodeFamilies

func (s *Server) GetDecodeFamilies() []string

GetDecodeFamilies returns all families that have decode plugins registered. Used by Session to auto-add Multiprotocol capabilities in OPEN. Plugins that can decode a family should advertise that family to peers.

func (*Server) GetPluginCapabilitiesForPeer

func (s *Server) GetPluginCapabilitiesForPeer(peerAddr string) []plugin.InjectedCapability

GetPluginCapabilitiesForPeer returns plugin-declared capabilities for a specific peer. Returns global capabilities plus any peer-specific capabilities (per-peer takes precedence).

func (*Server) GetSchemaDeclarations

func (s *Server) GetSchemaDeclarations() []plugin.SchemaDeclaration

GetSchemaDeclarations returns all schema declarations from registered plugins. Used for two-phase config parsing to extend the schema before parsing peer config. Should be called after Stage 1 (Registration) completes for all plugins.

func (*Server) HandleAdHocPluginSession

func (s *Server) HandleAdHocPluginSession(reader io.ReadCloser, writer io.WriteCloser) error

HandleAdHocPluginSession runs the 5-stage plugin handshake and runtime command loop over an arbitrary bidirectional stream (e.g., an SSH channel). The session uses coordinator == nil, so all stage barriers are skipped. Blocks until the connection closes or the server shuts down. Caller MUST close reader and writer after this returns.

func (*Server) HasConfigLoader

func (s *Server) HasConfigLoader() bool

HasConfigLoader reports whether a config loader has been set. Used by SIGHUP handler to decide between coordinator path and direct reload.

func (*Server) HasProcesses

func (s *Server) HasProcesses() bool

HasProcesses returns true if any plugin processes were loaded during startup. Used by the main loop to decide whether to listen for server-done (all processes exited). Without this, configs with no plugins cause immediate daemon exit.

func (*Server) Monitors

func (s *Server) Monitors() *MonitorManager

Monitors returns the monitor manager for CLI monitor sessions.

func (*Server) ProcessManager

func (s *Server) ProcessManager() *process.ProcessManager

ProcessManager returns the process manager. Used by BGP hook implementations to iterate plugin processes.

func (*Server) QueueSIGHUP

func (s *Server) QueueSIGHUP()

QueueSIGHUP queues a SIGHUP for later processing if a transaction is active.

func (*Server) Reactor

func (s *Server) Reactor() plugin.ReactorLifecycle

func (*Server) ReactorAny

func (s *Server) ReactorAny() any

ReactorAny returns the reactor as any, satisfying registry.PluginServerAccessor.

func (*Server) ReactorFor

func (s *Server) ReactorFor(name string) any

ReactorFor returns a named protocol reactor from the Coordinator, or nil. This allows plugins to access non-BGP reactors (e.g., OSPF, IS-IS) by name.

func (*Server) ReloadConfig

func (s *Server) ReloadConfig(ctx context.Context, newTree map[string]any) error

ReloadConfig orchestrates config reload across all config-interested plugins. Follows verify→apply protocol: all plugins must verify before any apply. Returns nil if there are no changes, or if verify→apply succeeds. Returns error if verify fails for any plugin, or if a reload is already in progress.

func (*Server) ReloadFromDisk

func (s *Server) ReloadFromDisk(ctx context.Context) error

ReloadFromDisk loads the config from the configured loader and triggers reload. Returns error if the loader is not set, parsing fails, or reload fails.

func (*Server) Running

func (s *Server) Running() bool

Running returns true if the server is running.

func (*Server) SetCommitManager

func (s *Server) SetCommitManager(cm any)

SetCommitManager sets the commit manager. Called by the BGP plugin during configuration to inject a CommitManager created with BGP-specific types. MUST be called before any RPC dispatch (i.e., during init-time registration). NOT safe for concurrent use with CommitManager().

func (*Server) SetConfigLoader

func (s *Server) SetConfigLoader(loader ConfigLoader)

SetConfigLoader sets the function used by ReloadFromDisk to load the config tree.

func (*Server) SetProcessSpawner

func (s *Server) SetProcessSpawner(sp plugin.ProcessSpawner)

SetProcessSpawner sets the PluginManager as the process spawner. When set, runPluginPhase delegates process creation to the spawner instead of creating ProcessManager directly. Must be called before Start. If the spawner supports SetMetricsRegistry (e.g., PluginManager), the server's metrics registry is forwarded for plugin health metrics.

func (*Server) Start

func (s *Server) Start() error

Start begins accepting connections.

func (*Server) StartWithContext

func (s *Server) StartWithContext(ctx context.Context) error

StartWithContext begins accepting connections with the given context. External access is via SSH; the plugin server handles only in-process dispatch.

func (*Server) Stop

func (s *Server) Stop()

Stop signals the server to stop and cleans up resources.

func (*Server) Subscribe

func (s *Server) Subscribe(namespace, eventType string, handler func(payload string)) func()

Subscribe satisfies the pkg/ze.EventBus interface. It is a thin alias for SubscribeEngineEvent that adapts the handler signature from EngineEventHandler (a named type) to a plain func, which is what ze.EventBus declares.

func (*Server) SubscribeEngineEvent

func (s *Server) SubscribeEngineEvent(namespace, eventType string, handler EngineEventHandler) func()

SubscribeEngineEvent registers an engine-side handler for stream events matching the given namespace and event type. The returned function unregisters the handler when called; safe to call multiple times.

Handlers fire synchronously from deliverEvent. They must not block on external I/O. The handler receives the same event string that plugin process subscribers would receive.

Engine subscriptions are parallel to plugin process subscriptions managed by SubscriptionManager. Both fire on the same deliverEvent call.

Subscriptions are NOT validated against the event registry: subscribing to an unknown (namespace, eventType) pair, or to a per-plugin event type that is not yet registered, succeeds silently. Such a subscription is dead until the matching emit arrives. This avoids races with per-plugin event types like "verify-bgp" that are registered dynamically when the plugin starts.

Engine subscribers receive ALL events for the given (namespace, eventType) regardless of direction or peer address. Plugin process subscribers can filter on direction and peer; engine subscribers cannot. This is intended for engine-internal coordination (e.g. config transactions) where direction has no meaning.

A nil handler is rejected: the call returns a no-op unsubscribe function without registering anything. This catches programmer errors loudly via "the handler I just registered never fires" rather than via a nil-pointer panic at first dispatch.

func (*Server) Subscriptions

func (s *Server) Subscriptions() *SubscriptionManager

Subscriptions returns the subscription manager.

func (*Server) TxLocked

func (s *Server) TxLocked() bool

TxLocked reports whether a config transaction is in progress.

func (*Server) UpdateProtocolConfig

func (s *Server) UpdateProtocolConfig(families, customEvents, customSendTypes []string)

UpdateProtocolConfig sets protocol-specific auto-load configuration after the reactor has parsed settings. Called by the protocol plugin's RunEngine after creating the reactor, so that family/event/send auto-load phases have the data.

func (*Server) Wait

func (s *Server) Wait(ctx context.Context) error

Wait waits for the server to stop.

func (*Server) WaitForStartupComplete

func (s *Server) WaitForStartupComplete(ctx context.Context) error

WaitForStartupComplete blocks until all plugin startup phases are done. Returns a non-nil error if a config-path plugin failed during startup (e.g., invalid BGP config) or if the context deadline is exceeded.

type ServerConfig

type ServerConfig struct {
	ConfigPath                string                // Path to config file (for peer save)
	Plugins                   []plugin.PluginConfig // External plugins to spawn
	ConfiguredFamilies        []string              // Families configured on peers (for deferred auto-load)
	ConfiguredCustomEvents    []string              // Custom event types in peer receive config (for auto-load)
	ConfiguredCustomSendTypes []string              // Custom send types in peer send config (for auto-load)
	ConfiguredPaths           []string              // Top-level config sections present (for config-driven auto-load)
	Hub                       *plugin.HubConfig     // TLS transport config (nil = no TLS listener)
	MetricsRegistry           metrics.Registry      // Prometheus metrics registry (nil = metrics disabled)
}

ServerConfig holds API server configuration.

type StreamingHandler

type StreamingHandler func(ctx context.Context, s *Server, w io.Writer, username string, args []string) error

StreamingHandler handles streaming commands (e.g., monitor). ctx is the session context, s is the plugin server, w is the output writer, username is the authenticated SSH user (for authorization), args are command arguments.

func GetStreamingHandlerForCommand

func GetStreamingHandlerForCommand(input string) (StreamingHandler, []string)

GetStreamingHandlerForCommand returns the handler and extracted args for a command. Matches the longest registered prefix. Returns (nil, nil) if no prefix matches.

type Subscription

type Subscription struct {
	Namespace    string      // "bgp" or "bgp-rib"
	EventType    string      // "update", "state", etc.
	Direction    string      // "received", "sent", "both" (empty = both)
	PeerFilter   *PeerFilter // nil = all peers
	PluginFilter string      // plugin name filter (empty = all)
}

Subscription represents an event subscription.

func BuildEventMonitorSubscriptions

func BuildEventMonitorSubscriptions(opts *EventMonitorOpts) []*Subscription

BuildEventMonitorSubscriptions creates subscriptions from parsed options. With no include/exclude filter, subscribes to all event types across all namespaces. With include, subscribes only to those types (in whichever namespaces they exist). With exclude, subscribes to all types except those listed.

func ParseSubscription

func ParseSubscription(args []string) (*Subscription, error)

ParseSubscription parses a subscribe/unsubscribe command. Format: [peer <sel> | plugin <name>] <namespace> event <type> [direction received|sent|both].

func (*Subscription) Equals

func (s *Subscription) Equals(other *Subscription) bool

Equals returns true if two subscriptions are identical.

func (*Subscription) Matches

func (s *Subscription) Matches(namespace, eventType, direction, peerAddr, peerName string) bool

Matches returns true if this subscription matches the event. peerAddr is the peer's IP address; peerName is the configured peer name (may be empty).

type SubscriptionManager

type SubscriptionManager struct {
	// contains filtered or unexported fields
}

SubscriptionManager tracks subscriptions per process.

func NewSubscriptionManager

func NewSubscriptionManager() *SubscriptionManager

NewSubscriptionManager creates a new subscription manager.

func (*SubscriptionManager) Add

func (sm *SubscriptionManager) Add(proc *process.Process, sub *Subscription)

Add adds a subscription for a process.

func (*SubscriptionManager) ClearProcess

func (sm *SubscriptionManager) ClearProcess(proc *process.Process)

ClearProcess removes all subscriptions for a process.

func (*SubscriptionManager) Count

func (sm *SubscriptionManager) Count(proc *process.Process) int

Count returns the number of subscriptions for a process.

func (*SubscriptionManager) GetMatching

func (sm *SubscriptionManager) GetMatching(namespace, eventType, direction, peerAddr, peerName string) []*process.Process

GetMatching returns all processes with subscriptions matching the event. peerName is the configured peer name (may be empty for non-BGP events or emit-event RPCs).

func (*SubscriptionManager) GetSubscriptions

func (sm *SubscriptionManager) GetSubscriptions(proc *process.Process) []*Subscription

GetSubscriptions returns all subscriptions for a process.

func (*SubscriptionManager) Remove

func (sm *SubscriptionManager) Remove(proc *process.Process, sub *Subscription) bool

Remove removes a subscription for a process. Returns true if the subscription was found and removed.

type SubsystemConfig

type SubsystemConfig struct {
	Name       string   // Subsystem name (cache, route, session)
	Binary     string   // Path to binary or full command
	Commands   []string // Commands this subsystem handles (for pre-registration)
	ConfigPath string   // Config file path (passed to child process)
}

SubsystemConfig describes a forked subsystem process.

type SubsystemHandler

type SubsystemHandler struct {
	// contains filtered or unexported fields
}

SubsystemHandler wraps a forked process that handles a subset of commands. It spawns the subprocess, completes the 5-stage protocol, and routes commands to it via pipes.

func NewSubsystemHandler

func NewSubsystemHandler(config SubsystemConfig) *SubsystemHandler

NewSubsystemHandler creates a handler backed by a forked process.

func (*SubsystemHandler) Commands

func (h *SubsystemHandler) Commands() []string

Commands returns the commands this subsystem handles.

func (*SubsystemHandler) Handle

func (h *SubsystemHandler) Handle(ctx context.Context, command string) (*plugin.Response, error)

Handle sends a command to the subsystem via RPC and returns the response.

func (*SubsystemHandler) Name

func (h *SubsystemHandler) Name() string

Name returns the subsystem name.

func (*SubsystemHandler) Running

func (h *SubsystemHandler) Running() bool

Running returns true if the subsystem process is running.

func (*SubsystemHandler) Schema

Schema returns the YANG schema declared by this subsystem, or nil if none.

func (*SubsystemHandler) Signal

func (h *SubsystemHandler) Signal(sig os.Signal) error

Signal sends an OS signal to the subsystem's external process. Returns an error if the process is not running or is internal (goroutine).

func (*SubsystemHandler) Start

func (h *SubsystemHandler) Start(ctx context.Context) error

Start spawns the subsystem process and completes the 5-stage protocol.

func (*SubsystemHandler) Stop

func (h *SubsystemHandler) Stop()

Stop terminates the subsystem process.

type SubsystemManager

type SubsystemManager struct {
	// contains filtered or unexported fields
}

SubsystemManager manages multiple subsystem handlers.

func NewSubsystemManager

func NewSubsystemManager() *SubsystemManager

NewSubsystemManager creates a new subsystem manager.

func (*SubsystemManager) AllCommands

func (m *SubsystemManager) AllCommands() []string

AllCommands returns all commands from all subsystems.

func (*SubsystemManager) AllSchemas

func (m *SubsystemManager) AllSchemas() []*Schema

AllSchemas returns all schemas from all subsystems.

func (*SubsystemManager) FindHandler

func (m *SubsystemManager) FindHandler(command string) *SubsystemHandler

FindHandler returns the handler for a given command, or nil if not found. After Freeze(), uses lock-free atomic.Load on the frozen snapshot.

func (*SubsystemManager) Freeze

func (m *SubsystemManager) Freeze()

Freeze creates an immutable snapshot of the handler map. After Freeze(), Get and FindHandler use atomic.Load instead of RLock. MUST be called after all Register calls complete (after startup barrier). Safe to call multiple times (each call overwrites the previous snapshot).

func (*SubsystemManager) Get

Get returns a subsystem handler by name. After Freeze(), uses lock-free atomic.Load on the frozen snapshot.

func (*SubsystemManager) Names

func (m *SubsystemManager) Names() []string

Names returns the names of all registered subsystems.

func (*SubsystemManager) Register

func (m *SubsystemManager) Register(config SubsystemConfig)

Register adds a subsystem configuration.

func (*SubsystemManager) RegisterSchemas

func (m *SubsystemManager) RegisterSchemas(registry *SchemaRegistry) error

RegisterSchemas registers all subsystem schemas with the given registry.

func (*SubsystemManager) StartAll

func (m *SubsystemManager) StartAll(ctx context.Context) error

StartAll starts all registered subsystems.

func (*SubsystemManager) StopAll

func (m *SubsystemManager) StopAll()

StopAll stops all subsystems.

func (*SubsystemManager) Unregister

func (m *SubsystemManager) Unregister(name string)

Unregister stops and removes a subsystem by name. No-op if the name is not registered. If frozen, publishes a new snapshot reflecting the removal.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL