pluginmgr

package
v0.7.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 21, 2026 License: MIT Imports: 25 Imported by: 0

Documentation

Overview

Package pluginmgr is CVT's plugin lifecycle + pipeline manager. It is core-only; plugin authors never import this package.

Index

Constants

View Source
const (
	OutcomeOK      = "ok"
	OutcomeError   = "error"
	OutcomeTimeout = "timeout"
)

Outcome values for AuditRecord.Outcome.

View Source
const (
	OnErrorFailClosed = "fail_closed"
	OnErrorFailOpen   = "fail_open"
)

Supported values for PluginConfig.OnError.

View Source
const (
	HookFetchSchema              = "fetch_schema"
	HookRegisterConsumerUsage    = "register_consumer_usage"
	HookOnBreakingChangeDetected = "on_breaking_change_detected"
	HookOnValidationFailed       = "on_validation_failed"
)

Supported hook names. The set is closed in v1; unknown hook names in the config file are load-time errors.

View Source
const CurrentStateVersion = 1

CurrentStateVersion is the schema version of StateFile. Bumped if the shape changes.

View Source
const DefaultCallTimeout = 5 * time.Second

Default per-call timeout when the plugin config omits timeout:.

View Source
const EnvDisablePlugins = "CVT_DISABLE_PLUGINS"

EnvDisablePlugins, when set to "1", forces Load to return an empty config regardless of file contents. Safe-mode escape hatch.

View Source
const SupportedConfigVersion = 1

SupportedConfigVersion is the config_version core accepts. Unknown values fail at load time.

Variables

This section is empty.

Functions

func DefaultConfigPath

func DefaultConfigPath() (string, error)

DefaultConfigPath returns ~/.cvt/config.yaml, expanded.

func DefaultPluginRoot

func DefaultPluginRoot() (string, error)

DefaultPluginRoot returns ~/.cvt/plugins, expanded.

func DefaultStatePath

func DefaultStatePath() (string, error)

DefaultStatePath returns ~/.cvt/plugins/state.json.

func Remove

func Remove(name, pluginRoot, statePath string) error

Remove deletes the plugin binary and its state entry under a single flock-guarded transaction. Ordering:

  1. Acquire lock.
  2. Verify entry exists and binary path is under pluginRoot.
  3. Write new state (without the entry).
  4. Release lock.
  5. Delete the binary (outside lock).

If step 3 fails, nothing changes. If step 5 fails, state is correct but the orphaned binary remains on disk — operator can delete by hand and reinstall works. The reverse ordering (binary-first) would leave state pointing at a missing file, blocking reinstall.

func VerifyInstalled

func VerifyInstalled(entry InstalledPlugin) error

VerifyInstalled re-hashes the binary on disk and returns an error if it no longer matches the stored sha256, or the file is missing.

func WriteState

func WriteState(path string, state *StateFile) error

WriteState writes state to the given path. Acquires the flock before writing. If you're doing a read-modify-write cycle, prefer withStateLock so the lock is held across both operations (otherwise concurrent mutators race to clobber each other's entries).

Types

type AuditKind

type AuditKind string

AuditKind labels plugin-call audit entries by I/O direction.

const (
	AuditKindRead  AuditKind = "read"
	AuditKindWrite AuditKind = "write"
)

type AuditRecord

type AuditRecord struct {
	Kind            AuditKind `json:"kind"`
	Plugin          string    `json:"plugin"`
	ReportedVersion string    `json:"reported_version"`
	SHA256          string    `json:"sha256"`
	PID             int       `json:"pid"`
	RequestID       string    `json:"request_id"`
	Service         string    `json:"service"`
	Method          string    `json:"method"`
	DurationMS      int64     `json:"duration_ms"`
	Outcome         string    `json:"outcome"`
	ErrorCode       string    `json:"error_code,omitempty"`
	Timestamp       time.Time `json:"timestamp"`
}

AuditRecord is a single plugin-call audit entry. Fields mirror the design doc's audit schema: plugin identity (name + reported version + install-time sha256 + pid), correlation (request_id), the gRPC call (service/method), and outcome metrics (duration, status, error code).

type AuditSink

type AuditSink interface {
	Record(AuditRecord)
}

AuditSink is the interface core plumbs audit records through. Server mode wires the existing server/cvtservice/audit_logger.go; CLI mode wires a Zap-backed sink. Tests wire an in-memory slice.

type Config

type Config struct {
	ConfigVersion int                     `yaml:"config_version"`
	Plugins       map[string]PluginConfig `yaml:"plugins"`
	Hooks         HookBindings            `yaml:"hooks"`
}

Config is the top-level plugin configuration loaded from ~/.cvt/config.yaml. Empty Plugins + empty Hooks means "no plugins run"; plugin system stays dormant.

func Load

func Load(path, pluginRoot string) (*Config, error)

Load reads and validates the plugin config from the given path. Missing file is not an error — it returns an empty Config (plugin system stays dormant). Returns a wrapped error on invalid YAML, unknown config_version, invalid plugin name, binary path escape, unset secret, or unknown hook name.

If CVT_DISABLE_PLUGINS=1 is set, Load returns an empty Config immediately without reading the file.

pluginRoot is the allowed plugin-binary directory, typically ~/.cvt/plugins; binaries declared outside this path are rejected.

type HandleInfo

type HandleInfo struct {
	Name            string
	ReportedVersion string
	SHA256          string
	PID             int
	Services        []string
}

HandleInfo is the public read-only identity view of a running plugin.

type HookBindings

type HookBindings struct {
	FetchSchema              string `yaml:"fetch_schema,omitempty"`
	RegisterConsumerUsage    string `yaml:"register_consumer_usage,omitempty"`
	OnBreakingChangeDetected string `yaml:"on_breaking_change_detected,omitempty"`
	OnValidationFailed       string `yaml:"on_validation_failed,omitempty"`
}

HookBindings maps hook names to the single plugin that handles each. v1 = one plugin per hook; multi-plugin fanout deferred to v1.1.

type InstalledPlugin

type InstalledPlugin struct {
	BinaryPath  string    `json:"binary_path"`
	SHA256      string    `json:"sha256"`
	InstalledAt time.Time `json:"installed_at"`
}

InstalledPlugin is per-plugin install-time metadata.

func Install

func Install(srcPath, name, pluginRoot, statePath string) (InstalledPlugin, error)

Install copies (or verifies) the binary at srcPath into pluginRoot and records the entry in state.json under a single flock-guarded transaction. Returns the persisted entry.

Behavior:

  • If srcPath is already inside pluginRoot, the binary stays in place and only its SHA256 is computed.
  • Otherwise the binary is copied to pluginRoot/<basename>(src). The copy is staged to a temp file and atomically renamed, so a crashed or concurrent install never leaves a half-written binary.
  • If the destination path is already registered under a DIFFERENT plugin name, Install refuses: the user must pick a distinct binary file or remove the existing plugin first.
  • name must match the plugin-name regex.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager owns the lifecycle of every configured plugin subprocess. It is a singleton per cvt process: Start forks all configured plugins in parallel, each plugin stays alive for the life of the manager, and Stop tears them all down.

Manager is NOT responsible for invoking plugin RPCs — that's what the typed clients from Registry(name) / Events(name) are for. The manager just owns their lifecycle and supplies ready-to-use client handles.

func New

func New(cfg *Config, state *StateFile, opts Options) *Manager

New constructs a Manager. The returned manager has no plugins running until Start is called.

func NewForTest

func NewForTest(cfg *Config, opts Options) *Manager

NewForTest constructs a Manager without forking any plugins. Tests inject fake clients via InjectClientsForTest and exercise the manager-facing API (Registry, Events, Cfg, Metrics, Audit, Handle) without standing up real subprocesses.

Production code must use New + Start; this helper is for unit tests that target pluginclient / hooks policy paths.

func (*Manager) Audit

func (m *Manager) Audit() AuditSink

Audit exposes the audit sink so callers can emit records.

func (*Manager) Cfg

func (m *Manager) Cfg() *Config

Cfg exposes the loaded config (used by pluginclient to look up per-hook plugin name and per-plugin on_error policy).

func (*Manager) Events

func (m *Manager) Events(name string) eventspb.EventHandlerClient

Events returns the typed EventHandler client for the named plugin, or nil if the plugin is not running or doesn't implement events.v1.

func (*Manager) Handle

func (m *Manager) Handle(name string) (HandleInfo, bool)

Handle returns the runtime identity bundle for the named plugin (used by audit wiring). Returns false if the plugin isn't running.

func (*Manager) InjectClientsForTest

func (m *Manager) InjectClientsForTest(
	name string,
	reg registrypb.RegistryProviderClient,
	ev eventspb.EventHandlerClient,
	info HandleInfo,
)

InjectClientsForTest installs the given typed clients under plugin name `name` so Registry(name) / Events(name) / Handle(name) return them. Pass nil for a client the test doesn't exercise. Safe to call multiple times across different names.

func (*Manager) Metrics

func (m *Manager) Metrics() *Metrics

Metrics exposes the metrics struct so callers (pluginclient) can observe per-call.

func (*Manager) Registry

func (m *Manager) Registry(name string) registrypb.RegistryProviderClient

Registry returns the typed RegistryProvider client for the named plugin, or nil if the plugin is not running or doesn't implement registry.v1.

func (*Manager) Start

func (m *Manager) Start(ctx context.Context) error

Start forks every configured plugin in parallel, waits for each to complete its handshake (5s deadline per plugin), delivers configured secrets via SetConfig, and dispenses the typed sub-clients. Returns the first plugin-start error; on error, every already-started plugin is torn down before returning.

An empty plugin set is a valid no-op (returns nil), supporting the "safe mode" and "no plugins configured" paths.

func (*Manager) Stop

func (m *Manager) Stop()

Stop gracefully shuts down every running plugin. Safe to call multiple times; subsequent calls are no-ops.

go-plugin's Client.Kill() already implements graceful shutdown: it closes the gRPC connection (plugin receives EOF, runs its deferred shutdown path), then waits up to ClientConfig.SyncStdout/Stderr flush + the go-plugin-internal kill timeout before escalating to SIGKILL. We inherit that behavior; no separate SIGTERM-first dance needed on top of it.

type Metrics

type Metrics struct {
	CallDuration *prometheus.HistogramVec
	CallErrors   *prometheus.CounterVec
	Up           *prometheus.GaugeVec
	Info         *prometheus.GaugeVec
	Restarts     *prometheus.CounterVec
}

Metrics holds the Prometheus series the plugin manager exports. Use NewMetrics(nil) to register against the default registry, or pass a custom registerer for tests.

The `Up` gauge is intentionally labelled by plugin name only (not version), so plugin version bumps don't leave a stale series at 1 forever and so malicious plugins that report a randomized version on every restart can't explode Prometheus cardinality. The version/name reported by the plugin is surfaced via the `Info` gauge, which always holds a single value (1) per plugin and gets fresh labels whenever a plugin's reported identity changes (old series is cleared at restart).

func NewMetrics

func NewMetrics(reg prometheus.Registerer) *Metrics

NewMetrics builds + registers the four plugin metrics. Passing nil uses prometheus.DefaultRegisterer and memoizes the result so a subsequent call returns the same Metrics struct (avoiding "duplicate metrics collector" panics across test binaries).

type NullAuditSink

type NullAuditSink struct{}

NullAuditSink drops records. Use when plugins are disabled.

func (NullAuditSink) Record

func (NullAuditSink) Record(AuditRecord)

type Options

type Options struct {
	// Logger is the core Zap logger. If nil, a no-op logger is used; tests
	// that care about log output should pass zaptest.NewLogger(t).
	Logger *zap.Logger

	// Metrics is the plugin-metrics struct. If nil, NewMetrics(nil) is
	// called — registering against prometheus.DefaultRegisterer.
	Metrics *Metrics

	// Audit is the plugin-call audit sink. If nil, a ZapAuditSink using
	// Logger is used; if Logger is also nil, audit is dropped.
	Audit AuditSink
}

Options bundle optional Manager dependencies. All fields are optional; defaults are safe for production.

type PluginConfig

type PluginConfig struct {
	Binary  string            `yaml:"binary"`
	Timeout time.Duration     `yaml:"timeout"`
	OnError string            `yaml:"on_error"`
	Secrets []string          `yaml:"secrets"`
	Config  map[string]string `yaml:"config"`
}

PluginConfig is per-plugin configuration.

Note: there is no Restart field in v1. go-plugin's default behavior restarts crashed plugins automatically; a configurable restart policy is tracked as part of the v1.1 supervisor work (see issue #107).

type RecordingAuditSink

type RecordingAuditSink struct {
	Records []AuditRecord
}

RecordingAuditSink collects records in memory. Useful for tests.

func (*RecordingAuditSink) Record

func (s *RecordingAuditSink) Record(r AuditRecord)

Record appends the entry.

type StateFile

type StateFile struct {
	Version int                        `json:"version"`
	Plugins map[string]InstalledPlugin `json:"plugins"`
}

StateFile is the on-disk install-time metadata store at ~/.cvt/plugins/state.json. It tracks which plugins have been installed, their binary paths, and install-time SHA256 hashes.

Runtime state (pid, up/down, restart count, circuit state) is NEVER stored here. That data lives in-process and is exposed via the Prometheus /metrics endpoint. Keeping state.json install-only avoids disk contention between the running daemon and the CLI, and eliminates stale-read risk.

func ReadState

func ReadState(path string) (*StateFile, error)

ReadState loads the state file. Missing file returns an empty StateFile (not an error) since fresh installs have no state yet.

Note: ReadState does NOT hold a lock. Pure reads are acceptable (stale-read tolerance for list/inspect). Mutating callers must use withStateLock to serialize read-modify-write cycles.

type ZapAuditSink

type ZapAuditSink struct{ L *zap.Logger }

ZapAuditSink adapts a Zap logger into an AuditSink. Writes are synchronous: the caller blocks until Zap enqueues the entry (Zap's own buffering is transparent). This preserves the compliance invariant that no plugin call returns to the caller until the audit entry is durably handed off.

func (ZapAuditSink) Record

func (s ZapAuditSink) Record(r AuditRecord)

Record emits the audit entry as a structured log at info level with a dedicated message so downstream filters can pick it out.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL