lplex

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 10, 2026 License: MIT Imports: 41 Imported by: 0

README

lplex

CAN bus HTTP bridge for NMEA 2000. Reads raw CAN frames from a SocketCAN interface, reassembles fast-packets, tracks device discovery, and streams frames to clients over SSE with session management, filtering, and replay. Supports cloud replication for remote access to boat data over intermittent connections.

  • Real-time SSE streaming with ephemeral and buffered session modes, per-client filtering by PGN, manufacturer, instance, or device name
  • Fast-packet reassembly for multi-frame NMEA 2000 PGNs, with automatic device discovery via ISO requests
  • PGN decoding of known NMEA 2000 message types into human-readable field values, with a DSL-based code generator supporting variant dispatch for proprietary PGNs and per-PGN metadata (fast-packet, transmission interval, on-demand)
  • Journal recording to block-based .lpj files with zstd compression, CRC32C checksums, and O(log N) time seeking
  • Retention and archival with max-age/min-keep/max-size knobs, soft/hard thresholds, configurable overflow policy, and pluggable archive scripts
  • Cloud replication over gRPC with mTLS, live + backfill streams, hole tracking, and lazy per-instance Broker on the cloud side
  • Pull-based Consumer with tiered replay (journal files → ring buffer → live), so clients can catch up from any point in history
  • Embeddable core as a Go package, mount the HTTP handler on any ServeMux
  • Go client library (lplexc) with mDNS discovery, subscriptions, device queries, and transmit
  • TypeScript client library (@sixfathoms/lplex) for browsers and Node.js, with CloudClient for lplex-cloud
  • CAN transmit via POST /send with automatic fast-packet fragmentation

Installation

Client (lplexdump)
# Homebrew (macOS / Linux)
brew install sixfathoms/tap/lplexdump

# From source
go install github.com/sixfathoms/lplex/cmd/lplexdump@latest
Server (Linux only, requires SocketCAN)
# Debian/Ubuntu (.deb includes both lplex server and lplexdump)
sudo dpkg -i lplex_*.deb
sudo systemctl start lplex

# Docker
docker run --network host --device /dev/can0 ghcr.io/sixfathoms/lplex:latest

# From source
go install github.com/sixfathoms/lplex/cmd/lplex@latest
Cloud Server
# From source
go install github.com/sixfathoms/lplex/cmd/lplex-cloud@latest

Download .deb packages from GitHub Releases.

Go Client Library
go get github.com/sixfathoms/lplex/lplexc@latest
TypeScript Client Library
npm install @sixfathoms/lplex

Zero runtime dependencies. Works in browsers and Node 18+. Ships ESM, CJS, and TypeScript declarations. See @sixfathoms/lplex on npm.

Embedding lplex

The core package is importable, so you can embed lplex into your own service:

go get github.com/sixfathoms/lplex@latest
import (
    "log/slog"
    "net/http"
    "time"

    "github.com/sixfathoms/lplex"
)

func main() {
    logger := slog.Default()

    // Create the broker (owns ring buffer, device registry, fan-out).
    broker := lplex.NewBroker(lplex.BrokerConfig{
        RingSize:          65536,
        MaxBufferDuration: 5 * time.Minute,
        Logger:            logger,
    })
    go broker.Run()

    // Mount the HTTP handler on a sub-path.
    srv := lplex.NewServer(broker, logger)
    mux := http.NewServeMux()
    mux.Handle("/nmea/", http.StripPrefix("/nmea", srv))

    // Feed frames from your own CAN source.
    go func() {
        for frame := range myFrameSource() {
            broker.RxFrames() <- lplex.RxFrame{
                Timestamp: frame.Time,
                Header:    lplex.CANHeader{Priority: 2, PGN: frame.PGN, Source: frame.Src, Destination: 0xFF},
                Data:      frame.Data,
            }
        }
    }()

    // Optional: enable journal recording.
    journalCh := make(chan lplex.RxFrame, 16384)
    broker.SetJournal(journalCh)
    // ... create JournalWriter and call Run in a goroutine.

    http.ListenAndServe(":8080", mux)
}

Lifecycle: the broker goroutine exits when you call broker.CloseRx(). Close the journal channel after that, then wait for the journal writer to finish.

Quick Start

Server
# Start the server (requires SocketCAN interface)
lplex -interface can0 -port 8089

# With a config file
lplex -config /etc/lplex/lplex.conf

# With journal recording enabled
lplex -interface can0 -port 8089 -journal-dir /var/log/lplex

# With cloud replication
lplex -interface can0 -replication-target cloud.example.com:9443 \
  -replication-instance-id boat-001 \
  -replication-tls-cert /etc/lplex/boat.crt \
  -replication-tls-key /etc/lplex/boat.key \
  -replication-tls-ca /etc/lplex/ca.crt

# Or with systemd
sudo systemctl enable --now lplex
Cloud Server
# Start the cloud server with mTLS
lplex-cloud -data-dir /data/lplex \
  -tls-cert /etc/lplex-cloud/server.crt \
  -tls-key /etc/lplex-cloud/server.key \
  -tls-client-ca /etc/lplex-cloud/ca.crt

# With a config file
lplex-cloud -config /etc/lplex-cloud/lplex-cloud.conf
Client (lplexdump)
# Auto-discover via mDNS and stream all frames
lplexdump

# Connect to a specific server with filtering
lplexdump -server http://inuc1.local:8089 -pgn 129025 -manufacturer Garmin

# Decode known PGNs into human-readable fields
lplexdump -decode

# Filter on decoded field values (auto-enables -decode)
lplexdump -where "pgn == 130310 && water_temperature < 280"
lplexdump -where 'register.name == "State of Charge"'

# Only show frames with significant changes (suppress sensor noise)
lplexdump -changes -decode

# Buffered mode with automatic reconnect replay
lplexdump -server http://inuc1.local:8089 -buffer-timeout PT5M
Go Client Library (lplexc)
import "github.com/sixfathoms/lplex/lplexc"

// Auto-discover the server
addr, _ := lplexc.Discover(ctx)
client := lplexc.NewClient(addr)

// Get devices on the bus
devices, _ := client.Devices(ctx)

// Subscribe to position updates from Garmin devices
sub, _ := client.Subscribe(ctx, &lplexc.Filter{
    PGNs:          []uint32{129025},
    Manufacturers: []string{"Garmin"},
})
defer sub.Close()

for {
    ev, err := sub.Next()
    if err != nil {
        break
    }
    fmt.Printf("Position: src=%d data=%s\n", ev.Frame.Src, ev.Frame.Data)
}
TypeScript Client Library (@sixfathoms/lplex)
import { Client } from "@sixfathoms/lplex";

const client = new Client("http://inuc1.local:8089");

// Get devices on the bus
const devices = await client.devices();

// Get current bus state snapshot
const snapshot = await client.values();

// Subscribe to position updates from Garmin devices
const stream = await client.subscribe({
  pgn: [129025],
  manufacturer: ["Garmin"],
});

for await (const event of stream) {
  if (event.type === "frame") {
    console.log(`Position: src=${event.frame.src} data=${event.frame.data}`);
  }
}

A CloudClient is also available for the lplex-cloud management API:

import { CloudClient } from "@sixfathoms/lplex";

const cloud = new CloudClient("https://cloud.example.com");
const instances = await cloud.instances();

// Get a regular Client scoped to a specific instance
const client = cloud.client("boat-001");
const devices = await client.devices();

Configuration

lplex can be configured with CLI flags, a HOCON config file, or both. CLI flags always take precedence over config file values.

Config file discovery

Use -config path/to/lplex.conf to specify a config file explicitly. If -config is not set, lplex searches for:

  1. ./lplex.conf
  2. /etc/lplex/lplex.conf

If no config file is found, lplex continues with defaults (fully backward compatible).

Example config (boat)
interface = can0
port = 8089
max-buffer-duration = PT5M

journal {
  dir = /var/log/lplex
  prefix = nmea2k
  block-size = 262144
  compression = zstd

  rotate {
    duration = PT1H
    size = 0
  }

  retention {
    max-age = P30D
    min-keep = PT24H
  }

  archive {
    command = "/usr/local/bin/archive-to-s3"
    trigger = "on-rotate"
  }
}

replication {
  target = "cloud.example.com:9443"
  instance-id = "boat-001"
  tls {
    cert = "/etc/lplex/boat.crt"
    key = "/etc/lplex/boat.key"
    ca = "/etc/lplex/ca.crt"
  }
}
Example config (cloud)
grpc {
  listen = ":9443"
  tls {
    cert = "/etc/lplex-cloud/server.crt"
    key = "/etc/lplex-cloud/server.key"
    client-ca = "/etc/lplex-cloud/ca.crt"
  }
}
http {
  listen = ":8080"
}
data-dir = "/data/lplex"

journal {
  rotate-duration = PT1H
  retention {
    max-age = P90D
    max-size = 53687091200
  }
  archive {
    command = "/usr/local/bin/archive-to-gcs"
    trigger = "before-expire"
  }
}

See lplex.conf.example and lplex-cloud.conf.example for the full annotated versions.

Architecture

SocketCAN (can0)
    |
CANReader goroutine
    |  reads extended CAN frames
    |  reassembles fast-packets (multi-frame PGNs)
    |
    v
rxFrames chan
    |
Broker goroutine (single writer, owns all state)
    |  assigns monotonic sequence numbers
    |  appends pre-serialized JSON to ring buffer (64k entries)
    |  updates device registry (PGN 60928, PGN 126996)
    |  fans out to sessions and ephemeral subscribers
    |  sends ISO requests to discover new devices
    |  feeds journal writer (if enabled)
    |
    +---> ring buffer (pre-serialized JSON, power-of-2)
    +---> DeviceRegistry (keyed by source address)
    +---> ValueStore (last frame per source+PGN)
    +---> sessions map (buffered clients with cursors)
    +---> subscribers map (ephemeral clients, no state)
    +---> journal chan (optional, 16k buffer)
    |
    v
HTTP Server (:8089)                JournalWriter goroutine
    |                                   |  block-based .lpj files
    +-- GET  /events                    |  zstd block compression
    +-- PUT  /clients/{id}              |  CRC32C checksums
    +-- GET  /clients/{id}/events       |  device table per block
    +-- PUT  /clients/{id}/ack          |  O(log N) time seeking
    +-- POST /send                      |  ~2-3 MB/hour at 200 fps
    +-- POST /query                     v
    +-- GET  /devices                .lpj journal files
    +-- GET  /values
    +-- GET  /replication/status

CANWriter goroutine            ReplicationClient (optional)
    |  fragments for TX            |  gRPC to cloud server
    |  writes to SocketCAN         +-- Live: Consumer -> LiveFrame stream
                                   +-- Backfill: raw blocks -> Block stream
                                   +-- Reconnect: exponential backoff

API

Ephemeral streaming

GET /events with optional query params: pgn, exclude_pgn, manufacturer, instance, name (hex).

No session, no replay, no ACK. Zero server-side state after disconnect.

Buffered sessions
  1. PUT /clients/{id} with {"buffer_timeout": "PT5M"} to create/reconnect
  2. GET /clients/{id}/events for SSE (replays from cursor, then live)
  3. PUT /clients/{id}/ack with {"seq": N} to advance cursor

Disconnected sessions keep their cursor for the buffer duration.

Transmit

Both /send and /query are disabled by default. Enable with -send-enabled or send.enabled = true in the config file. Use send.rules (HOCON string or object array) or -send-rules (semicolon-separated DSL) to define ordered allow/deny rules with PGN ranges and CAN NAME lists. HOCON config supports both string rules ("pgn:59904") and native objects ({ pgn = "59904", name = "..." }). Rules are evaluated top-to-bottom, first match wins. Internal device discovery (ISO requests at startup) is not affected.

POST /send with {"pgn": 59904, "src": 254, "dst": 255, "prio": 6, "data": "00ee00"}

Query on demand

POST /query with {"pgn": 129025, "dst": 255} sends an ISO Request (PGN 59904) and waits for the response. Returns the first matching frame as JSON. Optional "timeout": "PT5S" (default 2s). Returns 504 Gateway Timeout if no response arrives.

Devices

GET /devices returns JSON array of all discovered NMEA 2000 devices.

Last values

GET /values returns the most recently received frame for each (device, PGN) pair. Grouped by device, sorted by source address. Useful for getting a snapshot of bus state without subscribing to SSE.

Supports the same filter query params as /events: pgn, exclude_pgn, manufacturer, instance, name (hex). Example: GET /values?pgn=129025&manufacturer=Garmin.

Replication status (boat)

GET /replication/status returns current replication state (available when replication is configured).

Cloud Replication

lplex can replicate CAN bus data from a boat to a cloud instance over gRPC with mTLS. The boat initiates all connections (no public IP required). Data flows over two independent gRPC streams:

  • Live stream: realtime frames from the broker's head, delivered to the cloud within seconds
  • Backfill stream: raw journal blocks for filling historical gaps, newest-first

On reconnect after a connectivity gap, live data resumes immediately while backfill works through the gap in the background. The cloud runs a replica Broker per instance, so web clients connect to the cloud and get the same SSE API as if they were on the boat.

See docs/cloud-replication.md for the full protocol specification.

Cloud HTTP API
Endpoint Description
GET /instances List all instances
GET /instances/{id}/status Instance status (cursor, holes, lag)
GET /instances/{id}/events SSE stream from instance's broker
GET /instances/{id}/devices Device table
GET /instances/{id}/values Last-seen values per (device, PGN). Query params: pgn, manufacturer, instance, name.
GET /instances/{id}/replication/events?limit=N Replication event log (newest first, default 100, max 1024)

Journal Recording

lplex can record all CAN frames to disk as block-based binary journal files (.lpj) for future replay and analysis.

# Enable recording (zstd compression by default)
lplex -interface can0 -journal-dir /var/log/lplex

# With rotation (new file every hour)
lplex -interface can0 -journal-dir /var/log/lplex -journal-rotate-duration PT1H

# Disable compression
lplex -interface can0 -journal-dir /var/log/lplex -journal-compression none

Flags:

Flag Default Description
-journal-dir (disabled) Directory for journal files
-journal-prefix nmea2k Journal file name prefix
-journal-block-size 262144 Block size (power of 2, min 4096)
-journal-compression zstd Block compression: none, zstd, zstd-dict
-journal-rotate-duration PT1H Rotate after duration (ISO 8601)
-journal-rotate-size 0 Rotate after bytes (0 = disabled)
-journal-retention-max-age (disabled) Delete files older than this (ISO 8601, e.g. P30D)
-journal-retention-min-keep (disabled) Never delete files younger than this, unless max-size exceeded
-journal-retention-max-size 0 Hard size cap in bytes; delete oldest files when exceeded
-journal-retention-soft-pct 80 Proactive archive threshold as % of max-size (1-99)
-journal-retention-overflow-policy delete-unarchived What to do when hard cap hit with failed archives
-journal-archive-command (disabled) Path to archive script
-journal-archive-trigger (disabled) When to archive: on-rotate or before-expire

Blocks are compressed individually with zstd (~4x ratio at 256KB blocks on typical CAN data, ~158 MB/day at 200 fps). Each block carries a device table so consumers can resolve source addresses without external state. A block index at end-of-file enables fast seeking; crash-truncated files are recovered via forward-scan. See docs/format.md for the binary format specification.

Retention and Archival

Journal files accumulate indefinitely unless you configure a retention policy. Retention and archival are available on both boat and cloud binaries.

# Keep at most 30 days of journals, but never delete files less than 24 hours old
lplex -interface can0 -journal-dir /var/log/lplex \
  -journal-retention-max-age P30D -journal-retention-min-keep PT24H

# Hard size cap: keep at most 10 GB, oldest files deleted first
lplex -interface can0 -journal-dir /var/log/lplex \
  -journal-retention-max-size 10737418240

# Archive to S3 on rotation, then delete after 30 days
lplex -interface can0 -journal-dir /var/log/lplex \
  -journal-retention-max-age P30D \
  -journal-archive-command /usr/local/bin/archive-to-s3 \
  -journal-archive-trigger on-rotate

Retention algorithm: files are sorted oldest-first. Three zones govern behavior when max-size is set with archival:

  1. Normal (total <= soft threshold): standard age-based expiration, archive-then-delete
  2. Soft zone (soft < total <= hard): proactively queue oldest non-archived files for archive
  3. Hard zone (total > hard): expire files; if archives have failed, apply the overflow policy

max-size overrides min-keep overrides max-age. The soft threshold defaults to 80% of max-size and only applies when both max-size and an archive command are configured.

Overflow policies (when hard cap is hit and archives have failed):

  • delete-unarchived (default): delete files even if not archived, prioritizing continued recording
  • pause-recording: stop journal writes until archives free space, prioritizing archive completeness

Archive script protocol: the script receives file paths as arguments and JSONL metadata on stdin (one line per file with path, instance_id, size, created). It must write JSONL to stdout with per-file status ("ok" or "error"). Failed files are retried with exponential backoff.

Archive triggers:

  • on-rotate: archive immediately after a journal file is closed (eager, minimizes data loss window)
  • before-expire: archive only when a file is about to be deleted by retention (lazy, minimizes archive traffic)

PGN Decoding

lplexdump can decode known NMEA 2000 PGNs into human-readable field values using the -decode flag:

# Terminal: decoded fields appear below each frame
lplexdump -decode

# JSON output: adds a "decoded" object to each frame
lplexdump -decode -json

# Journal replay with decoding
lplexdump -file recording.lpj -decode

The registry contains ~120 PGNs, of which ~30 have full decoders (position, heading, wind, depth, engine, battery, environment, etc.). The remaining PGNs are name-only: they carry descriptions and metadata (fast-packet, interval) but no field layout. Unknown PGNs pass through with raw hex data as usual.

Packet tests

PGN decoders are verified by table-driven tests in pgn/packets_test.go. Each test vector specifies hex packet data and the expected decoded struct, with automatic round-trip verification. To add a test from real device data, capture a frame with lplexdump -decode -json and copy the data and decoded fields into a new entry.

PGN DSL

PGN definitions live in pgn/defs/*.pgn using a compact DSL that describes bit-level field layouts. The code generator (pgngen) reads these files and produces Go structs with Decode*/Encode methods, a Registry map, Protobuf definitions, and JSON Schema.

go generate ./pgn/...   # regenerate from pgn/defs/*.pgn
Basic syntax
# Line comments start with #

pgn 129025 "Position Rapid Update" interval=100ms {
  latitude   int32  :32  scale=1e-7  unit="deg"
  longitude  int32  :32  scale=1e-7  unit="deg"
}

pgn 129029 "GNSS Position Data" fast_packet interval=1000ms {
  sid              uint8   :8
  days_since_1970  uint16  :16
  # ... more fields
}

pgn 59904 "ISO Request" on_demand {
  requested_pgn  uint32  :24
}
PGN-level attributes

Attributes between the description and opening { apply to the PGN as a whole:

Attribute Description
fast_packet PGN uses multi-frame fast-packet protocol
interval=<duration> Default transmission interval (100ms, 500ms, 1s, 2500ms, 60s). Stored as time.Duration in the registry.
on_demand Event-driven PGN, no periodic transmission
draft Definition is incomplete or reverse-engineered. Propagated to PGNInfo.Draft.

These are code-generated into PGNInfo fields in pgn.Registry and used by IsFastPacket() to identify fast-packet PGNs.

Name-only PGNs

A PGN definition without braces registers the PGN's name and metadata (fast-packet, interval, etc.) without defining a field layout. The generated Registry entry has Decode: nil.

pgn 129038 "AIS Class A Position Report" fast_packet
pgn 126983 "Alert" fast_packet
pgn 127493 "Transmission Parameters Dynamic" draft

This is the canonical form for PGNs whose structure is unknown or not yet implemented. Use this instead of hardcoded name maps.

Field definitions

Each field has: name type :bits [attributes...]

Element Description
name Field name (snake_case). Use _ for reserved/padding bits, ? for unknown/undocumented data.
type uint8, uint16, uint32, uint64, int8, int16, int32, int64, float32, float64, string, or an enum name
:bits Bit width of the field
scale=N Scaling factor: decoded = raw * scale. Output type becomes float64.
offset=N Offset: decoded = raw * scale + offset
unit="..." Human-readable unit (e.g. "deg", "m/s", "rad")
trim="..." Right-trim these characters from decoded string fields (e.g. trim="@ " for AIS names)
tolerance=N Change detection threshold for ChangeTracker. Fields with changes smaller than N are suppressed by lplexdump -changes.
value=N Dispatch constraint for variant PGNs (see below)
Enums

Named enumerations for lookup fields:

enum HeadingReference {
  0 = "true"
  1 = "magnetic"
}

pgn 127250 "Vessel Heading" {
  sid                uint8             :8
  heading            uint16            :16  scale=0.0001  unit="rad"
  heading_reference  HeadingReference  :2
  _                                    :6
}
Lookups

Lookup tables map integer keys to human-readable names. Unlike enums, lookups don't change the field's Go type; the field stays its raw integer type and gets a Name() method for display.

lookup VictronRegister uint16 {
  0x0100 = "Product ID"
  0x0200 = "Device Mode"
  0xED8F = "DC Channel 1 Current"
}

pgn 61184 "Victron Battery Register" {
  manufacturer_code  uint16  :11  value=358
  _                          :2
  industry_code      uint8   :3
  register           uint16  :16  lookup=VictronRegister
  payload            uint32  :32
}

The generator produces:

  • A map[uint16]string variable (victronRegisterNames) with all key-name pairs
  • A RegisterName() string method on the struct that returns the human-readable name (or empty string if unknown)
  • A LookupFields() map[string]string method for display code to wrap the field as {"id": <raw>, "name": "..."}

Keys support hex (0xFF) and decimal (255) literals. Valid key types: uint8, uint16, uint32, uint64.

Variant dispatch (value=)

Some PGN numbers (notably 61184, Proprietary Single Frame) carry different payloads depending on a discriminator field value. The DSL supports this by allowing multiple pgn blocks with the same number, differentiated by value= constraints on a shared discriminator field.

# Victron devices use manufacturer_code=358
pgn 61184 "Victron Battery Register" {
  manufacturer_code  uint16  :11  value=358
  _                          :2
  industry_code      uint8   :3
  register           uint16  :16
  payload            uint32  :32
}

# Garmin devices use manufacturer_code=229
pgn 61184 "Garmin Proprietary" {
  manufacturer_code  uint16  :11  value=229
  _                          :2
  industry_code      uint8   :3
  data               uint32  :32
}

The generator produces:

  • A separate struct and Decode*/Encode for each variant (VictronBatteryRegister, GarminProprietary)
  • A dispatch function Decode61184(data []byte) (any, error) that reads the discriminator from raw bytes and routes to the correct variant decoder
  • A single Registry entry for the PGN number pointing to the dispatch function

Rules and constraints:

Rule Detail
Discriminator field All constrained variants must use the same field name, bit position, and bit width as the discriminator
Unique values Each value=N must be unique across all variants of the same PGN
Default variant A variant with no value= on any field acts as the fallback for unrecognized discriminator values. This is optional, not required.
At most one default Only one default variant (without value=) is allowed per PGN
Minimum one constraint At least one variant must have a value= constraint. Two defaults with no constraints is an error.
Single constrained variant Even a single pgn block with value= gets a dispatch function that rejects non-matching discriminator values
No default means error Without a default variant, unknown discriminator values return an error from the dispatch function
Constrained encode Encode() hardcodes the value=N literal instead of reading the struct field, so encoded frames always have the correct discriminator
Reserved/unknown fields _ (padding) and ? (unknown) fields cannot have value=

Generated dispatch (conceptual):

func Decode61184(data []byte) (any, error) {
    disc := binary.LittleEndian.Uint16(data[0:2]) & 0x07FF
    switch uint64(disc) {
    case 358:
        return DecodeVictronBatteryRegister(data)
    case 229:
        return DecodeGarminProprietary(data)
    default:
        return nil, fmt.Errorf("PGN 61184: unknown manufacturer_code value %d", disc)
    }
}
Repeated fields (repeat=)

When a PGN has N identical consecutive fields (e.g. 28 two-bit switch indicators), use repeat=N to collapse them into a single line. The generator expands them at code-generation time into a slice or map in Go.

# Array mode (default): generates []uint8
pgn 127501 "Binary Switch Bank Status" {
  instance    uint8   :8
  indicator   uint8   :2  repeat=28
}

# Map mode: generates map[int]uint8 with 1-based keys
pgn 127501 "Binary Switch Bank Status" {
  instance    uint8   :8
  indicator   uint8   :2  repeat=28  group="map"
}

# Override the auto-pluralized field name
pgn 127501 "Binary Switch Bank Status" {
  instance    uint8   :8
  indicator   uint8   :2  repeat=28  as="switches"
}
Attribute Description
repeat=N Repeat this field N times (N >= 2). Expands to N consecutive fields of the same type/width.
group="map" Use map[int]T instead of []T in Go. Keys are 1-based (NMEA convention). Default is array.
as="name" Override the auto-pluralized field name. Default: basic English pluralization (indicator -> indicators).

Constraints: repeat= cannot be used on reserved (_) or unknown (?) fields, or combined with value=, lookup=, or enum types. group= and as= require repeat=.

Generated code: Decode produces a slice/map literal with unrolled bit reads. Encode uses bounds-checked (array) or key-checked (map) writes. Fields after a repeated field get correct bit offsets automatically.

Deployment

The .deb package installs a systemd service that binds to can0. Configure with a config file or environment variable:

# Option 1: config file (recommended)
sudo cp lplex.conf.example /etc/lplex/lplex.conf
sudo vi /etc/lplex/lplex.conf

# Option 2: environment variable
# Edit /etc/default/lplex:
LPLEX_ARGS="-interface can0 -port 8089 -journal-dir /var/log/lplex -journal-compression zstd"

License

MIT

Documentation

Overview

Package lplex is a CAN bus HTTP bridge for NMEA 2000.

It reads raw CAN frames from a SocketCAN interface, reassembles fast-packets, tracks device discovery, and streams frames to clients over SSE with session management, filtering, and replay.

The package can be embedded into other Go services. Create a Broker to manage frame routing, a Server to expose the HTTP API, and optionally a JournalWriter to record frames to disk.

broker := lplex.NewBroker(lplex.BrokerConfig{
    RingSize:          65536,
    MaxBufferDuration: 5 * time.Minute,
    Logger:            logger,
})
go broker.Run()

srv := lplex.NewServer(broker, logger, lplex.SendPolicy{})
mux.Handle("/nmea/", http.StripPrefix("/nmea", srv))

Feed frames into the broker via Broker.RxFrames:

broker.RxFrames() <- lplex.RxFrame{
    Timestamp: time.Now(),
    Header:    lplex.CANHeader{Priority: 2, PGN: 129025, Source: 10, Destination: 0xFF},
    Data:      payload,
}

When done, close the rx channel and the broker goroutine exits:

broker.CloseRx()

Index

Constants

View Source
const DefaultLagCheckInterval = 1000

DefaultLagCheckInterval controls how often the boat checks for lag in the live send loop. Checked every N frames sent rather than by wall clock because when lagging, consumer.Next() returns instantly and the loop spins at CPU speed. At max bus rate (2000 fps) this checks roughly every 0.5s.

View Source
const DefaultMaxFrameRate = 2000

NMEA 2000 runs on CAN 2.0B at 250 kbit/s. An extended frame (29-bit ID, 8-byte payload) is roughly 131-157 bits depending on bit stuffing. Theoretical max is ~1800 frames/sec. We allow 2000 to give ~10% headroom for measurement jitter and bit-stuffing variance.

View Source
const DefaultMaxLiveLag uint64 = 10_000

DefaultMaxLiveLag is the frame count threshold for live lag detection. If the live stream falls this far behind the broker head (boat-side) or the boat's reported head (cloud-side), the stream is killed and the gap switches to backfill mode. ~5 seconds at max bus rate.

View Source
const DefaultMinLagReconnectInterval = 30 * time.Second

DefaultMinLagReconnectInterval prevents thrashing when the system is persistently overloaded. If lag keeps recurring, we wait at least this long between lag-triggered reconnects.

View Source
const DefaultRateBurst = 500

DefaultRateBurst is the burst allowance for transient spikes. Power-on storms (every device announces simultaneously) can briefly exceed the sustained rate. 500 frames absorbs a ~250ms burst at max bus load.

Variables

View Source
var (
	ParseCANID = canbus.ParseCANID
	BuildCANID = canbus.BuildCANID
)
View Source
var ErrFallenBehind = errors.New("consumer fallen behind: data no longer available")

ErrFallenBehind is returned by Consumer.Next when the consumer's cursor has fallen behind both the ring buffer and available journal files.

Functions

func CANReader

func CANReader(ctx context.Context, iface string, rxFrames chan<- RxFrame, logger *slog.Logger) error

CANReader reads frames from SocketCAN, reassembles fast-packets, and sends completed frames to the broker's rxFrames channel.

func CANWriter

func CANWriter(ctx context.Context, iface string, txFrames <-chan TxRequest, logger *slog.Logger) error

CANWriter reads from the broker's txFrames channel and writes to SocketCAN. Handles fast-packet fragmentation for payloads > 8 bytes.

func FragmentFastPacket

func FragmentFastPacket(data []byte, seqCounter uint8) [][]byte

FragmentFastPacket splits a payload into CAN frames for fast-packet TX. seqCounter should be incremented per transfer (wraps at 7, 3-bit field). Returns a slice of 8-byte CAN frame payloads.

func HealthHandler added in v0.3.1

func HealthHandler(cfg HealthConfig) http.HandlerFunc

HealthHandler returns an http.HandlerFunc that serves the /healthz endpoint.

func IsFastPacket

func IsFastPacket(pgnNum uint32) bool

IsFastPacket returns true if the PGN uses fast-packet transfer.

func MetricsHandler added in v0.3.1

func MetricsHandler(broker *Broker, replStatus func() *ReplicationStatus) http.HandlerFunc

MetricsHandler returns an http.HandlerFunc that serves Prometheus-format metrics from the broker's Stats(). Optional ReplicationStatus can be provided via a callback for replication-aware deployments.

func ParseISO8601Duration

func ParseISO8601Duration(s string) (time.Duration, error)

ParseISO8601Duration parses a subset of ISO 8601 durations (PT format). Supports hours (H), minutes (M), and seconds (S). Examples: "PT5M", "PT1H30M", "PT30S", "PT1H"

Types

type ArchiveTrigger

type ArchiveTrigger int

ArchiveTrigger controls when journal files are archived.

const (
	ArchiveDisabled     ArchiveTrigger = iota
	ArchiveOnRotate                    // archive immediately after rotation
	ArchiveBeforeExpire                // archive only when about to be deleted by retention
)

func ParseArchiveTrigger

func ParseArchiveTrigger(s string) (ArchiveTrigger, error)

ParseArchiveTrigger parses a string into an ArchiveTrigger.

func (ArchiveTrigger) String

func (t ArchiveTrigger) String() string

String returns the string representation of an ArchiveTrigger.

type BlockWriter

type BlockWriter struct {
	// contains filtered or unexported fields
}

BlockWriter appends pre-encoded journal blocks to .lpj files. Unlike JournalWriter, it receives blocks that are already serialized (with CRC, device table, frame data). Used by the cloud replication server to write backfill blocks byte-for-byte without decompression or re-encoding.

func NewBlockWriter

func NewBlockWriter(cfg BlockWriterConfig) (*BlockWriter, error)

NewBlockWriter creates a new BlockWriter. Call AppendBlock to write blocks. Call Close when done to finalize the current file.

func (*BlockWriter) AppendBlock

func (w *BlockWriter) AppendBlock(baseSeq uint64, baseTimeUs int64, data []byte, compressed bool) error

AppendBlock writes a pre-encoded block to the current journal file. For compressed blocks, the data is the compressed payload (written with a 20-byte v2 header). For uncompressed blocks, data is the full block bytes (must be exactly BlockSize with valid CRC).

func (*BlockWriter) Close

func (w *BlockWriter) Close() error

Close finalizes the current file (writes block index for compressed files, syncs, and closes). Safe to call multiple times or on a writer with no open file.

type BlockWriterConfig

type BlockWriterConfig struct {
	Dir            string
	Prefix         string // default: "nmea2k"
	BlockSize      int    // uncompressed block size (from source journal)
	Compression    journal.CompressionType
	RotateDuration time.Duration     // 0 = no limit
	RotateSize     int64             // 0 = no limit
	OnRotate       func(RotatedFile) // called after a journal file is closed by rotation
	Logger         *slog.Logger
}

BlockWriterConfig configures a BlockWriter.

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker is the central coordinator. Single goroutine reads from rxFrames, assigns sequence numbers, appends to ring buffer, updates device registry, and fans out to client sessions and ephemeral subscribers.

func NewBroker

func NewBroker(cfg BrokerConfig) *Broker

NewBroker creates a new broker with the given config.

func (*Broker) AckSession

func (b *Broker) AckSession(id string, seq uint64) error

AckSession updates the cursor for a session.

func (*Broker) CloseRx

func (b *Broker) CloseRx()

CloseRx closes the rxFrames channel, signaling the broker to stop processing. Wait on Done() to know when the broker goroutine has actually exited.

func (*Broker) CreateSession

func (b *Broker) CreateSession(id string, bufferTimeout time.Duration, filter *EventFilter) (*ClientSession, uint64)

CreateSession creates or retrieves a client session. Returns the session and the current head sequence number.

When bufferTimeout is 0, the session cursor is reset so no frames are replayed on the next connect (fresh start).

func (*Broker) CurrentSeq

func (b *Broker) CurrentSeq() uint64

CurrentSeq returns the most recently assigned sequence number.

func (*Broker) Devices

func (b *Broker) Devices() *DeviceRegistry

Devices returns the broker's device registry.

func (*Broker) Done

func (b *Broker) Done() <-chan struct{}

Done returns a channel that is closed when the broker's Run() method returns.

func (*Broker) GetSession

func (b *Broker) GetSession(id string) *ClientSession

GetSession returns a session by ID, or nil if not found.

func (*Broker) LastFrameTime added in v0.3.1

func (b *Broker) LastFrameTime() time.Time

LastFrameTime returns the timestamp of the most recently received frame, or zero if no frames have been received yet.

func (*Broker) NewConsumer

func (b *Broker) NewConsumer(cfg ConsumerConfig) *Consumer

NewConsumer creates a pull-based consumer starting at the given cursor. The consumer is registered with the broker for live notifications.

func (*Broker) Run

func (b *Broker) Run()

Run is the broker's main loop. Call in its own goroutine. Exits when rxFrames is closed.

func (*Broker) RxFrames

func (b *Broker) RxFrames() chan<- RxFrame

RxFrames returns the channel for submitting received CAN frames to the broker.

func (*Broker) SendISORequest added in v0.3.1

func (b *Broker) SendISORequest(dst uint8, pgn uint32) error

SendISORequest sends an ISO Request (PGN 59904) to the given destination, asking it to transmit the specified PGN. Returns an error if the tx queue is full.

func (*Broker) SetJournal

func (b *Broker) SetJournal(ch chan<- RxFrame)

SetJournal sets the journal channel. Must be called before Run.

func (*Broker) Stats added in v0.3.1

func (b *Broker) Stats() BrokerStats

Stats returns a point-in-time snapshot of broker metrics.

func (*Broker) Subscribe

func (b *Broker) Subscribe(filter *EventFilter) (*subscriber, func())

Subscribe creates an ephemeral fan-out channel with the given filter. Returns the subscriber and a cleanup function that must be called on disconnect.

func (*Broker) TouchSession

func (b *Broker) TouchSession(id string)

TouchSession updates the LastActivity timestamp for a session.

func (*Broker) TxFrames

func (b *Broker) TxFrames() <-chan TxRequest

TxFrames returns the channel for reading CAN frames to transmit.

func (*Broker) Values

func (b *Broker) Values() *ValueStore

Values returns the broker's last-values store.

type BrokerConfig

type BrokerConfig struct {
	RingSize          int           // must be power of 2
	MaxBufferDuration time.Duration // cap on client buffer_timeout
	JournalDir        string        // directory containing .lpj files (for consumer journal fallback)
	Logger            *slog.Logger

	// ReplicaMode makes the broker honor frame.Seq instead of auto-incrementing.
	// Used by the cloud replication server where sequence numbers originate
	// from the boat's broker.
	ReplicaMode bool

	// InitialHead sets the starting head value. Use this when resuming a
	// replica broker from persisted state so the ring starts at the right
	// position. Zero means start at 1 (the default).
	InitialHead uint64
}

BrokerConfig holds broker configuration.

type BrokerHealth added in v0.3.1

type BrokerHealth struct {
	Status        string    `json:"status"` // "ok" or "unhealthy"
	FramesTotal   uint64    `json:"frames_total"`
	HeadSeq       uint64    `json:"head_seq"`
	LastFrameTime time.Time `json:"last_frame_time,omitempty"`
	DeviceCount   int       `json:"device_count"`
	RingEntries   uint64    `json:"ring_entries"`
	RingCapacity  int       `json:"ring_capacity"`
}

BrokerHealth reports the broker's health.

type BrokerStats added in v0.3.1

type BrokerStats struct {
	FramesTotal       uint64    // total frames processed
	LastFrameTime     time.Time // timestamp of most recent frame (zero if none)
	RingEntries       uint64    // current entries in ring buffer
	RingCapacity      int       // ring buffer size
	HeadSeq           uint64    // next sequence number
	ActiveSessions    int       // buffered client sessions
	ActiveSubscribers int       // ephemeral SSE subscribers
	ActiveConsumers   int       // pull-based consumers
	DeviceCount       int       // discovered NMEA 2000 devices
}

BrokerStats is a point-in-time snapshot of broker metrics.

type BusSilenceMonitor added in v0.3.1

type BusSilenceMonitor struct {
	// contains filtered or unexported fields
}

BusSilenceMonitor watches for periods of CAN bus inactivity and logs alerts when no frames have been received for a configurable duration. This helps detect CAN bus disconnection or power issues on the boat.

func NewBusSilenceMonitor added in v0.3.1

func NewBusSilenceMonitor(timeout time.Duration, broker *Broker, logger *slog.Logger) *BusSilenceMonitor

NewBusSilenceMonitor creates a monitor that alerts when no frames have been received for the given timeout duration. The timeout must be positive.

func (*BusSilenceMonitor) IsSilent added in v0.3.1

func (m *BusSilenceMonitor) IsSilent() bool

IsSilent reports whether the bus is currently in a silence state.

func (*BusSilenceMonitor) Run added in v0.3.1

func (m *BusSilenceMonitor) Run(ctx context.Context)

Run checks for bus silence periodically and logs warnings. It exits when ctx is cancelled.

type ByteMaskDiff added in v0.3.1

type ByteMaskDiff struct{}

ByteMaskDiff is the default diff method. Any byte-level change is significant.

Short format (packets <= 8 bytes):

[mask:1] [changed bytes...]

Extended format (packets > 8 bytes):

[maskLen:2 LE] [mask bytes...] [changed bytes...]

func (ByteMaskDiff) Apply added in v0.3.1

func (ByteMaskDiff) Apply(prev, diff []byte) []byte

func (ByteMaskDiff) Diff added in v0.3.1

func (ByteMaskDiff) Diff(prev, curr []byte) (bool, []byte)

type CANHeader

type CANHeader = canbus.CANHeader

Type aliases so existing server code compiles unchanged.

type ChangeEvent added in v0.3.1

type ChangeEvent struct {
	Type      ChangeEventType
	Timestamp time.Time
	Source    uint8
	PGN       uint32
	SubKey    uint64
	Seq       uint64
	Data      []byte // Full data for Snapshot, diff for Delta, nil for Idle.
}

ChangeEvent is emitted by the ChangeTracker when a meaningful state change (or silence) is detected for a (source, PGN, subkey) tracking key.

type ChangeEventType added in v0.3.1

type ChangeEventType uint8

ChangeEventType identifies the kind of change event.

const (
	// Snapshot is the first observation for a key. Contains full packet data.
	Snapshot ChangeEventType = 1
	// Delta means a significant change was detected. Contains a compact diff.
	Delta ChangeEventType = 2
	// Idle means a source stopped producing for this key.
	Idle ChangeEventType = 3
)

func (ChangeEventType) String added in v0.3.1

func (t ChangeEventType) String() string

type ChangeReplayer added in v0.3.1

type ChangeReplayer struct {
	// contains filtered or unexported fields
}

ChangeReplayer reconstructs full packet data from a stream of ChangeEvents. Maintains per-key state and applies diffs to recover the original packets.

func NewChangeReplayer added in v0.3.1

func NewChangeReplayer(methods map[uint32]DiffMethod, subKeys map[uint32]SubKeyFunc) *ChangeReplayer

NewChangeReplayer creates a replayer. Pass the same methods and subkeys config used by the tracker that produced the events.

func (*ChangeReplayer) Apply added in v0.3.1

func (r *ChangeReplayer) Apply(event ChangeEvent) ([]byte, error)

Apply processes a ChangeEvent and returns the reconstructed full packet data. Returns nil for Idle events (state is preserved but no data emitted). Returns an error if a Delta arrives without a prior Snapshot.

func (*ChangeReplayer) State added in v0.3.1

func (r *ChangeReplayer) State(source uint8, pgnID uint32, subKey uint64) []byte

State returns the last known full packet data for a key, or nil if unknown.

type ChangeTracker added in v0.3.1

type ChangeTracker struct {
	// contains filtered or unexported fields
}

ChangeTracker tracks per-(source, PGN, subkey) state and emits compact change events. Not goroutine-safe; designed for single-goroutine callers (like the broker's handleFrame).

func NewChangeTracker added in v0.3.1

func NewChangeTracker(cfg ChangeTrackerConfig) *ChangeTracker

NewChangeTracker creates a ChangeTracker with the given configuration. Automatically wires FieldToleranceDiff for any PGN in the registry that declares field-level tolerances, unless an explicit method is already set.

func (*ChangeTracker) Process added in v0.3.1

func (ct *ChangeTracker) Process(ts time.Time, source uint8, pgnID uint32, data []byte, seq uint64) *ChangeEvent

Process handles an incoming frame and returns a ChangeEvent if the frame represents a meaningful state change. Returns nil for no-ops (unchanged data within tolerance).

func (*ChangeTracker) Remove added in v0.3.1

func (ct *ChangeTracker) Remove(source uint8, pgnID uint32, subKey uint64)

Remove removes tracking state for a specific key.

func (*ChangeTracker) Reset added in v0.3.1

func (ct *ChangeTracker) Reset()

Reset clears all tracked state.

func (*ChangeTracker) Tick added in v0.3.1

func (ct *ChangeTracker) Tick(now time.Time) []ChangeEvent

Tick checks all tracked pairs for idle timeouts and returns Idle events for any that have exceeded their timeout. Call this periodically.

func (*ChangeTracker) TrackedPairs added in v0.3.1

func (ct *ChangeTracker) TrackedPairs() int

TrackedPairs returns the number of actively tracked (source, PGN, subkey) pairs.

type ChangeTrackerConfig added in v0.3.1

type ChangeTrackerConfig struct {
	// DefaultMethod is the diff method used for PGNs without a specific override.
	// Nil defaults to ByteMaskDiff.
	DefaultMethod DiffMethod

	// Methods maps specific PGNs to custom diff methods.
	Methods map[uint32]DiffMethod

	// SubKeys maps specific PGNs to sub-key extractor functions for
	// multiplexed PGNs (e.g., Victron registers on PGN 61184).
	SubKeys map[uint32]SubKeyFunc

	// DefaultIdleTimeout is the fallback idle timeout when no PGN-specific
	// timeout can be resolved. Defaults to 5s.
	DefaultIdleTimeout time.Duration

	// IdleMultiplier is applied to the PGN registry interval to compute
	// the idle timeout. Defaults to 3.
	IdleMultiplier int

	// IdleTimeouts maps specific PGNs to explicit idle timeout overrides.
	IdleTimeouts map[uint32]time.Duration
}

ChangeTrackerConfig configures the ChangeTracker.

type ClientSession

type ClientSession struct {
	ID            string
	Cursor        uint64        // last ACK'd sequence number (0 = never ACK'd)
	BufferTimeout time.Duration // how long to keep buffering after disconnect
	LastActivity  time.Time
	Filter        *EventFilter // nil = receive all frames
}

ClientSession tracks a buffered client's metadata for persistence across HTTP reconnects. The actual frame reading is done by Consumer.

type ComponentHealth added in v0.3.1

type ComponentHealth struct {
	Status  string `json:"status"`
	Message string `json:"message,omitempty"`
}

ComponentHealth is a generic component status.

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

Consumer is a pull-based reader that iterates frames at its own pace. It reads from a tiered log: journal files on disk (oldest), ring buffer in memory (recent), and live notification (current head, blocking wait).

func (*Consumer) Close

func (c *Consumer) Close() error

Close stops the consumer and removes it from the broker. Safe to call multiple times.

func (*Consumer) Cursor

func (c *Consumer) Cursor() uint64

Cursor returns the consumer's current position (next seq to read).

func (*Consumer) Next

func (c *Consumer) Next(ctx context.Context) (*Frame, error)

Next returns the next matching frame, blocking until one is available. Returns ErrFallenBehind if the consumer's cursor has fallen behind all available data. Returns ctx.Err() if the context is cancelled.

type ConsumerConfig

type ConsumerConfig struct {
	Cursor uint64       // starting position (next seq to read)
	Filter *EventFilter // nil = all frames
}

ConsumerConfig configures a new Consumer.

type DecodedDeviceValues added in v0.3.1

type DecodedDeviceValues struct {
	Name         string            `json:"name"`
	Source       uint8             `json:"src"`
	Manufacturer string            `json:"manufacturer,omitempty"`
	ModelID      string            `json:"model_id,omitempty"`
	Values       []DecodedPGNValue `json:"values"`
}

DecodedDeviceValues groups decoded PGN values by device.

type DecodedPGNValue added in v0.3.1

type DecodedPGNValue struct {
	PGN         uint32 `json:"pgn"`
	Description string `json:"description"`
	Ts          string `json:"ts"`
	Seq         uint64 `json:"seq"`
	Fields      any    `json:"fields"`
}

DecodedPGNValue is a single PGN's last-known value decoded into named fields.

type Device

type Device struct {
	Source           uint8  `json:"src"`
	NAME             uint64 `json:"-"`
	NAMEHex          string `json:"name"`
	Manufacturer     string `json:"manufacturer"`
	ManufacturerCode uint16 `json:"manufacturer_code"`
	DeviceClass      uint8  `json:"device_class"`
	DeviceFunction   uint8  `json:"device_function"`
	DeviceInstance   uint8  `json:"device_instance"`
	UniqueNumber     uint32 `json:"unique_number,omitempty"`

	// PGN 126996 Product Information fields.
	ModelID         string `json:"model_id,omitempty"`
	SoftwareVersion string `json:"software_version,omitempty"`
	ModelVersion    string `json:"model_version,omitempty"`
	ModelSerial     string `json:"model_serial,omitempty"`
	ProductCode     uint16 `json:"product_code,omitempty"`

	// Per-source packet statistics.
	FirstSeen   time.Time `json:"first_seen"`
	LastSeen    time.Time `json:"last_seen"`
	PacketCount uint64    `json:"packet_count"`
	ByteCount   uint64    `json:"byte_count"`
}

Device represents an NMEA 2000 device discovered via ISO Address Claim (PGN 60928) and optionally enriched with Product Information (PGN 126996).

type DeviceRegistry

type DeviceRegistry struct {
	// contains filtered or unexported fields
}

DeviceRegistry tracks NMEA 2000 devices discovered via PGN 60928. Thread-safe for concurrent reads (SSE streams) and writes (broker goroutine).

func NewDeviceRegistry

func NewDeviceRegistry() *DeviceRegistry

NewDeviceRegistry creates an empty device registry.

func (*DeviceRegistry) Get

func (r *DeviceRegistry) Get(source uint8) *Device

Get returns a snapshot of the device at the given source address, or nil.

func (*DeviceRegistry) HandleAddressClaim

func (r *DeviceRegistry) HandleAddressClaim(source uint8, data []byte) *Device

HandleAddressClaim processes a PGN 60928 ISO Address Claim. Returns the device if this is a new or changed device, nil otherwise.

func (*DeviceRegistry) HandleProductInfo

func (r *DeviceRegistry) HandleProductInfo(source uint8, data []byte) *Device

HandleProductInfo processes a PGN 126996 Product Information response. Returns the device if fields changed, nil if source is unknown or unchanged.

func (*DeviceRegistry) RecordPacket

func (r *DeviceRegistry) RecordPacket(source uint8, ts time.Time, dataLen int) bool

RecordPacket updates per-source packet statistics. Returns true if this is a previously unseen source address.

func (*DeviceRegistry) Snapshot

func (r *DeviceRegistry) Snapshot() []Device

Snapshot returns a copy of all known devices.

func (*DeviceRegistry) SnapshotJSON

func (r *DeviceRegistry) SnapshotJSON() json.RawMessage

SnapshotJSON returns the device list as pre-serialized JSON.

func (*DeviceRegistry) SynthesizeFrames

func (r *DeviceRegistry) SynthesizeFrames(ts time.Time) []RxFrame

SynthesizeFrames generates RxFrame slices for PGN 60928 (Address Claim) and PGN 126996 (Product Info) from all known devices. Used to seed a remote broker's device registry on live stream connect.

type DeviceValues

type DeviceValues struct {
	Name         string     `json:"name"`
	Source       uint8      `json:"src"`
	Manufacturer string     `json:"manufacturer,omitempty"`
	ModelID      string     `json:"model_id,omitempty"`
	Values       []PGNValue `json:"values"`
}

DeviceValues groups PGN values by device in the JSON response.

type DiffMethod added in v0.3.1

type DiffMethod interface {
	// Diff compares prev and curr, returning whether the change is significant
	// and a compact diff encoding. If not significant, diff is nil.
	Diff(prev, curr []byte) (significant bool, diff []byte)

	// Apply reconstructs curr from prev and a diff produced by Diff.
	Apply(prev, diff []byte) []byte
}

DiffMethod computes and applies compact binary diffs between packet payloads.

type EventFilter

type EventFilter struct {
	PGNs          []uint32
	ExcludePGNs   []uint32
	Manufacturers []string
	Instances     []uint8
	Names         []uint64 // 64-bit CAN NAMEs (include)
	ExcludeNames  []uint64 // 64-bit CAN NAMEs (exclude)
}

EventFilter specifies which CAN frames a session receives. Categories are AND'd (all set categories must match), values within a category are OR'd (any value in the list matches).

func ParseFilterParams

func ParseFilterParams(r *http.Request) (*EventFilter, error)

ParseFilterParams reads optional filter query params from a request. Supported params: pgn (decimal), manufacturer (name or code), instance (decimal), name (hex CAN NAME). Returns nil filter if no params are set.

func (*EventFilter) IsEmpty

func (f *EventFilter) IsEmpty() bool

IsEmpty returns true if no filter criteria are set.

type EventLog

type EventLog struct {
	// contains filtered or unexported fields
}

EventLog is a fixed-size ring buffer of replication events.

func NewEventLog

func NewEventLog() *EventLog

NewEventLog creates an empty event log.

func (*EventLog) Recent

func (l *EventLog) Recent(n int) []ReplicationEvent

Recent returns up to n events, newest first.

func (*EventLog) Record

func (l *EventLog) Record(typ ReplicationEventType, detail map[string]any)

Record appends a new event to the log.

type FastPacketAssembler

type FastPacketAssembler struct {
	// contains filtered or unexported fields
}

FastPacketAssembler reassembles multi-frame fast-packet PGNs.

Fast-packet protocol:

  • Frame 0: byte[0] = seq_counter(3 bits) | frame_number(5 bits), byte[1] = total_bytes, bytes[2:8] = first 6 data bytes
  • Frame N: byte[0] = seq_counter(3 bits) | frame_number(5 bits), bytes[1:8] = next 7 data bytes

func NewFastPacketAssembler

func NewFastPacketAssembler(timeout time.Duration) *FastPacketAssembler

NewFastPacketAssembler creates a new assembler with the given reassembly timeout.

func (*FastPacketAssembler) Process

func (a *FastPacketAssembler) Process(pgn uint32, source uint8, data []byte, now time.Time) []byte

Process handles a CAN frame that is part of a fast-packet transfer. Returns the complete reassembled payload when all frames are received, nil otherwise.

func (*FastPacketAssembler) PurgeStale

func (a *FastPacketAssembler) PurgeStale(now time.Time)

PurgeStale removes any in-progress assemblies older than the timeout.

type FieldTolerance added in v0.3.1

type FieldTolerance struct {
	Field     string
	Tolerance float64
}

FieldTolerance defines a tolerance threshold for a named field. Changes within the tolerance are suppressed (not emitted as significant).

type FieldToleranceDiff added in v0.3.1

type FieldToleranceDiff struct {
	Inner      DiffMethod
	PGN        uint32
	Decode     func([]byte) (any, error)
	Tolerances []FieldTolerance
}

FieldToleranceDiff wraps an inner DiffMethod and uses PGN decode functions plus reflection to check field-level tolerances. If all changed fields are within their tolerance, the change is suppressed. The encoding is always delegated to the inner method (tolerances only gate significance).

The baseline is NOT updated when a change is suppressed, preventing tolerance drift over time.

func (*FieldToleranceDiff) Apply added in v0.3.1

func (f *FieldToleranceDiff) Apply(prev, diff []byte) []byte

func (*FieldToleranceDiff) Diff added in v0.3.1

func (f *FieldToleranceDiff) Diff(prev, curr []byte) (bool, []byte)

type Frame

type Frame struct {
	Seq       uint64
	Timestamp time.Time
	Header    CANHeader
	Data      []byte // raw CAN payload
	// contains filtered or unexported fields
}

Frame is a single CAN frame returned by Consumer.Next.

func (*Frame) JSON

func (f *Frame) JSON() ([]byte, error)

JSON returns the pre-serialized JSON for this frame (SSE format). If not cached (e.g. from journal replay), it serializes on demand.

type HealthConfig added in v0.3.1

type HealthConfig struct {
	Broker     *Broker
	ReplStatus func() *ReplicationStatus // nil if replication not configured

	// BusSilenceThreshold is the duration after which no frames indicates
	// a CAN bus problem. Zero disables bus silence detection.
	BusSilenceThreshold time.Duration
}

HealthConfig configures the health check endpoint.

type HealthStatus added in v0.3.1

type HealthStatus struct {
	Status      string                     `json:"status"` // "ok", "degraded", or "unhealthy"
	Broker      BrokerHealth               `json:"broker"`
	Replication *ReplicationHealth         `json:"replication,omitempty"`
	Components  map[string]ComponentHealth `json:"components,omitempty"`
}

HealthStatus is the structured response from the /healthz endpoint.

type HoleTracker

type HoleTracker struct {
	// contains filtered or unexported fields
}

HoleTracker tracks gaps (holes) in a sequence number space. Holes are non-overlapping, sorted by Start, and represent ranges of missing data.

Typical case: 0-3 holes. All operations are linear on the slice, which is perfectly fine for the expected cardinality.

func NewHoleTracker

func NewHoleTracker() *HoleTracker

NewHoleTracker creates an empty hole tracker.

func (*HoleTracker) Add

func (h *HoleTracker) Add(start, end uint64)

Add inserts a new hole [start, end). Merges with any overlapping or adjacent holes. No-op if start >= end.

func (*HoleTracker) ContinuousThrough

func (h *HoleTracker) ContinuousThrough(cursor uint64) uint64

ContinuousThrough returns the highest sequence number with no holes below it. Given a base cursor and the hole set, this is the seq just before the first hole (or the cursor itself if no holes exist before it).

Example: cursor=100, holes=[(200,300)] -> returns 199 (continuous through 199) Example: cursor=100, holes=[(100,200)] -> returns 99 (hole starts at cursor) Example: cursor=100, no holes -> returns max uint64 (no bound from holes)

func (*HoleTracker) Fill

func (h *HoleTracker) Fill(start, end uint64) bool

Fill marks [start, end) as received, removing that range from any holes. Returns true if any holes were actually affected.

func (*HoleTracker) Holes

func (h *HoleTracker) Holes() []SeqRange

Holes returns a copy of current holes, sorted by Start.

func (*HoleTracker) Len

func (h *HoleTracker) Len() int

Len returns the number of holes.

func (*HoleTracker) TotalMissing

func (h *HoleTracker) TotalMissing() uint64

TotalMissing returns the total number of missing sequence numbers across all holes.

type InstanceManager

type InstanceManager struct {
	// contains filtered or unexported fields
}

InstanceManager manages per-instance state on the cloud side.

func NewInstanceManager

func NewInstanceManager(dataDir string, logger *slog.Logger) (*InstanceManager, error)

NewInstanceManager creates a new instance manager, loading any persisted state.

func (*InstanceManager) Get

func (im *InstanceManager) Get(id string) *InstanceState

Get returns the instance state, or nil if not found.

func (*InstanceManager) GetOrCreate

func (im *InstanceManager) GetOrCreate(id string) *InstanceState

GetOrCreate returns the instance state, creating it if necessary.

func (*InstanceManager) List

func (im *InstanceManager) List() []InstanceSummary

List returns a snapshot of all instance IDs and their basic state.

func (*InstanceManager) SetInstancePaused

func (im *InstanceManager) SetInstancePaused(instanceID string, paused bool)

SetInstancePaused pauses or unpauses journal writing for a specific instance. Used by the JournalKeeper overflow policy to stop/resume writes.

func (*InstanceManager) SetJournalRotation added in v0.3.1

func (im *InstanceManager) SetJournalRotation(duration time.Duration, size int64)

SetJournalRotation configures rotation for live journal writers. Must be called before any connections are accepted. Retroactively updates all existing instances loaded at startup.

func (*InstanceManager) SetOnRotate

func (im *InstanceManager) SetOnRotate(fn func(instanceID string, rf RotatedFile))

SetOnRotate sets a callback invoked when any instance's journal or backfill file is rotated. Used by the cloud binary to feed the JournalKeeper. Must be called before any connections are accepted. Retroactively updates all existing instances loaded at startup.

func (*InstanceManager) Shutdown

func (im *InstanceManager) Shutdown()

Shutdown stops all instance brokers and persists state.

type InstanceState

type InstanceState struct {
	ID               string       `json:"id"`
	Cursor           uint64       `json:"cursor"`        // continuous data through this seq
	BoatHeadSeq      uint64       `json:"boat_head_seq"` // last reported by boat
	BoatJournalBytes uint64       `json:"boat_journal_bytes"`
	LastSeen         time.Time    `json:"last_seen"`
	Connected        bool         `json:"-"`
	HoleTracker      *HoleTracker `json:"-"`

	// Persisted hole state
	PersistedHoles []SeqRange `json:"holes,omitempty"`
	// contains filtered or unexported fields
}

InstanceState tracks the replication state for a single boat instance on the cloud side. Thread-safe.

func (*InstanceState) RecentEvents

func (s *InstanceState) RecentEvents(n int) []ReplicationEvent

RecentEvents returns up to n recent replication events, newest first.

func (*InstanceState) RecordEvent

func (s *InstanceState) RecordEvent(typ ReplicationEventType, detail map[string]any)

RecordEvent appends a diagnostic event to this instance's event log.

func (*InstanceState) Status

func (s *InstanceState) Status() InstanceStatus

Status returns a thread-safe snapshot of this instance's replication state.

type InstanceStatus

type InstanceStatus struct {
	ID               string     `json:"id"`
	Connected        bool       `json:"connected"`
	Cursor           uint64     `json:"cursor"`
	BoatHeadSeq      uint64     `json:"boat_head_seq"`
	BoatJournalBytes uint64     `json:"boat_journal_bytes"`
	Holes            []SeqRange `json:"holes,omitzero"`
	LagSeqs          uint64     `json:"lag_seqs"`
	LastSeen         time.Time  `json:"last_seen"`
}

InstanceStatus is a detailed snapshot of an instance for status reporting.

type InstanceSummary

type InstanceSummary struct {
	ID          string    `json:"id"`
	Connected   bool      `json:"connected"`
	Cursor      uint64    `json:"cursor"`
	BoatHeadSeq uint64    `json:"boat_head_seq"`
	Holes       int       `json:"holes"`
	LagSeqs     uint64    `json:"lag_seqs"`
	LastSeen    time.Time `json:"last_seen"`
}

InstanceSummary is a snapshot of an instance for listing.

type JournalConfig

type JournalConfig struct {
	Dir            string
	Prefix         string                  // default: "nmea2k"
	BlockSize      int                     // default: 262144, power of 2, min 4096
	Compression    journal.CompressionType // default: CompressionNone
	RotateDuration time.Duration           // 0 = no limit
	RotateSize     int64                   // 0 = no limit
	RotateCount    int64                   // 0 = no limit
	OnRotate       func(RotatedFile)       // called after a journal file is closed by rotation
	Logger         *slog.Logger
}

JournalConfig configures the journal writer.

type JournalKeeper

type JournalKeeper struct {
	// contains filtered or unexported fields
}

JournalKeeper manages journal file archival and retention. One goroutine per binary, driven by rotation notifications and periodic directory scans.

func NewJournalKeeper

func NewJournalKeeper(cfg KeeperConfig) *JournalKeeper

NewJournalKeeper creates a keeper. Call Run to start.

func (*JournalKeeper) Run

func (k *JournalKeeper) Run(ctx context.Context)

Run is the main loop. Blocks until ctx is cancelled.

func (*JournalKeeper) Send

func (k *JournalKeeper) Send(rf RotatedFile)

Send notifies the keeper that a file was rotated. Non-blocking; drops if the channel is full (the periodic scan will pick it up).

func (*JournalKeeper) SetOnPauseChange

func (k *JournalKeeper) SetOnPauseChange(fn func(dir KeeperDir, paused bool))

SetOnPauseChange sets the callback invoked when a directory's pause state transitions. Must be called before Run.

type JournalWriter

type JournalWriter struct {
	// contains filtered or unexported fields
}

JournalWriter writes CAN frames to block-based journal files.

func NewJournalWriter

func NewJournalWriter(cfg JournalConfig, devices *DeviceRegistry, ch <-chan RxFrame) (*JournalWriter, error)

NewJournalWriter creates a writer. Call Run to start.

func (*JournalWriter) Run

func (w *JournalWriter) Run(ctx context.Context) error

Run is the main loop. Blocks until ctx is cancelled or the channel is closed.

func (*JournalWriter) SetPaused

func (w *JournalWriter) SetPaused(p bool)

SetPaused sets whether the writer should discard incoming frames. Used by the overflow policy to stop writes when disk is full.

type KeeperConfig

type KeeperConfig struct {
	// Dirs to manage. Ignored if DirFunc is set.
	Dirs []KeeperDir

	// DirFunc returns current directories on each scan cycle.
	// Used by cloud to dynamically discover instance dirs.
	DirFunc func() []KeeperDir

	// Retention
	MaxAge  time.Duration // 0 = no age limit
	MinKeep time.Duration // 0 = no min-keep floor
	MaxSize int64         // 0 = no size limit

	// Soft/hard thresholds
	SoftPct        int            // % of MaxSize for proactive archiving (default 80, range 1-99)
	OverflowPolicy OverflowPolicy // what to do when hard cap hit with non-archived files

	// Archive
	ArchiveCommand string         // path to script; empty = no archiving
	ArchiveTrigger ArchiveTrigger // on-rotate or before-expire

	// Pause callback (called when overflow-policy=pause-recording toggles state)
	OnPauseChange func(dir KeeperDir, paused bool)

	Logger *slog.Logger
}

KeeperConfig configures the JournalKeeper.

type KeeperDir

type KeeperDir struct {
	Dir        string
	InstanceID string
}

KeeperDir is a directory managed by the keeper.

type OverflowPolicy

type OverflowPolicy int

OverflowPolicy controls what happens when the hard size cap is hit and files haven't been archived yet.

const (
	OverflowDeleteUnarchived OverflowPolicy = iota // delete files even if not archived
	OverflowPauseRecording                         // stop journal writes until archives free space
)

func ParseOverflowPolicy

func ParseOverflowPolicy(s string) (OverflowPolicy, error)

ParseOverflowPolicy parses a string into an OverflowPolicy.

func (OverflowPolicy) String

func (p OverflowPolicy) String() string

String returns the string representation of an OverflowPolicy.

type PGNMatcher added in v0.3.1

type PGNMatcher struct {
	Singles []uint32
	Ranges  [][2]uint32 // [lo, hi] inclusive
}

PGNMatcher matches PGN numbers against a set of individual values and ranges.

func (*PGNMatcher) Contains added in v0.3.1

func (m *PGNMatcher) Contains(pgn uint32) bool

Contains returns true if pgn is in the matcher's set.

type PGNValue

type PGNValue struct {
	PGN  uint32 `json:"pgn"`
	Ts   string `json:"ts"`
	Data string `json:"data"`
	Seq  uint64 `json:"seq"`
}

PGNValue is a single PGN's last-known value in the JSON response.

type ReplicationClient

type ReplicationClient struct {
	// contains filtered or unexported fields
}

ReplicationClient streams frames from the local broker to a cloud replication server over gRPC. It runs two independent streams: one for live frames (from the broker's head forward) and one for backfilling historical gaps (from journal files). On disconnect, it reconnects with exponential backoff and resumes both streams.

func NewReplicationClient

func NewReplicationClient(cfg ReplicationClientConfig, broker *Broker) *ReplicationClient

NewReplicationClient creates a new replication client. Call Run to start.

func (*ReplicationClient) Run

func (c *ReplicationClient) Run(ctx context.Context) error

Run is the main loop. Connects to the cloud, performs handshake, and starts live + backfill streams. Reconnects on failure with exponential backoff. Blocks until ctx is cancelled.

func (*ReplicationClient) Status

Status returns the current replication state for status reporting.

type ReplicationClientConfig

type ReplicationClientConfig struct {
	Target     string // cloud gRPC address (host:port)
	InstanceID string
	CertFile   string // client certificate
	KeyFile    string // client private key
	CAFile     string // CA certificate for verifying server
	Logger     *slog.Logger

	// Resource protection tuning. Zero values use defaults from
	// replication_limits.go.
	MaxLiveLag              uint64        // max frames live can lag before reconnect (default: DefaultMaxLiveLag)
	LagCheckInterval        int           // check lag every N frames sent (default: DefaultLagCheckInterval)
	MinLagReconnectInterval time.Duration // min wait between lag-triggered reconnects (default: DefaultMinLagReconnectInterval)
}

ReplicationClientConfig configures the boat-side replication client.

type ReplicationEvent

type ReplicationEvent struct {
	Time   time.Time            `json:"time"`
	Type   ReplicationEventType `json:"type"`
	Detail map[string]any       `json:"detail,omitempty"`
}

ReplicationEvent is a single diagnostic event from the replication pipeline.

type ReplicationEventType

type ReplicationEventType string

ReplicationEventType identifies the kind of replication event.

const (
	EventLiveStart     ReplicationEventType = "live_start"
	EventLiveStop      ReplicationEventType = "live_stop"
	EventBackfillStart ReplicationEventType = "backfill_start"
	EventBackfillStop  ReplicationEventType = "backfill_stop"
	EventBlockReceived ReplicationEventType = "block_received"
	EventCheckpoint    ReplicationEventType = "checkpoint"
)

type ReplicationHealth added in v0.3.1

type ReplicationHealth struct {
	Status            string    `json:"status"` // "ok", "degraded", or "disconnected"
	Connected         bool      `json:"connected"`
	LiveLag           uint64    `json:"live_lag"`
	BackfillRemaining uint64    `json:"backfill_remaining_seqs"`
	LastAck           time.Time `json:"last_ack,omitempty"`
}

ReplicationHealth reports the replication client's health.

type ReplicationServer

type ReplicationServer struct {
	pb.UnimplementedReplicationServer

	// Resource protection tuning. Zero values use defaults from
	// replication_limits.go.
	MaxFrameRate float64 // max frames/sec per live stream (default: DefaultMaxFrameRate)
	RateBurst    int     // burst allowance for transient spikes (default: DefaultRateBurst)
	MaxLiveLag   uint64  // max frames live can lag before closing stream (default: DefaultMaxLiveLag)
	// contains filtered or unexported fields
}

ReplicationServer implements the gRPC Replication service.

func NewReplicationServer

func NewReplicationServer(im *InstanceManager, logger *slog.Logger) *ReplicationServer

NewReplicationServer creates a new replication gRPC server.

func (*ReplicationServer) Backfill

Backfill handles bulk block transfer for filling gaps.

func (*ReplicationServer) GetInstanceBroker

func (s *ReplicationServer) GetInstanceBroker(instanceID string) *Broker

GetInstanceBroker returns the broker for an instance (starting it if needed). Used by HTTP handlers to serve SSE from a cloud instance.

func (*ReplicationServer) GetInstanceState

func (s *ReplicationServer) GetInstanceState(instanceID string) *InstanceState

GetInstanceState returns the instance state for status reporting.

func (*ReplicationServer) Handshake

Handshake exchanges sync state between boat and cloud.

func (*ReplicationServer) Live

Live handles the realtime frame stream from a boat.

type ReplicationStatus

type ReplicationStatus struct {
	Connected             bool       `json:"connected"`
	InstanceID            string     `json:"instance_id"`
	LocalHeadSeq          uint64     `json:"local_head_seq"`
	CloudCursor           uint64     `json:"cloud_cursor"`
	Holes                 []SeqRange `json:"holes,omitzero"`
	LiveLag               uint64     `json:"live_lag"`
	BackfillRemainingSeqs uint64     `json:"backfill_remaining_seqs"`
	LastAck               time.Time  `json:"last_ack,omitempty"`
}

ReplicationStatus is the boat-side view of replication state.

type RotatedFile

type RotatedFile struct {
	Path       string
	InstanceID string
}

RotatedFile describes a completed journal file.

type RxFrame

type RxFrame struct {
	Timestamp time.Time
	Header    CANHeader
	Data      []byte
	Seq       uint64 // assigned by broker in handleFrame; zero when fed by external code
}

RxFrame is a reassembled CAN frame ready for the broker.

type SendPolicy added in v0.3.1

type SendPolicy struct {
	Enabled bool       // must be true for /send and /query to accept requests
	Rules   []SendRule // ordered rules; first match wins
}

SendPolicy controls whether the /send and /query endpoints are enabled and which frames are permitted. Rules are evaluated top-to-bottom; first match wins. If no rule matches, the request is denied. An empty Rules list with Enabled=true allows all frames (backwards compatible).

type SendRule added in v0.3.1

type SendRule struct {
	Allow bool        // true = allow, false = deny
	PGNs  *PGNMatcher // nil = match any PGN
	Names []uint64    // nil/empty = match any destination NAME
}

SendRule is a single allow or deny rule that matches frames by PGN and/or destination CAN NAME. Either matcher may be nil (wildcard).

func ParseSendRule added in v0.3.1

func ParseSendRule(s string) (SendRule, error)

ParseSendRule parses a single rule string. Syntax:

[!] [pgn:<spec>] [name:<hex>,...]

where <spec> is comma-separated PGN values or ranges (e.g. "59904", "59904,126208", "129025-129029", "59904,65280-65535").

A '!' prefix makes the rule a deny rule; otherwise it's an allow rule. Omitting pgn: matches all PGNs. Omitting name: matches all destinations.

Examples:

"pgn:59904"                              — allow PGN 59904 to any device
"pgn:59904,126208 name:001c6e4000200000" — allow two PGNs to one device
"pgn:129025-129029"                      — allow a PGN range
"!pgn:65280-65535"                       — deny proprietary PGN range
"name:001c6e4000200000"                  — allow any PGN to one device

func ParseSendRules added in v0.3.1

func ParseSendRules(rules []string) ([]SendRule, error)

ParseSendRules parses multiple rule strings.

func (*SendRule) Matches added in v0.3.1

func (r *SendRule) Matches(pgn uint32, dstNAME uint64, nameKnown bool) bool

Matches returns true if the rule matches the given PGN and destination NAME. A nil PGNs matcher matches any PGN; an empty Names list matches any NAME.

type SeqRange

type SeqRange struct {
	Start uint64 // inclusive
	End   uint64 // exclusive
}

SeqRange represents a half-open interval [Start, End) of sequence numbers.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server handles HTTP API requests for lplex.

func NewServer

func NewServer(broker *Broker, logger *slog.Logger, policy SendPolicy) *Server

NewServer creates a new HTTP server wired to the given broker.

func (*Server) HandleEphemeralSSE

func (s *Server) HandleEphemeralSSE(w http.ResponseWriter, r *http.Request)

HandleEphemeralSSE handles GET /events. Ephemeral SSE stream, no session, no ACK, no replay. Optional query params for filtering: pgn, manufacturer, instance, name (hex).

func (*Server) HandleFunc

func (s *Server) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request))

HandleFunc registers an additional HTTP handler on the server's mux.

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler.

type SubKeyFunc added in v0.3.1

type SubKeyFunc func(data []byte) uint64

SubKeyFunc extracts a sub-discriminator from raw packet data for multiplexed PGNs. Returns 0 for non-multiplexed PGNs (i.e., no sub-key extractor configured).

type SyncState

type SyncState struct {
	Cursor           uint64     // continuous data through this seq
	Holes            []SeqRange // sorted gaps
	BoatHeadSeq      uint64     // last reported by boat
	BoatJournalBytes uint64
}

SyncState captures the replication state for an instance, used for persistence and handshake responses.

type TxRequest

type TxRequest struct {
	Header CANHeader
	Data   []byte
}

TxRequest is a frame to write to the CAN bus.

type ValueStore

type ValueStore struct {
	// contains filtered or unexported fields
}

ValueStore tracks the last-seen frame data for each (source, PGN) pair. The broker goroutine writes via Record; HTTP handlers read via Snapshot.

func NewValueStore

func NewValueStore() *ValueStore

NewValueStore creates an empty value store.

func (*ValueStore) DecodedSnapshot added in v0.3.1

func (vs *ValueStore) DecodedSnapshot(devices *DeviceRegistry, filter *EventFilter) []DecodedDeviceValues

DecodedSnapshot returns the current values grouped by device with PGN data decoded into named fields using the pgn.Registry. PGNs not in the registry or that fail to decode are omitted.

func (*ValueStore) DecodedSnapshotJSON added in v0.3.1

func (vs *ValueStore) DecodedSnapshotJSON(devices *DeviceRegistry, filter *EventFilter) json.RawMessage

DecodedSnapshotJSON returns the decoded snapshot as pre-serialized JSON.

func (*ValueStore) Record

func (vs *ValueStore) Record(source uint8, pgn uint32, ts time.Time, data []byte, seq uint64)

Record updates the stored value for the given source and PGN. Called by the broker goroutine on every frame.

func (*ValueStore) Snapshot

func (vs *ValueStore) Snapshot(devices *DeviceRegistry, filter *EventFilter) []DeviceValues

Snapshot returns the current values grouped by device, resolved against the device registry for NAME and manufacturer info. An optional filter restricts results by PGN and/or device criteria (manufacturer, name, instance).

func (*ValueStore) SnapshotJSON

func (vs *ValueStore) SnapshotJSON(devices *DeviceRegistry, filter *EventFilter) json.RawMessage

SnapshotJSON returns the snapshot as pre-serialized JSON.

Directories

Path Synopsis
Package canbus provides shared NMEA 2000 / CAN bus types and functions importable by both the server internals and client tools.
Package canbus provides shared NMEA 2000 / CAN bus types and functions importable by both the server internals and client tools.
cmd
journalbench command
journalbench reads frames from a .lpj journal file and re-encodes them with every combination of block size and compression method to find the optimal parameters for CAN bus journal data.
journalbench reads frames from a .lpj journal file and re-encodes them with every combination of block size and compression method to find the optimal parameters for CAN bus journal data.
lplex command
lplex-cloud command
lplexdump command
pgngen command
pgngen generates Go structs, Protocol Buffer definitions, and JSON Schema from NMEA 2000 PGN definition files written in the lplex PGN DSL.
pgngen generates Go structs, Protocol Buffer definitions, and JSON Schema from NMEA 2000 PGN definition files written in the lplex PGN DSL.
Package filter provides a BPF-inspired expression language for filtering decoded NMEA 2000 frames by field values.
Package filter provides a BPF-inspired expression language for filtering decoded NMEA 2000 frames by field values.
Package journal provides the block-based binary journal format (.lpj files) for recording and replaying NMEA 2000 CAN frames.
Package journal provides the block-based binary journal format (.lpj files) for recording and replaying NMEA 2000 CAN frames.
Package lplexc provides a Go client for lplex, a CAN bus HTTP bridge for NMEA 2000.
Package lplexc provides a Go client for lplex, a CAN bus HTTP bridge for NMEA 2000.
Package pgn provides generated Go types for decoding and encoding NMEA 2000 PGN messages.
Package pgn provides generated Go types for decoding and encoding NMEA 2000 PGN messages.
Package pgngen provides a DSL parser and code generators for NMEA 2000 PGN definitions.
Package pgngen provides a DSL parser and code generators for NMEA 2000 PGN definitions.
proto

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL