server

package
v0.1.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2026 License: MIT Imports: 14 Imported by: 0

Documentation

Overview

Package server implements lplex, a CAN bus HTTP bridge for NMEA 2000.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func BuildCANID

func BuildCANID(h CANHeader) uint32

BuildCANID constructs a 29-bit CAN identifier from a CANHeader. Inverse of ParseCANID: BuildCANID(ParseCANID(x)) == x for valid IDs.

func CANReader

func CANReader(ctx context.Context, iface string, rxFrames chan<- RxFrame, logger *slog.Logger) error

CANReader reads frames from SocketCAN, reassembles fast-packets, and sends completed frames to the broker's rxFrames channel.

func CANWriter

func CANWriter(ctx context.Context, iface string, txFrames <-chan TxRequest, logger *slog.Logger) error

CANWriter reads from the broker's txFrames channel and writes to SocketCAN. Handles fast-packet fragmentation for payloads > 8 bytes.

func FragmentFastPacket

func FragmentFastPacket(data []byte, seqCounter uint8) [][]byte

FragmentFastPacket splits a payload into CAN frames for fast-packet TX. seqCounter should be incremented per transfer (wraps at 7, 3-bit field). Returns a slice of 8-byte CAN frame payloads.

func IsFastPacket

func IsFastPacket(pgn uint32) bool

IsFastPacket returns true if the PGN uses fast-packet transfer.

func ParseISO8601Duration

func ParseISO8601Duration(s string) (time.Duration, error)

ParseISO8601Duration parses a subset of ISO 8601 durations (PT format). Supports hours (H), minutes (M), and seconds (S). Examples: "PT5M", "PT1H30M", "PT30S", "PT1H"

Types

type Broker

type Broker struct {
	// contains filtered or unexported fields
}

Broker is the central coordinator. Single goroutine reads from rxFrames, assigns sequence numbers, appends to ring buffer, updates device registry, and fans out to client sessions and ephemeral subscribers.

func NewBroker

func NewBroker(cfg BrokerConfig) *Broker

NewBroker creates a new broker with the given config.

func (*Broker) AckSession

func (b *Broker) AckSession(id string, seq uint64) error

AckSession updates the cursor for a session.

func (*Broker) CloseRx

func (b *Broker) CloseRx()

CloseRx closes the rxFrames channel, signaling the broker to stop processing.

func (*Broker) ConnectSession

func (b *Broker) ConnectSession(id string) (*ClientSession, bool)

ConnectSession marks a session as connected and returns its channel.

func (*Broker) CreateSession

func (b *Broker) CreateSession(id string, bufferTimeout time.Duration, filter *EventFilter) (*ClientSession, uint64)

CreateSession creates or retrieves a client session. Returns the session and the current head sequence number.

When bufferTimeout is 0, the session cursor is reset so no frames are replayed on the next connect (fresh start).

func (*Broker) CurrentSeq

func (b *Broker) CurrentSeq() uint64

CurrentSeq returns the most recently assigned sequence number.

func (*Broker) DisconnectSession

func (b *Broker) DisconnectSession(id string)

DisconnectSession marks a session as disconnected.

func (*Broker) Replay

func (b *Broker) Replay(afterSeq uint64, filter *EventFilter) [][]byte

Replay returns buffered entries from afterSeq+1 up to the current head, filtered by the given EventFilter. Device-based filter criteria are resolved to source addresses before taking the ring lock to avoid deadlocks.

func (*Broker) Run

func (b *Broker) Run()

Run is the broker's main loop. Call in its own goroutine. Exits when rxFrames is closed.

func (*Broker) RxFrames

func (b *Broker) RxFrames() chan<- RxFrame

RxFrames returns the channel for submitting received CAN frames to the broker.

func (*Broker) Subscribe

func (b *Broker) Subscribe(filter *EventFilter) (*subscriber, func())

Subscribe creates an ephemeral fan-out channel with the given filter. Returns the subscriber and a cleanup function that must be called on disconnect.

func (*Broker) TxFrames

func (b *Broker) TxFrames() <-chan TxRequest

TxFrames returns the channel for reading CAN frames to transmit.

type BrokerConfig

type BrokerConfig struct {
	RingSize          int           // must be power of 2
	MaxBufferDuration time.Duration // cap on client buffer_timeout
	Logger            *slog.Logger
}

BrokerConfig holds broker configuration.

type CANHeader

type CANHeader struct {
	Priority    uint8
	PGN         uint32
	Source      uint8
	Destination uint8 // 0xFF for broadcast (PDU2)
}

CANHeader holds the parsed fields from a 29-bit CAN ID.

func ParseCANID

func ParseCANID(id uint32) CANHeader

ParseCANID extracts priority, PGN, source address, and destination from a 29-bit CAN identifier per NMEA 2000 / ISO 11783.

CAN ID bit layout (29 bits):

bits 28-26: priority (3 bits)
bit  25:    reserved (always 0 on NMEA 2000)
bit  24:    data page (DP)
bits 23-16: PDU format (PF)
bits 15-8:  PDU specific (PS)
bits 7-0:   source address

PF >= 240 (PDU2, broadcast): PGN = DP<<16 | PF<<8 | PS PF < 240 (PDU1, addressed): PGN = DP<<16 | PF<<8, PS = destination

type ClientSession

type ClientSession struct {
	ID            string
	Cursor        uint64        // last ACK'd sequence number (0 = never ACK'd)
	BufferTimeout time.Duration // how long to keep buffering after disconnect
	LastActivity  time.Time
	Ch            chan []byte // buffered channel for SSE fan-out
	Connected     bool
	Filter        *EventFilter // nil = receive all frames
}

ClientSession tracks a connected or recently-disconnected client.

type Device

type Device struct {
	Source           uint8  `json:"src"`
	NAME             uint64 `json:"-"`
	NAMEHex          string `json:"name"`
	Manufacturer     string `json:"manufacturer"`
	ManufacturerCode uint16 `json:"manufacturer_code"`
	DeviceClass      uint8  `json:"device_class"`
	DeviceFunction   uint8  `json:"device_function"`
	DeviceInstance   uint8  `json:"device_instance"`
	UniqueNumber     uint32 `json:"unique_number,omitempty"`

	// PGN 126996 Product Information fields.
	ModelID         string `json:"model_id,omitempty"`
	SoftwareVersion string `json:"software_version,omitempty"`
	ModelVersion    string `json:"model_version,omitempty"`
	ModelSerial     string `json:"model_serial,omitempty"`
	ProductCode     uint16 `json:"product_code,omitempty"`

	// Per-source packet statistics.
	FirstSeen   time.Time `json:"first_seen"`
	LastSeen    time.Time `json:"last_seen"`
	PacketCount uint64    `json:"packet_count"`
	ByteCount   uint64    `json:"byte_count"`
}

Device represents an NMEA 2000 device discovered via ISO Address Claim (PGN 60928) and optionally enriched with Product Information (PGN 126996).

type DeviceRegistry

type DeviceRegistry struct {
	// contains filtered or unexported fields
}

DeviceRegistry tracks NMEA 2000 devices discovered via PGN 60928. Thread-safe for concurrent reads (SSE streams) and writes (broker goroutine).

func NewDeviceRegistry

func NewDeviceRegistry() *DeviceRegistry

NewDeviceRegistry creates an empty device registry.

func (*DeviceRegistry) Get

func (r *DeviceRegistry) Get(source uint8) *Device

Get returns a snapshot of the device at the given source address, or nil.

func (*DeviceRegistry) HandleAddressClaim

func (r *DeviceRegistry) HandleAddressClaim(source uint8, data []byte) *Device

HandleAddressClaim processes a PGN 60928 ISO Address Claim. Returns the device if this is a new or changed device, nil otherwise.

func (*DeviceRegistry) HandleProductInfo

func (r *DeviceRegistry) HandleProductInfo(source uint8, data []byte) *Device

HandleProductInfo processes a PGN 126996 Product Information response. Returns the device if fields changed, nil if source is unknown or unchanged.

func (*DeviceRegistry) RecordPacket

func (r *DeviceRegistry) RecordPacket(source uint8, ts time.Time, dataLen int) bool

RecordPacket updates per-source packet statistics. Returns true if this is a previously unseen source address.

func (*DeviceRegistry) Snapshot

func (r *DeviceRegistry) Snapshot() []Device

Snapshot returns a copy of all known devices.

func (*DeviceRegistry) SnapshotJSON

func (r *DeviceRegistry) SnapshotJSON() json.RawMessage

SnapshotJSON returns the device list as pre-serialized JSON.

type EventFilter

type EventFilter struct {
	PGNs          []uint32
	Manufacturers []string
	Instances     []uint8
	Names         []uint64 // 64-bit CAN NAMEs
}

EventFilter specifies which CAN frames a session receives. Categories are AND'd (all set categories must match), values within a category are OR'd (any value in the list matches).

func (*EventFilter) IsEmpty

func (f *EventFilter) IsEmpty() bool

IsEmpty returns true if no filter criteria are set.

type FastPacketAssembler

type FastPacketAssembler struct {
	// contains filtered or unexported fields
}

FastPacketAssembler reassembles multi-frame fast-packet PGNs.

Fast-packet protocol:

  • Frame 0: byte[0] = seq_counter(3 bits) | frame_number(5 bits), byte[1] = total_bytes, bytes[2:8] = first 6 data bytes
  • Frame N: byte[0] = seq_counter(3 bits) | frame_number(5 bits), bytes[1:8] = next 7 data bytes

func NewFastPacketAssembler

func NewFastPacketAssembler(timeout time.Duration) *FastPacketAssembler

NewFastPacketAssembler creates a new assembler with the given reassembly timeout.

func (*FastPacketAssembler) Process

func (a *FastPacketAssembler) Process(pgn uint32, source uint8, data []byte, now time.Time) []byte

Process handles a CAN frame that is part of a fast-packet transfer. Returns the complete reassembled payload when all frames are received, nil otherwise.

func (*FastPacketAssembler) PurgeStale

func (a *FastPacketAssembler) PurgeStale(now time.Time)

PurgeStale removes any in-progress assemblies older than the timeout.

type RxFrame

type RxFrame struct {
	Timestamp time.Time
	Header    CANHeader
	Data      []byte
}

RxFrame is a reassembled CAN frame ready for the broker.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server handles HTTP API requests for lplex.

func NewServer

func NewServer(broker *Broker, logger *slog.Logger) *Server

NewServer creates a new HTTP server wired to the given broker.

func (*Server) ServeHTTP

func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request)

ServeHTTP implements http.Handler.

type TxRequest

type TxRequest struct {
	Header CANHeader
	Data   []byte
}

TxRequest is a frame to write to the CAN bus.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL