Documentation
¶
Overview ¶
Package lplex is a CAN bus HTTP bridge for NMEA 2000.
It reads raw CAN frames from a SocketCAN interface, reassembles fast-packets, tracks device discovery, and streams frames to clients over SSE with session management, filtering, and replay.
The package can be embedded into other Go services. Create a Broker to manage frame routing, a Server to expose the HTTP API, and optionally a JournalWriter to record frames to disk.
broker := lplex.NewBroker(lplex.BrokerConfig{
RingSize: 65536,
MaxBufferDuration: 5 * time.Minute,
Logger: logger,
})
go broker.Run()
srv := lplex.NewServer(broker, logger, lplex.SendPolicy{})
mux.Handle("/nmea/", http.StripPrefix("/nmea", srv))
Feed frames into the broker via Broker.RxFrames:
broker.RxFrames() <- lplex.RxFrame{
Timestamp: time.Now(),
Header: lplex.CANHeader{Priority: 2, PGN: 129025, Source: 10, Destination: 0xFF},
Data: payload,
}
When done, close the rx channel and the broker goroutine exits:
broker.CloseRx()
Index ¶
- Constants
- Variables
- func CANReader(ctx context.Context, iface string, rxFrames chan<- RxFrame, ...) error
- func CANWriter(ctx context.Context, iface string, txFrames <-chan TxRequest, ...) error
- func FragmentFastPacket(data []byte, seqCounter uint8) [][]byte
- func HealthHandler(cfg HealthConfig) http.HandlerFunc
- func IsFastPacket(pgnNum uint32) bool
- func MetricsHandler(broker *Broker, replStatus func() *ReplicationStatus) http.HandlerFunc
- func ParseISO8601Duration(s string) (time.Duration, error)
- type ArchiveTrigger
- type BlockWriter
- type BlockWriterConfig
- type Broker
- func (b *Broker) AckSession(id string, seq uint64) error
- func (b *Broker) CloseRx()
- func (b *Broker) CreateSession(id string, bufferTimeout time.Duration, filter *EventFilter) (*ClientSession, uint64)
- func (b *Broker) CurrentSeq() uint64
- func (b *Broker) Devices() *DeviceRegistry
- func (b *Broker) Done() <-chan struct{}
- func (b *Broker) GetSession(id string) *ClientSession
- func (b *Broker) LastFrameTime() time.Time
- func (b *Broker) NewConsumer(cfg ConsumerConfig) *Consumer
- func (b *Broker) Run()
- func (b *Broker) RxFrames() chan<- RxFrame
- func (b *Broker) SendISORequest(dst uint8, pgn uint32) error
- func (b *Broker) SetJournal(ch chan<- RxFrame)
- func (b *Broker) Stats() BrokerStats
- func (b *Broker) Subscribe(filter *EventFilter) (*subscriber, func())
- func (b *Broker) TouchSession(id string)
- func (b *Broker) TxFrames() <-chan TxRequest
- func (b *Broker) Values() *ValueStore
- type BrokerConfig
- type BrokerHealth
- type BrokerStats
- type BusSilenceMonitor
- type ByteMaskDiff
- type CANHeader
- type ChangeEvent
- type ChangeEventType
- type ChangeReplayer
- type ChangeTracker
- func (ct *ChangeTracker) Process(ts time.Time, source uint8, pgnID uint32, data []byte, seq uint64) *ChangeEvent
- func (ct *ChangeTracker) Remove(source uint8, pgnID uint32, subKey uint64)
- func (ct *ChangeTracker) Reset()
- func (ct *ChangeTracker) Tick(now time.Time) []ChangeEvent
- func (ct *ChangeTracker) TrackedPairs() int
- type ChangeTrackerConfig
- type ClientSession
- type ComponentHealth
- type Consumer
- type ConsumerConfig
- type DecodedDeviceValues
- type DecodedPGNValue
- type Device
- type DeviceRegistry
- func (r *DeviceRegistry) Get(source uint8) *Device
- func (r *DeviceRegistry) HandleAddressClaim(source uint8, data []byte) *Device
- func (r *DeviceRegistry) HandleProductInfo(source uint8, data []byte) *Device
- func (r *DeviceRegistry) RecordPacket(source uint8, ts time.Time, dataLen int) bool
- func (r *DeviceRegistry) Snapshot() []Device
- func (r *DeviceRegistry) SnapshotJSON() json.RawMessage
- func (r *DeviceRegistry) SynthesizeFrames(ts time.Time) []RxFrame
- type DeviceValues
- type DiffMethod
- type EventFilter
- type EventLog
- type FastPacketAssembler
- type FieldTolerance
- type FieldToleranceDiff
- type Frame
- type HealthConfig
- type HealthStatus
- type HoleTracker
- type InstanceManager
- func (im *InstanceManager) Get(id string) *InstanceState
- func (im *InstanceManager) GetOrCreate(id string) *InstanceState
- func (im *InstanceManager) List() []InstanceSummary
- func (im *InstanceManager) SetInstancePaused(instanceID string, paused bool)
- func (im *InstanceManager) SetJournalRotation(duration time.Duration, size int64)
- func (im *InstanceManager) SetOnRotate(fn func(instanceID string, rf RotatedFile))
- func (im *InstanceManager) Shutdown()
- type InstanceState
- type InstanceStatus
- type InstanceSummary
- type JournalConfig
- type JournalKeeper
- type JournalWriter
- type KeeperConfig
- type KeeperDir
- type OverflowPolicy
- type PGNMatcher
- type PGNValue
- type ReplicationClient
- type ReplicationClientConfig
- type ReplicationEvent
- type ReplicationEventType
- type ReplicationHealth
- type ReplicationServer
- func (s *ReplicationServer) Backfill(stream pb.Replication_BackfillServer) error
- func (s *ReplicationServer) GetInstanceBroker(instanceID string) *Broker
- func (s *ReplicationServer) GetInstanceState(instanceID string) *InstanceState
- func (s *ReplicationServer) Handshake(ctx context.Context, req *pb.HandshakeRequest) (*pb.HandshakeResponse, error)
- func (s *ReplicationServer) Live(stream pb.Replication_LiveServer) error
- type ReplicationStatus
- type RotatedFile
- type RxFrame
- type SendPolicy
- type SendRule
- type SeqRange
- type Server
- type SubKeyFunc
- type SyncState
- type TxRequest
- type ValueStore
- func (vs *ValueStore) DecodedSnapshot(devices *DeviceRegistry, filter *EventFilter) []DecodedDeviceValues
- func (vs *ValueStore) DecodedSnapshotJSON(devices *DeviceRegistry, filter *EventFilter) json.RawMessage
- func (vs *ValueStore) Record(source uint8, pgn uint32, ts time.Time, data []byte, seq uint64)
- func (vs *ValueStore) Snapshot(devices *DeviceRegistry, filter *EventFilter) []DeviceValues
- func (vs *ValueStore) SnapshotJSON(devices *DeviceRegistry, filter *EventFilter) json.RawMessage
Constants ¶
const DefaultLagCheckInterval = 1000
DefaultLagCheckInterval controls how often the boat checks for lag in the live send loop. Checked every N frames sent rather than by wall clock because when lagging, consumer.Next() returns instantly and the loop spins at CPU speed. At max bus rate (2000 fps) this checks roughly every 0.5s.
const DefaultMaxFrameRate = 2000
NMEA 2000 runs on CAN 2.0B at 250 kbit/s. An extended frame (29-bit ID, 8-byte payload) is roughly 131-157 bits depending on bit stuffing. Theoretical max is ~1800 frames/sec. We allow 2000 to give ~10% headroom for measurement jitter and bit-stuffing variance.
const DefaultMaxLiveLag uint64 = 10_000
DefaultMaxLiveLag is the frame count threshold for live lag detection. If the live stream falls this far behind the broker head (boat-side) or the boat's reported head (cloud-side), the stream is killed and the gap switches to backfill mode. ~5 seconds at max bus rate.
const DefaultMinLagReconnectInterval = 30 * time.Second
DefaultMinLagReconnectInterval prevents thrashing when the system is persistently overloaded. If lag keeps recurring, we wait at least this long between lag-triggered reconnects.
const DefaultRateBurst = 500
DefaultRateBurst is the burst allowance for transient spikes. Power-on storms (every device announces simultaneously) can briefly exceed the sustained rate. 500 frames absorbs a ~250ms burst at max bus load.
Variables ¶
var ( ParseCANID = canbus.ParseCANID BuildCANID = canbus.BuildCANID )
var ErrFallenBehind = errors.New("consumer fallen behind: data no longer available")
ErrFallenBehind is returned by Consumer.Next when the consumer's cursor has fallen behind both the ring buffer and available journal files.
Functions ¶
func CANReader ¶
func CANReader(ctx context.Context, iface string, rxFrames chan<- RxFrame, logger *slog.Logger) error
CANReader reads frames from SocketCAN, reassembles fast-packets, and sends completed frames to the broker's rxFrames channel.
func CANWriter ¶
func CANWriter(ctx context.Context, iface string, txFrames <-chan TxRequest, logger *slog.Logger) error
CANWriter reads from the broker's txFrames channel and writes to SocketCAN. Handles fast-packet fragmentation for payloads > 8 bytes.
func FragmentFastPacket ¶
FragmentFastPacket splits a payload into CAN frames for fast-packet TX. seqCounter should be incremented per transfer (wraps at 7, 3-bit field). Returns a slice of 8-byte CAN frame payloads.
func HealthHandler ¶ added in v0.3.1
func HealthHandler(cfg HealthConfig) http.HandlerFunc
HealthHandler returns an http.HandlerFunc that serves the /healthz endpoint.
func IsFastPacket ¶
IsFastPacket returns true if the PGN uses fast-packet transfer.
func MetricsHandler ¶ added in v0.3.1
func MetricsHandler(broker *Broker, replStatus func() *ReplicationStatus) http.HandlerFunc
MetricsHandler returns an http.HandlerFunc that serves Prometheus-format metrics from the broker's Stats(). Optional ReplicationStatus can be provided via a callback for replication-aware deployments.
Types ¶
type ArchiveTrigger ¶
type ArchiveTrigger int
ArchiveTrigger controls when journal files are archived.
const ( ArchiveDisabled ArchiveTrigger = iota ArchiveOnRotate // archive immediately after rotation ArchiveBeforeExpire // archive only when about to be deleted by retention )
func ParseArchiveTrigger ¶
func ParseArchiveTrigger(s string) (ArchiveTrigger, error)
ParseArchiveTrigger parses a string into an ArchiveTrigger.
func (ArchiveTrigger) String ¶
func (t ArchiveTrigger) String() string
String returns the string representation of an ArchiveTrigger.
type BlockWriter ¶
type BlockWriter struct {
// contains filtered or unexported fields
}
BlockWriter appends pre-encoded journal blocks to .lpj files. Unlike JournalWriter, it receives blocks that are already serialized (with CRC, device table, frame data). Used by the cloud replication server to write backfill blocks byte-for-byte without decompression or re-encoding.
func NewBlockWriter ¶
func NewBlockWriter(cfg BlockWriterConfig) (*BlockWriter, error)
NewBlockWriter creates a new BlockWriter. Call AppendBlock to write blocks. Call Close when done to finalize the current file.
func (*BlockWriter) AppendBlock ¶
func (w *BlockWriter) AppendBlock(baseSeq uint64, baseTimeUs int64, data []byte, compressed bool) error
AppendBlock writes a pre-encoded block to the current journal file. For compressed blocks, the data is the compressed payload (written with a 20-byte v2 header). For uncompressed blocks, data is the full block bytes (must be exactly BlockSize with valid CRC).
func (*BlockWriter) Close ¶
func (w *BlockWriter) Close() error
Close finalizes the current file (writes block index for compressed files, syncs, and closes). Safe to call multiple times or on a writer with no open file.
type BlockWriterConfig ¶
type BlockWriterConfig struct {
Dir string
Prefix string // default: "nmea2k"
BlockSize int // uncompressed block size (from source journal)
Compression journal.CompressionType
RotateDuration time.Duration // 0 = no limit
RotateSize int64 // 0 = no limit
OnRotate func(RotatedFile) // called after a journal file is closed by rotation
Logger *slog.Logger
}
BlockWriterConfig configures a BlockWriter.
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker is the central coordinator. Single goroutine reads from rxFrames, assigns sequence numbers, appends to ring buffer, updates device registry, and fans out to client sessions and ephemeral subscribers.
func NewBroker ¶
func NewBroker(cfg BrokerConfig) *Broker
NewBroker creates a new broker with the given config.
func (*Broker) AckSession ¶
AckSession updates the cursor for a session.
func (*Broker) CloseRx ¶
func (b *Broker) CloseRx()
CloseRx closes the rxFrames channel, signaling the broker to stop processing. Wait on Done() to know when the broker goroutine has actually exited.
func (*Broker) CreateSession ¶
func (b *Broker) CreateSession(id string, bufferTimeout time.Duration, filter *EventFilter) (*ClientSession, uint64)
CreateSession creates or retrieves a client session. Returns the session and the current head sequence number.
When bufferTimeout is 0, the session cursor is reset so no frames are replayed on the next connect (fresh start).
func (*Broker) CurrentSeq ¶
CurrentSeq returns the most recently assigned sequence number.
func (*Broker) Devices ¶
func (b *Broker) Devices() *DeviceRegistry
Devices returns the broker's device registry.
func (*Broker) Done ¶
func (b *Broker) Done() <-chan struct{}
Done returns a channel that is closed when the broker's Run() method returns.
func (*Broker) GetSession ¶
func (b *Broker) GetSession(id string) *ClientSession
GetSession returns a session by ID, or nil if not found.
func (*Broker) LastFrameTime ¶ added in v0.3.1
LastFrameTime returns the timestamp of the most recently received frame, or zero if no frames have been received yet.
func (*Broker) NewConsumer ¶
func (b *Broker) NewConsumer(cfg ConsumerConfig) *Consumer
NewConsumer creates a pull-based consumer starting at the given cursor. The consumer is registered with the broker for live notifications.
func (*Broker) Run ¶
func (b *Broker) Run()
Run is the broker's main loop. Call in its own goroutine. Exits when rxFrames is closed.
func (*Broker) RxFrames ¶
RxFrames returns the channel for submitting received CAN frames to the broker.
func (*Broker) SendISORequest ¶ added in v0.3.1
SendISORequest sends an ISO Request (PGN 59904) to the given destination, asking it to transmit the specified PGN. Returns an error if the tx queue is full.
func (*Broker) SetJournal ¶
SetJournal sets the journal channel. Must be called before Run.
func (*Broker) Stats ¶ added in v0.3.1
func (b *Broker) Stats() BrokerStats
Stats returns a point-in-time snapshot of broker metrics.
func (*Broker) Subscribe ¶
func (b *Broker) Subscribe(filter *EventFilter) (*subscriber, func())
Subscribe creates an ephemeral fan-out channel with the given filter. Returns the subscriber and a cleanup function that must be called on disconnect.
func (*Broker) TouchSession ¶
TouchSession updates the LastActivity timestamp for a session.
func (*Broker) Values ¶
func (b *Broker) Values() *ValueStore
Values returns the broker's last-values store.
type BrokerConfig ¶
type BrokerConfig struct {
RingSize int // must be power of 2
MaxBufferDuration time.Duration // cap on client buffer_timeout
JournalDir string // directory containing .lpj files (for consumer journal fallback)
Logger *slog.Logger
// ReplicaMode makes the broker honor frame.Seq instead of auto-incrementing.
// Used by the cloud replication server where sequence numbers originate
// from the boat's broker.
ReplicaMode bool
// InitialHead sets the starting head value. Use this when resuming a
// replica broker from persisted state so the ring starts at the right
// position. Zero means start at 1 (the default).
InitialHead uint64
}
BrokerConfig holds broker configuration.
type BrokerHealth ¶ added in v0.3.1
type BrokerHealth struct {
Status string `json:"status"` // "ok" or "unhealthy"
FramesTotal uint64 `json:"frames_total"`
HeadSeq uint64 `json:"head_seq"`
LastFrameTime time.Time `json:"last_frame_time,omitempty"`
DeviceCount int `json:"device_count"`
RingEntries uint64 `json:"ring_entries"`
RingCapacity int `json:"ring_capacity"`
}
BrokerHealth reports the broker's health.
type BrokerStats ¶ added in v0.3.1
type BrokerStats struct {
FramesTotal uint64 // total frames processed
LastFrameTime time.Time // timestamp of most recent frame (zero if none)
RingEntries uint64 // current entries in ring buffer
RingCapacity int // ring buffer size
HeadSeq uint64 // next sequence number
ActiveSessions int // buffered client sessions
ActiveSubscribers int // ephemeral SSE subscribers
ActiveConsumers int // pull-based consumers
DeviceCount int // discovered NMEA 2000 devices
}
BrokerStats is a point-in-time snapshot of broker metrics.
type BusSilenceMonitor ¶ added in v0.3.1
type BusSilenceMonitor struct {
// contains filtered or unexported fields
}
BusSilenceMonitor watches for periods of CAN bus inactivity and logs alerts when no frames have been received for a configurable duration. This helps detect CAN bus disconnection or power issues on the boat.
func NewBusSilenceMonitor ¶ added in v0.3.1
func NewBusSilenceMonitor(timeout time.Duration, broker *Broker, logger *slog.Logger) *BusSilenceMonitor
NewBusSilenceMonitor creates a monitor that alerts when no frames have been received for the given timeout duration. The timeout must be positive.
func (*BusSilenceMonitor) IsSilent ¶ added in v0.3.1
func (m *BusSilenceMonitor) IsSilent() bool
IsSilent reports whether the bus is currently in a silence state.
func (*BusSilenceMonitor) Run ¶ added in v0.3.1
func (m *BusSilenceMonitor) Run(ctx context.Context)
Run checks for bus silence periodically and logs warnings. It exits when ctx is cancelled.
type ByteMaskDiff ¶ added in v0.3.1
type ByteMaskDiff struct{}
ByteMaskDiff is the default diff method. Any byte-level change is significant.
Short format (packets <= 8 bytes):
[mask:1] [changed bytes...]
Extended format (packets > 8 bytes):
[maskLen:2 LE] [mask bytes...] [changed bytes...]
func (ByteMaskDiff) Apply ¶ added in v0.3.1
func (ByteMaskDiff) Apply(prev, diff []byte) []byte
type ChangeEvent ¶ added in v0.3.1
type ChangeEvent struct {
Type ChangeEventType
Timestamp time.Time
Source uint8
PGN uint32
SubKey uint64
Seq uint64
Data []byte // Full data for Snapshot, diff for Delta, nil for Idle.
}
ChangeEvent is emitted by the ChangeTracker when a meaningful state change (or silence) is detected for a (source, PGN, subkey) tracking key.
type ChangeEventType ¶ added in v0.3.1
type ChangeEventType uint8
ChangeEventType identifies the kind of change event.
const ( // Snapshot is the first observation for a key. Contains full packet data. Snapshot ChangeEventType = 1 // Delta means a significant change was detected. Contains a compact diff. Delta ChangeEventType = 2 // Idle means a source stopped producing for this key. Idle ChangeEventType = 3 )
func (ChangeEventType) String ¶ added in v0.3.1
func (t ChangeEventType) String() string
type ChangeReplayer ¶ added in v0.3.1
type ChangeReplayer struct {
// contains filtered or unexported fields
}
ChangeReplayer reconstructs full packet data from a stream of ChangeEvents. Maintains per-key state and applies diffs to recover the original packets.
func NewChangeReplayer ¶ added in v0.3.1
func NewChangeReplayer(methods map[uint32]DiffMethod, subKeys map[uint32]SubKeyFunc) *ChangeReplayer
NewChangeReplayer creates a replayer. Pass the same methods and subkeys config used by the tracker that produced the events.
func (*ChangeReplayer) Apply ¶ added in v0.3.1
func (r *ChangeReplayer) Apply(event ChangeEvent) ([]byte, error)
Apply processes a ChangeEvent and returns the reconstructed full packet data. Returns nil for Idle events (state is preserved but no data emitted). Returns an error if a Delta arrives without a prior Snapshot.
type ChangeTracker ¶ added in v0.3.1
type ChangeTracker struct {
// contains filtered or unexported fields
}
ChangeTracker tracks per-(source, PGN, subkey) state and emits compact change events. Not goroutine-safe; designed for single-goroutine callers (like the broker's handleFrame).
func NewChangeTracker ¶ added in v0.3.1
func NewChangeTracker(cfg ChangeTrackerConfig) *ChangeTracker
NewChangeTracker creates a ChangeTracker with the given configuration. Automatically wires FieldToleranceDiff for any PGN in the registry that declares field-level tolerances, unless an explicit method is already set.
func (*ChangeTracker) Process ¶ added in v0.3.1
func (ct *ChangeTracker) Process(ts time.Time, source uint8, pgnID uint32, data []byte, seq uint64) *ChangeEvent
Process handles an incoming frame and returns a ChangeEvent if the frame represents a meaningful state change. Returns nil for no-ops (unchanged data within tolerance).
func (*ChangeTracker) Remove ¶ added in v0.3.1
func (ct *ChangeTracker) Remove(source uint8, pgnID uint32, subKey uint64)
Remove removes tracking state for a specific key.
func (*ChangeTracker) Reset ¶ added in v0.3.1
func (ct *ChangeTracker) Reset()
Reset clears all tracked state.
func (*ChangeTracker) Tick ¶ added in v0.3.1
func (ct *ChangeTracker) Tick(now time.Time) []ChangeEvent
Tick checks all tracked pairs for idle timeouts and returns Idle events for any that have exceeded their timeout. Call this periodically.
func (*ChangeTracker) TrackedPairs ¶ added in v0.3.1
func (ct *ChangeTracker) TrackedPairs() int
TrackedPairs returns the number of actively tracked (source, PGN, subkey) pairs.
type ChangeTrackerConfig ¶ added in v0.3.1
type ChangeTrackerConfig struct {
// DefaultMethod is the diff method used for PGNs without a specific override.
// Nil defaults to ByteMaskDiff.
DefaultMethod DiffMethod
// Methods maps specific PGNs to custom diff methods.
Methods map[uint32]DiffMethod
// SubKeys maps specific PGNs to sub-key extractor functions for
// multiplexed PGNs (e.g., Victron registers on PGN 61184).
SubKeys map[uint32]SubKeyFunc
// DefaultIdleTimeout is the fallback idle timeout when no PGN-specific
// timeout can be resolved. Defaults to 5s.
DefaultIdleTimeout time.Duration
// IdleMultiplier is applied to the PGN registry interval to compute
// the idle timeout. Defaults to 3.
IdleMultiplier int
// IdleTimeouts maps specific PGNs to explicit idle timeout overrides.
IdleTimeouts map[uint32]time.Duration
}
ChangeTrackerConfig configures the ChangeTracker.
type ClientSession ¶
type ClientSession struct {
ID string
Cursor uint64 // last ACK'd sequence number (0 = never ACK'd)
BufferTimeout time.Duration // how long to keep buffering after disconnect
LastActivity time.Time
Filter *EventFilter // nil = receive all frames
}
ClientSession tracks a buffered client's metadata for persistence across HTTP reconnects. The actual frame reading is done by Consumer.
type ComponentHealth ¶ added in v0.3.1
type ComponentHealth struct {
Status string `json:"status"`
Message string `json:"message,omitempty"`
}
ComponentHealth is a generic component status.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a pull-based reader that iterates frames at its own pace. It reads from a tiered log: journal files on disk (oldest), ring buffer in memory (recent), and live notification (current head, blocking wait).
func (*Consumer) Close ¶
Close stops the consumer and removes it from the broker. Safe to call multiple times.
type ConsumerConfig ¶
type ConsumerConfig struct {
Cursor uint64 // starting position (next seq to read)
Filter *EventFilter // nil = all frames
}
ConsumerConfig configures a new Consumer.
type DecodedDeviceValues ¶ added in v0.3.1
type DecodedDeviceValues struct {
Name string `json:"name"`
Source uint8 `json:"src"`
Manufacturer string `json:"manufacturer,omitempty"`
ModelID string `json:"model_id,omitempty"`
Values []DecodedPGNValue `json:"values"`
}
DecodedDeviceValues groups decoded PGN values by device.
type DecodedPGNValue ¶ added in v0.3.1
type DecodedPGNValue struct {
PGN uint32 `json:"pgn"`
Description string `json:"description"`
Ts string `json:"ts"`
Seq uint64 `json:"seq"`
Fields any `json:"fields"`
}
DecodedPGNValue is a single PGN's last-known value decoded into named fields.
type Device ¶
type Device struct {
Source uint8 `json:"src"`
NAME uint64 `json:"-"`
NAMEHex string `json:"name"`
Manufacturer string `json:"manufacturer"`
ManufacturerCode uint16 `json:"manufacturer_code"`
DeviceClass uint8 `json:"device_class"`
DeviceFunction uint8 `json:"device_function"`
DeviceInstance uint8 `json:"device_instance"`
UniqueNumber uint32 `json:"unique_number,omitempty"`
// PGN 126996 Product Information fields.
ModelID string `json:"model_id,omitempty"`
SoftwareVersion string `json:"software_version,omitempty"`
ModelVersion string `json:"model_version,omitempty"`
ModelSerial string `json:"model_serial,omitempty"`
ProductCode uint16 `json:"product_code,omitempty"`
// Per-source packet statistics.
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
PacketCount uint64 `json:"packet_count"`
ByteCount uint64 `json:"byte_count"`
}
Device represents an NMEA 2000 device discovered via ISO Address Claim (PGN 60928) and optionally enriched with Product Information (PGN 126996).
type DeviceRegistry ¶
type DeviceRegistry struct {
// contains filtered or unexported fields
}
DeviceRegistry tracks NMEA 2000 devices discovered via PGN 60928. Thread-safe for concurrent reads (SSE streams) and writes (broker goroutine).
func NewDeviceRegistry ¶
func NewDeviceRegistry() *DeviceRegistry
NewDeviceRegistry creates an empty device registry.
func (*DeviceRegistry) Get ¶
func (r *DeviceRegistry) Get(source uint8) *Device
Get returns a snapshot of the device at the given source address, or nil.
func (*DeviceRegistry) HandleAddressClaim ¶
func (r *DeviceRegistry) HandleAddressClaim(source uint8, data []byte) *Device
HandleAddressClaim processes a PGN 60928 ISO Address Claim. Returns the device if this is a new or changed device, nil otherwise.
func (*DeviceRegistry) HandleProductInfo ¶
func (r *DeviceRegistry) HandleProductInfo(source uint8, data []byte) *Device
HandleProductInfo processes a PGN 126996 Product Information response. Returns the device if fields changed, nil if source is unknown or unchanged.
func (*DeviceRegistry) RecordPacket ¶
RecordPacket updates per-source packet statistics. Returns true if this is a previously unseen source address.
func (*DeviceRegistry) Snapshot ¶
func (r *DeviceRegistry) Snapshot() []Device
Snapshot returns a copy of all known devices.
func (*DeviceRegistry) SnapshotJSON ¶
func (r *DeviceRegistry) SnapshotJSON() json.RawMessage
SnapshotJSON returns the device list as pre-serialized JSON.
func (*DeviceRegistry) SynthesizeFrames ¶
func (r *DeviceRegistry) SynthesizeFrames(ts time.Time) []RxFrame
SynthesizeFrames generates RxFrame slices for PGN 60928 (Address Claim) and PGN 126996 (Product Info) from all known devices. Used to seed a remote broker's device registry on live stream connect.
type DeviceValues ¶
type DeviceValues struct {
Name string `json:"name"`
Source uint8 `json:"src"`
Manufacturer string `json:"manufacturer,omitempty"`
ModelID string `json:"model_id,omitempty"`
Values []PGNValue `json:"values"`
}
DeviceValues groups PGN values by device in the JSON response.
type DiffMethod ¶ added in v0.3.1
type DiffMethod interface {
// Diff compares prev and curr, returning whether the change is significant
// and a compact diff encoding. If not significant, diff is nil.
Diff(prev, curr []byte) (significant bool, diff []byte)
// Apply reconstructs curr from prev and a diff produced by Diff.
Apply(prev, diff []byte) []byte
}
DiffMethod computes and applies compact binary diffs between packet payloads.
type EventFilter ¶
type EventFilter struct {
PGNs []uint32
ExcludePGNs []uint32
Manufacturers []string
Instances []uint8
Names []uint64 // 64-bit CAN NAMEs (include)
ExcludeNames []uint64 // 64-bit CAN NAMEs (exclude)
}
EventFilter specifies which CAN frames a session receives. Categories are AND'd (all set categories must match), values within a category are OR'd (any value in the list matches).
func ParseFilterParams ¶
func ParseFilterParams(r *http.Request) (*EventFilter, error)
ParseFilterParams reads optional filter query params from a request. Supported params: pgn (decimal), manufacturer (name or code), instance (decimal), name (hex CAN NAME). Returns nil filter if no params are set.
func (*EventFilter) IsEmpty ¶
func (f *EventFilter) IsEmpty() bool
IsEmpty returns true if no filter criteria are set.
type EventLog ¶
type EventLog struct {
// contains filtered or unexported fields
}
EventLog is a fixed-size ring buffer of replication events.
func (*EventLog) Recent ¶
func (l *EventLog) Recent(n int) []ReplicationEvent
Recent returns up to n events, newest first.
type FastPacketAssembler ¶
type FastPacketAssembler struct {
// contains filtered or unexported fields
}
FastPacketAssembler reassembles multi-frame fast-packet PGNs.
Fast-packet protocol:
- Frame 0: byte[0] = seq_counter(3 bits) | frame_number(5 bits), byte[1] = total_bytes, bytes[2:8] = first 6 data bytes
- Frame N: byte[0] = seq_counter(3 bits) | frame_number(5 bits), bytes[1:8] = next 7 data bytes
func NewFastPacketAssembler ¶
func NewFastPacketAssembler(timeout time.Duration) *FastPacketAssembler
NewFastPacketAssembler creates a new assembler with the given reassembly timeout.
func (*FastPacketAssembler) Process ¶
Process handles a CAN frame that is part of a fast-packet transfer. Returns the complete reassembled payload when all frames are received, nil otherwise.
func (*FastPacketAssembler) PurgeStale ¶
func (a *FastPacketAssembler) PurgeStale(now time.Time)
PurgeStale removes any in-progress assemblies older than the timeout.
type FieldTolerance ¶ added in v0.3.1
FieldTolerance defines a tolerance threshold for a named field. Changes within the tolerance are suppressed (not emitted as significant).
type FieldToleranceDiff ¶ added in v0.3.1
type FieldToleranceDiff struct {
Inner DiffMethod
PGN uint32
Decode func([]byte) (any, error)
Tolerances []FieldTolerance
}
FieldToleranceDiff wraps an inner DiffMethod and uses PGN decode functions plus reflection to check field-level tolerances. If all changed fields are within their tolerance, the change is suppressed. The encoding is always delegated to the inner method (tolerances only gate significance).
The baseline is NOT updated when a change is suppressed, preventing tolerance drift over time.
func (*FieldToleranceDiff) Apply ¶ added in v0.3.1
func (f *FieldToleranceDiff) Apply(prev, diff []byte) []byte
type Frame ¶
type Frame struct {
Seq uint64
Timestamp time.Time
Header CANHeader
Data []byte // raw CAN payload
// contains filtered or unexported fields
}
Frame is a single CAN frame returned by Consumer.Next.
type HealthConfig ¶ added in v0.3.1
type HealthConfig struct {
Broker *Broker
ReplStatus func() *ReplicationStatus // nil if replication not configured
// BusSilenceThreshold is the duration after which no frames indicates
// a CAN bus problem. Zero disables bus silence detection.
BusSilenceThreshold time.Duration
}
HealthConfig configures the health check endpoint.
type HealthStatus ¶ added in v0.3.1
type HealthStatus struct {
Status string `json:"status"` // "ok", "degraded", or "unhealthy"
Broker BrokerHealth `json:"broker"`
Replication *ReplicationHealth `json:"replication,omitempty"`
Components map[string]ComponentHealth `json:"components,omitempty"`
}
HealthStatus is the structured response from the /healthz endpoint.
type HoleTracker ¶
type HoleTracker struct {
// contains filtered or unexported fields
}
HoleTracker tracks gaps (holes) in a sequence number space. Holes are non-overlapping, sorted by Start, and represent ranges of missing data.
Typical case: 0-3 holes. All operations are linear on the slice, which is perfectly fine for the expected cardinality.
func NewHoleTracker ¶
func NewHoleTracker() *HoleTracker
NewHoleTracker creates an empty hole tracker.
func (*HoleTracker) Add ¶
func (h *HoleTracker) Add(start, end uint64)
Add inserts a new hole [start, end). Merges with any overlapping or adjacent holes. No-op if start >= end.
func (*HoleTracker) ContinuousThrough ¶
func (h *HoleTracker) ContinuousThrough(cursor uint64) uint64
ContinuousThrough returns the highest sequence number with no holes below it. Given a base cursor and the hole set, this is the seq just before the first hole (or the cursor itself if no holes exist before it).
Example: cursor=100, holes=[(200,300)] -> returns 199 (continuous through 199) Example: cursor=100, holes=[(100,200)] -> returns 99 (hole starts at cursor) Example: cursor=100, no holes -> returns max uint64 (no bound from holes)
func (*HoleTracker) Fill ¶
func (h *HoleTracker) Fill(start, end uint64) bool
Fill marks [start, end) as received, removing that range from any holes. Returns true if any holes were actually affected.
func (*HoleTracker) Holes ¶
func (h *HoleTracker) Holes() []SeqRange
Holes returns a copy of current holes, sorted by Start.
func (*HoleTracker) TotalMissing ¶
func (h *HoleTracker) TotalMissing() uint64
TotalMissing returns the total number of missing sequence numbers across all holes.
type InstanceManager ¶
type InstanceManager struct {
// contains filtered or unexported fields
}
InstanceManager manages per-instance state on the cloud side.
func NewInstanceManager ¶
func NewInstanceManager(dataDir string, logger *slog.Logger) (*InstanceManager, error)
NewInstanceManager creates a new instance manager, loading any persisted state.
func (*InstanceManager) Get ¶
func (im *InstanceManager) Get(id string) *InstanceState
Get returns the instance state, or nil if not found.
func (*InstanceManager) GetOrCreate ¶
func (im *InstanceManager) GetOrCreate(id string) *InstanceState
GetOrCreate returns the instance state, creating it if necessary.
func (*InstanceManager) List ¶
func (im *InstanceManager) List() []InstanceSummary
List returns a snapshot of all instance IDs and their basic state.
func (*InstanceManager) SetInstancePaused ¶
func (im *InstanceManager) SetInstancePaused(instanceID string, paused bool)
SetInstancePaused pauses or unpauses journal writing for a specific instance. Used by the JournalKeeper overflow policy to stop/resume writes.
func (*InstanceManager) SetJournalRotation ¶ added in v0.3.1
func (im *InstanceManager) SetJournalRotation(duration time.Duration, size int64)
SetJournalRotation configures rotation for live journal writers. Must be called before any connections are accepted. Retroactively updates all existing instances loaded at startup.
func (*InstanceManager) SetOnRotate ¶
func (im *InstanceManager) SetOnRotate(fn func(instanceID string, rf RotatedFile))
SetOnRotate sets a callback invoked when any instance's journal or backfill file is rotated. Used by the cloud binary to feed the JournalKeeper. Must be called before any connections are accepted. Retroactively updates all existing instances loaded at startup.
func (*InstanceManager) Shutdown ¶
func (im *InstanceManager) Shutdown()
Shutdown stops all instance brokers and persists state.
type InstanceState ¶
type InstanceState struct {
ID string `json:"id"`
Cursor uint64 `json:"cursor"` // continuous data through this seq
BoatHeadSeq uint64 `json:"boat_head_seq"` // last reported by boat
BoatJournalBytes uint64 `json:"boat_journal_bytes"`
LastSeen time.Time `json:"last_seen"`
Connected bool `json:"-"`
HoleTracker *HoleTracker `json:"-"`
// Persisted hole state
PersistedHoles []SeqRange `json:"holes,omitempty"`
// contains filtered or unexported fields
}
InstanceState tracks the replication state for a single boat instance on the cloud side. Thread-safe.
func (*InstanceState) RecentEvents ¶
func (s *InstanceState) RecentEvents(n int) []ReplicationEvent
RecentEvents returns up to n recent replication events, newest first.
func (*InstanceState) RecordEvent ¶
func (s *InstanceState) RecordEvent(typ ReplicationEventType, detail map[string]any)
RecordEvent appends a diagnostic event to this instance's event log.
func (*InstanceState) Status ¶
func (s *InstanceState) Status() InstanceStatus
Status returns a thread-safe snapshot of this instance's replication state.
type InstanceStatus ¶
type InstanceStatus struct {
ID string `json:"id"`
Connected bool `json:"connected"`
Cursor uint64 `json:"cursor"`
BoatHeadSeq uint64 `json:"boat_head_seq"`
BoatJournalBytes uint64 `json:"boat_journal_bytes"`
Holes []SeqRange `json:"holes,omitzero"`
LagSeqs uint64 `json:"lag_seqs"`
LastSeen time.Time `json:"last_seen"`
}
InstanceStatus is a detailed snapshot of an instance for status reporting.
type InstanceSummary ¶
type InstanceSummary struct {
ID string `json:"id"`
Connected bool `json:"connected"`
Cursor uint64 `json:"cursor"`
BoatHeadSeq uint64 `json:"boat_head_seq"`
Holes int `json:"holes"`
LagSeqs uint64 `json:"lag_seqs"`
LastSeen time.Time `json:"last_seen"`
}
InstanceSummary is a snapshot of an instance for listing.
type JournalConfig ¶
type JournalConfig struct {
Dir string
Prefix string // default: "nmea2k"
BlockSize int // default: 262144, power of 2, min 4096
Compression journal.CompressionType // default: CompressionNone
RotateDuration time.Duration // 0 = no limit
RotateSize int64 // 0 = no limit
RotateCount int64 // 0 = no limit
OnRotate func(RotatedFile) // called after a journal file is closed by rotation
Logger *slog.Logger
}
JournalConfig configures the journal writer.
type JournalKeeper ¶
type JournalKeeper struct {
// contains filtered or unexported fields
}
JournalKeeper manages journal file archival and retention. One goroutine per binary, driven by rotation notifications and periodic directory scans.
func NewJournalKeeper ¶
func NewJournalKeeper(cfg KeeperConfig) *JournalKeeper
NewJournalKeeper creates a keeper. Call Run to start.
func (*JournalKeeper) Run ¶
func (k *JournalKeeper) Run(ctx context.Context)
Run is the main loop. Blocks until ctx is cancelled.
func (*JournalKeeper) Send ¶
func (k *JournalKeeper) Send(rf RotatedFile)
Send notifies the keeper that a file was rotated. Non-blocking; drops if the channel is full (the periodic scan will pick it up).
func (*JournalKeeper) SetOnPauseChange ¶
func (k *JournalKeeper) SetOnPauseChange(fn func(dir KeeperDir, paused bool))
SetOnPauseChange sets the callback invoked when a directory's pause state transitions. Must be called before Run.
type JournalWriter ¶
type JournalWriter struct {
// contains filtered or unexported fields
}
JournalWriter writes CAN frames to block-based journal files.
func NewJournalWriter ¶
func NewJournalWriter(cfg JournalConfig, devices *DeviceRegistry, ch <-chan RxFrame) (*JournalWriter, error)
NewJournalWriter creates a writer. Call Run to start.
func (*JournalWriter) Run ¶
func (w *JournalWriter) Run(ctx context.Context) error
Run is the main loop. Blocks until ctx is cancelled or the channel is closed.
func (*JournalWriter) SetPaused ¶
func (w *JournalWriter) SetPaused(p bool)
SetPaused sets whether the writer should discard incoming frames. Used by the overflow policy to stop writes when disk is full.
type KeeperConfig ¶
type KeeperConfig struct {
// Dirs to manage. Ignored if DirFunc is set.
Dirs []KeeperDir
// DirFunc returns current directories on each scan cycle.
// Used by cloud to dynamically discover instance dirs.
DirFunc func() []KeeperDir
// Retention
MaxAge time.Duration // 0 = no age limit
MinKeep time.Duration // 0 = no min-keep floor
MaxSize int64 // 0 = no size limit
// Soft/hard thresholds
SoftPct int // % of MaxSize for proactive archiving (default 80, range 1-99)
OverflowPolicy OverflowPolicy // what to do when hard cap hit with non-archived files
// Archive
ArchiveCommand string // path to script; empty = no archiving
ArchiveTrigger ArchiveTrigger // on-rotate or before-expire
// Pause callback (called when overflow-policy=pause-recording toggles state)
OnPauseChange func(dir KeeperDir, paused bool)
Logger *slog.Logger
}
KeeperConfig configures the JournalKeeper.
type OverflowPolicy ¶
type OverflowPolicy int
OverflowPolicy controls what happens when the hard size cap is hit and files haven't been archived yet.
const ( OverflowDeleteUnarchived OverflowPolicy = iota // delete files even if not archived OverflowPauseRecording // stop journal writes until archives free space )
func ParseOverflowPolicy ¶
func ParseOverflowPolicy(s string) (OverflowPolicy, error)
ParseOverflowPolicy parses a string into an OverflowPolicy.
func (OverflowPolicy) String ¶
func (p OverflowPolicy) String() string
String returns the string representation of an OverflowPolicy.
type PGNMatcher ¶ added in v0.3.1
PGNMatcher matches PGN numbers against a set of individual values and ranges.
func (*PGNMatcher) Contains ¶ added in v0.3.1
func (m *PGNMatcher) Contains(pgn uint32) bool
Contains returns true if pgn is in the matcher's set.
type PGNValue ¶
type PGNValue struct {
PGN uint32 `json:"pgn"`
Ts string `json:"ts"`
Data string `json:"data"`
Seq uint64 `json:"seq"`
}
PGNValue is a single PGN's last-known value in the JSON response.
type ReplicationClient ¶
type ReplicationClient struct {
// contains filtered or unexported fields
}
ReplicationClient streams frames from the local broker to a cloud replication server over gRPC. It runs two independent streams: one for live frames (from the broker's head forward) and one for backfilling historical gaps (from journal files). On disconnect, it reconnects with exponential backoff and resumes both streams.
func NewReplicationClient ¶
func NewReplicationClient(cfg ReplicationClientConfig, broker *Broker) *ReplicationClient
NewReplicationClient creates a new replication client. Call Run to start.
func (*ReplicationClient) Run ¶
func (c *ReplicationClient) Run(ctx context.Context) error
Run is the main loop. Connects to the cloud, performs handshake, and starts live + backfill streams. Reconnects on failure with exponential backoff. Blocks until ctx is cancelled.
func (*ReplicationClient) Status ¶
func (c *ReplicationClient) Status() ReplicationStatus
Status returns the current replication state for status reporting.
type ReplicationClientConfig ¶
type ReplicationClientConfig struct {
Target string // cloud gRPC address (host:port)
InstanceID string
CertFile string // client certificate
KeyFile string // client private key
CAFile string // CA certificate for verifying server
Logger *slog.Logger
// Resource protection tuning. Zero values use defaults from
// replication_limits.go.
MaxLiveLag uint64 // max frames live can lag before reconnect (default: DefaultMaxLiveLag)
LagCheckInterval int // check lag every N frames sent (default: DefaultLagCheckInterval)
MinLagReconnectInterval time.Duration // min wait between lag-triggered reconnects (default: DefaultMinLagReconnectInterval)
}
ReplicationClientConfig configures the boat-side replication client.
type ReplicationEvent ¶
type ReplicationEvent struct {
Time time.Time `json:"time"`
Type ReplicationEventType `json:"type"`
Detail map[string]any `json:"detail,omitempty"`
}
ReplicationEvent is a single diagnostic event from the replication pipeline.
type ReplicationEventType ¶
type ReplicationEventType string
ReplicationEventType identifies the kind of replication event.
const ( EventLiveStart ReplicationEventType = "live_start" EventLiveStop ReplicationEventType = "live_stop" EventBackfillStart ReplicationEventType = "backfill_start" EventBackfillStop ReplicationEventType = "backfill_stop" EventBlockReceived ReplicationEventType = "block_received" EventCheckpoint ReplicationEventType = "checkpoint" )
type ReplicationHealth ¶ added in v0.3.1
type ReplicationHealth struct {
Status string `json:"status"` // "ok", "degraded", or "disconnected"
Connected bool `json:"connected"`
LiveLag uint64 `json:"live_lag"`
BackfillRemaining uint64 `json:"backfill_remaining_seqs"`
LastAck time.Time `json:"last_ack,omitempty"`
}
ReplicationHealth reports the replication client's health.
type ReplicationServer ¶
type ReplicationServer struct {
pb.UnimplementedReplicationServer
// Resource protection tuning. Zero values use defaults from
// replication_limits.go.
MaxFrameRate float64 // max frames/sec per live stream (default: DefaultMaxFrameRate)
RateBurst int // burst allowance for transient spikes (default: DefaultRateBurst)
MaxLiveLag uint64 // max frames live can lag before closing stream (default: DefaultMaxLiveLag)
// contains filtered or unexported fields
}
ReplicationServer implements the gRPC Replication service.
func NewReplicationServer ¶
func NewReplicationServer(im *InstanceManager, logger *slog.Logger) *ReplicationServer
NewReplicationServer creates a new replication gRPC server.
func (*ReplicationServer) Backfill ¶
func (s *ReplicationServer) Backfill(stream pb.Replication_BackfillServer) error
Backfill handles bulk block transfer for filling gaps.
func (*ReplicationServer) GetInstanceBroker ¶
func (s *ReplicationServer) GetInstanceBroker(instanceID string) *Broker
GetInstanceBroker returns the broker for an instance (starting it if needed). Used by HTTP handlers to serve SSE from a cloud instance.
func (*ReplicationServer) GetInstanceState ¶
func (s *ReplicationServer) GetInstanceState(instanceID string) *InstanceState
GetInstanceState returns the instance state for status reporting.
func (*ReplicationServer) Handshake ¶
func (s *ReplicationServer) Handshake(ctx context.Context, req *pb.HandshakeRequest) (*pb.HandshakeResponse, error)
Handshake exchanges sync state between boat and cloud.
func (*ReplicationServer) Live ¶
func (s *ReplicationServer) Live(stream pb.Replication_LiveServer) error
Live handles the realtime frame stream from a boat.
type ReplicationStatus ¶
type ReplicationStatus struct {
Connected bool `json:"connected"`
InstanceID string `json:"instance_id"`
LocalHeadSeq uint64 `json:"local_head_seq"`
CloudCursor uint64 `json:"cloud_cursor"`
Holes []SeqRange `json:"holes,omitzero"`
LiveLag uint64 `json:"live_lag"`
BackfillRemainingSeqs uint64 `json:"backfill_remaining_seqs"`
LastAck time.Time `json:"last_ack,omitempty"`
}
ReplicationStatus is the boat-side view of replication state.
type RotatedFile ¶
RotatedFile describes a completed journal file.
type RxFrame ¶
type RxFrame struct {
Timestamp time.Time
Header CANHeader
Data []byte
Seq uint64 // assigned by broker in handleFrame; zero when fed by external code
}
RxFrame is a reassembled CAN frame ready for the broker.
type SendPolicy ¶ added in v0.3.1
type SendPolicy struct {
Enabled bool // must be true for /send and /query to accept requests
Rules []SendRule // ordered rules; first match wins
}
SendPolicy controls whether the /send and /query endpoints are enabled and which frames are permitted. Rules are evaluated top-to-bottom; first match wins. If no rule matches, the request is denied. An empty Rules list with Enabled=true allows all frames (backwards compatible).
type SendRule ¶ added in v0.3.1
type SendRule struct {
Allow bool // true = allow, false = deny
PGNs *PGNMatcher // nil = match any PGN
Names []uint64 // nil/empty = match any destination NAME
}
SendRule is a single allow or deny rule that matches frames by PGN and/or destination CAN NAME. Either matcher may be nil (wildcard).
func ParseSendRule ¶ added in v0.3.1
ParseSendRule parses a single rule string. Syntax:
[!] [pgn:<spec>] [name:<hex>,...]
where <spec> is comma-separated PGN values or ranges (e.g. "59904", "59904,126208", "129025-129029", "59904,65280-65535").
A '!' prefix makes the rule a deny rule; otherwise it's an allow rule. Omitting pgn: matches all PGNs. Omitting name: matches all destinations.
Examples:
"pgn:59904" — allow PGN 59904 to any device "pgn:59904,126208 name:001c6e4000200000" — allow two PGNs to one device "pgn:129025-129029" — allow a PGN range "!pgn:65280-65535" — deny proprietary PGN range "name:001c6e4000200000" — allow any PGN to one device
func ParseSendRules ¶ added in v0.3.1
ParseSendRules parses multiple rule strings.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server handles HTTP API requests for lplex.
func NewServer ¶
func NewServer(broker *Broker, logger *slog.Logger, policy SendPolicy) *Server
NewServer creates a new HTTP server wired to the given broker.
func (*Server) HandleEphemeralSSE ¶
func (s *Server) HandleEphemeralSSE(w http.ResponseWriter, r *http.Request)
HandleEphemeralSSE handles GET /events. Ephemeral SSE stream, no session, no ACK, no replay. Optional query params for filtering: pgn, manufacturer, instance, name (hex).
func (*Server) HandleFunc ¶
HandleFunc registers an additional HTTP handler on the server's mux.
type SubKeyFunc ¶ added in v0.3.1
SubKeyFunc extracts a sub-discriminator from raw packet data for multiplexed PGNs. Returns 0 for non-multiplexed PGNs (i.e., no sub-key extractor configured).
type SyncState ¶
type SyncState struct {
Cursor uint64 // continuous data through this seq
Holes []SeqRange // sorted gaps
BoatHeadSeq uint64 // last reported by boat
BoatJournalBytes uint64
}
SyncState captures the replication state for an instance, used for persistence and handshake responses.
type ValueStore ¶
type ValueStore struct {
// contains filtered or unexported fields
}
ValueStore tracks the last-seen frame data for each (source, PGN) pair. The broker goroutine writes via Record; HTTP handlers read via Snapshot.
func (*ValueStore) DecodedSnapshot ¶ added in v0.3.1
func (vs *ValueStore) DecodedSnapshot(devices *DeviceRegistry, filter *EventFilter) []DecodedDeviceValues
DecodedSnapshot returns the current values grouped by device with PGN data decoded into named fields using the pgn.Registry. PGNs not in the registry or that fail to decode are omitted.
func (*ValueStore) DecodedSnapshotJSON ¶ added in v0.3.1
func (vs *ValueStore) DecodedSnapshotJSON(devices *DeviceRegistry, filter *EventFilter) json.RawMessage
DecodedSnapshotJSON returns the decoded snapshot as pre-serialized JSON.
func (*ValueStore) Record ¶
Record updates the stored value for the given source and PGN. Called by the broker goroutine on every frame.
func (*ValueStore) Snapshot ¶
func (vs *ValueStore) Snapshot(devices *DeviceRegistry, filter *EventFilter) []DeviceValues
Snapshot returns the current values grouped by device, resolved against the device registry for NAME and manufacturer info. An optional filter restricts results by PGN and/or device criteria (manufacturer, name, instance).
func (*ValueStore) SnapshotJSON ¶
func (vs *ValueStore) SnapshotJSON(devices *DeviceRegistry, filter *EventFilter) json.RawMessage
SnapshotJSON returns the snapshot as pre-serialized JSON.
Source Files
¶
- block_writer.go
- broker.go
- bus_silence.go
- can.go
- canid.go
- change_diff.go
- change_tracker.go
- consumer.go
- devices.go
- doc.go
- fastpacket.go
- health.go
- journal_keeper.go
- journal_writer.go
- metrics.go
- replication.go
- replication_client.go
- replication_events.go
- replication_limits.go
- replication_server.go
- send_policy.go
- server.go
- values.go
Directories
¶
| Path | Synopsis |
|---|---|
|
Package canbus provides shared NMEA 2000 / CAN bus types and functions importable by both the server internals and client tools.
|
Package canbus provides shared NMEA 2000 / CAN bus types and functions importable by both the server internals and client tools. |
|
cmd
|
|
|
journalbench
command
journalbench reads frames from a .lpj journal file and re-encodes them with every combination of block size and compression method to find the optimal parameters for CAN bus journal data.
|
journalbench reads frames from a .lpj journal file and re-encodes them with every combination of block size and compression method to find the optimal parameters for CAN bus journal data. |
|
lplex
command
|
|
|
lplex-cloud
command
|
|
|
lplexdump
command
|
|
|
pgngen
command
pgngen generates Go structs, Protocol Buffer definitions, and JSON Schema from NMEA 2000 PGN definition files written in the lplex PGN DSL.
|
pgngen generates Go structs, Protocol Buffer definitions, and JSON Schema from NMEA 2000 PGN definition files written in the lplex PGN DSL. |
|
Package filter provides a BPF-inspired expression language for filtering decoded NMEA 2000 frames by field values.
|
Package filter provides a BPF-inspired expression language for filtering decoded NMEA 2000 frames by field values. |
|
Package journal provides the block-based binary journal format (.lpj files) for recording and replaying NMEA 2000 CAN frames.
|
Package journal provides the block-based binary journal format (.lpj files) for recording and replaying NMEA 2000 CAN frames. |
|
Package lplexc provides a Go client for lplex, a CAN bus HTTP bridge for NMEA 2000.
|
Package lplexc provides a Go client for lplex, a CAN bus HTTP bridge for NMEA 2000. |
|
Package pgn provides generated Go types for decoding and encoding NMEA 2000 PGN messages.
|
Package pgn provides generated Go types for decoding and encoding NMEA 2000 PGN messages. |
|
Package pgngen provides a DSL parser and code generators for NMEA 2000 PGN definitions.
|
Package pgngen provides a DSL parser and code generators for NMEA 2000 PGN definitions. |
|
proto
|
|