Documentation
¶
Overview ¶
Package ipc implements the cli-wrapper inter-process communication layer.
It provides a length-prefixed framed protocol over a Unix Domain Socket. Each frame carries a 17-byte header, a MessagePack-encoded payload, and a monotonic sequence number. The package also implements a persistent outbox with a write-ahead log (WAL) so that message delivery survives transient connection loss and producer bursts.
This package is consumed by both the host-side controller and the cliwrap-agent subprocess.
Index ¶
- Constants
- Variables
- func DecodePayload(data []byte, v any) error
- func EncodePayload(v any) ([]byte, error)
- type AckPayload
- type AckTracker
- type AgentFatalPayload
- type ChildErrorPayload
- type ChildExitedPayload
- type ChildStartedPayload
- type Conn
- type ConnConfig
- type DedupTracker
- type Flags
- type FrameReader
- type FrameWriter
- type Header
- type HelloAckPayload
- type HelloPayload
- type LogChunkPayload
- type LogFileRefPayload
- type MessageHandler
- type MsgType
- type Outbox
- type OutboxMessage
- type ResourceSamplePayload
- type ResumePayload
- type SeqGenerator
- type SetLogLevelPayload
- type SignalChildPayload
- type SpillFunc
- type Spiller
- type StartChildPayload
- type StopChildPayload
- type WAL
- type WriteStdinPayload
Constants ¶
const ( // MagicByte0 and MagicByte1 mark the start of every frame. // A mismatch triggers immediate connection termination. MagicByte0 byte = 0xC1 MagicByte1 byte = 0xD9 // ProtocolVersion is the major version of the wire protocol. // Incompatible changes must bump this value. ProtocolVersion byte = 0x01 // HeaderSize is the fixed size of every frame header. // Layout: Magic(2) + Version(1) + MsgType(1) + Flags(1) + SeqNo(8) + Length(4) HeaderSize = 17 // MaxPayloadSize is the largest payload a single frame may carry. // Anything larger must be delivered via FILE_REF. MaxPayloadSize uint32 = 16 * 1024 * 1024 )
Protocol constants.
Variables ¶
var ( ErrShortHeader = errors.New("ipc: frame header is too short") ErrInvalidMagic = errors.New("ipc: frame magic bytes mismatch") ErrUnsupportedVersion = errors.New("ipc: unsupported protocol version") ErrFrameTooLarge = errors.New("ipc: frame length exceeds maximum") )
Sentinel decode errors.
var ErrLengthMismatch = errors.New("ipc: header length does not match payload size")
ErrLengthMismatch indicates that the header's Length field does not match the size of the payload slice passed to WriteFrame.
var ErrNilDecodeTarget = errors.New("ipc: decode target is nil")
ErrNilDecodeTarget is returned by DecodePayload when v is nil.
var ErrWALFull = errors.New("ipc: WAL size cap reached")
ErrWALFull indicates the WAL has reached its configured size cap.
Functions ¶
func DecodePayload ¶
DecodePayload deserializes MessagePack bytes into v. v must be a non-nil pointer to the destination struct.
func EncodePayload ¶
EncodePayload serializes v to MessagePack bytes.
Types ¶
type AckPayload ¶
type AckPayload struct {
AckedSeq uint64 `msgpack:"ack"`
}
AckPayload acknowledges a specific sequence number.
type AckTracker ¶
type AckTracker struct {
// contains filtered or unexported fields
}
AckTracker maintains the set of sequence numbers for which the sender is still awaiting receiver acknowledgment. It is safe for concurrent use.
func (*AckTracker) Ack ¶
func (a *AckTracker) Ack(seq uint64)
Ack removes seq from the pending set if present.
func (*AckTracker) MarkPending ¶
func (a *AckTracker) MarkPending(seq uint64)
MarkPending records that seq has been sent and is awaiting ACK.
func (*AckTracker) MaxPendingSeq ¶
func (a *AckTracker) MaxPendingSeq() uint64
MaxPendingSeq returns the highest seq awaiting ACK, or 0 if none.
func (*AckTracker) Pending ¶
func (a *AckTracker) Pending() []uint64
Pending returns a sorted snapshot of every seq still awaiting ACK.
type AgentFatalPayload ¶
AgentFatalPayload announces imminent agent termination.
type ChildErrorPayload ¶
ChildErrorPayload reports an exec/runtime error from inside the agent.
type ChildExitedPayload ¶
type ChildExitedPayload struct {
PID int32 `msgpack:"pid"`
ExitCode int32 `msgpack:"code"`
Signal int32 `msgpack:"sig"`
ExitedAt int64 `msgpack:"at"`
Reason string `msgpack:"reason"`
}
ChildExitedPayload signals that the child process has terminated.
type ChildStartedPayload ¶
ChildStartedPayload signals that exec succeeded.
type Conn ¶
type Conn struct {
// contains filtered or unexported fields
}
Conn is a full-duplex, framed, ACK-aware peer. It owns:
- a Spiller (outbox + WAL) for outgoing frames
- a writer goroutine that drains the outbox to the wire
- a reader goroutine that pushes inbound frames to the handler
func NewConn ¶
func NewConn(cfg ConnConfig) (*Conn, error)
NewConn constructs a Conn. Call Start() to launch the reader/writer goroutines.
func (*Conn) Close ¶
Close gracefully stops reader/writer goroutines and releases resources. It is idempotent. The ctx is used only to bound the spiller close; the goroutine join (wg.Wait) is always unconditional so that goleak never observes lingering goroutines.
func (*Conn) OnMessage ¶
func (c *Conn) OnMessage(h MessageHandler)
OnMessage sets the handler invoked for each incoming frame. Subsequent calls replace the previous handler atomically.
func (*Conn) Send ¶
func (c *Conn) Send(msg OutboxMessage) bool
Send enqueues msg for delivery. Returns false if the conn is closed.
func (*Conn) Seqs ¶
func (c *Conn) Seqs() *SeqGenerator
Seqs exposes the outbound sequence-number generator so higher layers can attach seqs to frames they build.
type ConnConfig ¶
type ConnConfig struct {
// RWC is the underlying bidirectional stream (net.Conn, net.Pipe, etc.).
RWC io.ReadWriteCloser
// SpillerDir holds the WAL for the outgoing direction.
SpillerDir string
// Capacity is the in-memory outbox capacity.
Capacity int
// WALBytes is the WAL size cap.
WALBytes int64
// MaxRecvPayload caps inbound frame size. If zero, MaxPayloadSize is used.
MaxRecvPayload uint32
}
ConnConfig parameterises NewConn.
type DedupTracker ¶
type DedupTracker struct {
// contains filtered or unexported fields
}
DedupTracker remembers the highest observed sequence number (watermark). A frame is considered duplicate iff seq <= lastSeen. This approach uses O(1) memory regardless of connection lifetime, which is safe for long-running connections (spec §2.7).
It is safe for concurrent use.
func NewDedupTracker ¶
func NewDedupTracker() *DedupTracker
NewDedupTracker returns an empty tracker.
func (*DedupTracker) Seen ¶
func (d *DedupTracker) Seen(seq uint64) bool
Seen reports whether seq has already been observed. If seq is new (greater than the current watermark), it advances the watermark and returns false.
type FrameReader ¶
type FrameReader struct {
// contains filtered or unexported fields
}
FrameReader reads framed messages from an io.Reader. It is NOT safe for concurrent use; callers must serialize ReadFrame calls.
func NewFrameReader ¶
func NewFrameReader(r io.Reader, maxPayload uint32) *FrameReader
NewFrameReader returns a FrameReader that reads from r and rejects frames with Length > maxPayload. Callers should pass MaxPayloadSize unless they want an even tighter bound.
func (*FrameReader) ReadFrame ¶
func (fr *FrameReader) ReadFrame() (Header, []byte, error)
ReadFrame reads exactly one frame from the underlying reader. It returns the decoded header, the payload bytes (may be empty), and an error. On io.EOF with no bytes consumed the error is io.EOF. On partial consumption the error is io.ErrUnexpectedEOF.
type FrameWriter ¶
type FrameWriter struct {
// contains filtered or unexported fields
}
FrameWriter serializes frames to an io.Writer. It is NOT safe for concurrent use; callers must serialize WriteFrame calls.
func NewFrameWriter ¶
func NewFrameWriter(w io.Writer) *FrameWriter
NewFrameWriter returns a FrameWriter that writes to w.
func (*FrameWriter) WriteFrame ¶
func (fw *FrameWriter) WriteFrame(h Header, payload []byte) (int, error)
WriteFrame serializes h followed by payload to the underlying writer. It returns the total number of bytes written (header + payload) on success.
type Header ¶
Header is the fixed-size frame prefix that precedes every payload.
type HelloAckPayload ¶
type HelloAckPayload struct {
ProtocolVersion uint8 `msgpack:"v"`
Capabilities []string `msgpack:"caps"`
}
HelloAckPayload is the agent's handshake response.
type HelloPayload ¶
HelloPayload is the handshake initiator.
type LogChunkPayload ¶
type LogChunkPayload struct {
Stream uint8 `msgpack:"s"`
SeqNo uint64 `msgpack:"n"`
Data []byte `msgpack:"d"`
}
LogChunkPayload carries a stdout/stderr fragment inline. Stream: 0=stdout, 1=stderr, 2=diag (debug mode only)
type LogFileRefPayload ¶
type LogFileRefPayload struct {
Stream uint8 `msgpack:"s"`
Path string `msgpack:"p"`
Size uint64 `msgpack:"sz"`
Checksum string `msgpack:"ck"`
}
LogFileRefPayload references an out-of-band log file on disk.
type MessageHandler ¶
type MessageHandler func(OutboxMessage)
MessageHandler is invoked for every frame that Conn receives. It MUST NOT block; handlers should forward work to a buffered channel.
type MsgType ¶
type MsgType uint8
MsgType identifies the payload shape of a frame.
const ( MsgHello MsgType = 0x01 MsgStartChild MsgType = 0x02 MsgStopChild MsgType = 0x03 MsgSignalChild MsgType = 0x04 MsgWriteStdin MsgType = 0x05 MsgSetLogLevel MsgType = 0x06 MsgPing MsgType = 0x07 MsgShutdown MsgType = 0x08 MsgAckControl MsgType = 0x0A // host ACKs a data-plane message MsgResume MsgType = 0x0B )
Control-plane message types (host -> agent).
const ( MsgHelloAck MsgType = 0x81 MsgChildStarted MsgType = 0x82 MsgChildExited MsgType = 0x83 MsgLogChunk MsgType = 0x84 MsgLogFileRef MsgType = 0x85 MsgChildError MsgType = 0x86 MsgPong MsgType = 0x87 MsgResourceSample MsgType = 0x88 MsgAgentFatal MsgType = 0x89 MsgAckData MsgType = 0x8A // agent ACKs a control-plane message )
Data-plane message types (agent -> host).
type Outbox ¶
type Outbox struct {
// contains filtered or unexported fields
}
Outbox is a bounded FIFO with an optional disk spill handler. Enqueue never blocks the producer; when the in-memory slots are full it invokes the configured SpillFunc synchronously.
Shutdown is signalled via a dedicated `done` channel — the data channel `ch` is never closed directly, which eliminates the "send on closed channel" race between Enqueue and Close.
func NewOutbox ¶
NewOutbox returns an Outbox with the given in-memory capacity and spill handler. If spill is nil, overflow enqueues are dropped by returning false.
func (*Outbox) Close ¶
func (o *Outbox) Close()
Close shuts the outbox down. Subsequent Enqueue calls return false and Dequeue callers are unblocked with ok=false once all remaining messages are drained.
func (*Outbox) Dequeue ¶
func (o *Outbox) Dequeue(timeout time.Duration) (OutboxMessage, bool)
Dequeue blocks until a message is available, the timeout elapses, or the outbox is closed. The second return value indicates success.
func (*Outbox) Enqueue ¶
func (o *Outbox) Enqueue(msg OutboxMessage) bool
Enqueue attempts to add msg to the queue. Returns false if the outbox is closed or the SpillFunc (when the channel is full) returned an error.
func (*Outbox) InjectFront ¶
func (o *Outbox) InjectFront(msg OutboxMessage) bool
InjectFront pushes a message to the front of the queue, bypassing the normal capacity check. This is used by the spiller to replay WAL entries into the in-memory queue.
type OutboxMessage ¶
OutboxMessage is one unit of work in an Outbox.
type ResourceSamplePayload ¶
type ResourceSamplePayload struct {
PID int32 `msgpack:"pid"`
RSS uint64 `msgpack:"rss"`
CPUPercent float64 `msgpack:"cpu"`
Threads int32 `msgpack:"thr"`
FDs int32 `msgpack:"fds"`
}
ResourceSamplePayload carries a periodic resource sample.
type ResumePayload ¶
type ResumePayload struct {
LastAckedSeq uint64 `msgpack:"last"`
}
ResumePayload exchanges last-seen sequence numbers on reconnect.
type SeqGenerator ¶
type SeqGenerator struct {
// contains filtered or unexported fields
}
SeqGenerator produces monotonically increasing sequence numbers. It is safe for concurrent use.
func NewSeqGenerator ¶
func NewSeqGenerator(start uint64) *SeqGenerator
NewSeqGenerator returns a generator whose first Next() returns start. If start is 0, it is clamped to 1 to avoid uint64 underflow.
func (*SeqGenerator) Next ¶
func (g *SeqGenerator) Next() uint64
Next returns the next sequence number.
type SetLogLevelPayload ¶
type SetLogLevelPayload struct {
Level uint8 `msgpack:"lvl"` // matches DebugMode
}
SetLogLevelPayload toggles the agent's log verbosity.
type SignalChildPayload ¶
type SignalChildPayload struct {
Signal int32 `msgpack:"sig"`
}
SignalChildPayload asks the agent to deliver a specific signal.
type SpillFunc ¶
type SpillFunc func(OutboxMessage) error
SpillFunc is invoked when the in-memory queue is full. Implementations typically persist the message to a WAL. Returning a non-nil error causes the Outbox.Enqueue call to fail.
type Spiller ¶
type Spiller struct {
// contains filtered or unexported fields
}
Spiller couples an in-memory Outbox with a disk-backed WAL so that producers never block and messages are never lost.
func NewSpiller ¶
NewSpiller returns a Spiller whose Outbox has the given capacity and whose WAL lives under dir with a byte cap of walCap.
func (*Spiller) ReplayInto ¶
ReplayInto reads WAL records and feeds them back into target in order. Any record that cannot fit into the in-memory queue is left in the WAL for a later replay.
func (*Spiller) WALReplayForTest ¶
func (s *Spiller) WALReplayForTest() ([]OutboxMessage, error)
WALReplayForTest exposes the Spiller's WAL replay to external test packages (e.g. test/chaos). It lives in a non-test file because Go's _test.go files are only visible to the same package's tests, but chaos tests live in a separate module subdirectory and import ipc as a regular dependency.
This method is safe for concurrent use with Ack/ReplayInto and exists solely to let chaos tests inspect unacked WAL contents after forced disconnects.
type StartChildPayload ¶
type StartChildPayload struct {
Command string `msgpack:"cmd"`
Args []string `msgpack:"args"`
Env map[string]string `msgpack:"env"`
WorkDir string `msgpack:"wd"`
StdinMode uint8 `msgpack:"stdin"` // 0=none, 1=pipe, 2=inherit
StopTimeout int64 `msgpack:"stop_to_ms"`
}
StartChildPayload instructs the agent to fork/exec the target CLI.
type StopChildPayload ¶
type StopChildPayload struct {
TimeoutMs int64 `msgpack:"to_ms"`
}
StopChildPayload requests graceful termination of the child.
type WAL ¶
type WAL struct {
// contains filtered or unexported fields
}
WAL is an append-only log of OutboxMessage records. Records are framed on disk using the same 17-byte header format as the wire protocol so that replay is bit-for-bit compatible with the in-flight format.
The file layout is:
[ header(17) | payload(N) ] [ header(17) | payload(N) ] ...
Retire(seq) rewrites the file, keeping only records with Header.SeqNo > seq.
func (*WAL) Append ¶
func (w *WAL) Append(msg OutboxMessage) error
Append writes msg to the WAL. It returns ErrWALFull if the byte cap would be exceeded.
func (*WAL) Replay ¶
func (w *WAL) Replay() ([]OutboxMessage, error)
Replay returns every record currently in the WAL, in append order.
type WriteStdinPayload ¶
type WriteStdinPayload struct {
Data []byte `msgpack:"d"`
}
WriteStdinPayload forwards bytes to the child's stdin.