Documentation
¶
Overview ¶
Package txbackend contains the per-channel TX dispatcher that replaces the single modem Sender function graywolf used before Phase 3 of the KISS TCP-client / channel-backing plan.
Architectural summary:
Governor.Submit → Dispatcher.Send(tf) → Registry.Load() → ByChannel[tf.Channel] → fanout to every Backend (Modem or KissTnc) → errors.Join(...) → nil if any backend accepted.
Registry is an immutable snapshot published via atomic.Pointer[T]. A single watcher goroutine (owned by the Dispatcher) rebuilds the snapshot off the hot path in response to config-change events (notifyBridgeReload + KISS manager add/remove/mode-flip events) and publishes with one atomic.Store. Dispatcher.Send does one atomic.Load per frame — no locking on the hot path.
Connection health is deliberately NOT part of the registry. A KissTnc backend whose supervisor is in backoff stays registered; the Submit call to kiss.Manager returns ErrBackendDown and the Dispatcher records that outcome per-instance. This separates config (which backends exist) from runtime (which backends are up) and keeps hot-path snapshot swaps rare.
Fanout semantics: a channel may have multiple KissTnc instances (e.g. two remote TNCs peering the same logical channel). All instances receive every frame; per-instance outcomes are recorded independently via metrics, and Dispatcher.Send returns nil iff at least one instance accepted. Governor.Sent++ is incremented once per frame (it is a submission counter, not an airtime counter); per-backend emissions live in the dispatcher's own counter.
Shutdown order (see pkg/app/wiring.go Stop path):
- Governor.Drain(ctx) — stop accepting new Submits.
- Dispatcher.StopAccepting()— refuse new Send calls.
- Backends Close(ctx) — parallel (errgroup).
- kiss.Manager.Stop() — cancels per-instance supervisors and queues.
- modembridge.Stop() — kills the Rust subprocess.
Every long-running goroutine in this package (just the watcher) exits on ctx cancellation; tests use goleak.VerifyNone(t) at teardown.
Index ¶
- Constants
- Variables
- type Backend
- type Config
- type Dispatcher
- func (d *Dispatcher) Registry() *Registry
- func (d *Dispatcher) Send(tf *pb.TransmitFrame) error
- func (d *Dispatcher) SkipCSMA(channel uint32) bool
- func (d *Dispatcher) StartWatcher(ctx context.Context, signals <-chan struct{}, build func() *Snapshot)
- func (d *Dispatcher) StopAccepting()
- func (d *Dispatcher) WaitWatcher()
- type KissInstanceSender
- type KissTncBackend
- type Metrics
- type ModemBackend
- type ModemSender
- type Registry
- type Snapshot
Constants ¶
const ( BackendNameModem = "modem" BackendNameKiss = "kiss" )
Backend is one TX sink attached to a channel. Implementations:
- ModemBackend: wraps modembridge.Bridge.SendTransmitFrame. One per graywolf instance; attached to every channel with a bound input audio device.
- KissTncBackend: wraps kiss.Manager.TransmitOnChannel for a single KissInterface row with Mode=tnc AND AllowTxFromGovernor=true.
Submit is expected to be fast and non-blocking on the hot path: ModemBackend forwards over an in-process IPC channel; KissTncBackend does a non-blocking enqueue into a per-instance bounded queue. Slow work (socket write, subprocess IPC ack) happens on a separate goroutine owned by the backend's implementation.
Close is called during shutdown (step 3 in the package godoc). It must be idempotent and respect ctx cancellation. No-op is valid for backends whose underlying transport owns its own lifecycle (the ModemBackend is a thin wrapper and returns nil immediately). Backend.Name() label values. Exported so callers (e.g. the messages wiring adapter) can identify backend kinds without coupling to a magic string. New backend kinds must define and export their label here.
const ( OutcomeOK = "ok" OutcomeErr = "err" OutcomeBackendBusy = "backend_busy" OutcomeBackendDown = "backend_down" )
Outcome enumerates per-instance Submit results recorded by the dispatcher. Values are stable — they appear as the `outcome` label on graywolf_tx_backend_submits_total.
Variables ¶
var ErrBackendBusy = errors.New("txbackend: backend busy")
ErrBackendBusy is returned by a Backend.Submit when the backend's internal bounded queue cannot accept the frame without blocking. The Dispatcher records this as outcome=backend_busy for per-instance visibility. Non-terminal — the backend remains healthy.
var ErrBackendDown = errors.New("txbackend: backend down")
ErrBackendDown is returned by a Backend.Submit when the backend is known-disconnected (e.g. KISS tcp-client supervisor in backoff). The Dispatcher records this as outcome=backend_down. Non-terminal — a later frame may succeed after reconnect.
var ErrNoBackend = errors.New("txbackend: no backend registered for channel")
ErrNoBackend is returned by Dispatcher.Send when the channel has no registered backend. The submission is dropped and the per-channel graywolf_tx_no_backend_total counter increments.
var ErrStopped = errors.New("txbackend: dispatcher stopped")
ErrStopped is returned by Dispatcher.Send after StopAccepting has been called. Terminal — callers (Governor) must drop the frame and cease submitting.
Functions ¶
This section is empty.
Types ¶
type Backend ¶
type Backend interface {
// Submit hands tf to the backend's transport. Returns ErrBackendBusy
// if the backend's internal queue is momentarily full, ErrBackendDown
// if the transport is known-disconnected, or an opaque transport
// error otherwise. Must not block on slow I/O (writes run on a
// separate goroutine).
Submit(ctx context.Context, tf *pb.TransmitFrame) error
// Name identifies the backend kind for metrics: "modem" or "kiss".
Name() string
// InstanceID is a per-backend string that uniquely identifies this
// instance across the process. For ModemBackend this is "modem";
// for KissTncBackend it is "kiss-<interfaceID>". Used as the
// `instance` label on dispatcher metrics.
InstanceID() string
// AttachedChannels returns the channel IDs this backend serves.
// Read once during snapshot rebuild; the backend must not mutate
// its attached set after being handed to the dispatcher.
AttachedChannels() []uint32
// Close releases any resources owned by this backend (bounded
// queue drain, goroutine shutdown). Idempotent. Must return when
// ctx is cancelled even if resources remain. Most backends delegate
// to upstream lifecycle objects (kiss.Manager, modembridge) whose
// own shutdown is sequenced by the wiring layer.
Close(ctx context.Context) error
}
type Config ¶
type Config struct {
// Registry is the backing snapshot store. Required. Use NewRegistry.
Registry *Registry
// Metrics records per-submit telemetry. nil → no-op recorder.
Metrics Metrics
// Logger is optional; slog.Default is used when nil.
Logger *slog.Logger
}
Config is the Dispatcher's constructor argument.
type Dispatcher ¶
type Dispatcher struct {
// contains filtered or unexported fields
}
Dispatcher routes every TX frame through the Registry snapshot to zero-or-more Backend instances, fanning out and joining per-instance errors. Single hot-path instance per process.
func New ¶
func New(cfg Config) *Dispatcher
New returns a Dispatcher. The watcher goroutine is not started automatically — callers wire it separately via StartWatcher so tests that don't need live config updates can skip it.
func (*Dispatcher) Registry ¶
func (d *Dispatcher) Registry() *Registry
Registry returns the dispatcher's backing registry. Exposed for tests and the watcher wiring in pkg/app; Send callers never need it.
func (*Dispatcher) Send ¶
func (d *Dispatcher) Send(tf *pb.TransmitFrame) error
Send is the hot-path entry point the Governor calls once per frame. It resolves the frame's channel to a set of backends, fans out, and returns the joined error. Returns nil when at least one backend accepted the frame, ErrNoBackend when the channel has no backend, ErrStopped after StopAccepting, or errors.Join(perInstance...) when every backend failed.
Send does NOT block on slow I/O: Backend implementations either hand off to an in-process IPC channel (modem) or a bounded queue (kiss). A momentarily full kiss queue returns ErrBackendBusy immediately; the dispatcher records the outcome and moves on. See the package godoc for the full fanout contract.
func (*Dispatcher) SkipCSMA ¶
func (d *Dispatcher) SkipCSMA(channel uint32) bool
SkipCSMA reports whether the given channel should bypass the governor's p-persistence / slot-time / DCD wait. True for KISS-only channels (no modem backend) since TCP links have no carrier to sense. The governor calls this once per processOne iteration immediately before the CSMA branch.
func (*Dispatcher) StartWatcher ¶
func (d *Dispatcher) StartWatcher(ctx context.Context, signals <-chan struct{}, build func() *Snapshot)
StartWatcher launches the single rebuild goroutine that consumes config-change signals and republishes snapshots. Build must return a fresh snapshot on each call; the dispatcher does not cache. The goroutine exits when ctx is cancelled.
signals is the fan-in channel the caller uses to trigger rebuilds. A send with no buffer capacity is dropped — every caller of NotifyReload does a non-blocking select, so the watcher just needs to coalesce bursts.
An initial rebuild is performed synchronously before StartWatcher returns so the first Send after startup observes a populated snapshot rather than the empty constructor default.
func (*Dispatcher) StopAccepting ¶
func (d *Dispatcher) StopAccepting()
StopAccepting marks the dispatcher closed so subsequent Send calls return ErrStopped. Idempotent; safe to call from the shutdown orchestrator. Does NOT close the watcher goroutine — that exits on its own ctx cancellation in StartWatcher.
func (*Dispatcher) WaitWatcher ¶
func (d *Dispatcher) WaitWatcher()
WaitWatcher blocks until the watcher goroutine has exited. Called by the shutdown orchestrator after ctx cancel so stop returns only once the rebuild goroutine is truly gone.
type KissInstanceSender ¶
type KissInstanceSender interface {
// Enqueue does a non-blocking send of (frame, frameID) onto the
// per-instance bounded queue. Returns ErrBackendBusy when the
// queue is full, ErrBackendDown when the writer goroutine has
// stopped, or an opaque transport error otherwise.
Enqueue(frame []byte, frameID uint64) error
}
KissInstanceSender is the minimal surface a KissTncBackend needs from the per-interface queue owned by kiss.Manager. The concrete implementation lives in pkg/kiss (instanceTxQueue); the indirection keeps txbackend free of circular dependencies on kiss internals and makes test stubs trivial.
type KissTncBackend ¶
type KissTncBackend struct {
// contains filtered or unexported fields
}
KissTncBackend is the per-interface TX backend for a KissInterface row with Mode=tnc and AllowTxFromGovernor=true. One instance per eligible interface; a channel that has two such interfaces attached gets two independent KissTncBackend instances in the snapshot so the dispatcher fans out to both.
func NewKissTncBackend ¶
func NewKissTncBackend(sender KissInstanceSender, interfaceID, channel uint32) *KissTncBackend
NewKissTncBackend constructs a backend for one KissInterface. interfaceID is the DB row ID used to build the instance label and to correlate with kiss.Manager.Status(). channel is the single channel this interface services.
func (*KissTncBackend) AttachedChannels ¶
func (k *KissTncBackend) AttachedChannels() []uint32
AttachedChannels returns the single channel this interface serves.
func (*KissTncBackend) Close ¶
func (k *KissTncBackend) Close(context.Context) error
Close is a no-op at the txbackend layer. The per-instance queue's writer goroutine is owned by kiss.Manager, which cancels it via the ctx passed to Manager.Start when the wiring layer's kissComponent stop runs. Duplicating the teardown here would race Manager.Stop.
func (*KissTncBackend) InstanceID ¶
func (k *KissTncBackend) InstanceID() string
InstanceID returns the per-interface identifier used as the `instance` metric label so operators can attribute drops to a specific KissInterface row.
func (*KissTncBackend) Name ¶
func (k *KissTncBackend) Name() string
Name returns the metric label for this backend kind.
func (*KissTncBackend) Submit ¶
func (k *KissTncBackend) Submit(_ context.Context, tf *pb.TransmitFrame) error
Submit hands off to the per-instance tx queue. The queue's writer goroutine performs the actual socket write; Submit never blocks on slow peers.
type Metrics ¶
type Metrics interface {
// ObserveTxBackendSubmit records one per-instance fan-out outcome.
ObserveTxBackendSubmit(channel uint32, backend, instance, outcome string, d time.Duration)
// ObserveTxNoBackend records a dispatcher drop because no backend
// was registered for the frame's channel.
ObserveTxNoBackend(channel uint32)
// ObserveTxFrame records the single per-frame submission counter
// (mirrors the pre-Phase-3 behaviour — one increment per governor
// Submit regardless of fan-out size).
ObserveTxFrame(channel uint32)
}
Metrics is the minimal interface the dispatcher uses to record per-submit telemetry. The concrete implementation in pkg/metrics satisfies it; tests use a fake that captures calls for assertion.
type ModemBackend ¶
type ModemBackend struct {
// contains filtered or unexported fields
}
ModemBackend is the single TX backend that routes governor-scheduled frames into the Rust modem subprocess via modembridge. One instance per graywolf process; attached to every channel whose configstore row has a non-nil InputDeviceID.
func NewModemBackend ¶
func NewModemBackend(sender ModemSender, channels []uint32) *ModemBackend
NewModemBackend constructs a backend servicing the given channel IDs. channels should be the complete set of channels with a bound input audio device at snapshot-build time; a new backend is constructed on each registry rebuild so the membership is always current.
func (*ModemBackend) AttachedChannels ¶
func (m *ModemBackend) AttachedChannels() []uint32
AttachedChannels returns the channel IDs this backend serves.
func (*ModemBackend) Close ¶
func (m *ModemBackend) Close(context.Context) error
Close is a no-op. modembridge owns its own subprocess lifecycle and is torn down by the wiring layer's bridgeComponent.stop.
func (*ModemBackend) InstanceID ¶
func (m *ModemBackend) InstanceID() string
InstanceID returns a process-wide-unique identifier for this backend. There is only ever one modem backend, so the kind label doubles as the instance label — per-channel labelling already lives in the `channel` metric label.
func (*ModemBackend) Name ¶
func (m *ModemBackend) Name() string
Name returns the metric label for this backend kind.
func (*ModemBackend) Submit ¶
func (m *ModemBackend) Submit(_ context.Context, tf *pb.TransmitFrame) error
Submit forwards tf to the modembridge. Errors from the bridge are returned verbatim — the dispatcher classifies them as OutcomeErr. Busy/Down outcomes don't apply to modembridge: it has a single IPC socket and no bounded queue; a dead subprocess surfaces as an IPC error which is recorded as a generic err.
type ModemSender ¶
type ModemSender interface {
SendTransmitFrame(*pb.TransmitFrame) error
}
ModemSender is the minimal surface the ModemBackend needs from modembridge.Bridge. Injection point keeps the package testable.
type Registry ¶
type Registry struct {
// contains filtered or unexported fields
}
Registry holds the current snapshot behind an atomic.Pointer so the hot-path Send has no lock. Writers are expected to build a complete new snapshot and Publish it — never mutate a live snapshot in place, because a reader may still hold a pointer to the old one.
func NewRegistry ¶
func NewRegistry() *Registry
NewRegistry returns a Registry initialised with an empty snapshot so Load is always safe to call.
func (*Registry) Load ¶
Load returns the current snapshot. Pointer is never nil because NewRegistry seeds the atomic with an empty snapshot.
func (*Registry) Publish ¶
Publish atomically installs s as the current snapshot. Subsequent Load calls observe s. Older snapshots remain valid until every reader that already loaded them has released them (Go GC handles the lifetime — readers should not retain snapshots beyond the span of a single Send call).
type Snapshot ¶
type Snapshot struct {
// ByChannel maps channel ID → every registered Backend serving
// that channel. The slice is typically length 1 (modem-only or a
// single kiss-tnc) but may be >1 when a KISS-TNC channel has
// multiple TNC interfaces attached. Nil / missing entry means no
// backend — Dispatcher.Send returns ErrNoBackend.
ByChannel map[uint32][]Backend
// CsmaSkip[ch] is true when the channel has zero modem backends
// (KISS-only). The governor uses this to bypass its p-persistence /
// slot-time / DCD wait: there is no carrier to sense on a TCP
// link, so CSMA math is meaningless. A channel with both a modem
// and kiss-tnc backends (forbidden by the validator but defended
// here) keeps CSMA enabled because the modem half still needs it.
CsmaSkip map[uint32]bool
}
Snapshot is the immutable per-frame view of the channel → backends map. Published by the dispatcher's watcher goroutine via one atomic store; read by every Dispatcher.Send via one atomic load. Callers MUST NOT mutate the returned maps or slices.
The type is exported so wiring code in other packages can construct a snapshot via BuildSnapshot and pass it back to the dispatcher via the StartWatcher build closure. Callers outside this package should treat the fields as read-only.
func BuildSnapshot ¶
func BuildSnapshot(modem *ModemBackend, modemChannels []uint32, kissBackends []*KissTncBackend) *Snapshot
BuildSnapshot builds a fresh Snapshot from the given raw inputs. Pure function — no I/O, no locking — so the caller (the watcher goroutine) can invoke it off the hot path and Publish the result with a single atomic store.
Inputs:
- modem: the shared ModemBackend (may be nil if modembridge isn't wired in tests; the snapshot will then have zero modem backends).
- modemChannels: the set of channel IDs with a bound input audio device. Used to populate the modem backend's ByChannel entries even though ModemBackend exposes AttachedChannels itself — we want the caller to be explicit so test wiring can exercise partial configurations without stubbing a full modem backend.
- kissBackends: one *KissTncBackend per eligible KissInterface row. Membership is decided by the caller per D4: Mode=tnc AND AllowTxFromGovernor=true.
CSMA skip is computed per channel: true iff the channel has zero modem backends (no carrier to sense).