Documentation
¶
Overview ¶
Package pluginmgr is CVT's plugin lifecycle + pipeline manager. It is core-only; plugin authors never import this package.
Index ¶
- Constants
- func DefaultConfigPath() (string, error)
- func DefaultPluginRoot() (string, error)
- func DefaultStatePath() (string, error)
- func Remove(name, pluginRoot, statePath string) error
- func VerifyInstalled(entry InstalledPlugin) error
- func WriteState(path string, state *StateFile) error
- type AuditKind
- type AuditRecord
- type AuditSink
- type Config
- type HandleInfo
- type HookBindings
- type InstalledPlugin
- type Manager
- func (m *Manager) Audit() AuditSink
- func (m *Manager) Cfg() *Config
- func (m *Manager) Events(name string) eventspb.EventHandlerClient
- func (m *Manager) Handle(name string) (HandleInfo, bool)
- func (m *Manager) InjectClientsForTest(name string, reg registrypb.RegistryProviderClient, ...)
- func (m *Manager) Metrics() *Metrics
- func (m *Manager) Registry(name string) registrypb.RegistryProviderClient
- func (m *Manager) Start(ctx context.Context) error
- func (m *Manager) Stop()
- type Metrics
- type NullAuditSink
- type Options
- type PluginConfig
- type RecordingAuditSink
- type StateFile
- type ZapAuditSink
Constants ¶
const ( OutcomeOK = "ok" OutcomeError = "error" OutcomeTimeout = "timeout" )
Outcome values for AuditRecord.Outcome.
const ( OnErrorFailClosed = "fail_closed" OnErrorFailOpen = "fail_open" )
Supported values for PluginConfig.OnError.
const ( HookFetchSchema = "fetch_schema" HookRegisterConsumerUsage = "register_consumer_usage" HookOnBreakingChangeDetected = "on_breaking_change_detected" HookOnValidationFailed = "on_validation_failed" )
Supported hook names. The set is closed in v1; unknown hook names in the config file are load-time errors.
const CurrentStateVersion = 1
CurrentStateVersion is the schema version of StateFile. Bumped if the shape changes.
const DefaultCallTimeout = 5 * time.Second
Default per-call timeout when the plugin config omits timeout:.
const EnvDisablePlugins = "CVT_DISABLE_PLUGINS"
EnvDisablePlugins, when set to "1", forces Load to return an empty config regardless of file contents. Safe-mode escape hatch.
const SupportedConfigVersion = 1
SupportedConfigVersion is the config_version core accepts. Unknown values fail at load time.
Variables ¶
This section is empty.
Functions ¶
func DefaultConfigPath ¶
DefaultConfigPath returns ~/.cvt/config.yaml, expanded.
func DefaultPluginRoot ¶
DefaultPluginRoot returns ~/.cvt/plugins, expanded.
func DefaultStatePath ¶
DefaultStatePath returns ~/.cvt/plugins/state.json.
func Remove ¶
Remove deletes the plugin binary and its state entry under a single flock-guarded transaction. Ordering:
- Acquire lock.
- Verify entry exists and binary path is under pluginRoot.
- Write new state (without the entry).
- Release lock.
- Delete the binary (outside lock).
If step 3 fails, nothing changes. If step 5 fails, state is correct but the orphaned binary remains on disk — operator can delete by hand and reinstall works. The reverse ordering (binary-first) would leave state pointing at a missing file, blocking reinstall.
func VerifyInstalled ¶
func VerifyInstalled(entry InstalledPlugin) error
VerifyInstalled re-hashes the binary on disk and returns an error if it no longer matches the stored sha256, or the file is missing.
func WriteState ¶
WriteState writes state to the given path. Acquires the flock before writing. If you're doing a read-modify-write cycle, prefer withStateLock so the lock is held across both operations (otherwise concurrent mutators race to clobber each other's entries).
Types ¶
type AuditRecord ¶
type AuditRecord struct {
Kind AuditKind `json:"kind"`
Plugin string `json:"plugin"`
ReportedVersion string `json:"reported_version"`
SHA256 string `json:"sha256"`
PID int `json:"pid"`
RequestID string `json:"request_id"`
Service string `json:"service"`
Method string `json:"method"`
DurationMS int64 `json:"duration_ms"`
Outcome string `json:"outcome"`
ErrorCode string `json:"error_code,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
AuditRecord is a single plugin-call audit entry. Fields mirror the design doc's audit schema: plugin identity (name + reported version + install-time sha256 + pid), correlation (request_id), the gRPC call (service/method), and outcome metrics (duration, status, error code).
type AuditSink ¶
type AuditSink interface {
Record(AuditRecord)
}
AuditSink is the interface core plumbs audit records through. Server mode wires the existing server/cvtservice/audit_logger.go; CLI mode wires a Zap-backed sink. Tests wire an in-memory slice.
type Config ¶
type Config struct {
ConfigVersion int `yaml:"config_version"`
Plugins map[string]PluginConfig `yaml:"plugins"`
Hooks HookBindings `yaml:"hooks"`
}
Config is the top-level plugin configuration loaded from ~/.cvt/config.yaml. Empty Plugins + empty Hooks means "no plugins run"; plugin system stays dormant.
func Load ¶
Load reads and validates the plugin config from the given path. Missing file is not an error — it returns an empty Config (plugin system stays dormant). Returns a wrapped error on invalid YAML, unknown config_version, invalid plugin name, binary path escape, unset secret, or unknown hook name.
If CVT_DISABLE_PLUGINS=1 is set, Load returns an empty Config immediately without reading the file.
pluginRoot is the allowed plugin-binary directory, typically ~/.cvt/plugins; binaries declared outside this path are rejected.
type HandleInfo ¶
type HandleInfo struct {
Name string
ReportedVersion string
SHA256 string
PID int
Services []string
}
HandleInfo is the public read-only identity view of a running plugin.
type HookBindings ¶
type HookBindings struct {
FetchSchema string `yaml:"fetch_schema,omitempty"`
RegisterConsumerUsage string `yaml:"register_consumer_usage,omitempty"`
OnBreakingChangeDetected string `yaml:"on_breaking_change_detected,omitempty"`
OnValidationFailed string `yaml:"on_validation_failed,omitempty"`
}
HookBindings maps hook names to the single plugin that handles each. v1 = one plugin per hook; multi-plugin fanout deferred to v1.1.
type InstalledPlugin ¶
type InstalledPlugin struct {
BinaryPath string `json:"binary_path"`
SHA256 string `json:"sha256"`
InstalledAt time.Time `json:"installed_at"`
}
InstalledPlugin is per-plugin install-time metadata.
func Install ¶
func Install(srcPath, name, pluginRoot, statePath string) (InstalledPlugin, error)
Install copies (or verifies) the binary at srcPath into pluginRoot and records the entry in state.json under a single flock-guarded transaction. Returns the persisted entry.
Behavior:
- If srcPath is already inside pluginRoot, the binary stays in place and only its SHA256 is computed.
- Otherwise the binary is copied to pluginRoot/<basename>(src). The copy is staged to a temp file and atomically renamed, so a crashed or concurrent install never leaves a half-written binary.
- If the destination path is already registered under a DIFFERENT plugin name, Install refuses: the user must pick a distinct binary file or remove the existing plugin first.
- name must match the plugin-name regex.
type Manager ¶
type Manager struct {
// contains filtered or unexported fields
}
Manager owns the lifecycle of every configured plugin subprocess. It is a singleton per cvt process: Start forks all configured plugins in parallel, each plugin stays alive for the life of the manager, and Stop tears them all down.
Manager is NOT responsible for invoking plugin RPCs — that's what the typed clients from Registry(name) / Events(name) are for. The manager just owns their lifecycle and supplies ready-to-use client handles.
func New ¶
New constructs a Manager. The returned manager has no plugins running until Start is called.
func NewForTest ¶
NewForTest constructs a Manager without forking any plugins. Tests inject fake clients via InjectClientsForTest and exercise the manager-facing API (Registry, Events, Cfg, Metrics, Audit, Handle) without standing up real subprocesses.
Production code must use New + Start; this helper is for unit tests that target pluginclient / hooks policy paths.
func (*Manager) Cfg ¶
Cfg exposes the loaded config (used by pluginclient to look up per-hook plugin name and per-plugin on_error policy).
func (*Manager) Events ¶
func (m *Manager) Events(name string) eventspb.EventHandlerClient
Events returns the typed EventHandler client for the named plugin, or nil if the plugin is not running or doesn't implement events.v1.
func (*Manager) Handle ¶
func (m *Manager) Handle(name string) (HandleInfo, bool)
Handle returns the runtime identity bundle for the named plugin (used by audit wiring). Returns false if the plugin isn't running.
func (*Manager) InjectClientsForTest ¶
func (m *Manager) InjectClientsForTest( name string, reg registrypb.RegistryProviderClient, ev eventspb.EventHandlerClient, info HandleInfo, )
InjectClientsForTest installs the given typed clients under plugin name `name` so Registry(name) / Events(name) / Handle(name) return them. Pass nil for a client the test doesn't exercise. Safe to call multiple times across different names.
func (*Manager) Metrics ¶
Metrics exposes the metrics struct so callers (pluginclient) can observe per-call.
func (*Manager) Registry ¶
func (m *Manager) Registry(name string) registrypb.RegistryProviderClient
Registry returns the typed RegistryProvider client for the named plugin, or nil if the plugin is not running or doesn't implement registry.v1.
func (*Manager) Start ¶
Start forks every configured plugin in parallel, waits for each to complete its handshake (5s deadline per plugin), delivers configured secrets via SetConfig, and dispenses the typed sub-clients. Returns the first plugin-start error; on error, every already-started plugin is torn down before returning.
An empty plugin set is a valid no-op (returns nil), supporting the "safe mode" and "no plugins configured" paths.
func (*Manager) Stop ¶
func (m *Manager) Stop()
Stop gracefully shuts down every running plugin. Safe to call multiple times; subsequent calls are no-ops.
go-plugin's Client.Kill() already implements graceful shutdown: it closes the gRPC connection (plugin receives EOF, runs its deferred shutdown path), then waits up to ClientConfig.SyncStdout/Stderr flush + the go-plugin-internal kill timeout before escalating to SIGKILL. We inherit that behavior; no separate SIGTERM-first dance needed on top of it.
type Metrics ¶
type Metrics struct {
CallDuration *prometheus.HistogramVec
CallErrors *prometheus.CounterVec
Up *prometheus.GaugeVec
Info *prometheus.GaugeVec
Restarts *prometheus.CounterVec
}
Metrics holds the Prometheus series the plugin manager exports. Use NewMetrics(nil) to register against the default registry, or pass a custom registerer for tests.
The `Up` gauge is intentionally labelled by plugin name only (not version), so plugin version bumps don't leave a stale series at 1 forever and so malicious plugins that report a randomized version on every restart can't explode Prometheus cardinality. The version/name reported by the plugin is surfaced via the `Info` gauge, which always holds a single value (1) per plugin and gets fresh labels whenever a plugin's reported identity changes (old series is cleared at restart).
func NewMetrics ¶
func NewMetrics(reg prometheus.Registerer) *Metrics
NewMetrics builds + registers the four plugin metrics. Passing nil uses prometheus.DefaultRegisterer and memoizes the result so a subsequent call returns the same Metrics struct (avoiding "duplicate metrics collector" panics across test binaries).
type NullAuditSink ¶
type NullAuditSink struct{}
NullAuditSink drops records. Use when plugins are disabled.
func (NullAuditSink) Record ¶
func (NullAuditSink) Record(AuditRecord)
type Options ¶
type Options struct {
// Logger is the core Zap logger. If nil, a no-op logger is used; tests
// that care about log output should pass zaptest.NewLogger(t).
Logger *zap.Logger
// Metrics is the plugin-metrics struct. If nil, NewMetrics(nil) is
// called — registering against prometheus.DefaultRegisterer.
Metrics *Metrics
// Audit is the plugin-call audit sink. If nil, a ZapAuditSink using
// Logger is used; if Logger is also nil, audit is dropped.
Audit AuditSink
}
Options bundle optional Manager dependencies. All fields are optional; defaults are safe for production.
type PluginConfig ¶
type PluginConfig struct {
Binary string `yaml:"binary"`
Timeout time.Duration `yaml:"timeout"`
OnError string `yaml:"on_error"`
Secrets []string `yaml:"secrets"`
Config map[string]string `yaml:"config"`
}
PluginConfig is per-plugin configuration.
Note: there is no Restart field in v1. go-plugin's default behavior restarts crashed plugins automatically; a configurable restart policy is tracked as part of the v1.1 supervisor work (see issue #107).
type RecordingAuditSink ¶
type RecordingAuditSink struct {
Records []AuditRecord
}
RecordingAuditSink collects records in memory. Useful for tests.
func (*RecordingAuditSink) Record ¶
func (s *RecordingAuditSink) Record(r AuditRecord)
Record appends the entry.
type StateFile ¶
type StateFile struct {
Version int `json:"version"`
Plugins map[string]InstalledPlugin `json:"plugins"`
}
StateFile is the on-disk install-time metadata store at ~/.cvt/plugins/state.json. It tracks which plugins have been installed, their binary paths, and install-time SHA256 hashes.
Runtime state (pid, up/down, restart count, circuit state) is NEVER stored here. That data lives in-process and is exposed via the Prometheus /metrics endpoint. Keeping state.json install-only avoids disk contention between the running daemon and the CLI, and eliminates stale-read risk.
func ReadState ¶
ReadState loads the state file. Missing file returns an empty StateFile (not an error) since fresh installs have no state yet.
Note: ReadState does NOT hold a lock. Pure reads are acceptable (stale-read tolerance for list/inspect). Mutating callers must use withStateLock to serialize read-modify-write cycles.
type ZapAuditSink ¶
ZapAuditSink adapts a Zap logger into an AuditSink. Writes are synchronous: the caller blocks until Zap enqueues the entry (Zap's own buffering is transparent). This preserves the compliance invariant that no plugin call returns to the caller until the audit entry is durably handed off.
func (ZapAuditSink) Record ¶
func (s ZapAuditSink) Record(r AuditRecord)
Record emits the audit entry as a structured log at info level with a dedicated message so downstream filters can pick it out.