Documentation
¶
Overview ¶
Package server implements lplex, a CAN bus HTTP bridge for NMEA 2000.
Index ¶
- func BuildCANID(h CANHeader) uint32
- func CANReader(ctx context.Context, iface string, rxFrames chan<- RxFrame, ...) error
- func CANWriter(ctx context.Context, iface string, txFrames <-chan TxRequest, ...) error
- func FragmentFastPacket(data []byte, seqCounter uint8) [][]byte
- func IsFastPacket(pgn uint32) bool
- func ParseISO8601Duration(s string) (time.Duration, error)
- type Broker
- func (b *Broker) AckSession(id string, seq uint64) error
- func (b *Broker) CloseRx()
- func (b *Broker) ConnectSession(id string) (*ClientSession, bool)
- func (b *Broker) CreateSession(id string, bufferTimeout time.Duration, filter *EventFilter) (*ClientSession, uint64)
- func (b *Broker) CurrentSeq() uint64
- func (b *Broker) DisconnectSession(id string)
- func (b *Broker) Replay(afterSeq uint64, filter *EventFilter) [][]byte
- func (b *Broker) Run()
- func (b *Broker) RxFrames() chan<- RxFrame
- func (b *Broker) Subscribe(filter *EventFilter) (*subscriber, func())
- func (b *Broker) TxFrames() <-chan TxRequest
- type BrokerConfig
- type CANHeader
- type ClientSession
- type Device
- type DeviceRegistry
- func (r *DeviceRegistry) Get(source uint8) *Device
- func (r *DeviceRegistry) HandleAddressClaim(source uint8, data []byte) *Device
- func (r *DeviceRegistry) HandleProductInfo(source uint8, data []byte) *Device
- func (r *DeviceRegistry) RecordPacket(source uint8, ts time.Time, dataLen int) bool
- func (r *DeviceRegistry) Snapshot() []Device
- func (r *DeviceRegistry) SnapshotJSON() json.RawMessage
- type EventFilter
- type FastPacketAssembler
- type RxFrame
- type Server
- type TxRequest
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func BuildCANID ¶
BuildCANID constructs a 29-bit CAN identifier from a CANHeader. Inverse of ParseCANID: BuildCANID(ParseCANID(x)) == x for valid IDs.
func CANReader ¶
func CANReader(ctx context.Context, iface string, rxFrames chan<- RxFrame, logger *slog.Logger) error
CANReader reads frames from SocketCAN, reassembles fast-packets, and sends completed frames to the broker's rxFrames channel.
func CANWriter ¶
func CANWriter(ctx context.Context, iface string, txFrames <-chan TxRequest, logger *slog.Logger) error
CANWriter reads from the broker's txFrames channel and writes to SocketCAN. Handles fast-packet fragmentation for payloads > 8 bytes.
func FragmentFastPacket ¶
FragmentFastPacket splits a payload into CAN frames for fast-packet TX. seqCounter should be incremented per transfer (wraps at 7, 3-bit field). Returns a slice of 8-byte CAN frame payloads.
func IsFastPacket ¶
IsFastPacket returns true if the PGN uses fast-packet transfer.
Types ¶
type Broker ¶
type Broker struct {
// contains filtered or unexported fields
}
Broker is the central coordinator. Single goroutine reads from rxFrames, assigns sequence numbers, appends to ring buffer, updates device registry, and fans out to client sessions and ephemeral subscribers.
func NewBroker ¶
func NewBroker(cfg BrokerConfig) *Broker
NewBroker creates a new broker with the given config.
func (*Broker) AckSession ¶
AckSession updates the cursor for a session.
func (*Broker) CloseRx ¶
func (b *Broker) CloseRx()
CloseRx closes the rxFrames channel, signaling the broker to stop processing.
func (*Broker) ConnectSession ¶
func (b *Broker) ConnectSession(id string) (*ClientSession, bool)
ConnectSession marks a session as connected and returns its channel.
func (*Broker) CreateSession ¶
func (b *Broker) CreateSession(id string, bufferTimeout time.Duration, filter *EventFilter) (*ClientSession, uint64)
CreateSession creates or retrieves a client session. Returns the session and the current head sequence number.
When bufferTimeout is 0, the session cursor is reset so no frames are replayed on the next connect (fresh start).
func (*Broker) CurrentSeq ¶
CurrentSeq returns the most recently assigned sequence number.
func (*Broker) DisconnectSession ¶
DisconnectSession marks a session as disconnected.
func (*Broker) Replay ¶
func (b *Broker) Replay(afterSeq uint64, filter *EventFilter) [][]byte
Replay returns buffered entries from afterSeq+1 up to the current head, filtered by the given EventFilter. Device-based filter criteria are resolved to source addresses before taking the ring lock to avoid deadlocks.
func (*Broker) Run ¶
func (b *Broker) Run()
Run is the broker's main loop. Call in its own goroutine. Exits when rxFrames is closed.
func (*Broker) RxFrames ¶
RxFrames returns the channel for submitting received CAN frames to the broker.
func (*Broker) Subscribe ¶
func (b *Broker) Subscribe(filter *EventFilter) (*subscriber, func())
Subscribe creates an ephemeral fan-out channel with the given filter. Returns the subscriber and a cleanup function that must be called on disconnect.
type BrokerConfig ¶
type BrokerConfig struct {
RingSize int // must be power of 2
MaxBufferDuration time.Duration // cap on client buffer_timeout
Logger *slog.Logger
}
BrokerConfig holds broker configuration.
type CANHeader ¶
type CANHeader struct {
Priority uint8
PGN uint32
Source uint8
Destination uint8 // 0xFF for broadcast (PDU2)
}
CANHeader holds the parsed fields from a 29-bit CAN ID.
func ParseCANID ¶
ParseCANID extracts priority, PGN, source address, and destination from a 29-bit CAN identifier per NMEA 2000 / ISO 11783.
CAN ID bit layout (29 bits):
bits 28-26: priority (3 bits) bit 25: reserved (always 0 on NMEA 2000) bit 24: data page (DP) bits 23-16: PDU format (PF) bits 15-8: PDU specific (PS) bits 7-0: source address
PF >= 240 (PDU2, broadcast): PGN = DP<<16 | PF<<8 | PS PF < 240 (PDU1, addressed): PGN = DP<<16 | PF<<8, PS = destination
type ClientSession ¶
type ClientSession struct {
ID string
Cursor uint64 // last ACK'd sequence number (0 = never ACK'd)
BufferTimeout time.Duration // how long to keep buffering after disconnect
LastActivity time.Time
Ch chan []byte // buffered channel for SSE fan-out
Connected bool
Filter *EventFilter // nil = receive all frames
}
ClientSession tracks a connected or recently-disconnected client.
type Device ¶
type Device struct {
Source uint8 `json:"src"`
NAME uint64 `json:"-"`
NAMEHex string `json:"name"`
Manufacturer string `json:"manufacturer"`
ManufacturerCode uint16 `json:"manufacturer_code"`
DeviceClass uint8 `json:"device_class"`
DeviceFunction uint8 `json:"device_function"`
DeviceInstance uint8 `json:"device_instance"`
UniqueNumber uint32 `json:"unique_number,omitempty"`
// PGN 126996 Product Information fields.
ModelID string `json:"model_id,omitempty"`
SoftwareVersion string `json:"software_version,omitempty"`
ModelVersion string `json:"model_version,omitempty"`
ModelSerial string `json:"model_serial,omitempty"`
ProductCode uint16 `json:"product_code,omitempty"`
// Per-source packet statistics.
FirstSeen time.Time `json:"first_seen"`
LastSeen time.Time `json:"last_seen"`
PacketCount uint64 `json:"packet_count"`
ByteCount uint64 `json:"byte_count"`
}
Device represents an NMEA 2000 device discovered via ISO Address Claim (PGN 60928) and optionally enriched with Product Information (PGN 126996).
type DeviceRegistry ¶
type DeviceRegistry struct {
// contains filtered or unexported fields
}
DeviceRegistry tracks NMEA 2000 devices discovered via PGN 60928. Thread-safe for concurrent reads (SSE streams) and writes (broker goroutine).
func NewDeviceRegistry ¶
func NewDeviceRegistry() *DeviceRegistry
NewDeviceRegistry creates an empty device registry.
func (*DeviceRegistry) Get ¶
func (r *DeviceRegistry) Get(source uint8) *Device
Get returns a snapshot of the device at the given source address, or nil.
func (*DeviceRegistry) HandleAddressClaim ¶
func (r *DeviceRegistry) HandleAddressClaim(source uint8, data []byte) *Device
HandleAddressClaim processes a PGN 60928 ISO Address Claim. Returns the device if this is a new or changed device, nil otherwise.
func (*DeviceRegistry) HandleProductInfo ¶
func (r *DeviceRegistry) HandleProductInfo(source uint8, data []byte) *Device
HandleProductInfo processes a PGN 126996 Product Information response. Returns the device if fields changed, nil if source is unknown or unchanged.
func (*DeviceRegistry) RecordPacket ¶
RecordPacket updates per-source packet statistics. Returns true if this is a previously unseen source address.
func (*DeviceRegistry) Snapshot ¶
func (r *DeviceRegistry) Snapshot() []Device
Snapshot returns a copy of all known devices.
func (*DeviceRegistry) SnapshotJSON ¶
func (r *DeviceRegistry) SnapshotJSON() json.RawMessage
SnapshotJSON returns the device list as pre-serialized JSON.
type EventFilter ¶
type EventFilter struct {
PGNs []uint32
Manufacturers []string
Instances []uint8
Names []uint64 // 64-bit CAN NAMEs
}
EventFilter specifies which CAN frames a session receives. Categories are AND'd (all set categories must match), values within a category are OR'd (any value in the list matches).
func (*EventFilter) IsEmpty ¶
func (f *EventFilter) IsEmpty() bool
IsEmpty returns true if no filter criteria are set.
type FastPacketAssembler ¶
type FastPacketAssembler struct {
// contains filtered or unexported fields
}
FastPacketAssembler reassembles multi-frame fast-packet PGNs.
Fast-packet protocol:
- Frame 0: byte[0] = seq_counter(3 bits) | frame_number(5 bits), byte[1] = total_bytes, bytes[2:8] = first 6 data bytes
- Frame N: byte[0] = seq_counter(3 bits) | frame_number(5 bits), bytes[1:8] = next 7 data bytes
func NewFastPacketAssembler ¶
func NewFastPacketAssembler(timeout time.Duration) *FastPacketAssembler
NewFastPacketAssembler creates a new assembler with the given reassembly timeout.
func (*FastPacketAssembler) Process ¶
Process handles a CAN frame that is part of a fast-packet transfer. Returns the complete reassembled payload when all frames are received, nil otherwise.
func (*FastPacketAssembler) PurgeStale ¶
func (a *FastPacketAssembler) PurgeStale(now time.Time)
PurgeStale removes any in-progress assemblies older than the timeout.