Documentation
¶
Overview ¶
Package server manages plugin process connections, the 5-stage startup protocol, and command dispatch.
Index ¶
- Constants
- Variables
- func BuiltinCount() int
- func ExtractConfigSubtree(configTree map[string]any, path string) any
- func GetSchemaNode(path string) config.Node
- func GetVersion() (string, string)
- func HandleNodeWith(ctx *CommandContext, args []string, schemaPath string, treeKey string, ...) (*plugin.Response, error)
- func IsReadOnlyPath(path string) bool
- func IsStreamingCommand(input string) bool
- func LoadBuiltins(d *Dispatcher, wireToPath, pathToDesc map[string]string)
- func LookupCommandHelp(ctx *CommandContext, name, kind string) (*plugin.Response, error)
- func MonitorEventFormatter() func(string) string
- func ParseInlineArgsForSchema(schemaPath string, args []string) (map[string]any, *plugin.Response, error)
- func PeerSubcommandKeywords(wireToPath map[string]string) map[string]bool
- func RegisterDefaultHandlers(d *Dispatcher, wireToPath, pathToDesc map[string]string)
- func RegisterMonitorEventFormatter(fn func(string) string)
- func RegisterRPCs(rpcs ...RPCRegistration)
- func RegisterStreamingHandler(prefix string, h StreamingHandler)
- func RequireReactor(ctx *CommandContext) (plugin.ReactorLifecycle, *plugin.Response, error)
- func SetVersion(v, d string)
- func StreamEventMonitor(ctx context.Context, s *Server, w io.Writer, _ string, args []string) error
- func StreamingPrefixes() []string
- type Authorizer
- type Command
- type CommandContext
- func (c *CommandContext) CommitManager() any
- func (c *CommandContext) Dispatcher() *Dispatcher
- func (c *CommandContext) PeerSelector() string
- func (c *CommandContext) ProtocolReactor(name string) any
- func (c *CommandContext) Reactor() plugin.ReactorLifecycle
- func (c *CommandContext) Subscriptions() *SubscriptionManager
- type CommandDef
- type CommandRegistry
- func (r *CommandRegistry) AddBuiltin(name string)
- func (r *CommandRegistry) All() []*RegisteredCommand
- func (r *CommandRegistry) Complete(partial string) []Completion
- func (r *CommandRegistry) Freeze()
- func (r *CommandRegistry) IsBuiltin(name string) bool
- func (r *CommandRegistry) Lookup(name string) *RegisteredCommand
- func (r *CommandRegistry) Register(proc *process.Process, defs []CommandDef) []RegisterResult
- func (r *CommandRegistry) Unregister(proc *process.Process, names []string)
- func (r *CommandRegistry) UnregisterAll(proc *process.Process)
- type Completion
- type ConfigBlock
- type ConfigEventGateway
- type ConfigLoader
- type ConfigReader
- type Dispatcher
- func (d *Dispatcher) Commands() []*Command
- func (d *Dispatcher) Dispatch(ctx *CommandContext, input string) (*plugin.Response, error)
- func (d *Dispatcher) ForwardToPlugin(command string, args []string, peerSelector string) (*plugin.Response, error)
- func (d *Dispatcher) HasCommandPrefix(input string) bool
- func (d *Dispatcher) IsAuthorized(ctx *CommandContext, input string, readOnly bool) bool
- func (d *Dispatcher) Lookup(name string) *Command
- func (d *Dispatcher) Pending() *PendingRequests
- func (d *Dispatcher) Register(name string, handler Handler, help string)
- func (d *Dispatcher) RegisterWithOptions(name string, handler Handler, help string, opts RegisterOptions)
- func (d *Dispatcher) Registry() *CommandRegistry
- func (d *Dispatcher) SetAuthorizer(a Authorizer)
- func (d *Dispatcher) SetSubsystems(sm *SubsystemManager)
- func (d *Dispatcher) Subsystems() *SubsystemManager
- type EngineEventHandler
- type EventMonitorOpts
- type Handler
- type Hub
- type ManagedConfigService
- func (s *ManagedConfigService) BuildConfigChanged(clientName string) (fleet.ConfigChanged, error)
- func (s *ManagedConfigService) HandleConfigFetch(clientName string, req fleet.ConfigFetchRequest) (fleet.ConfigFetchResponse, error)
- func (s *ManagedConfigService) RegisterClient(name string) error
- func (s *ManagedConfigService) UnregisterClient(name string)
- type MonitorClient
- type MonitorManager
- func (mm *MonitorManager) Add(mc *MonitorClient)
- func (mm *MonitorManager) Count() int
- func (mm *MonitorManager) Deliver(namespace, eventType, direction, peerAddr, peerName, output string)
- func (mm *MonitorManager) GetMatching(namespace, eventType, direction, peerAddr, peerName string) []*MonitorClient
- func (mm *MonitorManager) Remove(id string)
- type NodeApply
- type NodePrepare
- type PeerFilter
- type PendingRequest
- type PendingRequests
- func (p *PendingRequests) Add(req *PendingRequest) string
- func (p *PendingRequests) CancelAll(proc *process.Process)
- func (p *PendingRequests) Complete(serial string, resp *plugin.Response) bool
- func (p *PendingRequests) Count(proc *process.Process) int
- func (p *PendingRequests) Partial(serial string, resp *plugin.Response) bool
- func (p *PendingRequests) Total() int
- type RPCParams
- type RPCRegistration
- type RegisterOptions
- type RegisterResult
- type RegisteredCommand
- type RegisteredNotification
- type RegisteredRPC
- type Schema
- type SchemaRegistry
- func (r *SchemaRegistry) Count() int
- func (r *SchemaRegistry) FindHandler(path string) (*Schema, string)
- func (r *SchemaRegistry) FindRPC(wireMethod string) (*RegisteredRPC, error)
- func (r *SchemaRegistry) FindRPCByCommand(cliCommand string) (*RegisteredRPC, error)
- func (r *SchemaRegistry) Freeze()
- func (r *SchemaRegistry) GetByHandler(path string) (*Schema, error)
- func (r *SchemaRegistry) GetByModule(name string) (*Schema, error)
- func (r *SchemaRegistry) ListHandlers() map[string]string
- func (r *SchemaRegistry) ListModules() []string
- func (r *SchemaRegistry) ListNotifications(module string) []*RegisteredNotification
- func (r *SchemaRegistry) ListRPCs(module string) []*RegisteredRPC
- func (r *SchemaRegistry) Register(schema *Schema) error
- func (r *SchemaRegistry) RegisterCLICommand(cliCommand, wireMethod string) error
- func (r *SchemaRegistry) RegisterNotifications(module string, notifs []yang.NotificationMeta) error
- func (r *SchemaRegistry) RegisterRPCs(module string, rpcs []yang.RPCMeta) error
- type Server
- func (s *Server) AllPluginCapabilities() []plugin.InjectedCapability
- func (s *Server) CallFilterUpdate(ctx context.Context, pluginName string, input *rpc.FilterUpdateInput) (*rpc.FilterUpdateOutput, error)
- func (s *Server) CommitManager() any
- func (s *Server) ConfigPath() string
- func (s *Server) Context() context.Context
- func (s *Server) DecodeNLRI(family, hexData string) (string, error)
- func (s *Server) Dispatcher() *Dispatcher
- func (s *Server) DrainSIGHUP() bool
- func (s *Server) Emit(namespace, eventType, payload string) (int, error)
- func (s *Server) EmitEngineEvent(namespace, eventType, event string) (int, error)
- func (s *Server) EncodeNLRI(family string, args []string) ([]byte, error)
- func (s *Server) FilterInfo(pluginName, filterName string) (declaredAttrs []string, raw bool)
- func (s *Server) FilterOnError(pluginName, filterName string) string
- func (s *Server) GetDecodeFamilies() []string
- func (s *Server) GetPluginCapabilitiesForPeer(peerAddr string) []plugin.InjectedCapability
- func (s *Server) GetSchemaDeclarations() []plugin.SchemaDeclaration
- func (s *Server) HandleAdHocPluginSession(reader io.ReadCloser, writer io.WriteCloser) error
- func (s *Server) HasConfigLoader() bool
- func (s *Server) HasProcesses() bool
- func (s *Server) Monitors() *MonitorManager
- func (s *Server) ProcessManager() *process.ProcessManager
- func (s *Server) QueueSIGHUP()
- func (s *Server) Reactor() plugin.ReactorLifecycle
- func (s *Server) ReactorAny() any
- func (s *Server) ReactorFor(name string) any
- func (s *Server) ReloadConfig(ctx context.Context, newTree map[string]any) error
- func (s *Server) ReloadFromDisk(ctx context.Context) error
- func (s *Server) Running() bool
- func (s *Server) SetCommitManager(cm any)
- func (s *Server) SetConfigLoader(loader ConfigLoader)
- func (s *Server) SetProcessSpawner(sp plugin.ProcessSpawner)
- func (s *Server) Start() error
- func (s *Server) StartWithContext(ctx context.Context) error
- func (s *Server) Stop()
- func (s *Server) Subscribe(namespace, eventType string, handler func(payload string)) func()
- func (s *Server) SubscribeEngineEvent(namespace, eventType string, handler EngineEventHandler) func()
- func (s *Server) Subscriptions() *SubscriptionManager
- func (s *Server) TxLocked() bool
- func (s *Server) UpdateProtocolConfig(families, customEvents, customSendTypes []string)
- func (s *Server) Wait(ctx context.Context) error
- func (s *Server) WaitForStartupComplete(ctx context.Context) error
- type ServerConfig
- type StreamingHandler
- type Subscription
- type SubscriptionManager
- func (sm *SubscriptionManager) Add(proc *process.Process, sub *Subscription)
- func (sm *SubscriptionManager) ClearProcess(proc *process.Process)
- func (sm *SubscriptionManager) Count(proc *process.Process) int
- func (sm *SubscriptionManager) GetMatching(namespace, eventType, direction, peerAddr, peerName string) []*process.Process
- func (sm *SubscriptionManager) GetSubscriptions(proc *process.Process) []*Subscription
- func (sm *SubscriptionManager) Remove(proc *process.Process, sub *Subscription) bool
- type SubsystemConfig
- type SubsystemHandler
- func (h *SubsystemHandler) Commands() []string
- func (h *SubsystemHandler) Handle(ctx context.Context, command string) (*plugin.Response, error)
- func (h *SubsystemHandler) Name() string
- func (h *SubsystemHandler) Running() bool
- func (h *SubsystemHandler) Schema() *plugin.PluginSchemaDecl
- func (h *SubsystemHandler) Signal(sig os.Signal) error
- func (h *SubsystemHandler) Start(ctx context.Context) error
- func (h *SubsystemHandler) Stop()
- type SubsystemManager
- func (m *SubsystemManager) AllCommands() []string
- func (m *SubsystemManager) AllSchemas() []*Schema
- func (m *SubsystemManager) FindHandler(command string) *SubsystemHandler
- func (m *SubsystemManager) Freeze()
- func (m *SubsystemManager) Get(name string) *SubsystemHandler
- func (m *SubsystemManager) Names() []string
- func (m *SubsystemManager) Register(config SubsystemConfig)
- func (m *SubsystemManager) RegisterSchemas(registry *SchemaRegistry) error
- func (m *SubsystemManager) StartAll(ctx context.Context) error
- func (m *SubsystemManager) StopAll()
- func (m *SubsystemManager) Unregister(name string)
Constants ¶
const ( DefaultCommandTimeout = 30 * time.Second CompletionTimeout = 500 * time.Millisecond )
Default timeouts for plugin commands.
const ( FilterOnErrorReject = "reject" FilterOnErrorAccept = "accept" )
Filter on-error constants.
const APIVersion = "0.1.0"
APIVersion is the IPC protocol version.
const MaxPendingPerProcess = 100
MaxPendingPerProcess limits pending requests to prevent memory exhaustion.
Variables ¶
var ( ErrSchemaModuleEmpty = errors.New("schema module name is empty") ErrSchemaModuleDuplicate = errors.New("schema module already registered") ErrSchemaHandlerDuplicate = errors.New("schema handler already registered") ErrSchemaNotFound = errors.New("schema not found") ErrRPCNotFound = errors.New("RPC not found") ErrRPCDuplicate = errors.New("RPC wire method already registered") ErrNotificationDuplicate = errors.New("notification wire method already registered") )
Errors for schema registration.
var ErrClientConfigNotFound = errors.New("client config not found")
ErrClientConfigNotFound is returned when no config exists for a client name.
var ErrDuplicateClient = errors.New("duplicate client name")
ErrDuplicateClient is returned when a client with the same name is already connected.
var ErrEmptyCommand = errors.New("empty command")
ErrEmptyCommand is returned when the command is empty.
var ErrPluginConnectionClosed = errors.New("plugin connection closed")
ErrPluginConnectionClosed is returned when the plugin's connection is no longer available.
var ErrPluginProcessNotRunning = errors.New("plugin process not running")
ErrPluginProcessNotRunning is returned when a plugin command targets a non-running process.
var ErrReloadInProgress = errors.New("config reload already in progress")
ErrReloadInProgress is returned when a config reload is attempted while another is already running. Callers can check this with errors.Is to decide whether to queue the reload (SIGHUP) or reject it (CLI/API).
var ErrSilent = errors.New("silent")
ErrSilent is returned when a command should produce no response.
var ErrSubsystemConnectionClosed = errors.New("subsystem connection closed")
ErrSubsystemConnectionClosed is returned when the subsystem's connection is no longer available.
var ErrSubsystemNotRunning = errors.New("subsystem not running")
ErrSubsystemNotRunning is returned when a command targets a non-running subsystem.
ErrUnauthorized is returned when a command is denied by authorization.
var ErrUnknownCommand = errors.New("unknown command")
ErrUnknownCommand is returned when a command is not recognized.
Functions ¶
func BuiltinCount ¶
func BuiltinCount() int
BuiltinCount returns the number of registered builtin handlers.
func ExtractConfigSubtree ¶
ExtractConfigSubtree extracts a subtree from the config based on path. Always returns data wrapped in its full path structure from root. Supports:
- "*" -> entire tree
- "bgp" -> {"bgp": configTree["bgp"]}
- "bgp/peer" -> {"bgp": {"peer": configTree["bgp"]["peer"]}}
func GetSchemaNode ¶
GetSchemaNode returns a cached YANG schema node for the given config path.
func GetVersion ¶
GetVersion returns the current version and build date.
func HandleNodeWith ¶
func HandleNodeWith( ctx *CommandContext, args []string, schemaPath string, treeKey string, prepare NodePrepare, apply NodeApply, ) (*plugin.Response, error)
HandleNodeWith is the generic handler for "set <domain> <node> <selector> with <args>". It parses inline args via the YANG schema at schemaPath, runs the prepare callback for node-specific validation/defaults, then calls apply to commit the change.
func IsReadOnlyPath ¶
IsReadOnlyPath returns true if the command path starts with a read-only verb or a top-level query command not yet migrated under a verb.
func IsStreamingCommand ¶
IsStreamingCommand returns true if the input matches any registered streaming prefix.
func LoadBuiltins ¶
func LoadBuiltins(d *Dispatcher, wireToPath, pathToDesc map[string]string)
LoadBuiltins registers all builtin handlers with the dispatcher. The wireToPath map provides the dispatch key for each handler, derived from the YANG command tree (WireMethod -> CLI path). pathToDesc provides YANG descriptions for help text. Handlers without a YANG entry are skipped.
func LookupCommandHelp ¶
func LookupCommandHelp(ctx *CommandContext, name, kind string) (*plugin.Response, error)
LookupCommandHelp looks up a command by name in builtins then plugins. The kind parameter is used in error messages (e.g., "command", "bgp rib command").
func MonitorEventFormatter ¶
MonitorEventFormatter returns the registered event formatter, or nil if none is registered.
func ParseInlineArgsForSchema ¶
func ParseInlineArgsForSchema(schemaPath string, args []string) (map[string]any, *plugin.Response, error)
ParseInlineArgsForSchema parses config-syntax inline args using the YANG schema at the given path. Generic: works for any YANG node (peer, group, etc.). Returns (map, nil, nil) on success. Returns (nil, response, error) on failure.
func PeerSubcommandKeywords ¶
PeerSubcommandKeywords returns the set of first words that follow "peer" in CLI command paths. Used by config validation to reject peer names that would collide with subcommand dispatch. The wireToPath map is typically built via yang.WireMethodToPath(loader).
func RegisterDefaultHandlers ¶
func RegisterDefaultHandlers(d *Dispatcher, wireToPath, pathToDesc map[string]string)
RegisterDefaultHandlers registers all builtin handlers with the dispatcher.
func RegisterMonitorEventFormatter ¶
RegisterMonitorEventFormatter registers the function that formats raw JSON event lines into compact one-liners for monitor streaming output (both CLI and TUI). Called from the monitor plugin's init().
func RegisterRPCs ¶
func RegisterRPCs(rpcs ...RPCRegistration)
RegisterRPCs adds RPCs to the package-level registry. Called from init() in register.go files.
func RegisterStreamingHandler ¶
func RegisterStreamingHandler(prefix string, h StreamingHandler)
RegisterStreamingHandler registers a streaming command handler for a prefix. The prefix is matched case-insensitively against command input. Called from plugin init() functions.
func RequireReactor ¶
func RequireReactor(ctx *CommandContext) (plugin.ReactorLifecycle, *plugin.Response, error)
RequireReactor returns the reactor or an error response if not available.
func SetVersion ¶
func SetVersion(v, d string)
SetVersion sets the application version and build date (called from main).
func StreamEventMonitor ¶
StreamEventMonitor is the streaming handler for the "event monitor" command. It parses arguments, creates subscriptions, registers a MonitorClient, and streams events until the context is canceled. Registration: called from internal/component/bgp/plugins/cmd/monitor/monitor.go init() via RegisterStreamingHandler("event monitor", StreamEventMonitor).
func StreamingPrefixes ¶
func StreamingPrefixes() []string
StreamingPrefixes returns the registered streaming command prefixes, sorted.
Types ¶
type Authorizer ¶
Authorizer checks whether a user is allowed to execute a command.
type Command ¶
type Command struct {
Name string
Handler Handler
Help string
ReadOnly bool // True if command only reads state (safe for "ze show")
RequiresSelector bool // True if command requires explicit peer selector (not default "*")
}
Command represents a registered command with metadata.
type CommandContext ¶
type CommandContext struct {
Server *Server // Gateway to all server state (reactor, dispatcher, etc.)
Process *process.Process // The API process (for session state)
Peer string // Peer selector: "*" for all, or specific IP. Empty = "*"
Username string // Authenticated username (empty = no auth, full access)
Meta map[string]any // Route metadata from UpdateRoute RPC; nil if not set.
}
CommandContext provides access to reactor and session state. Dependencies are accessed through Server; per-request state is stored directly.
func (*CommandContext) CommitManager ¶
func (c *CommandContext) CommitManager() any
CommitManager returns the commit manager via Server. Nil-safe: returns nil if Server is nil.
func (*CommandContext) Dispatcher ¶
func (c *CommandContext) Dispatcher() *Dispatcher
Dispatcher returns the command dispatcher via Server. Nil-safe: returns nil if Server is nil.
func (*CommandContext) PeerSelector ¶
func (c *CommandContext) PeerSelector() string
PeerSelector returns the effective neighbor selector. Returns "*" if no neighbor was specified.
func (*CommandContext) ProtocolReactor ¶
func (c *CommandContext) ProtocolReactor(name string) any
ProtocolReactor returns a named protocol reactor from the Coordinator. Callers type-assert to the protocol-specific interface they need. Nil-safe: returns nil if Server is nil or protocol not registered.
func (*CommandContext) Reactor ¶
func (c *CommandContext) Reactor() plugin.ReactorLifecycle
Reactor returns the BGP reactor lifecycle interface via Server. Nil-safe: returns nil if Server is nil.
func (*CommandContext) Subscriptions ¶
func (c *CommandContext) Subscriptions() *SubscriptionManager
Subscriptions returns the subscription manager via Server. Nil-safe: returns nil if Server is nil.
type CommandDef ¶
type CommandDef struct {
Name string // Command name (e.g., "myapp status")
Description string // Help text
Args string // Usage hint (e.g., "<component>")
Completable bool // Process handles arg completion
Timeout time.Duration // Per-command timeout (0 = default 30s)
}
CommandDef describes a command to register. Passed from process to registry during registration.
type CommandRegistry ¶
type CommandRegistry struct {
// contains filtered or unexported fields
}
CommandRegistry manages plugin commands. Thread-safe for concurrent registration and lookup.
func NewCommandRegistry ¶
func NewCommandRegistry() *CommandRegistry
NewCommandRegistry creates a new command registry.
func (*CommandRegistry) AddBuiltin ¶
func (r *CommandRegistry) AddBuiltin(name string)
AddBuiltin marks a command name as builtin (cannot be shadowed). Called during dispatcher initialization.
func (*CommandRegistry) All ¶
func (r *CommandRegistry) All() []*RegisteredCommand
All returns all registered commands.
func (*CommandRegistry) Complete ¶
func (r *CommandRegistry) Complete(partial string) []Completion
Complete returns commands matching the partial input. Used for CLI command completion.
func (*CommandRegistry) Freeze ¶
func (r *CommandRegistry) Freeze()
Freeze creates an immutable snapshot of the commands map. After Freeze(), Lookup uses atomic.Load instead of RLock. MUST be called after all Register calls complete (after startup barrier). Safe to call multiple times (each call overwrites the previous snapshot).
func (*CommandRegistry) IsBuiltin ¶
func (r *CommandRegistry) IsBuiltin(name string) bool
IsBuiltin returns true if the command name is a builtin.
func (*CommandRegistry) Lookup ¶
func (r *CommandRegistry) Lookup(name string) *RegisteredCommand
Lookup finds a command by exact name (case-insensitive). After Freeze(), uses lock-free atomic.Load on the frozen snapshot.
func (*CommandRegistry) Register ¶
func (r *CommandRegistry) Register(proc *process.Process, defs []CommandDef) []RegisterResult
Register adds commands for a process. Returns results for each command (success or failure reason).
func (*CommandRegistry) Unregister ¶
func (r *CommandRegistry) Unregister(proc *process.Process, names []string)
Unregister removes commands owned by the process. Only the owning process can unregister a command. Unknown commands are silently ignored. If frozen, publishes a new snapshot reflecting the removal.
func (*CommandRegistry) UnregisterAll ¶
func (r *CommandRegistry) UnregisterAll(proc *process.Process)
UnregisterAll removes all commands owned by the process. Called when a process dies. If frozen, publishes a new snapshot reflecting the removal.
type Completion ¶
type Completion struct {
Value string `json:"value"` // The completion text
Help string `json:"help,omitempty"` // Optional description
Source string `json:"source,omitempty"` // "builtin" or process name (verbose mode)
}
Completion represents a single completion suggestion. Used for both command and argument completion.
type ConfigBlock ¶
type ConfigBlock struct {
Handler string // Handler path (e.g., "bgp/peer")
Action string // create, modify, delete
Path string // Full path (e.g., "bgp/peer[address=192.0.2.1]")
Data string // JSON data
}
ConfigBlock represents a config command to send to a plugin.
func ParseCommand ¶
func ParseCommand(line string) (*ConfigBlock, error)
ParseCommand parses a namespace command. Format: <namespace> <path> <action> {json}. Or: <namespace> <action> {json} (for namespace-level config). Or: <namespace> commit|rollback|diff.
type ConfigEventGateway ¶
type ConfigEventGateway struct {
// contains filtered or unexported fields
}
ConfigEventGateway adapts Server to the internal/component/config/transaction.EventGateway interface used by the config transaction orchestrator.
The adapter hides the namespace parameter (always plugin.NamespaceConfig) and converts between the orchestrator's []byte payloads and Server's string event payloads.
Performance note: each emit/dispatch round-trips through []byte -> string -> []byte (one copy in EmitConfigEvent, one copy in the SubscribeConfigEvent handler bridge). This is acceptable for small config transaction payloads (~hundreds of bytes per ack) and trades two small allocations against the simpler string-based deliverEvent path. If this ever becomes a hot path, the right fix is to add a []byte-native variant to deliverEvent rather than complicating the adapter.
func NewConfigEventGateway ¶
func NewConfigEventGateway(s *Server) *ConfigEventGateway
NewConfigEventGateway creates a new adapter wrapping the given Server. The Server must outlive the gateway; the gateway holds a reference but does not manage Server lifecycle.
func (*ConfigEventGateway) EmitConfigEvent ¶
func (g *ConfigEventGateway) EmitConfigEvent(eventType string, payload []byte) (int, error)
EmitConfigEvent publishes a stream event in the config namespace. Returns the number of plugin processes that received the event.
func (*ConfigEventGateway) SubscribeConfigEvent ¶
func (g *ConfigEventGateway) SubscribeConfigEvent(eventType string, handler func(payload []byte)) func()
SubscribeConfigEvent registers a handler for a config namespace event type. The handler is invoked synchronously from deliverEvent. Returns an unsubscribe function; nil handler returns a no-op unsubscribe.
type ConfigLoader ¶
ConfigLoader loads a new config tree from disk or other source. Returns the parsed config tree or an error. Set on Server.configLoader before calling ReloadFromDisk.
type ConfigReader ¶
ConfigReader reads a client's config by name from the hub's blob store. Returns the raw config bytes, or ErrClientConfigNotFound if the client has no config entry.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher routes commands to handlers.
func NewDispatcher ¶
func NewDispatcher() *Dispatcher
NewDispatcher creates a new command dispatcher.
func (*Dispatcher) Commands ¶
func (d *Dispatcher) Commands() []*Command
Commands returns all registered commands.
func (*Dispatcher) Dispatch ¶
func (d *Dispatcher) Dispatch(ctx *CommandContext, input string) (*plugin.Response, error)
Dispatch parses and executes a command. Supports peer selector prefix: "peer <addr|name|*> <command>". If no peer prefix, defaults to all peers ("*"). Priority: 1) builtin commands, 2) forked subsystems, 3) plugin registry.
func (*Dispatcher) ForwardToPlugin ¶
func (d *Dispatcher) ForwardToPlugin(command string, args []string, peerSelector string) (*plugin.Response, error)
ForwardToPlugin routes a command to a plugin process by exact name lookup. Used by proxy handlers that bridge CLI builtins to plugin commands. Returns ErrUnknownCommand if the command is not registered (plugin may not be running).
func (*Dispatcher) HasCommandPrefix ¶
func (d *Dispatcher) HasCommandPrefix(input string) bool
HasCommandPrefix returns true if the input matches any registered command prefix (builtin or plugin). Used by dispatch routing to distinguish top-level commands from peer subcommands that need "peer <selector> " prepended.
func (*Dispatcher) IsAuthorized ¶
func (d *Dispatcher) IsAuthorized(ctx *CommandContext, input string, readOnly bool) bool
IsAuthorized checks if the user is allowed to execute the command. Exported for use by streaming handlers (e.g., monitor) that bypass the normal dispatch path.
func (*Dispatcher) Lookup ¶
func (d *Dispatcher) Lookup(name string) *Command
Lookup finds a command by exact name.
func (*Dispatcher) Pending ¶
func (d *Dispatcher) Pending() *PendingRequests
Pending returns the pending requests tracker.
func (*Dispatcher) Register ¶
func (d *Dispatcher) Register(name string, handler Handler, help string)
Register adds a builtin command handler. Also marks the command as builtin in the registry to prevent shadowing.
func (*Dispatcher) RegisterWithOptions ¶
func (d *Dispatcher) RegisterWithOptions(name string, handler Handler, help string, opts RegisterOptions)
RegisterWithOptions adds a builtin command handler with additional options.
func (*Dispatcher) Registry ¶
func (d *Dispatcher) Registry() *CommandRegistry
Registry returns the plugin command registry.
func (*Dispatcher) SetAuthorizer ¶
func (d *Dispatcher) SetAuthorizer(a Authorizer)
SetAuthorizer sets the authorization checker for the dispatcher. When set, all commands are checked against the authorizer before execution.
func (*Dispatcher) SetSubsystems ¶
func (d *Dispatcher) SetSubsystems(sm *SubsystemManager)
SetSubsystems sets the subsystem manager.
func (*Dispatcher) Subsystems ¶
func (d *Dispatcher) Subsystems() *SubsystemManager
Subsystems returns the subsystem manager.
type EngineEventHandler ¶
type EngineEventHandler func(event string)
EngineEventHandler is invoked when a stream event matches an engine subscription. The event string is the same payload that plugin subscribers receive (typically JSON). Handlers are called synchronously from deliverEvent. Handlers MUST NOT block on external I/O; push to a buffered channel and return if work is needed.
A handler that panics is recovered by the dispatch loop, logged, and the remaining handlers for the same event still fire. The panic does NOT propagate to the emitter (whoever called EmitEngineEvent or any other path that ends in deliverEvent).
type EventMonitorOpts ¶
type EventMonitorOpts struct {
IncludeTypes []string
ExcludeTypes []string
Peer string
Direction string
}
EventMonitorOpts holds parsed arguments for the event monitor command.
func ParseEventMonitorArgs ¶
func ParseEventMonitorArgs(args []string) (*EventMonitorOpts, error)
ParseEventMonitorArgs parses keyword arguments for the event monitor command.
Syntax: [include|exclude <types>] [peer <selector>] [direction received|sent] Keywords may appear in any order. Include and exclude are mutually exclusive.
type Handler ¶
type Handler func(ctx *CommandContext, args []string) (*plugin.Response, error)
Handler processes a command and returns a response.
type Hub ¶
type Hub struct {
// contains filtered or unexported fields
}
Hub orchestrates plugin communication and command routing.
func NewHub ¶
func NewHub(registry *SchemaRegistry, subsystems *SubsystemManager) *Hub
NewHub creates a new Hub with the given registry and subsystem manager.
func (*Hub) ProcessConfig ¶
func (h *Hub) ProcessConfig(ctx context.Context, blocks []ConfigBlock) error
ProcessConfig processes a configuration transaction. Sends all commands to plugins, then commits each affected namespace.
func (*Hub) RouteCommand ¶
func (h *Hub) RouteCommand(ctx context.Context, block *ConfigBlock) error
RouteCommand routes a command to the appropriate plugin. Format: <namespace> <path> <action> {json}.
func (*Hub) RouteCommit ¶
RouteCommit sends a commit command to a plugin. Format: <namespace> commit.
type ManagedConfigService ¶
type ManagedConfigService struct {
// contains filtered or unexported fields
}
ManagedConfigService handles hub-side config-fetch and config-changed operations for managed clients. It reads client configs via a ConfigReader and computes version hashes for change detection. Tracks connected clients and rejects duplicate names. Safe for concurrent use.
func NewManagedConfigService ¶
func NewManagedConfigService(reader ConfigReader) *ManagedConfigService
NewManagedConfigService creates a service that reads client configs via reader.
func (*ManagedConfigService) BuildConfigChanged ¶
func (s *ManagedConfigService) BuildConfigChanged(clientName string) (fleet.ConfigChanged, error)
BuildConfigChanged creates a config-changed notification for a client. Reads the client's current config and computes its version hash.
func (*ManagedConfigService) HandleConfigFetch ¶
func (s *ManagedConfigService) HandleConfigFetch(clientName string, req fleet.ConfigFetchRequest) (fleet.ConfigFetchResponse, error)
HandleConfigFetch processes a config-fetch request from a managed client. If the client's version matches the current config, returns status "current". Otherwise returns the full config as base64 with the new version hash.
func (*ManagedConfigService) RegisterClient ¶
func (s *ManagedConfigService) RegisterClient(name string) error
RegisterClient marks a client as connected. Returns ErrDuplicateClient if a client with the same name is already connected. Caller MUST call UnregisterClient when the client disconnects.
func (*ManagedConfigService) UnregisterClient ¶
func (s *ManagedConfigService) UnregisterClient(name string)
UnregisterClient removes a client from the connected set.
type MonitorClient ¶
type MonitorClient struct {
EventChan chan string // Buffered channel for formatted events.
Ctx context.Context // Client-scoped context for cancellation.
Dropped atomic.Uint64 // Count of events dropped due to full channel.
// contains filtered or unexported fields
}
MonitorClient represents an active monitor session.
func NewMonitorClient ¶
func NewMonitorClient(ctx context.Context, id string, subs []*Subscription, bufSize int) *MonitorClient
NewMonitorClient creates a monitor client with the given subscriptions and buffer size. Caller MUST call MonitorManager.Remove(id) when done to release resources.
type MonitorManager ¶
type MonitorManager struct {
// contains filtered or unexported fields
}
MonitorManager manages active monitor clients. Parallel to SubscriptionManager (which manages plugin process subscriptions).
func NewMonitorManager ¶
func NewMonitorManager() *MonitorManager
NewMonitorManager creates a new monitor manager.
func (*MonitorManager) Add ¶
func (mm *MonitorManager) Add(mc *MonitorClient)
Add registers a monitor client.
func (*MonitorManager) Count ¶
func (mm *MonitorManager) Count() int
Count returns the number of active monitors.
func (*MonitorManager) Deliver ¶
func (mm *MonitorManager) Deliver(namespace, eventType, direction, peerAddr, peerName, output string)
Deliver sends a formatted event to all matching monitors. Uses non-blocking send: if a monitor's channel is full, the event is dropped and the dropped counter is incremented (backpressure). peerName is the configured peer name (may be empty).
func (*MonitorManager) GetMatching ¶
func (mm *MonitorManager) GetMatching(namespace, eventType, direction, peerAddr, peerName string) []*MonitorClient
GetMatching returns monitors with subscriptions matching the event. A monitor matches if any of its subscriptions match. peerName is the configured peer name (may be empty).
func (*MonitorManager) Remove ¶
func (mm *MonitorManager) Remove(id string)
Remove unregisters a monitor client by ID.
type NodeApply ¶
NodeApply applies the validated config tree to the reactor. Called after prepare succeeds. Each node type provides its own apply (e.g., peers call AddDynamicPeer, groups would call AddDynamicGroup).
type NodePrepare ¶
NodePrepare is called after YANG parsing to apply node-specific validation and defaults. selector is the dispatcher-extracted selector (IP, name, etc.); nodeTree is the parsed config map. MUST return (nil, nil) on success or (response, error) on failure.
type PeerFilter ¶
type PeerFilter struct {
Selector string // "*", "10.0.0.1", "!10.0.0.1", "my-peer", "!my-peer"
}
PeerFilter specifies which peers to filter.
func (*PeerFilter) Matches ¶
func (pf *PeerFilter) Matches(peerAddr, peerName string) bool
Matches returns true if the peer matches this filter. Matches against both the peer address (IP) and peer name.
type PendingRequest ¶
type PendingRequest struct {
Serial string // Alpha serial (a, b, bcd, ...)
Command string // Matched command name
Process *process.Process // Target process
Timeout time.Duration // Timeout for response
RespChan chan *plugin.Response // Channel to deliver response
// contains filtered or unexported fields
}
PendingRequest represents an in-flight plugin command request.
type PendingRequests ¶
type PendingRequests struct {
// contains filtered or unexported fields
}
PendingRequests tracks in-flight plugin command requests. Thread-safe for concurrent access.
func NewPendingRequests ¶
func NewPendingRequests() *PendingRequests
NewPendingRequests creates a new pending requests tracker.
func (*PendingRequests) Add ¶
func (p *PendingRequests) Add(req *PendingRequest) string
Add registers a new pending request and starts the timeout timer. Returns the assigned alpha serial, or empty string if limit exceeded. If limit exceeded, sends error response to RespChan.
func (*PendingRequests) CancelAll ¶
func (p *PendingRequests) CancelAll(proc *process.Process)
CancelAll cancels all pending requests for a process (process death). Sends error response to all waiting clients.
func (*PendingRequests) Complete ¶
func (p *PendingRequests) Complete(serial string, resp *plugin.Response) bool
Complete delivers a final response and removes the request. Returns true if the serial was found (response delivered).
func (*PendingRequests) Count ¶
func (p *PendingRequests) Count(proc *process.Process) int
Count returns the number of pending requests for a process.
func (*PendingRequests) Partial ¶
func (p *PendingRequests) Partial(serial string, resp *plugin.Response) bool
Partial delivers a partial response (streaming) and resets the timeout. Returns true if the serial was found.
func (*PendingRequests) Total ¶
func (p *PendingRequests) Total() int
Total returns the total number of pending requests.
type RPCParams ¶
type RPCParams struct {
Selector string `json:"selector,omitempty"` // Peer selector (optional)
Args []string `json:"args,omitempty"` // Command arguments (optional)
}
RPCParams is the standard params format for JSON RPC requests from socket clients. Handlers receive Args as positional arguments and Selector as the peer filter. Identity (Username) is never accepted from client JSON -- it MUST be injected by the transport layer (SSH session, plugin process manager, TLS auth).
type RPCRegistration ¶
type RPCRegistration struct {
WireMethod string // "module:rpc-name" format (e.g., "ze-bgp:peer-list")
Handler Handler // Handler function
RequiresSelector bool // True if peer commands must have explicit selector (not default "*")
PluginCommand string // If set, this builtin proxies to a runtime plugin command (e.g., "bgp rib show")
}
RPCRegistration maps a YANG RPC wire method to its handler function. The CLI command name is derived from the YANG command tree (-cmd.yang modules) via yang.WireMethodToPath(). It is not stored in the registration. Help text comes from YANG descriptions. Read-only classification comes from the verb position in the command tree (show/validate/monitor = read-only).
func AllBuiltinRPCs ¶
func AllBuiltinRPCs() []RPCRegistration
AllBuiltinRPCs returns all RPCs registered via init() + RegisterRPCs(). Includes server, handler, and editor RPCs (when their packages are imported).
type RegisterOptions ¶
type RegisterOptions struct {
ReadOnly bool // True if command only reads state
RequiresSelector bool // True if "bgp peer <command>" must have an explicit peer selector
PluginProxy bool // True if this builtin proxies to a plugin command (allows plugin to register same name)
}
RegisterOptions holds optional settings for command registration.
type RegisterResult ¶
type RegisterResult struct {
Name string // Command that was registered
OK bool // True if registration succeeded
Error string // Error message if failed
}
RegisterResult holds the result of a single command registration.
type RegisteredCommand ¶
type RegisteredCommand struct {
Name string
LowerName string // Pre-lowercased at registration for dispatch matching (zero alloc per lookup)
Description string
Args string // Usage hint (e.g., "<component>")
Completable bool // Process handles arg completion
Timeout time.Duration // Per-command timeout
Process *process.Process // Owning process
RegisteredAt time.Time
}
RegisteredCommand represents a plugin command in the registry.
type RegisteredNotification ¶
type RegisteredNotification struct {
Module string // YANG module name
Name string // Notification name in kebab-case
WireMethod string // Wire format "module:notification-name"
Description string // From YANG description
Leaves []yang.LeafMeta // Notification data leaves
}
RegisteredNotification represents a notification indexed in the schema registry.
type RegisteredRPC ¶
type RegisteredRPC struct {
Module string // YANG module name (e.g., "ze-bgp-api")
Name string // RPC name in kebab-case (e.g., "peer-list")
WireMethod string // Wire format "module:rpc-name" (e.g., "ze-bgp:peer-list")
CLICommand string // CLI text command (e.g., "bgp peer list")
Description string // From YANG description
Input []yang.LeafMeta // Input parameter leaves
Output []yang.LeafMeta // Output parameter leaves
Handler Handler // Handler function (set during registration)
}
RegisteredRPC represents an RPC indexed in the schema registry.
type Schema ¶
type Schema struct {
Module string // YANG module name
Namespace string // YANG namespace URI
Yang string // Full YANG module text
Imports []string // Imported module names (from YANG import statements)
Handlers []string // Handler paths (e.g., "bgp", "bgp/peer")
Plugin string // Plugin that registered this schema
Priority int // Config ordering (lower = processed first, default 1000)
WantsConfig []string // Config roots plugin wants (from "declare wants config <root>")
}
Schema represents a YANG schema registered by a plugin.
type SchemaRegistry ¶
type SchemaRegistry struct {
// contains filtered or unexported fields
}
SchemaRegistry stores and manages schemas from all plugins.
func NewSchemaRegistry ¶
func NewSchemaRegistry() *SchemaRegistry
NewSchemaRegistry creates a new schema registry.
func (*SchemaRegistry) Count ¶
func (r *SchemaRegistry) Count() int
Count returns the number of registered schemas.
func (*SchemaRegistry) FindHandler ¶
func (r *SchemaRegistry) FindHandler(path string) (*Schema, string)
FindHandler returns the schema for a handler path using longest prefix match. For example, if "bgp" and "bgp/peer" are registered, FindHandler("bgp/peer/timers") returns the schema for "bgp/peer". Predicates like [address=192.0.2.1] are stripped before matching. After Freeze(), uses lock-free atomic.Load on the frozen snapshot.
func (*SchemaRegistry) FindRPC ¶
func (r *SchemaRegistry) FindRPC(wireMethod string) (*RegisteredRPC, error)
FindRPC returns the registered RPC for an exact wire method match.
func (*SchemaRegistry) FindRPCByCommand ¶
func (r *SchemaRegistry) FindRPCByCommand(cliCommand string) (*RegisteredRPC, error)
FindRPCByCommand returns the registered RPC for a CLI text command.
func (*SchemaRegistry) Freeze ¶
func (r *SchemaRegistry) Freeze()
Freeze creates an immutable snapshot of the handler and module maps. After Freeze(), FindHandler uses atomic.Load instead of RLock. MUST be called after all Register calls complete (after startup barrier). Safe to call multiple times (each call overwrites the previous snapshot).
func (*SchemaRegistry) GetByHandler ¶
func (r *SchemaRegistry) GetByHandler(path string) (*Schema, error)
GetByHandler returns a schema by exact handler path.
func (*SchemaRegistry) GetByModule ¶
func (r *SchemaRegistry) GetByModule(name string) (*Schema, error)
GetByModule returns a schema by module name.
func (*SchemaRegistry) ListHandlers ¶
func (r *SchemaRegistry) ListHandlers() map[string]string
ListHandlers returns all registered handler paths with their modules.
func (*SchemaRegistry) ListModules ¶
func (r *SchemaRegistry) ListModules() []string
ListModules returns all registered module names.
func (*SchemaRegistry) ListNotifications ¶
func (r *SchemaRegistry) ListNotifications(module string) []*RegisteredNotification
ListNotifications returns all registered notifications, optionally filtered by YANG module name. Pass empty string to list all notifications.
func (*SchemaRegistry) ListRPCs ¶
func (r *SchemaRegistry) ListRPCs(module string) []*RegisteredRPC
ListRPCs returns all registered RPCs, optionally filtered by YANG module name. Pass empty string to list all RPCs.
func (*SchemaRegistry) Register ¶
func (r *SchemaRegistry) Register(schema *Schema) error
Register adds a schema to the registry. Returns error if module name or handler paths conflict with existing registrations.
func (*SchemaRegistry) RegisterCLICommand ¶
func (r *SchemaRegistry) RegisterCLICommand(cliCommand, wireMethod string) error
RegisterCLICommand associates a CLI text command with a wire method. The wire method must already be registered via RegisterRPCs.
func (*SchemaRegistry) RegisterNotifications ¶
func (r *SchemaRegistry) RegisterNotifications(module string, notifs []yang.NotificationMeta) error
RegisterNotifications indexes notifications extracted from a YANG module.
func (*SchemaRegistry) RegisterRPCs ¶
func (r *SchemaRegistry) RegisterRPCs(module string, rpcs []yang.RPCMeta) error
RegisterRPCs indexes RPCs extracted from a YANG module. Wire methods use the stripped module prefix (e.g., "ze-bgp-api" → "ze-bgp:peer-list").
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server manages API connections and command dispatch.
func NewServer ¶
func NewServer(config *ServerConfig, reactor plugin.ReactorLifecycle) (*Server, error)
NewServer creates a new API server.
func (*Server) AllPluginCapabilities ¶
func (s *Server) AllPluginCapabilities() []plugin.InjectedCapability
AllPluginCapabilities returns all stored capabilities (global + all per-peer). Used by the restart handler to compute max restart-time for the GR marker.
func (*Server) CallFilterUpdate ¶
func (s *Server) CallFilterUpdate(ctx context.Context, pluginName string, input *rpc.FilterUpdateInput) (*rpc.FilterUpdateOutput, error)
CallFilterUpdate sends a filter-update RPC to a named plugin and returns the response. Returns an error if the plugin is not found, not connected, or the RPC fails.
func (*Server) CommitManager ¶
CommitManager returns the commit manager.
func (*Server) ConfigPath ¶
ConfigPath returns the path to the config file. Empty if not set.
func (*Server) Context ¶
Context returns the server's context. Used by RPC handlers that need a cancellable context tied to the server's lifetime (e.g., coordinator reload).
func (*Server) DecodeNLRI ¶
DecodeNLRI decodes NLRI by routing to the appropriate family plugin via RPC. Returns the JSON representation of the decoded NLRI. Returns error if no plugin registered or plugin not running.
func (*Server) Dispatcher ¶
func (s *Server) Dispatcher() *Dispatcher
Dispatcher returns the command dispatcher.
func (*Server) DrainSIGHUP ¶
DrainSIGHUP returns true if a SIGHUP was queued and clears the flag.
func (*Server) Emit ¶
Emit satisfies the pkg/ze.EventBus interface. It is a thin alias for EmitEngineEvent so engine components can depend on the public ze.EventBus type without importing this package directly.
func (*Server) EmitEngineEvent ¶
EmitEngineEvent publishes an event from the engine to the stream system. Both engine subscribers and plugin process subscribers receive it. Returns the number of plugin processes that received the event (engine handler count is intentionally not reported because engine subscribers are in-process and always receive synchronously when matching).
The event must use a registered (namespace, eventType) per plugin.IsValidEvent; unknown pairs return an error and deliver to nobody (neither engine handlers nor plugin subscribers).
func (*Server) EncodeNLRI ¶
EncodeNLRI encodes NLRI by routing to the appropriate family plugin via RPC. Returns error if no plugin registered or plugin not running.
func (*Server) FilterInfo ¶
FilterInfo returns declaration info for a named filter: declared attributes and raw flag. Returns nil attributes and false if the plugin or filter is not found.
func (*Server) FilterOnError ¶
FilterOnError returns the declared on-error mode for a named filter. Returns FilterOnErrorReject (fail-closed) if the plugin or filter is not found.
func (*Server) GetDecodeFamilies ¶
GetDecodeFamilies returns all families that have decode plugins registered. Used by Session to auto-add Multiprotocol capabilities in OPEN. Plugins that can decode a family should advertise that family to peers.
func (*Server) GetPluginCapabilitiesForPeer ¶
func (s *Server) GetPluginCapabilitiesForPeer(peerAddr string) []plugin.InjectedCapability
GetPluginCapabilitiesForPeer returns plugin-declared capabilities for a specific peer. Returns global capabilities plus any peer-specific capabilities (per-peer takes precedence).
func (*Server) GetSchemaDeclarations ¶
func (s *Server) GetSchemaDeclarations() []plugin.SchemaDeclaration
GetSchemaDeclarations returns all schema declarations from registered plugins. Used for two-phase config parsing to extend the schema before parsing peer config. Should be called after Stage 1 (Registration) completes for all plugins.
func (*Server) HandleAdHocPluginSession ¶
func (s *Server) HandleAdHocPluginSession(reader io.ReadCloser, writer io.WriteCloser) error
HandleAdHocPluginSession runs the 5-stage plugin handshake and runtime command loop over an arbitrary bidirectional stream (e.g., an SSH channel). The session uses coordinator == nil, so all stage barriers are skipped. Blocks until the connection closes or the server shuts down. Caller MUST close reader and writer after this returns.
func (*Server) HasConfigLoader ¶
HasConfigLoader reports whether a config loader has been set. Used by SIGHUP handler to decide between coordinator path and direct reload.
func (*Server) HasProcesses ¶
HasProcesses returns true if any plugin processes were loaded during startup. Used by the main loop to decide whether to listen for server-done (all processes exited). Without this, configs with no plugins cause immediate daemon exit.
func (*Server) Monitors ¶
func (s *Server) Monitors() *MonitorManager
Monitors returns the monitor manager for CLI monitor sessions.
func (*Server) ProcessManager ¶
func (s *Server) ProcessManager() *process.ProcessManager
ProcessManager returns the process manager. Used by BGP hook implementations to iterate plugin processes.
func (*Server) QueueSIGHUP ¶
func (s *Server) QueueSIGHUP()
QueueSIGHUP queues a SIGHUP for later processing if a transaction is active.
func (*Server) Reactor ¶
func (s *Server) Reactor() plugin.ReactorLifecycle
func (*Server) ReactorAny ¶
ReactorAny returns the reactor as any, satisfying registry.PluginServerAccessor.
func (*Server) ReactorFor ¶
ReactorFor returns a named protocol reactor from the Coordinator, or nil. This allows plugins to access non-BGP reactors (e.g., OSPF, IS-IS) by name.
func (*Server) ReloadConfig ¶
ReloadConfig orchestrates config reload across all config-interested plugins. Follows verify→apply protocol: all plugins must verify before any apply. Returns nil if there are no changes, or if verify→apply succeeds. Returns error if verify fails for any plugin, or if a reload is already in progress.
func (*Server) ReloadFromDisk ¶
ReloadFromDisk loads the config from the configured loader and triggers reload. Returns error if the loader is not set, parsing fails, or reload fails.
func (*Server) SetCommitManager ¶
SetCommitManager sets the commit manager. Called by the BGP plugin during configuration to inject a CommitManager created with BGP-specific types. MUST be called before any RPC dispatch (i.e., during init-time registration). NOT safe for concurrent use with CommitManager().
func (*Server) SetConfigLoader ¶
func (s *Server) SetConfigLoader(loader ConfigLoader)
SetConfigLoader sets the function used by ReloadFromDisk to load the config tree.
func (*Server) SetProcessSpawner ¶
func (s *Server) SetProcessSpawner(sp plugin.ProcessSpawner)
SetProcessSpawner sets the PluginManager as the process spawner. When set, runPluginPhase delegates process creation to the spawner instead of creating ProcessManager directly. Must be called before Start. If the spawner supports SetMetricsRegistry (e.g., PluginManager), the server's metrics registry is forwarded for plugin health metrics.
func (*Server) StartWithContext ¶
StartWithContext begins accepting connections with the given context. External access is via SSH; the plugin server handles only in-process dispatch.
func (*Server) Stop ¶
func (s *Server) Stop()
Stop signals the server to stop and cleans up resources.
func (*Server) Subscribe ¶
Subscribe satisfies the pkg/ze.EventBus interface. It is a thin alias for SubscribeEngineEvent that adapts the handler signature from EngineEventHandler (a named type) to a plain func, which is what ze.EventBus declares.
func (*Server) SubscribeEngineEvent ¶
func (s *Server) SubscribeEngineEvent(namespace, eventType string, handler EngineEventHandler) func()
SubscribeEngineEvent registers an engine-side handler for stream events matching the given namespace and event type. The returned function unregisters the handler when called; safe to call multiple times.
Handlers fire synchronously from deliverEvent. They must not block on external I/O. The handler receives the same event string that plugin process subscribers would receive.
Engine subscriptions are parallel to plugin process subscriptions managed by SubscriptionManager. Both fire on the same deliverEvent call.
Subscriptions are NOT validated against the event registry: subscribing to an unknown (namespace, eventType) pair, or to a per-plugin event type that is not yet registered, succeeds silently. Such a subscription is dead until the matching emit arrives. This avoids races with per-plugin event types like "verify-bgp" that are registered dynamically when the plugin starts.
Engine subscribers receive ALL events for the given (namespace, eventType) regardless of direction or peer address. Plugin process subscribers can filter on direction and peer; engine subscribers cannot. This is intended for engine-internal coordination (e.g. config transactions) where direction has no meaning.
A nil handler is rejected: the call returns a no-op unsubscribe function without registering anything. This catches programmer errors loudly via "the handler I just registered never fires" rather than via a nil-pointer panic at first dispatch.
func (*Server) Subscriptions ¶
func (s *Server) Subscriptions() *SubscriptionManager
Subscriptions returns the subscription manager.
func (*Server) UpdateProtocolConfig ¶
UpdateProtocolConfig sets protocol-specific auto-load configuration after the reactor has parsed settings. Called by the protocol plugin's RunEngine after creating the reactor, so that family/event/send auto-load phases have the data.
func (*Server) WaitForStartupComplete ¶
WaitForStartupComplete blocks until all plugin startup phases are done. Returns a non-nil error if a config-path plugin failed during startup (e.g., invalid BGP config) or if the context deadline is exceeded.
type ServerConfig ¶
type ServerConfig struct {
ConfigPath string // Path to config file (for peer save)
Plugins []plugin.PluginConfig // External plugins to spawn
ConfiguredFamilies []string // Families configured on peers (for deferred auto-load)
ConfiguredCustomEvents []string // Custom event types in peer receive config (for auto-load)
ConfiguredCustomSendTypes []string // Custom send types in peer send config (for auto-load)
ConfiguredPaths []string // Top-level config sections present (for config-driven auto-load)
Hub *plugin.HubConfig // TLS transport config (nil = no TLS listener)
MetricsRegistry metrics.Registry // Prometheus metrics registry (nil = metrics disabled)
}
ServerConfig holds API server configuration.
type StreamingHandler ¶
type StreamingHandler func(ctx context.Context, s *Server, w io.Writer, username string, args []string) error
StreamingHandler handles streaming commands (e.g., monitor). ctx is the session context, s is the plugin server, w is the output writer, username is the authenticated SSH user (for authorization), args are command arguments.
func GetStreamingHandlerForCommand ¶
func GetStreamingHandlerForCommand(input string) (StreamingHandler, []string)
GetStreamingHandlerForCommand returns the handler and extracted args for a command. Matches the longest registered prefix. Returns (nil, nil) if no prefix matches.
type Subscription ¶
type Subscription struct {
Namespace string // "bgp" or "bgp-rib"
EventType string // "update", "state", etc.
Direction string // "received", "sent", "both" (empty = both)
PeerFilter *PeerFilter // nil = all peers
PluginFilter string // plugin name filter (empty = all)
}
Subscription represents an event subscription.
func BuildEventMonitorSubscriptions ¶
func BuildEventMonitorSubscriptions(opts *EventMonitorOpts) []*Subscription
BuildEventMonitorSubscriptions creates subscriptions from parsed options. With no include/exclude filter, subscribes to all event types across all namespaces. With include, subscribes only to those types (in whichever namespaces they exist). With exclude, subscribes to all types except those listed.
func ParseSubscription ¶
func ParseSubscription(args []string) (*Subscription, error)
ParseSubscription parses a subscribe/unsubscribe command. Format: [peer <sel> | plugin <name>] <namespace> event <type> [direction received|sent|both].
func (*Subscription) Equals ¶
func (s *Subscription) Equals(other *Subscription) bool
Equals returns true if two subscriptions are identical.
func (*Subscription) Matches ¶
func (s *Subscription) Matches(namespace, eventType, direction, peerAddr, peerName string) bool
Matches returns true if this subscription matches the event. peerAddr is the peer's IP address; peerName is the configured peer name (may be empty).
type SubscriptionManager ¶
type SubscriptionManager struct {
// contains filtered or unexported fields
}
SubscriptionManager tracks subscriptions per process.
func NewSubscriptionManager ¶
func NewSubscriptionManager() *SubscriptionManager
NewSubscriptionManager creates a new subscription manager.
func (*SubscriptionManager) Add ¶
func (sm *SubscriptionManager) Add(proc *process.Process, sub *Subscription)
Add adds a subscription for a process.
func (*SubscriptionManager) ClearProcess ¶
func (sm *SubscriptionManager) ClearProcess(proc *process.Process)
ClearProcess removes all subscriptions for a process.
func (*SubscriptionManager) Count ¶
func (sm *SubscriptionManager) Count(proc *process.Process) int
Count returns the number of subscriptions for a process.
func (*SubscriptionManager) GetMatching ¶
func (sm *SubscriptionManager) GetMatching(namespace, eventType, direction, peerAddr, peerName string) []*process.Process
GetMatching returns all processes with subscriptions matching the event. peerName is the configured peer name (may be empty for non-BGP events or emit-event RPCs).
func (*SubscriptionManager) GetSubscriptions ¶
func (sm *SubscriptionManager) GetSubscriptions(proc *process.Process) []*Subscription
GetSubscriptions returns all subscriptions for a process.
func (*SubscriptionManager) Remove ¶
func (sm *SubscriptionManager) Remove(proc *process.Process, sub *Subscription) bool
Remove removes a subscription for a process. Returns true if the subscription was found and removed.
type SubsystemConfig ¶
type SubsystemConfig struct {
Name string // Subsystem name (cache, route, session)
Binary string // Path to binary or full command
Commands []string // Commands this subsystem handles (for pre-registration)
ConfigPath string // Config file path (passed to child process)
}
SubsystemConfig describes a forked subsystem process.
type SubsystemHandler ¶
type SubsystemHandler struct {
// contains filtered or unexported fields
}
SubsystemHandler wraps a forked process that handles a subset of commands. It spawns the subprocess, completes the 5-stage protocol, and routes commands to it via pipes.
func NewSubsystemHandler ¶
func NewSubsystemHandler(config SubsystemConfig) *SubsystemHandler
NewSubsystemHandler creates a handler backed by a forked process.
func (*SubsystemHandler) Commands ¶
func (h *SubsystemHandler) Commands() []string
Commands returns the commands this subsystem handles.
func (*SubsystemHandler) Handle ¶
Handle sends a command to the subsystem via RPC and returns the response.
func (*SubsystemHandler) Name ¶
func (h *SubsystemHandler) Name() string
Name returns the subsystem name.
func (*SubsystemHandler) Running ¶
func (h *SubsystemHandler) Running() bool
Running returns true if the subsystem process is running.
func (*SubsystemHandler) Schema ¶
func (h *SubsystemHandler) Schema() *plugin.PluginSchemaDecl
Schema returns the YANG schema declared by this subsystem, or nil if none.
func (*SubsystemHandler) Signal ¶
func (h *SubsystemHandler) Signal(sig os.Signal) error
Signal sends an OS signal to the subsystem's external process. Returns an error if the process is not running or is internal (goroutine).
func (*SubsystemHandler) Start ¶
func (h *SubsystemHandler) Start(ctx context.Context) error
Start spawns the subsystem process and completes the 5-stage protocol.
func (*SubsystemHandler) Stop ¶
func (h *SubsystemHandler) Stop()
Stop terminates the subsystem process.
type SubsystemManager ¶
type SubsystemManager struct {
// contains filtered or unexported fields
}
SubsystemManager manages multiple subsystem handlers.
func NewSubsystemManager ¶
func NewSubsystemManager() *SubsystemManager
NewSubsystemManager creates a new subsystem manager.
func (*SubsystemManager) AllCommands ¶
func (m *SubsystemManager) AllCommands() []string
AllCommands returns all commands from all subsystems.
func (*SubsystemManager) AllSchemas ¶
func (m *SubsystemManager) AllSchemas() []*Schema
AllSchemas returns all schemas from all subsystems.
func (*SubsystemManager) FindHandler ¶
func (m *SubsystemManager) FindHandler(command string) *SubsystemHandler
FindHandler returns the handler for a given command, or nil if not found. After Freeze(), uses lock-free atomic.Load on the frozen snapshot.
func (*SubsystemManager) Freeze ¶
func (m *SubsystemManager) Freeze()
Freeze creates an immutable snapshot of the handler map. After Freeze(), Get and FindHandler use atomic.Load instead of RLock. MUST be called after all Register calls complete (after startup barrier). Safe to call multiple times (each call overwrites the previous snapshot).
func (*SubsystemManager) Get ¶
func (m *SubsystemManager) Get(name string) *SubsystemHandler
Get returns a subsystem handler by name. After Freeze(), uses lock-free atomic.Load on the frozen snapshot.
func (*SubsystemManager) Names ¶
func (m *SubsystemManager) Names() []string
Names returns the names of all registered subsystems.
func (*SubsystemManager) Register ¶
func (m *SubsystemManager) Register(config SubsystemConfig)
Register adds a subsystem configuration.
func (*SubsystemManager) RegisterSchemas ¶
func (m *SubsystemManager) RegisterSchemas(registry *SchemaRegistry) error
RegisterSchemas registers all subsystem schemas with the given registry.
func (*SubsystemManager) StartAll ¶
func (m *SubsystemManager) StartAll(ctx context.Context) error
StartAll starts all registered subsystems.
func (*SubsystemManager) StopAll ¶
func (m *SubsystemManager) StopAll()
StopAll stops all subsystems.
func (*SubsystemManager) Unregister ¶
func (m *SubsystemManager) Unregister(name string)
Unregister stops and removes a subsystem by name. No-op if the name is not registered. If frozen, publishes a new snapshot reflecting the removal.
Source Files
¶
- adhoc.go
- command.go
- command_registry.go
- config.go
- config_tx_bridge.go
- dispatch.go
- doc.go
- engine_event.go
- engine_event_gateway.go
- event_monitor.go
- events.go
- handler.go
- hub.go
- managed.go
- monitor.go
- node_with.go
- pending.go
- plugin_rpc.go
- register.go
- reload.go
- reload_tx.go
- rpc_register.go
- schema.go
- server.go
- session.go
- startup.go
- startup_autoload.go
- subscribe.go
- subsystem.go
- system.go