Documentation
¶
Overview ¶
Package lplex is a CAN bus HTTP bridge for NMEA 2000.
It reads raw CAN frames from a SocketCAN interface, reassembles fast-packets, tracks device discovery, and streams frames to clients over SSE with session management, filtering, and replay.
The package can be embedded into other Go services. Create a Broker to manage frame routing, a Server to expose the HTTP API, and optionally a JournalWriter to record frames to disk.
broker := lplex.NewBroker(lplex.BrokerConfig{
RingSize: 65536,
MaxBufferDuration: 5 * time.Minute,
Logger: logger,
})
go broker.Run()
srv := lplex.NewServer(broker, logger)
mux.Handle("/nmea/", http.StripPrefix("/nmea", srv))
Feed frames into the broker via Broker.RxFrames:
broker.RxFrames() <- lplex.RxFrame{
Timestamp: time.Now(),
Header: lplex.CANHeader{Priority: 2, PGN: 129025, Source: 10, Destination: 0xFF},
Data: payload,
}
When done, close the rx channel and the broker goroutine exits:
broker.CloseRx()
Index ¶
- Variables
- func CANReader(ctx context.Context, iface string, rxFrames chan<- RxFrame, ...) error
- func CANWriter(ctx context.Context, iface string, txFrames <-chan TxRequest, ...) error
- func FragmentFastPacket(data []byte, seqCounter uint8) [][]byte
- func IsFastPacket(pgn uint32) bool
- func ParseISO8601Duration(s string) (time.Duration, error)
- type ArchiveTrigger
- type BlockWriter
- type BlockWriterConfig
- type Broker
- func (b *Broker) AckSession(id string, seq uint64) error
- func (b *Broker) CloseRx()
- func (b *Broker) CreateSession(id string, bufferTimeout time.Duration, filter *EventFilter) (*ClientSession, uint64)
- func (b *Broker) CurrentSeq() uint64
- func (b *Broker) Devices() *DeviceRegistry
- func (b *Broker) Done() <-chan struct{}
- func (b *Broker) GetSession(id string) *ClientSession
- func (b *Broker) NewConsumer(cfg ConsumerConfig) *Consumer
- func (b *Broker) Run()
- func (b *Broker) RxFrames() chan<- RxFrame
- func (b *Broker) SetJournal(ch chan<- RxFrame)
- func (b *Broker) Subscribe(filter *EventFilter) (*subscriber, func())
- func (b *Broker) TouchSession(id string)
- func (b *Broker) TxFrames() <-chan TxRequest
- func (b *Broker) Values() *ValueStore
- type BrokerConfig
- type CANHeader
- type ClientSession
- type Consumer
- type ConsumerConfig
- type Device
- type DeviceRegistry
- func (r *DeviceRegistry) Get(source uint8) *Device
- func (r *DeviceRegistry) HandleAddressClaim(source uint8, data []byte) *Device
- func (r *DeviceRegistry) HandleProductInfo(source uint8, data []byte) *Device
- func (r *DeviceRegistry) RecordPacket(source uint8, ts time.Time, dataLen int) bool
- func (r *DeviceRegistry) Snapshot() []Device
- func (r *DeviceRegistry) SnapshotJSON() json.RawMessage
- func (r *DeviceRegistry) SynthesizeFrames(ts time.Time) []RxFrame
- type DeviceValues
- type EventFilter
- type EventLog
- type FastPacketAssembler
- type Frame
- type HoleTracker
- type InstanceManager
- func (im *InstanceManager) Get(id string) *InstanceState
- func (im *InstanceManager) GetOrCreate(id string) *InstanceState
- func (im *InstanceManager) List() []InstanceSummary
- func (im *InstanceManager) SetInstancePaused(instanceID string, paused bool)
- func (im *InstanceManager) SetOnRotate(fn func(instanceID string, rf RotatedFile))
- func (im *InstanceManager) Shutdown()
- type InstanceState
- type InstanceStatus
- type InstanceSummary
- type JournalConfig
- type JournalKeeper
- type JournalWriter
- type KeeperConfig
- type KeeperDir
- type OverflowPolicy
- type PGNValue
- type ReplicationClient
- type ReplicationClientConfig
- type ReplicationEvent
- type ReplicationEventType
- type ReplicationServer
- func (s *ReplicationServer) Backfill(stream pb.Replication_BackfillServer) error
- func (s *ReplicationServer) GetInstanceBroker(instanceID string) *Broker
- func (s *ReplicationServer) GetInstanceState(instanceID string) *InstanceState
- func (s *ReplicationServer) Handshake(ctx context.Context, req *pb.HandshakeRequest) (*pb.HandshakeResponse, error)
- func (s *ReplicationServer) Live(stream pb.Replication_LiveServer) error
- type ReplicationStatus
- type RotatedFile
- type RxFrame
- type SeqRange
- type Server
- type SyncState
- type TxRequest
- type ValueStore
Constants ¶
This section is empty.
Variables ¶
var ( ParseCANID = canbus.ParseCANID BuildCANID = canbus.BuildCANID )
var ErrFallenBehind = errors.New("consumer fallen behind: data no longer available")
ErrFallenBehind is returned by Consumer.Next when the consumer's cursor has fallen behind both the ring buffer and available journal files.
Functions ¶
func CANReader ¶
func CANReader(ctx context.Context, iface string, rxFrames chan<- RxFrame, logger *slog.Logger) error
CANReader reads frames from SocketCAN, reassembles fast-packets, and sends completed frames to the broker's rxFrames channel.
func CANWriter ¶
func CANWriter(ctx context.Context, iface string, txFrames <-chan TxRequest, logger *slog.Logger) error
CANWriter reads from the broker's txFrames channel and writes to SocketCAN. Handles fast-packet fragmentation for payloads > 8 bytes.
func FragmentFastPacket ¶
FragmentFastPacket splits a payload into CAN frames for fast-packet TX. seqCounter should be incremented per transfer (wraps at 7, 3-bit field). Returns a slice of 8-byte CAN frame payloads.
func IsFastPacket ¶
IsFastPacket returns true if the PGN uses fast-packet transfer.
Types ¶
type ArchiveTrigger ¶
type ArchiveTrigger int
ArchiveTrigger controls when journal files are archived.
const ( ArchiveDisabled ArchiveTrigger = iota ArchiveOnRotate // archive immediately after rotation ArchiveBeforeExpire // archive only when about to be deleted by retention )
func ParseArchiveTrigger ¶
func ParseArchiveTrigger(s string) (ArchiveTrigger, error)
ParseArchiveTrigger parses a string into an ArchiveTrigger.
func (ArchiveTrigger) String ¶
func (t ArchiveTrigger) String() string
String returns the string representation of an ArchiveTrigger.
type BlockWriter ¶
type BlockWriter struct {
// contains filtered or unexported fields
}
BlockWriter appends pre-encoded journal blocks to .lpj files. Unlike JournalWriter, it receives blocks that are already serialized (with CRC, device table, frame data). Used by the cloud replication server to write backfill blocks byte-for-byte without decompression or re-encoding.
func NewBlockWriter ¶
func NewBlockWriter(cfg BlockWriterConfig) (*BlockWriter, error)
NewBlockWriter creates a new BlockWriter. Call AppendBlock to write blocks. Call Close when done to finalize the current file.
func (*BlockWriter) AppendBlock ¶
func (w *BlockWriter) AppendBlock(baseSeq uint64, baseTimeUs int64, data []byte, compressed bool) error
AppendBlock writes a pre-encoded block to the current journal file. For compressed blocks, the data is the compressed payload (written with a 20-byte v2 header). For uncompressed blocks, data is the full block bytes (must be exactly BlockSize with valid CRC).
func (*BlockWriter) Close ¶
func (w *BlockWriter) Close() error
Close finalizes the current file (writes block index for compressed files, syncs, and closes). Safe to call multiple times or on a writer with no open file.
type BlockWriterConfig ¶
type BlockWriterConfig struct {
Dir string
Prefix string // default: "nmea2k"
BlockSize int // uncompressed block size (from source journal)
Compression journal.CompressionType
RotateDuration time.Duration // 0 = no limit
RotateSize int64 // 0 = no limit
OnRotate func(RotatedFile) // called after a journal file is closed by rotation
Logger *slog.Logger
}
BlockWriterConfig configures a BlockWriter.
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker is the central coordinator. Single goroutine reads from rxFrames, assigns sequence numbers, appends to ring buffer, updates device registry, and fans out to client sessions and ephemeral subscribers.
func NewBroker ¶
func NewBroker(cfg BrokerConfig) *Broker
NewBroker creates a new broker with the given config.
func (*Broker) AckSession ¶
AckSession updates the cursor for a session.
func (*Broker) CloseRx ¶
func (b *Broker) CloseRx()
CloseRx closes the rxFrames channel, signaling the broker to stop processing. Wait on Done() to know when the broker goroutine has actually exited.
func (*Broker) CreateSession ¶
func (b *Broker) CreateSession(id string, bufferTimeout time.Duration, filter *EventFilter) (*ClientSession, uint64)
CreateSession creates or retrieves a client session. Returns the session and the current head sequence number.
When bufferTimeout is 0, the session cursor is reset so no frames are replayed on the next connect (fresh start).
func (*Broker) CurrentSeq ¶
CurrentSeq returns the most recently assigned sequence number.
func (*Broker) Devices ¶
func (b *Broker) Devices() *DeviceRegistry
Devices returns the broker's device registry.
func (*Broker) Done ¶
func (b *Broker) Done() <-chan struct{}
Done returns a channel that is closed when the broker's Run() method returns.
func (*Broker) GetSession ¶
func (b *Broker) GetSession(id string) *ClientSession
GetSession returns a session by ID, or nil if not found.
func (*Broker) NewConsumer ¶
func (b *Broker) NewConsumer(cfg ConsumerConfig) *Consumer
NewConsumer creates a pull-based consumer starting at the given cursor. The consumer is registered with the broker for live notifications.
func (*Broker) Run ¶
func (b *Broker) Run()
Run is the broker's main loop. Call in its own goroutine. Exits when rxFrames is closed.
func (*Broker) RxFrames ¶
RxFrames returns the channel for submitting received CAN frames to the broker.
func (*Broker) SetJournal ¶
SetJournal sets the journal channel. Must be called before Run.
func (*Broker) Subscribe ¶
func (b *Broker) Subscribe(filter *EventFilter) (*subscriber, func())
Subscribe creates an ephemeral fan-out channel with the given filter. Returns the subscriber and a cleanup function that must be called on disconnect.
func (*Broker) TouchSession ¶
TouchSession updates the LastActivity timestamp for a session.
func (*Broker) Values ¶
func (b *Broker) Values() *ValueStore
Values returns the broker's last-values store.
type BrokerConfig ¶
type BrokerConfig struct {
RingSize int // must be power of 2
MaxBufferDuration time.Duration // cap on client buffer_timeout
JournalDir string // directory containing .lpj files (for consumer journal fallback)
Logger *slog.Logger
// ReplicaMode makes the broker honor frame.Seq instead of auto-incrementing.
// Used by the cloud replication server where sequence numbers originate
// from the boat's broker.
ReplicaMode bool
// InitialHead sets the starting head value. Use this when resuming a
// replica broker from persisted state so the ring starts at the right
// position. Zero means start at 1 (the default).
InitialHead uint64
}
BrokerConfig holds broker configuration.
type ClientSession ¶
type ClientSession struct {
ID string
Cursor uint64 // last ACK'd sequence number (0 = never ACK'd)
BufferTimeout time.Duration // how long to keep buffering after disconnect
LastActivity time.Time
Filter *EventFilter // nil = receive all frames
}
ClientSession tracks a buffered client's metadata for persistence across HTTP reconnects. The actual frame reading is done by Consumer.
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer is a pull-based reader that iterates frames at its own pace. It reads from a tiered log: journal files on disk (oldest), ring buffer in memory (recent), and live notification (current head, blocking wait).
func (*Consumer) Close ¶
Close stops the consumer and removes it from the broker. Safe to call multiple times.
type ConsumerConfig ¶
type ConsumerConfig struct {
Cursor uint64 // starting position (next seq to read)
Filter *EventFilter // nil = all frames
}
ConsumerConfig configures a new Consumer.
type Device ¶
type Device struct {
Source uint8 `json:"src"`
NAME uint64 `json:"-"`
NAMEHex string `json:"name"`
Manufacturer string `json:"manufacturer"`
ManufacturerCode uint16 `json:"manufacturer_code"`
DeviceClass uint8 `json:"device_class"`
DeviceFunction uint8 `json:"device_function"`
DeviceInstance uint8 `json:"device_instance"`
UniqueNumber uint32 `json:"unique_number,omitempty"`
// PGN 126996 Product Information fields.
ModelID string `json:"model_id,omitempty"`
SoftwareVersion string `json:"software_version,omitempty"`
ModelVersion string `json:"model_version,omitempty"`
ModelSerial string `json:"model_serial,omitempty"`
ProductCode uint16 `json:"product_code,omitempty"`
// Per-source packet statistics.
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
PacketCount uint64 `json:"packet_count"`
ByteCount uint64 `json:"byte_count"`
}
Device represents an NMEA 2000 device discovered via ISO Address Claim (PGN 60928) and optionally enriched with Product Information (PGN 126996).
type DeviceRegistry ¶
type DeviceRegistry struct {
// contains filtered or unexported fields
}
DeviceRegistry tracks NMEA 2000 devices discovered via PGN 60928. Thread-safe for concurrent reads (SSE streams) and writes (broker goroutine).
func NewDeviceRegistry ¶
func NewDeviceRegistry() *DeviceRegistry
NewDeviceRegistry creates an empty device registry.
func (*DeviceRegistry) Get ¶
func (r *DeviceRegistry) Get(source uint8) *Device
Get returns a snapshot of the device at the given source address, or nil.
func (*DeviceRegistry) HandleAddressClaim ¶
func (r *DeviceRegistry) HandleAddressClaim(source uint8, data []byte) *Device
HandleAddressClaim processes a PGN 60928 ISO Address Claim. Returns the device if this is a new or changed device, nil otherwise.
func (*DeviceRegistry) HandleProductInfo ¶
func (r *DeviceRegistry) HandleProductInfo(source uint8, data []byte) *Device
HandleProductInfo processes a PGN 126996 Product Information response. Returns the device if fields changed, nil if source is unknown or unchanged.
func (*DeviceRegistry) RecordPacket ¶
RecordPacket updates per-source packet statistics. Returns true if this is a previously unseen source address.
func (*DeviceRegistry) Snapshot ¶
func (r *DeviceRegistry) Snapshot() []Device
Snapshot returns a copy of all known devices.
func (*DeviceRegistry) SnapshotJSON ¶
func (r *DeviceRegistry) SnapshotJSON() json.RawMessage
SnapshotJSON returns the device list as pre-serialized JSON.
func (*DeviceRegistry) SynthesizeFrames ¶
func (r *DeviceRegistry) SynthesizeFrames(ts time.Time) []RxFrame
SynthesizeFrames generates RxFrame slices for PGN 60928 (Address Claim) and PGN 126996 (Product Info) from all known devices. Used to seed a remote broker's device registry on live stream connect.
type DeviceValues ¶
type DeviceValues struct {
Name string `json:"name"`
Source uint8 `json:"src"`
Manufacturer string `json:"manufacturer,omitempty"`
ModelID string `json:"model_id,omitempty"`
Values []PGNValue `json:"values"`
}
DeviceValues groups PGN values by device in the JSON response.
type EventFilter ¶
type EventFilter struct {
PGNs []uint32
Manufacturers []string
Instances []uint8
Names []uint64 // 64-bit CAN NAMEs
}
EventFilter specifies which CAN frames a session receives. Categories are AND'd (all set categories must match), values within a category are OR'd (any value in the list matches).
func ParseFilterParams ¶
func ParseFilterParams(r *http.Request) (*EventFilter, error)
ParseFilterParams reads optional filter query params from a request. Supported params: pgn (decimal), manufacturer (name or code), instance (decimal), name (hex CAN NAME). Returns nil filter if no params are set.
func (*EventFilter) IsEmpty ¶
func (f *EventFilter) IsEmpty() bool
IsEmpty returns true if no filter criteria are set.
type EventLog ¶
type EventLog struct {
// contains filtered or unexported fields
}
EventLog is a fixed-size ring buffer of replication events.
func (*EventLog) Recent ¶
func (l *EventLog) Recent(n int) []ReplicationEvent
Recent returns up to n events, newest first.
type FastPacketAssembler ¶
type FastPacketAssembler struct {
// contains filtered or unexported fields
}
FastPacketAssembler reassembles multi-frame fast-packet PGNs.
Fast-packet protocol:
- Frame 0: byte[0] = seq_counter(3 bits) | frame_number(5 bits), byte[1] = total_bytes, bytes[2:8] = first 6 data bytes
- Frame N: byte[0] = seq_counter(3 bits) | frame_number(5 bits), bytes[1:8] = next 7 data bytes
func NewFastPacketAssembler ¶
func NewFastPacketAssembler(timeout time.Duration) *FastPacketAssembler
NewFastPacketAssembler creates a new assembler with the given reassembly timeout.
func (*FastPacketAssembler) Process ¶
Process handles a CAN frame that is part of a fast-packet transfer. Returns the complete reassembled payload when all frames are received, nil otherwise.
func (*FastPacketAssembler) PurgeStale ¶
func (a *FastPacketAssembler) PurgeStale(now time.Time)
PurgeStale removes any in-progress assemblies older than the timeout.
type Frame ¶
type Frame struct {
Seq uint64
Timestamp time.Time
Header CANHeader
Data []byte // raw CAN payload
// contains filtered or unexported fields
}
Frame is a single CAN frame returned by Consumer.Next.
type HoleTracker ¶
type HoleTracker struct {
// contains filtered or unexported fields
}
HoleTracker tracks gaps (holes) in a sequence number space. Holes are non-overlapping, sorted by Start, and represent ranges of missing data.
Typical case: 0-3 holes. All operations are linear on the slice, which is perfectly fine for the expected cardinality.
func NewHoleTracker ¶
func NewHoleTracker() *HoleTracker
NewHoleTracker creates an empty hole tracker.
func (*HoleTracker) Add ¶
func (h *HoleTracker) Add(start, end uint64)
Add inserts a new hole [start, end). Merges with any overlapping or adjacent holes. No-op if start >= end.
func (*HoleTracker) ContinuousThrough ¶
func (h *HoleTracker) ContinuousThrough(cursor uint64) uint64
ContinuousThrough returns the highest sequence number with no holes below it. Given a base cursor and the hole set, this is the seq just before the first hole (or the cursor itself if no holes exist before it).
Example: cursor=100, holes=[(200,300)] -> returns 199 (continuous through 199) Example: cursor=100, holes=[(100,200)] -> returns 99 (hole starts at cursor) Example: cursor=100, no holes -> returns max uint64 (no bound from holes)
func (*HoleTracker) Fill ¶
func (h *HoleTracker) Fill(start, end uint64) bool
Fill marks [start, end) as received, removing that range from any holes. Returns true if any holes were actually affected.
func (*HoleTracker) Holes ¶
func (h *HoleTracker) Holes() []SeqRange
Holes returns a copy of current holes, sorted by Start.
func (*HoleTracker) TotalMissing ¶
func (h *HoleTracker) TotalMissing() uint64
TotalMissing returns the total number of missing sequence numbers across all holes.
type InstanceManager ¶
type InstanceManager struct {
// contains filtered or unexported fields
}
InstanceManager manages per-instance state on the cloud side.
func NewInstanceManager ¶
func NewInstanceManager(dataDir string, logger *slog.Logger) (*InstanceManager, error)
NewInstanceManager creates a new instance manager, loading any persisted state.
func (*InstanceManager) Get ¶
func (im *InstanceManager) Get(id string) *InstanceState
Get returns the instance state, or nil if not found.
func (*InstanceManager) GetOrCreate ¶
func (im *InstanceManager) GetOrCreate(id string) *InstanceState
GetOrCreate returns the instance state, creating it if necessary.
func (*InstanceManager) List ¶
func (im *InstanceManager) List() []InstanceSummary
List returns a snapshot of all instance IDs and their basic state.
func (*InstanceManager) SetInstancePaused ¶
func (im *InstanceManager) SetInstancePaused(instanceID string, paused bool)
SetInstancePaused pauses or unpauses journal writing for a specific instance. Used by the JournalKeeper overflow policy to stop/resume writes.
func (*InstanceManager) SetOnRotate ¶
func (im *InstanceManager) SetOnRotate(fn func(instanceID string, rf RotatedFile))
SetOnRotate sets a callback invoked when any instance's journal or backfill file is rotated. Used by the cloud binary to feed the JournalKeeper. Must be called before any connections are accepted. Retroactively updates all existing instances loaded at startup.
func (*InstanceManager) Shutdown ¶
func (im *InstanceManager) Shutdown()
Shutdown stops all instance brokers and persists state.
type InstanceState ¶
type InstanceState struct {
ID string `json:"id"`
Cursor uint64 `json:"cursor"` // continuous data through this seq
BoatHeadSeq uint64 `json:"boat_head_seq"` // last reported by boat
BoatJournalBytes uint64 `json:"boat_journal_bytes"`
LastSeen time.Time `json:"last_seen"`
Connected bool `json:"-"`
HoleTracker *HoleTracker `json:"-"`
// Persisted hole state
PersistedHoles []SeqRange `json:"holes,omitempty"`
// contains filtered or unexported fields
}
InstanceState tracks the replication state for a single boat instance on the cloud side. Thread-safe.
func (*InstanceState) RecentEvents ¶
func (s *InstanceState) RecentEvents(n int) []ReplicationEvent
RecentEvents returns up to n recent replication events, newest first.
func (*InstanceState) RecordEvent ¶
func (s *InstanceState) RecordEvent(typ ReplicationEventType, detail map[string]any)
RecordEvent appends a diagnostic event to this instance's event log.
func (*InstanceState) Status ¶
func (s *InstanceState) Status() InstanceStatus
Status returns a thread-safe snapshot of this instance's replication state.
type InstanceStatus ¶
type InstanceStatus struct {
ID string `json:"id"`
Connected bool `json:"connected"`
Cursor uint64 `json:"cursor"`
BoatHeadSeq uint64 `json:"boat_head_seq"`
BoatJournalBytes uint64 `json:"boat_journal_bytes"`
Holes []SeqRange `json:"holes,omitzero"`
LagSeqs uint64 `json:"lag_seqs"`
LastSeen time.Time `json:"last_seen"`
}
InstanceStatus is a detailed snapshot of an instance for status reporting.
type InstanceSummary ¶
type InstanceSummary struct {
ID string `json:"id"`
Connected bool `json:"connected"`
Cursor uint64 `json:"cursor"`
BoatHeadSeq uint64 `json:"boat_head_seq"`
Holes int `json:"holes"`
LagSeqs uint64 `json:"lag_seqs"`
LastSeen time.Time `json:"last_seen"`
}
InstanceSummary is a snapshot of an instance for listing.
type JournalConfig ¶
type JournalConfig struct {
Dir string
Prefix string // default: "nmea2k"
BlockSize int // default: 262144, power of 2, min 4096
Compression journal.CompressionType // default: CompressionNone
RotateDuration time.Duration // 0 = no limit
RotateSize int64 // 0 = no limit
RotateCount int64 // 0 = no limit
OnRotate func(RotatedFile) // called after a journal file is closed by rotation
Logger *slog.Logger
}
JournalConfig configures the journal writer.
type JournalKeeper ¶
type JournalKeeper struct {
// contains filtered or unexported fields
}
JournalKeeper manages journal file archival and retention. One goroutine per binary, driven by rotation notifications and periodic directory scans.
func NewJournalKeeper ¶
func NewJournalKeeper(cfg KeeperConfig) *JournalKeeper
NewJournalKeeper creates a keeper. Call Run to start.
func (*JournalKeeper) Run ¶
func (k *JournalKeeper) Run(ctx context.Context)
Run is the main loop. Blocks until ctx is cancelled.
func (*JournalKeeper) Send ¶
func (k *JournalKeeper) Send(rf RotatedFile)
Send notifies the keeper that a file was rotated. Non-blocking; drops if the channel is full (the periodic scan will pick it up).
func (*JournalKeeper) SetOnPauseChange ¶
func (k *JournalKeeper) SetOnPauseChange(fn func(dir KeeperDir, paused bool))
SetOnPauseChange sets the callback invoked when a directory's pause state transitions. Must be called before Run.
type JournalWriter ¶
type JournalWriter struct {
// contains filtered or unexported fields
}
JournalWriter writes CAN frames to block-based journal files.
func NewJournalWriter ¶
func NewJournalWriter(cfg JournalConfig, devices *DeviceRegistry, ch <-chan RxFrame) (*JournalWriter, error)
NewJournalWriter creates a writer. Call Run to start.
func (*JournalWriter) Run ¶
func (w *JournalWriter) Run(ctx context.Context) error
Run is the main loop. Blocks until ctx is cancelled or the channel is closed.
func (*JournalWriter) SetPaused ¶
func (w *JournalWriter) SetPaused(p bool)
SetPaused sets whether the writer should discard incoming frames. Used by the overflow policy to stop writes when disk is full.
type KeeperConfig ¶
type KeeperConfig struct {
// Dirs to manage. Ignored if DirFunc is set.
Dirs []KeeperDir
// DirFunc returns current directories on each scan cycle.
// Used by cloud to dynamically discover instance dirs.
DirFunc func() []KeeperDir
// Retention
MaxAge time.Duration // 0 = no age limit
MinKeep time.Duration // 0 = no min-keep floor
MaxSize int64 // 0 = no size limit
// Soft/hard thresholds
SoftPct int // % of MaxSize for proactive archiving (default 80, range 1-99)
OverflowPolicy OverflowPolicy // what to do when hard cap hit with non-archived files
// Archive
ArchiveCommand string // path to script; empty = no archiving
ArchiveTrigger ArchiveTrigger // on-rotate or before-expire
// Pause callback (called when overflow-policy=pause-recording toggles state)
OnPauseChange func(dir KeeperDir, paused bool)
Logger *slog.Logger
}
KeeperConfig configures the JournalKeeper.
type OverflowPolicy ¶
type OverflowPolicy int
OverflowPolicy controls what happens when the hard size cap is hit and files haven't been archived yet.
const ( OverflowDeleteUnarchived OverflowPolicy = iota // delete files even if not archived OverflowPauseRecording // stop journal writes until archives free space )
func ParseOverflowPolicy ¶
func ParseOverflowPolicy(s string) (OverflowPolicy, error)
ParseOverflowPolicy parses a string into an OverflowPolicy.
func (OverflowPolicy) String ¶
func (p OverflowPolicy) String() string
String returns the string representation of an OverflowPolicy.
type PGNValue ¶
type PGNValue struct {
PGN uint32 `json:"pgn"`
Ts string `json:"ts"`
Data string `json:"data"`
Seq uint64 `json:"seq"`
}
PGNValue is a single PGN's last-known value in the JSON response.
type ReplicationClient ¶
type ReplicationClient struct {
// contains filtered or unexported fields
}
ReplicationClient streams frames from the local broker to a cloud replication server over gRPC. It runs two independent streams: one for live frames (from the broker's head forward) and one for backfilling historical gaps (from journal files). On disconnect, it reconnects with exponential backoff and resumes both streams.
func NewReplicationClient ¶
func NewReplicationClient(cfg ReplicationClientConfig, broker *Broker) *ReplicationClient
NewReplicationClient creates a new replication client. Call Run to start.
func (*ReplicationClient) Run ¶
func (c *ReplicationClient) Run(ctx context.Context) error
Run is the main loop. Connects to the cloud, performs handshake, and starts live + backfill streams. Reconnects on failure with exponential backoff. Blocks until ctx is cancelled.
func (*ReplicationClient) Status ¶
func (c *ReplicationClient) Status() ReplicationStatus
Status returns the current replication state for status reporting.
type ReplicationClientConfig ¶
type ReplicationClientConfig struct {
Target string // cloud gRPC address (host:port)
InstanceID string
CertFile string // client certificate
KeyFile string // client private key
CAFile string // CA certificate for verifying server
Logger *slog.Logger
}
ReplicationClientConfig configures the boat-side replication client.
type ReplicationEvent ¶
type ReplicationEvent struct {
Time time.Time `json:"time"`
Type ReplicationEventType `json:"type"`
Detail map[string]any `json:"detail,omitempty"`
}
ReplicationEvent is a single diagnostic event from the replication pipeline.
type ReplicationEventType ¶
type ReplicationEventType string
ReplicationEventType identifies the kind of replication event.
const ( EventLiveStart ReplicationEventType = "live_start" EventLiveStop ReplicationEventType = "live_stop" EventBackfillStart ReplicationEventType = "backfill_start" EventBackfillStop ReplicationEventType = "backfill_stop" EventBlockReceived ReplicationEventType = "block_received" EventCheckpoint ReplicationEventType = "checkpoint" )
type ReplicationServer ¶
type ReplicationServer struct {
pb.UnimplementedReplicationServer
// contains filtered or unexported fields
}
ReplicationServer implements the gRPC Replication service.
func NewReplicationServer ¶
func NewReplicationServer(im *InstanceManager, logger *slog.Logger) *ReplicationServer
NewReplicationServer creates a new replication gRPC server.
func (*ReplicationServer) Backfill ¶
func (s *ReplicationServer) Backfill(stream pb.Replication_BackfillServer) error
Backfill handles bulk block transfer for filling gaps.
func (*ReplicationServer) GetInstanceBroker ¶
func (s *ReplicationServer) GetInstanceBroker(instanceID string) *Broker
GetInstanceBroker returns the broker for an instance (starting it if needed). Used by HTTP handlers to serve SSE from a cloud instance.
func (*ReplicationServer) GetInstanceState ¶
func (s *ReplicationServer) GetInstanceState(instanceID string) *InstanceState
GetInstanceState returns the instance state for status reporting.
func (*ReplicationServer) Handshake ¶
func (s *ReplicationServer) Handshake(ctx context.Context, req *pb.HandshakeRequest) (*pb.HandshakeResponse, error)
Handshake exchanges sync state between boat and cloud.
func (*ReplicationServer) Live ¶
func (s *ReplicationServer) Live(stream pb.Replication_LiveServer) error
Live handles the realtime frame stream from a boat.
type ReplicationStatus ¶
type ReplicationStatus struct {
Connected bool `json:"connected"`
InstanceID string `json:"instance_id"`
LocalHeadSeq uint64 `json:"local_head_seq"`
CloudCursor uint64 `json:"cloud_cursor"`
Holes []SeqRange `json:"holes,omitzero"`
LiveLag uint64 `json:"live_lag"`
BackfillRemainingSeqs uint64 `json:"backfill_remaining_seqs"`
LastAck time.Time `json:"last_ack,omitempty"`
}
ReplicationStatus is the boat-side view of replication state.
type RotatedFile ¶
RotatedFile describes a completed journal file.
type RxFrame ¶
type RxFrame struct {
Timestamp time.Time
Header CANHeader
Data []byte
Seq uint64 // assigned by broker in handleFrame; zero when fed by external code
}
RxFrame is a reassembled CAN frame ready for the broker.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server handles HTTP API requests for lplex.
func (*Server) HandleEphemeralSSE ¶
func (s *Server) HandleEphemeralSSE(w http.ResponseWriter, r *http.Request)
HandleEphemeralSSE handles GET /events. Ephemeral SSE stream, no session, no ACK, no replay. Optional query params for filtering: pgn, manufacturer, instance, name (hex).
func (*Server) HandleFunc ¶
HandleFunc registers an additional HTTP handler on the server's mux.
type SyncState ¶
type SyncState struct {
Cursor uint64 // continuous data through this seq
Holes []SeqRange // sorted gaps
BoatHeadSeq uint64 // last reported by boat
BoatJournalBytes uint64
}
SyncState captures the replication state for an instance, used for persistence and handshake responses.
type ValueStore ¶
type ValueStore struct {
// contains filtered or unexported fields
}
ValueStore tracks the last-seen frame data for each (source, PGN) pair. The broker goroutine writes via Record; HTTP handlers read via Snapshot.
func (*ValueStore) Record ¶
Record updates the stored value for the given source and PGN. Called by the broker goroutine on every frame.
func (*ValueStore) Snapshot ¶
func (vs *ValueStore) Snapshot(devices *DeviceRegistry, filter *EventFilter) []DeviceValues
Snapshot returns the current values grouped by device, resolved against the device registry for NAME and manufacturer info. An optional filter restricts results by PGN and/or device criteria (manufacturer, name, instance).
func (*ValueStore) SnapshotJSON ¶
func (vs *ValueStore) SnapshotJSON(devices *DeviceRegistry, filter *EventFilter) json.RawMessage
SnapshotJSON returns the snapshot as pre-serialized JSON.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package canbus provides shared NMEA 2000 / CAN bus types and functions importable by both the server internals and client tools.
|
Package canbus provides shared NMEA 2000 / CAN bus types and functions importable by both the server internals and client tools. |
|
cmd
|
|
|
journalbench
command
journalbench reads frames from a .lpj journal file and re-encodes them with every combination of block size and compression method to find the optimal parameters for CAN bus journal data.
|
journalbench reads frames from a .lpj journal file and re-encodes them with every combination of block size and compression method to find the optimal parameters for CAN bus journal data. |
|
lplex
command
|
|
|
lplex-cloud
command
|
|
|
lplexdump
command
|
|
|
Package journal provides the block-based binary journal format (.lpj files) for recording and replaying NMEA 2000 CAN frames.
|
Package journal provides the block-based binary journal format (.lpj files) for recording and replaying NMEA 2000 CAN frames. |
|
Package lplexc provides a Go client for lplex, a CAN bus HTTP bridge for NMEA 2000.
|
Package lplexc provides a Go client for lplex, a CAN bus HTTP bridge for NMEA 2000. |
|
proto
|
|