dataexchange

package module
v0.2.1-beta.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 29, 2026 License: AGPL-3.0 Imports: 17 Imported by: 0

README

dataexchange

ci codecov

Data-exchange plugin for the Pilot Protocol daemon. Listens on port 1001 and persists inbound frames under ~/.pilot/: files land in received/ and text/JSON/binary messages land in inbox/.

Install

import "github.com/pilot-protocol/dataexchange"

Usage

f := &dataexchange.Frame{Type: dataexchange.TypeJSON, Payload: body}
if err := dataexchange.WriteFrame(conn, f); err != nil {
    return err
}

// Register as a plugin on the daemon runtime:
rt.Register(dataexchange.NewService(dataexchange.ServiceConfig{}))

Layout

File What it does
dataexchange.go Wire format: Frame, WriteFrame, ReadFrame, TraceFrame, TypeText/Binary/JSON/File/Trace, TypeName.
client.go ClientDial and send helpers.
server.go Server — accept loop and handler dispatch.
service.go *Servicecoreapi.Service adapter. Build tag !no_dataexchange.
service_disabled.go Stub *Service for -tags no_dataexchange builds.

Wire format

[4-byte type][4-byte length][payload]

For TypeFile the payload is prefixed with [2-byte name length][name bytes]. For TypeTrace the payload is [4-byte inner_type][8-byte sent_at_ns][inner payload].

Max frame size: 256 MiB.

Build tags

Tag Effect
no_dataexchange Compiles a no-op stub whose Start does nothing. Useful for integration tests that don't want inbox files written.

License

AGPL-3.0-or-later. See LICENSE.

Documentation

Index

Constants

View Source
const (
	TypeText   uint32 = 1
	TypeBinary uint32 = 2
	TypeJSON   uint32 = 3
	TypeFile   uint32 = 4
	// TypeTrace wraps another frame type with nanosecond-precision timing.
	// Wire layout: [4-byte TypeTrace][4-byte len][4-byte inner_type][8-byte sent_at_ns][inner_payload]
	TypeTrace uint32 = 5
)

Frame types for data exchange on port 1001.

View Source
const MaxFrameSize = 1 << 28

MaxFrameSize caps a single data-exchange frame at 256 MiB. Sized to fit the test fleet's 100 MiB file payloads with margin while still rejecting pathological 500 MiB+ frames that would dominate memory.

Variables

This section is empty.

Functions

func TypeName

func TypeName(t uint32) string

TypeName returns a human-readable name for a frame type.

func WriteFrame

func WriteFrame(w io.Writer, f *Frame) error

WriteFrame writes a frame to a writer.

func WriteTraceFrame

func WriteTraceFrame(w io.Writer, tf *TraceFrame) error

WriteTraceFrame serialises a TraceFrame as a TypeTrace outer frame.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client connects to a remote data exchange service on port 1001.

func Dial

func Dial(d *driver.Driver, addr protocol.Addr) (*Client, error)

Dial connects to a remote agent's data exchange port.

func (*Client) Close

func (c *Client) Close() error

Close closes the connection.

func (*Client) Recv

func (c *Client) Recv() (*Frame, error)

Recv reads the next frame from the connection.

func (*Client) SendBinary

func (c *Client) SendBinary(data []byte) error

SendBinary sends a binary frame.

func (*Client) SendFile

func (c *Client) SendFile(name string, data []byte) error

SendFile sends a file frame with a filename and data.

func (*Client) SendJSON

func (c *Client) SendJSON(data []byte) error

SendJSON sends a JSON frame.

func (*Client) SendText

func (c *Client) SendText(text string) error

SendText sends a text frame.

func (*Client) SendTrace

func (c *Client) SendTrace(innerType uint32, data []byte) (sentAtNs int64, err error)

SendTrace wraps data in a TypeTrace frame with the current nanosecond clock. Returns sentAtNs so the caller can correlate it against the timing ACK.

type Frame

type Frame struct {
	Type     uint32
	Payload  []byte
	Filename string // only for TypeFile
}

Frame is a typed data unit exchanged between agents. Wire format: [4-byte type][4-byte length][payload] For TypeFile, payload is: [2-byte name length][name bytes][file data]

func ReadFrame

func ReadFrame(r io.Reader) (*Frame, error)

ReadFrame reads a frame from a reader.

type Handler

type Handler func(conn net.Conn, frame *Frame)

Handler is called for each incoming frame on a connection.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server listens on port 1001 and dispatches incoming frames to a handler.

func NewServer

func NewServer(d *driver.Driver, handler Handler) *Server

NewServer creates a data exchange server.

func (*Server) ListenAndServe

func (s *Server) ListenAndServe() error

ListenAndServe binds port 1001 and starts accepting connections.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is the L11 plugin adapter. Daemon (L7) holds it only as coreapi.Service; cmd/daemon/main.go (L12) constructs it.

func NewService

func NewService(cfg ServiceConfig) *Service

func (*Service) Name

func (s *Service) Name() string

func (*Service) Order

func (s *Service) Order() int

Order: 110 — after handshake (~70) and the trust subsystem (~50).

func (*Service) Start

func (s *Service) Start(ctx context.Context, deps coreapi.Deps) error

func (*Service) Stop

func (s *Service) Stop(ctx context.Context) error

type ServiceConfig

type ServiceConfig struct {
	ReceivedDir string
	InboxDir    string
	// IncludeBase64 adds a lossless `data_b64` field to inbox JSON
	// alongside `data`. Off by default — only enable when binary
	// payloads (e.g. zlib-compressed envelopes) need to round-trip
	// without UTF-8 mangling.
	IncludeBase64 bool
	// InboxMaxFiles caps the number of inbox files retained on disk.
	// On exceeding the cap, oldest files (by mtime) are evicted FIFO.
	// Zero or negative ⇒ default 10000. Without this cap, a
	// misbehaving peer or sustained inbound load fills the operator's
	// disk indefinitely.
	InboxMaxFiles int
}

ServiceConfig configures the daemon-side dataexchange handler. Both paths default to ~/.pilot/{received,inbox} when empty.

type TraceFrame

type TraceFrame struct {
	SentAtNs  int64
	InnerType uint32
	Payload   []byte
}

TraceFrame carries timing metadata around an inner message frame.

func ReadTracePayload

func ReadTracePayload(f *Frame) (*TraceFrame, error)

ReadTracePayload decodes a TraceFrame from a raw TypeTrace Frame.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL