ipc

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 9, 2026 License: Apache-2.0 Imports: 13 Imported by: 0

Documentation

Overview

Package ipc implements the cli-wrapper inter-process communication layer.

It provides a length-prefixed framed protocol over a Unix Domain Socket. Each frame carries a 17-byte header, a MessagePack-encoded payload, and a monotonic sequence number. The package also implements a persistent outbox with a write-ahead log (WAL) so that message delivery survives transient connection loss and producer bursts.

This package is consumed by both the host-side controller and the cliwrap-agent subprocess.

Index

Constants

View Source
const (
	// MagicByte0 and MagicByte1 mark the start of every frame.
	// A mismatch triggers immediate connection termination.
	MagicByte0 byte = 0xC1
	MagicByte1 byte = 0xD9

	// ProtocolVersion is the major version of the wire protocol.
	// Incompatible changes must bump this value.
	ProtocolVersion byte = 0x01

	// HeaderSize is the fixed size of every frame header.
	// Layout: Magic(2) + Version(1) + MsgType(1) + Flags(1) + SeqNo(8) + Length(4)
	HeaderSize = 17

	// MaxPayloadSize is the largest payload a single frame may carry.
	// Anything larger must be delivered via FILE_REF.
	MaxPayloadSize uint32 = 16 * 1024 * 1024
)

Protocol constants.

Variables

View Source
var (
	ErrShortHeader        = errors.New("ipc: frame header is too short")
	ErrInvalidMagic       = errors.New("ipc: frame magic bytes mismatch")
	ErrUnsupportedVersion = errors.New("ipc: unsupported protocol version")
	ErrFrameTooLarge      = errors.New("ipc: frame length exceeds maximum")
)

Sentinel decode errors.

View Source
var ErrLengthMismatch = errors.New("ipc: header length does not match payload size")

ErrLengthMismatch indicates that the header's Length field does not match the size of the payload slice passed to WriteFrame.

View Source
var ErrNilDecodeTarget = errors.New("ipc: decode target is nil")

ErrNilDecodeTarget is returned by DecodePayload when v is nil.

View Source
var ErrWALFull = errors.New("ipc: WAL size cap reached")

ErrWALFull indicates the WAL has reached its configured size cap.

Functions

func DecodePayload

func DecodePayload(data []byte, v any) error

DecodePayload deserializes MessagePack bytes into v. v must be a non-nil pointer to the destination struct.

func EncodePayload

func EncodePayload(v any) ([]byte, error)

EncodePayload serializes v to MessagePack bytes.

Types

type AckPayload

type AckPayload struct {
	AckedSeq uint64 `msgpack:"ack"`
}

AckPayload acknowledges a specific sequence number.

type AckTracker

type AckTracker struct {
	// contains filtered or unexported fields
}

AckTracker maintains the set of sequence numbers for which the sender is still awaiting receiver acknowledgment. It is safe for concurrent use.

func NewAckTracker

func NewAckTracker() *AckTracker

NewAckTracker returns an empty tracker.

func (*AckTracker) Ack

func (a *AckTracker) Ack(seq uint64)

Ack removes seq from the pending set if present.

func (*AckTracker) MarkPending

func (a *AckTracker) MarkPending(seq uint64)

MarkPending records that seq has been sent and is awaiting ACK.

func (*AckTracker) MaxPendingSeq

func (a *AckTracker) MaxPendingSeq() uint64

MaxPendingSeq returns the highest seq awaiting ACK, or 0 if none.

func (*AckTracker) Pending

func (a *AckTracker) Pending() []uint64

Pending returns a sorted snapshot of every seq still awaiting ACK.

type AgentFatalPayload

type AgentFatalPayload struct {
	Reason string `msgpack:"reason"`
	Stack  string `msgpack:"stack"`
}

AgentFatalPayload announces imminent agent termination.

type ChildErrorPayload

type ChildErrorPayload struct {
	Phase   string `msgpack:"phase"`
	Message string `msgpack:"msg"`
}

ChildErrorPayload reports an exec/runtime error from inside the agent.

type ChildExitedPayload

type ChildExitedPayload struct {
	PID      int32  `msgpack:"pid"`
	ExitCode int32  `msgpack:"code"`
	Signal   int32  `msgpack:"sig"`
	ExitedAt int64  `msgpack:"at"`
	Reason   string `msgpack:"reason"`
}

ChildExitedPayload signals that the child process has terminated.

type ChildStartedPayload

type ChildStartedPayload struct {
	PID       int32 `msgpack:"pid"`
	StartedAt int64 `msgpack:"at"`
}

ChildStartedPayload signals that exec succeeded.

type Conn

type Conn struct {
	// contains filtered or unexported fields
}

Conn is a full-duplex, framed, ACK-aware peer. It owns:

  • a Spiller (outbox + WAL) for outgoing frames
  • a writer goroutine that drains the outbox to the wire
  • a reader goroutine that pushes inbound frames to the handler

func NewConn

func NewConn(cfg ConnConfig) (*Conn, error)

NewConn constructs a Conn. Call Start() to launch the reader/writer goroutines.

func (*Conn) Close

func (c *Conn) Close(ctx context.Context) error

Close gracefully stops reader/writer goroutines and releases resources. It is idempotent. The ctx is used only to bound the spiller close; the goroutine join (wg.Wait) is always unconditional so that goleak never observes lingering goroutines.

func (*Conn) OnMessage

func (c *Conn) OnMessage(h MessageHandler)

OnMessage sets the handler invoked for each incoming frame. Subsequent calls replace the previous handler atomically.

func (*Conn) Send

func (c *Conn) Send(msg OutboxMessage) bool

Send enqueues msg for delivery. Returns false if the conn is closed.

func (*Conn) Seqs

func (c *Conn) Seqs() *SeqGenerator

Seqs exposes the outbound sequence-number generator so higher layers can attach seqs to frames they build.

func (*Conn) Start

func (c *Conn) Start()

Start launches the reader and writer goroutines. It is safe to call more than once; subsequent calls are no-ops.

type ConnConfig

type ConnConfig struct {
	// RWC is the underlying bidirectional stream (net.Conn, net.Pipe, etc.).
	RWC io.ReadWriteCloser

	// SpillerDir holds the WAL for the outgoing direction.
	SpillerDir string

	// Capacity is the in-memory outbox capacity.
	Capacity int

	// WALBytes is the WAL size cap.
	WALBytes int64

	// MaxRecvPayload caps inbound frame size. If zero, MaxPayloadSize is used.
	MaxRecvPayload uint32
}

ConnConfig parameterises NewConn.

type DedupTracker

type DedupTracker struct {
	// contains filtered or unexported fields
}

DedupTracker remembers the highest observed sequence number (watermark). A frame is considered duplicate iff seq <= lastSeen. This approach uses O(1) memory regardless of connection lifetime, which is safe for long-running connections (spec §2.7).

It is safe for concurrent use.

func NewDedupTracker

func NewDedupTracker() *DedupTracker

NewDedupTracker returns an empty tracker.

func (*DedupTracker) Seen

func (d *DedupTracker) Seen(seq uint64) bool

Seen reports whether seq has already been observed. If seq is new (greater than the current watermark), it advances the watermark and returns false.

type Flags

type Flags uint8

Flags is the bitmask carried in every frame header.

const (
	FlagAckRequired Flags = 0x01
	FlagIsReplay    Flags = 0x02
	FlagFileRef     Flags = 0x04
)

Flag bits.

func (Flags) Has

func (f Flags) Has(mask Flags) bool

Has reports whether all bits in mask are set.

type FrameReader

type FrameReader struct {
	// contains filtered or unexported fields
}

FrameReader reads framed messages from an io.Reader. It is NOT safe for concurrent use; callers must serialize ReadFrame calls.

func NewFrameReader

func NewFrameReader(r io.Reader, maxPayload uint32) *FrameReader

NewFrameReader returns a FrameReader that reads from r and rejects frames with Length > maxPayload. Callers should pass MaxPayloadSize unless they want an even tighter bound.

func (*FrameReader) ReadFrame

func (fr *FrameReader) ReadFrame() (Header, []byte, error)

ReadFrame reads exactly one frame from the underlying reader. It returns the decoded header, the payload bytes (may be empty), and an error. On io.EOF with no bytes consumed the error is io.EOF. On partial consumption the error is io.ErrUnexpectedEOF.

type FrameWriter

type FrameWriter struct {
	// contains filtered or unexported fields
}

FrameWriter serializes frames to an io.Writer. It is NOT safe for concurrent use; callers must serialize WriteFrame calls.

func NewFrameWriter

func NewFrameWriter(w io.Writer) *FrameWriter

NewFrameWriter returns a FrameWriter that writes to w.

func (*FrameWriter) WriteFrame

func (fw *FrameWriter) WriteFrame(h Header, payload []byte) (int, error)

WriteFrame serializes h followed by payload to the underlying writer. It returns the total number of bytes written (header + payload) on success.

type Header struct {
	MsgType MsgType
	Flags   Flags
	SeqNo   uint64
	Length  uint32
}

Header is the fixed-size frame prefix that precedes every payload.

func (*Header) Decode

func (h *Header) Decode(src []byte) error

Decode parses the header from src. src must be at least HeaderSize bytes.

func (Header) Encode

func (h Header) Encode(dst []byte)

Encode serializes the header into dst. dst must be at least HeaderSize bytes.

type HelloAckPayload

type HelloAckPayload struct {
	ProtocolVersion uint8    `msgpack:"v"`
	Capabilities    []string `msgpack:"caps"`
}

HelloAckPayload is the agent's handshake response.

type HelloPayload

type HelloPayload struct {
	ProtocolVersion uint8  `msgpack:"v"`
	AgentID         string `msgpack:"id"`
}

HelloPayload is the handshake initiator.

type LogChunkPayload

type LogChunkPayload struct {
	Stream uint8  `msgpack:"s"`
	SeqNo  uint64 `msgpack:"n"`
	Data   []byte `msgpack:"d"`
}

LogChunkPayload carries a stdout/stderr fragment inline. Stream: 0=stdout, 1=stderr, 2=diag (debug mode only)

type LogFileRefPayload

type LogFileRefPayload struct {
	Stream   uint8  `msgpack:"s"`
	Path     string `msgpack:"p"`
	Size     uint64 `msgpack:"sz"`
	Checksum string `msgpack:"ck"`
}

LogFileRefPayload references an out-of-band log file on disk.

type MessageHandler

type MessageHandler func(OutboxMessage)

MessageHandler is invoked for every frame that Conn receives. It MUST NOT block; handlers should forward work to a buffered channel.

type MsgType

type MsgType uint8

MsgType identifies the payload shape of a frame.

const (
	MsgHello       MsgType = 0x01
	MsgStartChild  MsgType = 0x02
	MsgStopChild   MsgType = 0x03
	MsgSignalChild MsgType = 0x04
	MsgWriteStdin  MsgType = 0x05
	MsgSetLogLevel MsgType = 0x06
	MsgPing        MsgType = 0x07
	MsgShutdown    MsgType = 0x08
	MsgAckControl  MsgType = 0x0A // host ACKs a data-plane message
	MsgResume      MsgType = 0x0B
)

Control-plane message types (host -> agent).

const (
	MsgHelloAck       MsgType = 0x81
	MsgChildStarted   MsgType = 0x82
	MsgChildExited    MsgType = 0x83
	MsgLogChunk       MsgType = 0x84
	MsgLogFileRef     MsgType = 0x85
	MsgChildError     MsgType = 0x86
	MsgPong           MsgType = 0x87
	MsgResourceSample MsgType = 0x88
	MsgAgentFatal     MsgType = 0x89
	MsgAckData        MsgType = 0x8A // agent ACKs a control-plane message
)

Data-plane message types (agent -> host).

type Outbox

type Outbox struct {
	// contains filtered or unexported fields
}

Outbox is a bounded FIFO with an optional disk spill handler. Enqueue never blocks the producer; when the in-memory slots are full it invokes the configured SpillFunc synchronously.

Shutdown is signalled via a dedicated `done` channel — the data channel `ch` is never closed directly, which eliminates the "send on closed channel" race between Enqueue and Close.

func NewOutbox

func NewOutbox(capacity int, spill SpillFunc) *Outbox

NewOutbox returns an Outbox with the given in-memory capacity and spill handler. If spill is nil, overflow enqueues are dropped by returning false.

func (*Outbox) Close

func (o *Outbox) Close()

Close shuts the outbox down. Subsequent Enqueue calls return false and Dequeue callers are unblocked with ok=false once all remaining messages are drained.

func (*Outbox) Dequeue

func (o *Outbox) Dequeue(timeout time.Duration) (OutboxMessage, bool)

Dequeue blocks until a message is available, the timeout elapses, or the outbox is closed. The second return value indicates success.

func (*Outbox) Enqueue

func (o *Outbox) Enqueue(msg OutboxMessage) bool

Enqueue attempts to add msg to the queue. Returns false if the outbox is closed or the SpillFunc (when the channel is full) returned an error.

func (*Outbox) InjectFront

func (o *Outbox) InjectFront(msg OutboxMessage) bool

InjectFront pushes a message to the front of the queue, bypassing the normal capacity check. This is used by the spiller to replay WAL entries into the in-memory queue.

type OutboxMessage

type OutboxMessage struct {
	Header  Header
	Payload []byte
}

OutboxMessage is one unit of work in an Outbox.

type ResourceSamplePayload

type ResourceSamplePayload struct {
	PID        int32   `msgpack:"pid"`
	RSS        uint64  `msgpack:"rss"`
	CPUPercent float64 `msgpack:"cpu"`
	Threads    int32   `msgpack:"thr"`
	FDs        int32   `msgpack:"fds"`
}

ResourceSamplePayload carries a periodic resource sample.

type ResumePayload

type ResumePayload struct {
	LastAckedSeq uint64 `msgpack:"last"`
}

ResumePayload exchanges last-seen sequence numbers on reconnect.

type SeqGenerator

type SeqGenerator struct {
	// contains filtered or unexported fields
}

SeqGenerator produces monotonically increasing sequence numbers. It is safe for concurrent use.

func NewSeqGenerator

func NewSeqGenerator(start uint64) *SeqGenerator

NewSeqGenerator returns a generator whose first Next() returns start. If start is 0, it is clamped to 1 to avoid uint64 underflow.

func (*SeqGenerator) Next

func (g *SeqGenerator) Next() uint64

Next returns the next sequence number.

type SetLogLevelPayload

type SetLogLevelPayload struct {
	Level uint8 `msgpack:"lvl"` // matches DebugMode
}

SetLogLevelPayload toggles the agent's log verbosity.

type SignalChildPayload

type SignalChildPayload struct {
	Signal int32 `msgpack:"sig"`
}

SignalChildPayload asks the agent to deliver a specific signal.

type SpillFunc

type SpillFunc func(OutboxMessage) error

SpillFunc is invoked when the in-memory queue is full. Implementations typically persist the message to a WAL. Returning a non-nil error causes the Outbox.Enqueue call to fail.

type Spiller

type Spiller struct {
	// contains filtered or unexported fields
}

Spiller couples an in-memory Outbox with a disk-backed WAL so that producers never block and messages are never lost.

func NewSpiller

func NewSpiller(dir string, capacity int, walCap int64) (*Spiller, error)

NewSpiller returns a Spiller whose Outbox has the given capacity and whose WAL lives under dir with a byte cap of walCap.

func (*Spiller) Ack

func (s *Spiller) Ack(ackedSeq uint64) error

Ack retires every WAL record with SeqNo <= ackedSeq.

func (*Spiller) Close

func (s *Spiller) Close() error

Close releases both the outbox and the WAL.

func (*Spiller) Outbox

func (s *Spiller) Outbox() *Outbox

Outbox exposes the underlying in-memory queue for producers and consumers.

func (*Spiller) ReplayInto

func (s *Spiller) ReplayInto(target *Outbox) error

ReplayInto reads WAL records and feeds them back into target in order. Any record that cannot fit into the in-memory queue is left in the WAL for a later replay.

func (*Spiller) WALReplayForTest

func (s *Spiller) WALReplayForTest() ([]OutboxMessage, error)

WALReplayForTest exposes the Spiller's WAL replay to external test packages (e.g. test/chaos). It lives in a non-test file because Go's _test.go files are only visible to the same package's tests, but chaos tests live in a separate module subdirectory and import ipc as a regular dependency.

This method is safe for concurrent use with Ack/ReplayInto and exists solely to let chaos tests inspect unacked WAL contents after forced disconnects.

type StartChildPayload

type StartChildPayload struct {
	Command     string            `msgpack:"cmd"`
	Args        []string          `msgpack:"args"`
	Env         map[string]string `msgpack:"env"`
	WorkDir     string            `msgpack:"wd"`
	StdinMode   uint8             `msgpack:"stdin"` // 0=none, 1=pipe, 2=inherit
	StopTimeout int64             `msgpack:"stop_to_ms"`
}

StartChildPayload instructs the agent to fork/exec the target CLI.

type StopChildPayload

type StopChildPayload struct {
	TimeoutMs int64 `msgpack:"to_ms"`
}

StopChildPayload requests graceful termination of the child.

type WAL

type WAL struct {
	// contains filtered or unexported fields
}

WAL is an append-only log of OutboxMessage records. Records are framed on disk using the same 17-byte header format as the wire protocol so that replay is bit-for-bit compatible with the in-flight format.

The file layout is:

[ header(17) | payload(N) ] [ header(17) | payload(N) ] ...

Retire(seq) rewrites the file, keeping only records with Header.SeqNo > seq.

func OpenWAL

func OpenWAL(dir string, sizeCap int64) (*WAL, error)

OpenWAL opens or creates the WAL in dir with the given byte cap.

func (*WAL) Append

func (w *WAL) Append(msg OutboxMessage) error

Append writes msg to the WAL. It returns ErrWALFull if the byte cap would be exceeded.

func (*WAL) Close

func (w *WAL) Close() error

Close flushes and closes the WAL file.

func (*WAL) Replay

func (w *WAL) Replay() ([]OutboxMessage, error)

Replay returns every record currently in the WAL, in append order.

func (*WAL) Retire

func (w *WAL) Retire(ackedSeq uint64) error

Retire discards every record with Header.SeqNo <= ackedSeq. It does so by rewriting the WAL file in place.

The ordering is: rename(tmp, path) FIRST, then close(old fd), then reopen. This ensures the file at w.path is always valid even if the process crashes mid-Retire.

type WriteStdinPayload

type WriteStdinPayload struct {
	Data []byte `msgpack:"d"`
}

WriteStdinPayload forwards bytes to the child's stdin.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL