lplex

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 5, 2026 License: MIT Imports: 37 Imported by: 0

README

lplex

CAN bus HTTP bridge for NMEA 2000. Reads raw CAN frames from a SocketCAN interface, reassembles fast-packets, tracks device discovery, and streams frames to clients over SSE with session management, filtering, and replay. Supports cloud replication for remote access to boat data over intermittent connections.

  • Real-time SSE streaming with ephemeral and buffered session modes, per-client filtering by PGN, manufacturer, instance, or device name
  • Fast-packet reassembly for multi-frame NMEA 2000 PGNs, with automatic device discovery via ISO requests
  • Journal recording to block-based .lpj files with zstd compression, CRC32C checksums, and O(log N) time seeking
  • Retention and archival with max-age/min-keep/max-size knobs, soft/hard thresholds, configurable overflow policy, and pluggable archive scripts
  • Cloud replication over gRPC with mTLS, live + backfill streams, hole tracking, and lazy per-instance Broker on the cloud side
  • Pull-based Consumer with tiered replay (journal files → ring buffer → live), so clients can catch up from any point in history
  • Embeddable core as a Go package, mount the HTTP handler on any ServeMux
  • Go client library (lplexc) with mDNS discovery, subscriptions, device queries, and transmit
  • TypeScript client library (@sixfathoms/lplex) for browsers and Node.js, with CloudClient for lplex-cloud
  • CAN transmit via POST /send with automatic fast-packet fragmentation

Installation

Client (lplexdump)
# Homebrew (macOS / Linux)
brew install sixfathoms/tap/lplexdump

# From source
go install github.com/sixfathoms/lplex/cmd/lplexdump@latest
Server (Linux only, requires SocketCAN)
# Debian/Ubuntu (.deb includes both lplex server and lplexdump)
sudo dpkg -i lplex_*.deb
sudo systemctl start lplex

# Docker
docker run --network host --device /dev/can0 ghcr.io/sixfathoms/lplex:latest

# From source
go install github.com/sixfathoms/lplex/cmd/lplex@latest
Cloud Server
# From source
go install github.com/sixfathoms/lplex/cmd/lplex-cloud@latest

Download .deb packages from GitHub Releases.

Go Client Library
go get github.com/sixfathoms/lplex/lplexc@latest
TypeScript Client Library
npm install @sixfathoms/lplex

Zero runtime dependencies. Works in browsers and Node 18+. Ships ESM, CJS, and TypeScript declarations. See @sixfathoms/lplex on npm.

Embedding lplex

The core package is importable, so you can embed lplex into your own service:

go get github.com/sixfathoms/lplex@latest
import (
    "log/slog"
    "net/http"
    "time"

    "github.com/sixfathoms/lplex"
)

func main() {
    logger := slog.Default()

    // Create the broker (owns ring buffer, device registry, fan-out).
    broker := lplex.NewBroker(lplex.BrokerConfig{
        RingSize:          65536,
        MaxBufferDuration: 5 * time.Minute,
        Logger:            logger,
    })
    go broker.Run()

    // Mount the HTTP handler on a sub-path.
    srv := lplex.NewServer(broker, logger)
    mux := http.NewServeMux()
    mux.Handle("/nmea/", http.StripPrefix("/nmea", srv))

    // Feed frames from your own CAN source.
    go func() {
        for frame := range myFrameSource() {
            broker.RxFrames() <- lplex.RxFrame{
                Timestamp: frame.Time,
                Header:    lplex.CANHeader{Priority: 2, PGN: frame.PGN, Source: frame.Src, Destination: 0xFF},
                Data:      frame.Data,
            }
        }
    }()

    // Optional: enable journal recording.
    journalCh := make(chan lplex.RxFrame, 16384)
    broker.SetJournal(journalCh)
    // ... create JournalWriter and call Run in a goroutine.

    http.ListenAndServe(":8080", mux)
}

Lifecycle: the broker goroutine exits when you call broker.CloseRx(). Close the journal channel after that, then wait for the journal writer to finish.

Quick Start

Server
# Start the server (requires SocketCAN interface)
lplex -interface can0 -port 8089

# With a config file
lplex -config /etc/lplex/lplex.conf

# With journal recording enabled
lplex -interface can0 -port 8089 -journal-dir /var/log/lplex

# With cloud replication
lplex -interface can0 -replication-target cloud.example.com:9443 \
  -replication-instance-id boat-001 \
  -replication-tls-cert /etc/lplex/boat.crt \
  -replication-tls-key /etc/lplex/boat.key \
  -replication-tls-ca /etc/lplex/ca.crt

# Or with systemd
sudo systemctl enable --now lplex
Cloud Server
# Start the cloud server with mTLS
lplex-cloud -data-dir /data/lplex \
  -tls-cert /etc/lplex-cloud/server.crt \
  -tls-key /etc/lplex-cloud/server.key \
  -tls-client-ca /etc/lplex-cloud/ca.crt

# With a config file
lplex-cloud -config /etc/lplex-cloud/lplex-cloud.conf
Client (lplexdump)
# Auto-discover via mDNS and stream all frames
lplexdump

# Connect to a specific server with filtering
lplexdump -server http://inuc1.local:8089 -pgn 129025 -manufacturer Garmin

# Buffered mode with automatic reconnect replay
lplexdump -server http://inuc1.local:8089 -buffer-timeout PT5M
Go Client Library (lplexc)
import "github.com/sixfathoms/lplex/lplexc"

// Auto-discover the server
addr, _ := lplexc.Discover(ctx)
client := lplexc.NewClient(addr)

// Get devices on the bus
devices, _ := client.Devices(ctx)

// Subscribe to position updates from Garmin devices
sub, _ := client.Subscribe(ctx, &lplexc.Filter{
    PGNs:          []uint32{129025},
    Manufacturers: []string{"Garmin"},
})
defer sub.Close()

for {
    ev, err := sub.Next()
    if err != nil {
        break
    }
    fmt.Printf("Position: src=%d data=%s\n", ev.Frame.Src, ev.Frame.Data)
}
TypeScript Client Library (@sixfathoms/lplex)
import { Client } from "@sixfathoms/lplex";

const client = new Client("http://inuc1.local:8089");

// Get devices on the bus
const devices = await client.devices();

// Get current bus state snapshot
const snapshot = await client.values();

// Subscribe to position updates from Garmin devices
const stream = await client.subscribe({
  pgn: [129025],
  manufacturer: ["Garmin"],
});

for await (const event of stream) {
  if (event.type === "frame") {
    console.log(`Position: src=${event.frame.src} data=${event.frame.data}`);
  }
}

A CloudClient is also available for the lplex-cloud management API:

import { CloudClient } from "@sixfathoms/lplex";

const cloud = new CloudClient("https://cloud.example.com");
const instances = await cloud.instances();

// Get a regular Client scoped to a specific instance
const client = cloud.client("boat-001");
const devices = await client.devices();

Configuration

lplex can be configured with CLI flags, a HOCON config file, or both. CLI flags always take precedence over config file values.

Config file discovery

Use -config path/to/lplex.conf to specify a config file explicitly. If -config is not set, lplex searches for:

  1. ./lplex.conf
  2. /etc/lplex/lplex.conf

If no config file is found, lplex continues with defaults (fully backward compatible).

Example config (boat)
interface = can0
port = 8089
max-buffer-duration = PT5M

journal {
  dir = /var/log/lplex
  prefix = nmea2k
  block-size = 262144
  compression = zstd

  rotate {
    duration = PT1H
    size = 0
  }

  retention {
    max-age = P30D
    min-keep = PT24H
  }

  archive {
    command = "/usr/local/bin/archive-to-s3"
    trigger = "on-rotate"
  }
}

replication {
  target = "cloud.example.com:9443"
  instance-id = "boat-001"
  tls {
    cert = "/etc/lplex/boat.crt"
    key = "/etc/lplex/boat.key"
    ca = "/etc/lplex/ca.crt"
  }
}
Example config (cloud)
grpc {
  listen = ":9443"
  tls {
    cert = "/etc/lplex-cloud/server.crt"
    key = "/etc/lplex-cloud/server.key"
    client-ca = "/etc/lplex-cloud/ca.crt"
  }
}
http {
  listen = ":8080"
}
data-dir = "/data/lplex"

journal {
  retention {
    max-age = P90D
    max-size = 53687091200
  }
  archive {
    command = "/usr/local/bin/archive-to-gcs"
    trigger = "before-expire"
  }
}

See lplex.conf.example and lplex-cloud.conf.example for the full annotated versions.

Architecture

SocketCAN (can0)
    |
CANReader goroutine
    |  reads extended CAN frames
    |  reassembles fast-packets (multi-frame PGNs)
    |
    v
rxFrames chan
    |
Broker goroutine (single writer, owns all state)
    |  assigns monotonic sequence numbers
    |  appends pre-serialized JSON to ring buffer (64k entries)
    |  updates device registry (PGN 60928, PGN 126996)
    |  fans out to sessions and ephemeral subscribers
    |  sends ISO requests to discover new devices
    |  feeds journal writer (if enabled)
    |
    +---> ring buffer (pre-serialized JSON, power-of-2)
    +---> DeviceRegistry (keyed by source address)
    +---> ValueStore (last frame per source+PGN)
    +---> sessions map (buffered clients with cursors)
    +---> subscribers map (ephemeral clients, no state)
    +---> journal chan (optional, 16k buffer)
    |
    v
HTTP Server (:8089)                JournalWriter goroutine
    |                                   |  block-based .lpj files
    +-- GET  /events                    |  zstd block compression
    +-- PUT  /clients/{id}              |  CRC32C checksums
    +-- GET  /clients/{id}/events       |  device table per block
    +-- PUT  /clients/{id}/ack          |  O(log N) time seeking
    +-- POST /send                      |  ~2-3 MB/hour at 200 fps
    +-- GET  /devices                   v
    +-- GET  /values                .lpj journal files
    +-- GET  /replication/status

CANWriter goroutine            ReplicationClient (optional)
    |  fragments for TX            |  gRPC to cloud server
    |  writes to SocketCAN         +-- Live: Consumer -> LiveFrame stream
                                   +-- Backfill: raw blocks -> Block stream
                                   +-- Reconnect: exponential backoff

API

Ephemeral streaming

GET /events with optional query params: pgn, manufacturer, instance, name (hex).

No session, no replay, no ACK. Zero server-side state after disconnect.

Buffered sessions
  1. PUT /clients/{id} with {"buffer_timeout": "PT5M"} to create/reconnect
  2. GET /clients/{id}/events for SSE (replays from cursor, then live)
  3. PUT /clients/{id}/ack with {"seq": N} to advance cursor

Disconnected sessions keep their cursor for the buffer duration.

Transmit

POST /send with {"pgn": 59904, "src": 254, "dst": 255, "prio": 6, "data": "00ee00"}

Devices

GET /devices returns JSON array of all discovered NMEA 2000 devices.

Last values

GET /values returns the most recently received frame for each (device, PGN) pair. Grouped by device, sorted by source address. Useful for getting a snapshot of bus state without subscribing to SSE.

Supports the same filter query params as /events: pgn, manufacturer, instance, name (hex). Example: GET /values?pgn=129025&manufacturer=Garmin.

Replication status (boat)

GET /replication/status returns current replication state (available when replication is configured).

Cloud Replication

lplex can replicate CAN bus data from a boat to a cloud instance over gRPC with mTLS. The boat initiates all connections (no public IP required). Data flows over two independent gRPC streams:

  • Live stream: realtime frames from the broker's head, delivered to the cloud within seconds
  • Backfill stream: raw journal blocks for filling historical gaps, newest-first

On reconnect after a connectivity gap, live data resumes immediately while backfill works through the gap in the background. The cloud runs a replica Broker per instance, so web clients connect to the cloud and get the same SSE API as if they were on the boat.

See docs/cloud-replication.md for the full protocol specification.

Cloud HTTP API
Endpoint Description
GET /instances List all instances
GET /instances/{id}/status Instance status (cursor, holes, lag)
GET /instances/{id}/events SSE stream from instance's broker
GET /instances/{id}/devices Device table
GET /instances/{id}/values Last-seen values per (device, PGN). Query params: pgn, manufacturer, instance, name.
GET /instances/{id}/replication/events?limit=N Replication event log (newest first, default 100, max 1024)

Journal Recording

lplex can record all CAN frames to disk as block-based binary journal files (.lpj) for future replay and analysis.

# Enable recording (zstd compression by default)
lplex -interface can0 -journal-dir /var/log/lplex

# With rotation (new file every hour)
lplex -interface can0 -journal-dir /var/log/lplex -journal-rotate-duration PT1H

# Disable compression
lplex -interface can0 -journal-dir /var/log/lplex -journal-compression none

Flags:

Flag Default Description
-journal-dir (disabled) Directory for journal files
-journal-prefix nmea2k Journal file name prefix
-journal-block-size 262144 Block size (power of 2, min 4096)
-journal-compression zstd Block compression: none, zstd, zstd-dict
-journal-rotate-duration PT1H Rotate after duration (ISO 8601)
-journal-rotate-size 0 Rotate after bytes (0 = disabled)
-journal-retention-max-age (disabled) Delete files older than this (ISO 8601, e.g. P30D)
-journal-retention-min-keep (disabled) Never delete files younger than this, unless max-size exceeded
-journal-retention-max-size 0 Hard size cap in bytes; delete oldest files when exceeded
-journal-retention-soft-pct 80 Proactive archive threshold as % of max-size (1-99)
-journal-retention-overflow-policy delete-unarchived What to do when hard cap hit with failed archives
-journal-archive-command (disabled) Path to archive script
-journal-archive-trigger (disabled) When to archive: on-rotate or before-expire

Blocks are compressed individually with zstd (~4x ratio at 256KB blocks on typical CAN data, ~158 MB/day at 200 fps). Each block carries a device table so consumers can resolve source addresses without external state. A block index at end-of-file enables fast seeking; crash-truncated files are recovered via forward-scan. See docs/format.md for the binary format specification.

Retention and Archival

Journal files accumulate indefinitely unless you configure a retention policy. Retention and archival are available on both boat and cloud binaries.

# Keep at most 30 days of journals, but never delete files less than 24 hours old
lplex -interface can0 -journal-dir /var/log/lplex \
  -journal-retention-max-age P30D -journal-retention-min-keep PT24H

# Hard size cap: keep at most 10 GB, oldest files deleted first
lplex -interface can0 -journal-dir /var/log/lplex \
  -journal-retention-max-size 10737418240

# Archive to S3 on rotation, then delete after 30 days
lplex -interface can0 -journal-dir /var/log/lplex \
  -journal-retention-max-age P30D \
  -journal-archive-command /usr/local/bin/archive-to-s3 \
  -journal-archive-trigger on-rotate

Retention algorithm: files are sorted oldest-first. Three zones govern behavior when max-size is set with archival:

  1. Normal (total <= soft threshold): standard age-based expiration, archive-then-delete
  2. Soft zone (soft < total <= hard): proactively queue oldest non-archived files for archive
  3. Hard zone (total > hard): expire files; if archives have failed, apply the overflow policy

max-size overrides min-keep overrides max-age. The soft threshold defaults to 80% of max-size and only applies when both max-size and an archive command are configured.

Overflow policies (when hard cap is hit and archives have failed):

  • delete-unarchived (default): delete files even if not archived, prioritizing continued recording
  • pause-recording: stop journal writes until archives free space, prioritizing archive completeness

Archive script protocol: the script receives file paths as arguments and JSONL metadata on stdin (one line per file with path, instance_id, size, created). It must write JSONL to stdout with per-file status ("ok" or "error"). Failed files are retried with exponential backoff.

Archive triggers:

  • on-rotate: archive immediately after a journal file is closed (eager, minimizes data loss window)
  • before-expire: archive only when a file is about to be deleted by retention (lazy, minimizes archive traffic)

Deployment

The .deb package installs a systemd service that binds to can0. Configure with a config file or environment variable:

# Option 1: config file (recommended)
sudo cp lplex.conf.example /etc/lplex/lplex.conf
sudo vi /etc/lplex/lplex.conf

# Option 2: environment variable
# Edit /etc/default/lplex:
LPLEX_ARGS="-interface can0 -port 8089 -journal-dir /var/log/lplex -journal-compression zstd"

License

MIT

Documentation

Overview

Package lplex is a CAN bus HTTP bridge for NMEA 2000.

It reads raw CAN frames from a SocketCAN interface, reassembles fast-packets, tracks device discovery, and streams frames to clients over SSE with session management, filtering, and replay.

The package can be embedded into other Go services. Create a Broker to manage frame routing, a Server to expose the HTTP API, and optionally a JournalWriter to record frames to disk.

broker := lplex.NewBroker(lplex.BrokerConfig{
    RingSize:          65536,
    MaxBufferDuration: 5 * time.Minute,
    Logger:            logger,
})
go broker.Run()

srv := lplex.NewServer(broker, logger)
mux.Handle("/nmea/", http.StripPrefix("/nmea", srv))

Feed frames into the broker via Broker.RxFrames:

broker.RxFrames() <- lplex.RxFrame{
    Timestamp: time.Now(),
    Header:    lplex.CANHeader{Priority: 2, PGN: 129025, Source: 10, Destination: 0xFF},
    Data:      payload,
}

When done, close the rx channel and the broker goroutine exits:

broker.CloseRx()

Index

Constants

This section is empty.

Variables

View Source
var (
	ParseCANID = canbus.ParseCANID
	BuildCANID = canbus.BuildCANID
)
View Source
var ErrFallenBehind = errors.New("consumer fallen behind: data no longer available")

ErrFallenBehind is returned by Consumer.Next when the consumer's cursor has fallen behind both the ring buffer and available journal files.

Functions

func CANReader

func CANReader(ctx context.Context, iface string, rxFrames chan<- RxFrame, logger *slog.Logger) error

CANReader reads frames from SocketCAN, reassembles fast-packets, and sends completed frames to the broker's rxFrames channel.

func CANWriter

func CANWriter(ctx context.Context, iface string, txFrames <-chan TxRequest, logger *slog.Logger) error

CANWriter reads from the broker's txFrames channel and writes to SocketCAN. Handles fast-packet fragmentation for payloads > 8 bytes.

func FragmentFastPacket

func FragmentFastPacket(data []byte, seqCounter uint8) [][]byte

FragmentFastPacket splits a payload into CAN frames for fast-packet TX. seqCounter should be incremented per transfer (wraps at 7, 3-bit field). Returns a slice of 8-byte CAN frame payloads.

func IsFastPacket

func IsFastPacket(pgn uint32) bool

IsFastPacket returns true if the PGN uses fast-packet transfer.

func ParseISO8601Duration

func ParseISO8601Duration(s string) (time.Duration, error)

ParseISO8601Duration parses a subset of ISO 8601 durations (PT format). Supports hours (H), minutes (M), and seconds (S). Examples: "PT5M", "PT1H30M", "PT30S", "PT1H"

Types

type ArchiveTrigger

type ArchiveTrigger int

ArchiveTrigger controls when journal files are archived.

const (
	ArchiveDisabled     ArchiveTrigger = iota
	ArchiveOnRotate                    // archive immediately after rotation
	ArchiveBeforeExpire                // archive only when about to be deleted by retention
)

func ParseArchiveTrigger

func ParseArchiveTrigger(s string) (ArchiveTrigger, error)

ParseArchiveTrigger parses a string into an ArchiveTrigger.

func (ArchiveTrigger) String

func (t ArchiveTrigger) String() string

String returns the string representation of an ArchiveTrigger.

type BlockWriter

type BlockWriter struct {
	// contains filtered or unexported fields
}

BlockWriter appends pre-encoded journal blocks to .lpj files. Unlike JournalWriter, it receives blocks that are already serialized (with CRC, device table, frame data). Used by the cloud replication server to write backfill blocks byte-for-byte without decompression or re-encoding.

func NewBlockWriter

func NewBlockWriter(cfg BlockWriterConfig) (*BlockWriter, error)

NewBlockWriter creates a new BlockWriter. Call AppendBlock to write blocks. Call Close when done to finalize the current file.

func (*BlockWriter) AppendBlock

func (w *BlockWriter) AppendBlock(baseSeq uint64, baseTimeUs int64, data []byte, compressed bool) error

AppendBlock writes a pre-encoded block to the current journal file. For compressed blocks, the data is the compressed payload (written with a 20-byte v2 header). For uncompressed blocks, data is the full block bytes (must be exactly BlockSize with valid CRC).

func (*BlockWriter) Close

func (w *BlockWriter) Close() error

Close finalizes the current file (writes block index for compressed files, syncs, and closes). Safe to call multiple times or on a writer with no open file.

type BlockWriterConfig

type BlockWriterConfig struct {
	Dir            string
	Prefix         string // default: "nmea2k"
	BlockSize      int    // uncompressed block size (from source journal)
	Compression    journal.CompressionType
	RotateDuration time.Duration     // 0 = no limit
	RotateSize     int64             // 0 = no limit
	OnRotate       func(RotatedFile) // called after a journal file is closed by rotation
	Logger         *slog.Logger
}

BlockWriterConfig configures a BlockWriter.

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker is the central coordinator. Single goroutine reads from rxFrames, assigns sequence numbers, appends to ring buffer, updates device registry, and fans out to client sessions and ephemeral subscribers.

func NewBroker

func NewBroker(cfg BrokerConfig) *Broker

NewBroker creates a new broker with the given config.

func (*Broker) AckSession

func (b *Broker) AckSession(id string, seq uint64) error

AckSession updates the cursor for a session.

func (*Broker) CloseRx

func (b *Broker) CloseRx()

CloseRx closes the rxFrames channel, signaling the broker to stop processing. Wait on Done() to know when the broker goroutine has actually exited.

func (*Broker) CreateSession

func (b *Broker) CreateSession(id string, bufferTimeout time.Duration, filter *EventFilter) (*ClientSession, uint64)

CreateSession creates or retrieves a client session. Returns the session and the current head sequence number.

When bufferTimeout is 0, the session cursor is reset so no frames are replayed on the next connect (fresh start).

func (*Broker) CurrentSeq

func (b *Broker) CurrentSeq() uint64

CurrentSeq returns the most recently assigned sequence number.

func (*Broker) Devices

func (b *Broker) Devices() *DeviceRegistry

Devices returns the broker's device registry.

func (*Broker) Done

func (b *Broker) Done() <-chan struct{}

Done returns a channel that is closed when the broker's Run() method returns.

func (*Broker) GetSession

func (b *Broker) GetSession(id string) *ClientSession

GetSession returns a session by ID, or nil if not found.

func (*Broker) NewConsumer

func (b *Broker) NewConsumer(cfg ConsumerConfig) *Consumer

NewConsumer creates a pull-based consumer starting at the given cursor. The consumer is registered with the broker for live notifications.

func (*Broker) Run

func (b *Broker) Run()

Run is the broker's main loop. Call in its own goroutine. Exits when rxFrames is closed.

func (*Broker) RxFrames

func (b *Broker) RxFrames() chan<- RxFrame

RxFrames returns the channel for submitting received CAN frames to the broker.

func (*Broker) SetJournal

func (b *Broker) SetJournal(ch chan<- RxFrame)

SetJournal sets the journal channel. Must be called before Run.

func (*Broker) Subscribe

func (b *Broker) Subscribe(filter *EventFilter) (*subscriber, func())

Subscribe creates an ephemeral fan-out channel with the given filter. Returns the subscriber and a cleanup function that must be called on disconnect.

func (*Broker) TouchSession

func (b *Broker) TouchSession(id string)

TouchSession updates the LastActivity timestamp for a session.

func (*Broker) TxFrames

func (b *Broker) TxFrames() <-chan TxRequest

TxFrames returns the channel for reading CAN frames to transmit.

func (*Broker) Values

func (b *Broker) Values() *ValueStore

Values returns the broker's last-values store.

type BrokerConfig

type BrokerConfig struct {
	RingSize          int           // must be power of 2
	MaxBufferDuration time.Duration // cap on client buffer_timeout
	JournalDir        string        // directory containing .lpj files (for consumer journal fallback)
	Logger            *slog.Logger

	// ReplicaMode makes the broker honor frame.Seq instead of auto-incrementing.
	// Used by the cloud replication server where sequence numbers originate
	// from the boat's broker.
	ReplicaMode bool

	// InitialHead sets the starting head value. Use this when resuming a
	// replica broker from persisted state so the ring starts at the right
	// position. Zero means start at 1 (the default).
	InitialHead uint64
}

BrokerConfig holds broker configuration.

type CANHeader

type CANHeader = canbus.CANHeader

Type aliases so existing server code compiles unchanged.

type ClientSession

type ClientSession struct {
	ID            string
	Cursor        uint64        // last ACK'd sequence number (0 = never ACK'd)
	BufferTimeout time.Duration // how long to keep buffering after disconnect
	LastActivity  time.Time
	Filter        *EventFilter // nil = receive all frames
}

ClientSession tracks a buffered client's metadata for persistence across HTTP reconnects. The actual frame reading is done by Consumer.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a pull-based reader that iterates frames at its own pace. It reads from a tiered log: journal files on disk (oldest), ring buffer in memory (recent), and live notification (current head, blocking wait).

func (*Consumer) Close

func (c *Consumer) Close() error

Close stops the consumer and removes it from the broker. Safe to call multiple times.

func (*Consumer) Cursor

func (c *Consumer) Cursor() uint64

Cursor returns the consumer's current position (next seq to read).

func (*Consumer) Next

func (c *Consumer) Next(ctx context.Context) (*Frame, error)

Next returns the next matching frame, blocking until one is available. Returns ErrFallenBehind if the consumer's cursor has fallen behind all available data. Returns ctx.Err() if the context is cancelled.

type ConsumerConfig

type ConsumerConfig struct {
	Cursor uint64       // starting position (next seq to read)
	Filter *EventFilter // nil = all frames
}

ConsumerConfig configures a new Consumer.

type Device

type Device struct {
	Source           uint8  `json:"src"`
	NAME             uint64 `json:"-"`
	NAMEHex          string `json:"name"`
	Manufacturer     string `json:"manufacturer"`
	ManufacturerCode uint16 `json:"manufacturer_code"`
	DeviceClass      uint8  `json:"device_class"`
	DeviceFunction   uint8  `json:"device_function"`
	DeviceInstance   uint8  `json:"device_instance"`
	UniqueNumber     uint32 `json:"unique_number,omitempty"`

	// PGN 126996 Product Information fields.
	ModelID         string `json:"model_id,omitempty"`
	SoftwareVersion string `json:"software_version,omitempty"`
	ModelVersion    string `json:"model_version,omitempty"`
	ModelSerial     string `json:"model_serial,omitempty"`
	ProductCode     uint16 `json:"product_code,omitempty"`

	// Per-source packet statistics.
	FirstSeen   time.Time `json:"first_seen"`
	LastSeen    time.Time `json:"last_seen"`
	PacketCount uint64    `json:"packet_count"`
	ByteCount   uint64    `json:"byte_count"`
}

Device represents an NMEA 2000 device discovered via ISO Address Claim (PGN 60928) and optionally enriched with Product Information (PGN 126996).

type DeviceRegistry

type DeviceRegistry struct {
	// contains filtered or unexported fields
}

DeviceRegistry tracks NMEA 2000 devices discovered via PGN 60928. Thread-safe for concurrent reads (SSE streams) and writes (broker goroutine).

func NewDeviceRegistry

func NewDeviceRegistry() *DeviceRegistry

NewDeviceRegistry creates an empty device registry.

func (*DeviceRegistry) Get

func (r *DeviceRegistry) Get(source uint8) *Device

Get returns a snapshot of the device at the given source address, or nil.

func (*DeviceRegistry) HandleAddressClaim

func (r *DeviceRegistry) HandleAddressClaim(source uint8, data []byte) *Device

HandleAddressClaim processes a PGN 60928 ISO Address Claim. Returns the device if this is a new or changed device, nil otherwise.

func (*DeviceRegistry) HandleProductInfo

func (r *DeviceRegistry) HandleProductInfo(source uint8, data []byte) *Device

HandleProductInfo processes a PGN 126996 Product Information response. Returns the device if fields changed, nil if source is unknown or unchanged.

func (*DeviceRegistry) RecordPacket

func (r *DeviceRegistry) RecordPacket(source uint8, ts time.Time, dataLen int) bool

RecordPacket updates per-source packet statistics. Returns true if this is a previously unseen source address.

func (*DeviceRegistry) Snapshot

func (r *DeviceRegistry) Snapshot() []Device

Snapshot returns a copy of all known devices.

func (*DeviceRegistry) SnapshotJSON

func (r *DeviceRegistry) SnapshotJSON() json.RawMessage

SnapshotJSON returns the device list as pre-serialized JSON.

func (*DeviceRegistry) SynthesizeFrames

func (r *DeviceRegistry) SynthesizeFrames(ts time.Time) []RxFrame

SynthesizeFrames generates RxFrame slices for PGN 60928 (Address Claim) and PGN 126996 (Product Info) from all known devices. Used to seed a remote broker's device registry on live stream connect.

type DeviceValues

type DeviceValues struct {
	Name         string     `json:"name"`
	Source       uint8      `json:"src"`
	Manufacturer string     `json:"manufacturer,omitempty"`
	ModelID      string     `json:"model_id,omitempty"`
	Values       []PGNValue `json:"values"`
}

DeviceValues groups PGN values by device in the JSON response.

type EventFilter

type EventFilter struct {
	PGNs          []uint32
	Manufacturers []string
	Instances     []uint8
	Names         []uint64 // 64-bit CAN NAMEs
}

EventFilter specifies which CAN frames a session receives. Categories are AND'd (all set categories must match), values within a category are OR'd (any value in the list matches).

func ParseFilterParams

func ParseFilterParams(r *http.Request) (*EventFilter, error)

ParseFilterParams reads optional filter query params from a request. Supported params: pgn (decimal), manufacturer (name or code), instance (decimal), name (hex CAN NAME). Returns nil filter if no params are set.

func (*EventFilter) IsEmpty

func (f *EventFilter) IsEmpty() bool

IsEmpty returns true if no filter criteria are set.

type EventLog

type EventLog struct {
	// contains filtered or unexported fields
}

EventLog is a fixed-size ring buffer of replication events.

func NewEventLog

func NewEventLog() *EventLog

NewEventLog creates an empty event log.

func (*EventLog) Recent

func (l *EventLog) Recent(n int) []ReplicationEvent

Recent returns up to n events, newest first.

func (*EventLog) Record

func (l *EventLog) Record(typ ReplicationEventType, detail map[string]any)

Record appends a new event to the log.

type FastPacketAssembler

type FastPacketAssembler struct {
	// contains filtered or unexported fields
}

FastPacketAssembler reassembles multi-frame fast-packet PGNs.

Fast-packet protocol:

  • Frame 0: byte[0] = seq_counter(3 bits) | frame_number(5 bits), byte[1] = total_bytes, bytes[2:8] = first 6 data bytes
  • Frame N: byte[0] = seq_counter(3 bits) | frame_number(5 bits), bytes[1:8] = next 7 data bytes

func NewFastPacketAssembler

func NewFastPacketAssembler(timeout time.Duration) *FastPacketAssembler

NewFastPacketAssembler creates a new assembler with the given reassembly timeout.

func (*FastPacketAssembler) Process

func (a *FastPacketAssembler) Process(pgn uint32, source uint8, data []byte, now time.Time) []byte

Process handles a CAN frame that is part of a fast-packet transfer. Returns the complete reassembled payload when all frames are received, nil otherwise.

func (*FastPacketAssembler) PurgeStale

func (a *FastPacketAssembler) PurgeStale(now time.Time)

PurgeStale removes any in-progress assemblies older than the timeout.

type Frame

type Frame struct {
	Seq       uint64
	Timestamp time.Time
	Header    CANHeader
	Data      []byte // raw CAN payload
	// contains filtered or unexported fields
}

Frame is a single CAN frame returned by Consumer.Next.

func (*Frame) JSON

func (f *Frame) JSON() ([]byte, error)

JSON returns the pre-serialized JSON for this frame (SSE format). If not cached (e.g. from journal replay), it serializes on demand.

type HoleTracker

type HoleTracker struct {
	// contains filtered or unexported fields
}

HoleTracker tracks gaps (holes) in a sequence number space. Holes are non-overlapping, sorted by Start, and represent ranges of missing data.

Typical case: 0-3 holes. All operations are linear on the slice, which is perfectly fine for the expected cardinality.

func NewHoleTracker

func NewHoleTracker() *HoleTracker

NewHoleTracker creates an empty hole tracker.

func (*HoleTracker) Add

func (h *HoleTracker) Add(start, end uint64)

Add inserts a new hole [start, end). Merges with any overlapping or adjacent holes. No-op if start >= end.

func (*HoleTracker) ContinuousThrough

func (h *HoleTracker) ContinuousThrough(cursor uint64) uint64

ContinuousThrough returns the highest sequence number with no holes below it. Given a base cursor and the hole set, this is the seq just before the first hole (or the cursor itself if no holes exist before it).

Example: cursor=100, holes=[(200,300)] -> returns 199 (continuous through 199) Example: cursor=100, holes=[(100,200)] -> returns 99 (hole starts at cursor) Example: cursor=100, no holes -> returns max uint64 (no bound from holes)

func (*HoleTracker) Fill

func (h *HoleTracker) Fill(start, end uint64) bool

Fill marks [start, end) as received, removing that range from any holes. Returns true if any holes were actually affected.

func (*HoleTracker) Holes

func (h *HoleTracker) Holes() []SeqRange

Holes returns a copy of current holes, sorted by Start.

func (*HoleTracker) Len

func (h *HoleTracker) Len() int

Len returns the number of holes.

func (*HoleTracker) TotalMissing

func (h *HoleTracker) TotalMissing() uint64

TotalMissing returns the total number of missing sequence numbers across all holes.

type InstanceManager

type InstanceManager struct {
	// contains filtered or unexported fields
}

InstanceManager manages per-instance state on the cloud side.

func NewInstanceManager

func NewInstanceManager(dataDir string, logger *slog.Logger) (*InstanceManager, error)

NewInstanceManager creates a new instance manager, loading any persisted state.

func (*InstanceManager) Get

func (im *InstanceManager) Get(id string) *InstanceState

Get returns the instance state, or nil if not found.

func (*InstanceManager) GetOrCreate

func (im *InstanceManager) GetOrCreate(id string) *InstanceState

GetOrCreate returns the instance state, creating it if necessary.

func (*InstanceManager) List

func (im *InstanceManager) List() []InstanceSummary

List returns a snapshot of all instance IDs and their basic state.

func (*InstanceManager) SetInstancePaused

func (im *InstanceManager) SetInstancePaused(instanceID string, paused bool)

SetInstancePaused pauses or unpauses journal writing for a specific instance. Used by the JournalKeeper overflow policy to stop/resume writes.

func (*InstanceManager) SetOnRotate

func (im *InstanceManager) SetOnRotate(fn func(instanceID string, rf RotatedFile))

SetOnRotate sets a callback invoked when any instance's journal or backfill file is rotated. Used by the cloud binary to feed the JournalKeeper. Must be called before any connections are accepted. Retroactively updates all existing instances loaded at startup.

func (*InstanceManager) Shutdown

func (im *InstanceManager) Shutdown()

Shutdown stops all instance brokers and persists state.

type InstanceState

type InstanceState struct {
	ID               string       `json:"id"`
	Cursor           uint64       `json:"cursor"`        // continuous data through this seq
	BoatHeadSeq      uint64       `json:"boat_head_seq"` // last reported by boat
	BoatJournalBytes uint64       `json:"boat_journal_bytes"`
	LastSeen         time.Time    `json:"last_seen"`
	Connected        bool         `json:"-"`
	HoleTracker      *HoleTracker `json:"-"`

	// Persisted hole state
	PersistedHoles []SeqRange `json:"holes,omitempty"`
	// contains filtered or unexported fields
}

InstanceState tracks the replication state for a single boat instance on the cloud side. Thread-safe.

func (*InstanceState) RecentEvents

func (s *InstanceState) RecentEvents(n int) []ReplicationEvent

RecentEvents returns up to n recent replication events, newest first.

func (*InstanceState) RecordEvent

func (s *InstanceState) RecordEvent(typ ReplicationEventType, detail map[string]any)

RecordEvent appends a diagnostic event to this instance's event log.

func (*InstanceState) Status

func (s *InstanceState) Status() InstanceStatus

Status returns a thread-safe snapshot of this instance's replication state.

type InstanceStatus

type InstanceStatus struct {
	ID               string     `json:"id"`
	Connected        bool       `json:"connected"`
	Cursor           uint64     `json:"cursor"`
	BoatHeadSeq      uint64     `json:"boat_head_seq"`
	BoatJournalBytes uint64     `json:"boat_journal_bytes"`
	Holes            []SeqRange `json:"holes,omitzero"`
	LagSeqs          uint64     `json:"lag_seqs"`
	LastSeen         time.Time  `json:"last_seen"`
}

InstanceStatus is a detailed snapshot of an instance for status reporting.

type InstanceSummary

type InstanceSummary struct {
	ID          string    `json:"id"`
	Connected   bool      `json:"connected"`
	Cursor      uint64    `json:"cursor"`
	BoatHeadSeq uint64    `json:"boat_head_seq"`
	Holes       int       `json:"holes"`
	LagSeqs     uint64    `json:"lag_seqs"`
	LastSeen    time.Time `json:"last_seen"`
}

InstanceSummary is a snapshot of an instance for listing.

type JournalConfig

type JournalConfig struct {
	Dir            string
	Prefix         string                  // default: "nmea2k"
	BlockSize      int                     // default: 262144, power of 2, min 4096
	Compression    journal.CompressionType // default: CompressionNone
	RotateDuration time.Duration           // 0 = no limit
	RotateSize     int64                   // 0 = no limit
	RotateCount    int64                   // 0 = no limit
	OnRotate       func(RotatedFile)       // called after a journal file is closed by rotation
	Logger         *slog.Logger
}

JournalConfig configures the journal writer.

type JournalKeeper

type JournalKeeper struct {
	// contains filtered or unexported fields
}

JournalKeeper manages journal file archival and retention. One goroutine per binary, driven by rotation notifications and periodic directory scans.

func NewJournalKeeper

func NewJournalKeeper(cfg KeeperConfig) *JournalKeeper

NewJournalKeeper creates a keeper. Call Run to start.

func (*JournalKeeper) Run

func (k *JournalKeeper) Run(ctx context.Context)

Run is the main loop. Blocks until ctx is cancelled.

func (*JournalKeeper) Send

func (k *JournalKeeper) Send(rf RotatedFile)

Send notifies the keeper that a file was rotated. Non-blocking; drops if the channel is full (the periodic scan will pick it up).

func (*JournalKeeper) SetOnPauseChange

func (k *JournalKeeper) SetOnPauseChange(fn func(dir KeeperDir, paused bool))

SetOnPauseChange sets the callback invoked when a directory's pause state transitions. Must be called before Run.

type JournalWriter

type JournalWriter struct {
	// contains filtered or unexported fields
}

JournalWriter writes CAN frames to block-based journal files.

func NewJournalWriter

func NewJournalWriter(cfg JournalConfig, devices *DeviceRegistry, ch <-chan RxFrame) (*JournalWriter, error)

NewJournalWriter creates a writer. Call Run to start.

func (*JournalWriter) Run

func (w *JournalWriter) Run(ctx context.Context) error

Run is the main loop. Blocks until ctx is cancelled or the channel is closed.

func (*JournalWriter) SetPaused

func (w *JournalWriter) SetPaused(p bool)

SetPaused sets whether the writer should discard incoming frames. Used by the overflow policy to stop writes when disk is full.

type KeeperConfig

type KeeperConfig struct {
	// Dirs to manage. Ignored if DirFunc is set.
	Dirs []KeeperDir

	// DirFunc returns current directories on each scan cycle.
	// Used by cloud to dynamically discover instance dirs.
	DirFunc func() []KeeperDir

	// Retention
	MaxAge  time.Duration // 0 = no age limit
	MinKeep time.Duration // 0 = no min-keep floor
	MaxSize int64         // 0 = no size limit

	// Soft/hard thresholds
	SoftPct        int            // % of MaxSize for proactive archiving (default 80, range 1-99)
	OverflowPolicy OverflowPolicy // what to do when hard cap hit with non-archived files

	// Archive
	ArchiveCommand string         // path to script; empty = no archiving
	ArchiveTrigger ArchiveTrigger // on-rotate or before-expire

	// Pause callback (called when overflow-policy=pause-recording toggles state)
	OnPauseChange func(dir KeeperDir, paused bool)

	Logger *slog.Logger
}

KeeperConfig configures the JournalKeeper.

type KeeperDir

type KeeperDir struct {
	Dir        string
	InstanceID string
}

KeeperDir is a directory managed by the keeper.

type OverflowPolicy

type OverflowPolicy int

OverflowPolicy controls what happens when the hard size cap is hit and files haven't been archived yet.

const (
	OverflowDeleteUnarchived OverflowPolicy = iota // delete files even if not archived
	OverflowPauseRecording                         // stop journal writes until archives free space
)

func ParseOverflowPolicy

func ParseOverflowPolicy(s string) (OverflowPolicy, error)

ParseOverflowPolicy parses a string into an OverflowPolicy.

func (OverflowPolicy) String

func (p OverflowPolicy) String() string

String returns the string representation of an OverflowPolicy.

type PGNValue

type PGNValue struct {
	PGN  uint32 `json:"pgn"`
	Ts   string `json:"ts"`
	Data string `json:"data"`
	Seq  uint64 `json:"seq"`
}

PGNValue is a single PGN's last-known value in the JSON response.

type ReplicationClient

type ReplicationClient struct {
	// contains filtered or unexported fields
}

ReplicationClient streams frames from the local broker to a cloud replication server over gRPC. It runs two independent streams: one for live frames (from the broker's head forward) and one for backfilling historical gaps (from journal files). On disconnect, it reconnects with exponential backoff and resumes both streams.

func NewReplicationClient

func NewReplicationClient(cfg ReplicationClientConfig, broker *Broker) *ReplicationClient

NewReplicationClient creates a new replication client. Call Run to start.

func (*ReplicationClient) Run

func (c *ReplicationClient) Run(ctx context.Context) error

Run is the main loop. Connects to the cloud, performs handshake, and starts live + backfill streams. Reconnects on failure with exponential backoff. Blocks until ctx is cancelled.

func (*ReplicationClient) Status

Status returns the current replication state for status reporting.

type ReplicationClientConfig

type ReplicationClientConfig struct {
	Target     string // cloud gRPC address (host:port)
	InstanceID string
	CertFile   string // client certificate
	KeyFile    string // client private key
	CAFile     string // CA certificate for verifying server
	Logger     *slog.Logger
}

ReplicationClientConfig configures the boat-side replication client.

type ReplicationEvent

type ReplicationEvent struct {
	Time   time.Time            `json:"time"`
	Type   ReplicationEventType `json:"type"`
	Detail map[string]any       `json:"detail,omitempty"`
}

ReplicationEvent is a single diagnostic event from the replication pipeline.

type ReplicationEventType

type ReplicationEventType string

ReplicationEventType identifies the kind of replication event.

const (
	EventLiveStart     ReplicationEventType = "live_start"
	EventLiveStop      ReplicationEventType = "live_stop"
	EventBackfillStart ReplicationEventType = "backfill_start"
	EventBackfillStop  ReplicationEventType = "backfill_stop"
	EventBlockReceived ReplicationEventType = "block_received"
	EventCheckpoint    ReplicationEventType = "checkpoint"
)

type ReplicationServer

type ReplicationServer struct {
	pb.UnimplementedReplicationServer
	// contains filtered or unexported fields
}

ReplicationServer implements the gRPC Replication service.

func NewReplicationServer

func NewReplicationServer(im *InstanceManager, logger *slog.Logger) *ReplicationServer

NewReplicationServer creates a new replication gRPC server.

func (*ReplicationServer) Backfill

Backfill handles bulk block transfer for filling gaps.

func (*ReplicationServer) GetInstanceBroker

func (s *ReplicationServer) GetInstanceBroker(instanceID string) *Broker

GetInstanceBroker returns the broker for an instance (starting it if needed). Used by HTTP handlers to serve SSE from a cloud instance.

func (*ReplicationServer) GetInstanceState

func (s *ReplicationServer) GetInstanceState(instanceID string) *InstanceState

GetInstanceState returns the instance state for status reporting.

func (*ReplicationServer) Handshake

Handshake exchanges sync state between boat and cloud.

func (*ReplicationServer) Live

Live handles the realtime frame stream from a boat.

type ReplicationStatus

type ReplicationStatus struct {
	Connected             bool       `json:"connected"`
	InstanceID            string     `json:"instance_id"`
	LocalHeadSeq          uint64     `json:"local_head_seq"`
	CloudCursor           uint64     `json:"cloud_cursor"`
	Holes                 []SeqRange `json:"holes,omitzero"`
	LiveLag               uint64     `json:"live_lag"`
	BackfillRemainingSeqs uint64     `json:"backfill_remaining_seqs"`
	LastAck               time.Time  `json:"last_ack,omitempty"`
}

ReplicationStatus is the boat-side view of replication state.

type RotatedFile

type RotatedFile struct {
	Path       string
	InstanceID string
}

RotatedFile describes a completed journal file.

type RxFrame

type RxFrame struct {
	Timestamp time.Time
	Header    CANHeader
	Data      []byte
	Seq       uint64 // assigned by broker in handleFrame; zero when fed by external code
}

RxFrame is a reassembled CAN frame ready for the broker.

type SeqRange

type SeqRange struct {
	Start uint64 // inclusive
	End   uint64 // exclusive
}

SeqRange represents a half-open interval [Start, End) of sequence numbers.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server handles HTTP API requests for lplex.

func NewServer

func NewServer(broker *Broker, logger *slog.Logger) *Server

NewServer creates a new HTTP server wired to the given broker.

func (*Server) HandleEphemeralSSE

func (s *Server) HandleEphemeralSSE(w http.ResponseWriter, r *http.Request)

HandleEphemeralSSE handles GET /events. Ephemeral SSE stream, no session, no ACK, no replay. Optional query params for filtering: pgn, manufacturer, instance, name (hex).

func (*Server) HandleFunc

func (s *Server) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))

HandleFunc registers an additional HTTP handler on the server's mux.

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler.

type SyncState

type SyncState struct {
	Cursor           uint64     // continuous data through this seq
	Holes            []SeqRange // sorted gaps
	BoatHeadSeq      uint64     // last reported by boat
	BoatJournalBytes uint64
}

SyncState captures the replication state for an instance, used for persistence and handshake responses.

type TxRequest

type TxRequest struct {
	Header CANHeader
	Data   []byte
}

TxRequest is a frame to write to the CAN bus.

type ValueStore

type ValueStore struct {
	// contains filtered or unexported fields
}

ValueStore tracks the last-seen frame data for each (source, PGN) pair. The broker goroutine writes via Record; HTTP handlers read via Snapshot.

func NewValueStore

func NewValueStore() *ValueStore

NewValueStore creates an empty value store.

func (*ValueStore) Record

func (vs *ValueStore) Record(source uint8, pgn uint32, ts time.Time, data []byte, seq uint64)

Record updates the stored value for the given source and PGN. Called by the broker goroutine on every frame.

func (*ValueStore) Snapshot

func (vs *ValueStore) Snapshot(devices *DeviceRegistry, filter *EventFilter) []DeviceValues

Snapshot returns the current values grouped by device, resolved against the device registry for NAME and manufacturer info. An optional filter restricts results by PGN and/or device criteria (manufacturer, name, instance).

func (*ValueStore) SnapshotJSON

func (vs *ValueStore) SnapshotJSON(devices *DeviceRegistry, filter *EventFilter) json.RawMessage

SnapshotJSON returns the snapshot as pre-serialized JSON.

Directories

Path Synopsis
Package canbus provides shared NMEA 2000 / CAN bus types and functions importable by both the server internals and client tools.
Package canbus provides shared NMEA 2000 / CAN bus types and functions importable by both the server internals and client tools.
cmd
journalbench command
journalbench reads frames from a .lpj journal file and re-encodes them with every combination of block size and compression method to find the optimal parameters for CAN bus journal data.
journalbench reads frames from a .lpj journal file and re-encodes them with every combination of block size and compression method to find the optimal parameters for CAN bus journal data.
lplex command
lplex-cloud command
lplexdump command
Package journal provides the block-based binary journal format (.lpj files) for recording and replaying NMEA 2000 CAN frames.
Package journal provides the block-based binary journal format (.lpj files) for recording and replaying NMEA 2000 CAN frames.
Package lplexc provides a Go client for lplex, a CAN bus HTTP bridge for NMEA 2000.
Package lplexc provides a Go client for lplex, a CAN bus HTTP bridge for NMEA 2000.
proto

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL