adaptivemsg

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2026 License: MIT Imports: 18 Imported by: 0

README

adaptivemsg-go

Go runtime for the adaptivemsg wire protocol.

This repository is the Go sibling of adaptivemsg-rust and is intended to stay in lockstep with the protocol defined in adaptivemsg-doc.

Code generation (amgen-go)

Use //go:generate go run <module>/cmd/amgen-go in your message.go. amgen-go reads GOFILE from go generate and writes a sibling .rs file with the same base name. Exported fields must include explicit am:"..." tags.

Client/server example

package main

import (
	"fmt"
	"log"

	am "adaptivemsg"
)

type HelloRequest struct {
	Who string `am:"who"`
}

type HelloInternal struct {
	TraceID string `am:"trace_id"`
}

type HelloReply struct {
	Answer   string        `am:"answer"`
	Internal HelloInternal `am:"internal"`
}

func (msg *HelloRequest) Handle(_ *am.StreamContext) (am.Message, error) {
	return &HelloReply{
		Answer: fmt.Sprintf("hi, %s", msg.Who),
		Internal: HelloInternal{
			TraceID: "req-1",
		},
	}, nil
}

var _ = am.MustRegisterGlobalType[HelloRequest]()

func main() {
	// server
	go func() {
		server := am.NewServer().WithRecovery(am.ServerRecoveryOptions{Enable: true})
		if err := server.Serve("tcp://0.0.0.0:5555"); err != nil {
			log.Fatal(err)
		}
	}()

	// client
	client := am.NewClient().WithRecovery(am.ClientRecoveryOptions{Enable: true})
	conn, _ := client.Connect("tcp://127.0.0.1:5555")
	reply, _ := am.SendRecvAs[*HelloReply](conn, &HelloRequest{Who: "alice"})
	log.Printf("reply: %s (trace %s)", reply.Answer, reply.Internal.TraceID)
}

Dynamic receive

package main

import (
	"fmt"
	"log"

	am "adaptivemsg"
	"adaptivemsg/examples/echo"
)

var _ = am.MustRegisterGlobalType[echo.MessageReply]()
var _ = am.MustRegisterGlobalType[echo.WhoElseEvent]()

func main() {
	conn, _ := am.NewClient().WithRecovery(am.ClientRecoveryOptions{Enable: true}).Connect("tcp://127.0.0.1:5560")
	stream := conn.NewStream()

	for {
		msg, err := stream.Recv()
		if err != nil {
			log.Fatal(err)
		}
		switch m := msg.(type) {
		case *echo.MessageReply:
			log.Printf("reply: %s", m.Msg)
		case *echo.WhoElseEvent:
			log.Printf("event: %s", m.Addr)
		default:
			log.Fatal(fmt.Errorf("unexpected %T", msg))
		}
	}
}

API reference (surface)

Functions:

  • SendRecvAs, StreamAs, WireNameOf, ContextAs
  • RegisterGlobalType, MustRegisterGlobalType

Client:

  • NewClient
  • Client.WithTimeout, Client.WithCodecs, Client.WithMaxFrame, Client.WithRecovery, Client.Connect

Server:

  • NewServer
  • Server.WithRecovery, Server.Serve, Server.OnConnect, Server.OnDisconnect, Server.OnNewStream, Server.OnCloseStream
  • Netconn.PeerAddr

Connection (default stream view):

  • Connection.NewStream, Connection.Close, Connection.WaitClosed
  • Connection.Send, Connection.SendRecv, Connection.Recv, Connection.PeekWire, Connection.SetRecvTimeout

Stream:

  • Stream[T].Send, Stream[T].SendRecv, Stream[T].Recv, Stream[T].PeekWire, Stream[T].SetRecvTimeout, Stream[T].ID, Stream[T].Close

Context:

  • StreamContext.SetContext, StreamContext.GetContext, StreamContext.NewTask

Codec & messages:

  • CodecID, CodecMsgpackMap, CodecMsgpackCompact, CodecID.String, CodecImpl
  • RegisterCodec, MustRegisterCodec
  • Message, NamedMessage, OkReply, ErrorReply, NewErrorReply

Recovery:

  • ClientRecoveryOptions, ServerRecoveryOptions

Error reasoning

Local input/usage errors:

  • ErrInvalidMessage: nil or non-struct messages, invalid wire names, compact field issues.
  • ErrUnknownMessage: wire name not registered in the registry.

Protocol/compat errors:

  • ErrUnsupportedCodec, ErrUnsupportedFrameVersion, ErrNoCommonCodec, ErrTooManyCodecs, ErrBadHandshakeMagic, ErrFrameTooLarge, ErrUnsupportedTransport, ErrResumeRejected.

Runtime errors:

  • ErrClosed, ErrRecvTimeout, ErrConcurrentRecv, ErrHandlerTaskBusy, ErrConnectTimeout, ErrReplayBufferFull.

Remote errors:

  • ErrorReply is sent by the peer; SendRecv surfaces it as ErrRemote{Code, Message}.
  • protocol_error = wire mismatch or invalid ordering; codec_error = decode/envelope failure; handler_error = handler returned an error.

Notes:

  • Addresses accept tcp://HOST:PORT, uds://@name (linux abstract), or uds:///tmp/name.sock.
  • Abstract UDS demo: go run -tags server ./examples/hello/cmd/server -addr uds://@adaptivemsg-hello and go run ./examples/hello/cmd/client -addr uds://@adaptivemsg-hello (echo uses @adaptivemsg-echo).
  • Codecs are negotiated from the client's WithCodecs preference list; the server selects the first common codec. Defaults are compact-first.
  • Custom codecs implement CodecImpl and register with RegisterCodec; msgpack struct tags only apply to the msgpack built-ins. CodecImpl.Encode transfers ownership of the returned payload to the caller, so codecs must not mutate or reuse that backing storage after return.
  • Compact codec uses positional arrays; nested structs are encoded as arrays when eligible, but types with custom msgpack/binary/text encoders or unexported fields fall back to msgpack's normal encoding (typically maps), so struct tags may still apply there.
  • Connections act as the default stream; use am.SendRecvAs[Reply](conn, msg) for one-off calls or am.StreamAs[Reply](stream) for a typed view (needed for Recv).
  • Register handler/message types with MustRegisterGlobalType before NewClient()/NewServer() so the snapshot sees them.
  • Use PeekWire() on a stream (or conn.PeekWire()) to inspect the next message type before decoding; it honors the same recv timeout and concurrency rules as Recv.
  • Message names default to am.<package-leaf>.<TypeName>; implement WireName() string on a type if you need an override.
  • Example servers rely on build-tagged handlers; run them with -tags server (for example: go run -tags server ./examples/hello/cmd/server).
  • Stream.Close() is local-only; there is no on-wire stream close frame.

Recovery

  • Recovery is opt-in via Client.WithRecovery(...) and Server.WithRecovery(...).
  • When both sides enable recovery, the connection negotiates protocol v3; otherwise the client falls back to legacy v2.
  • The implemented recovery scope is transport-only failure while both client and server processes remain alive.
  • In recovery mode, Connection is the logical connection and the underlying net.Conn may be replaced transparently after reconnect.
  • Recovery is client-driven: the dialing side reconnects and the server reattaches the new transport to the existing logical connection.
  • The server is authoritative for shared recovery wire behavior. ACK batching and heartbeat settings are chosen by the server and sent to the client during attach/resume.
  • Recv() can continue waiting across reconnect, and queued/unacknowledged outbound frames are replayed after resume.
  • WaitClosed() and ErrClosed refer to permanent logical closure, not a transient reconnectable transport loss.
  • Server OnConnect runs for the initial logical connection, not for every resumed transport; OnDisconnect runs on permanent logical close.
  • Recovery does not cover client/server process restart, node reboot, or durable replay after process death.
  • ClientRecoveryOptions controls:
    • Enable: turn recovery on.
    • ReconnectMinBackoff / ReconnectMaxBackoff: client reconnect backoff range.
    • MaxReplayBytes: client-side byte cap for retained unacknowledged outbound frames.
  • ServerRecoveryOptions controls:
    • Enable: turn recovery on.
    • DetachedTTL: how long the server keeps a detached logical connection alive.
    • MaxReplayBytes: server-side byte cap for retained unacknowledged outbound frames.
    • AckEvery / AckDelay: server-selected cumulative ACK batching policy.
    • HeartbeatInterval / HeartbeatTimeout: server-selected idle failure detection policy for quiet connections.

For detailed recovery protocol behavior, heartbeat/liveness semantics, and cross-runtime interoperability notes, see DEVELOP.md.

Debugging and Observability

The runtime exposes scoped debugging snapshots (per connection and per stream). This gives you counters and failure context without relying on global process metrics.

What is available
  • Connection.DebugState() returns:
    • negotiated protocol/codec/max frame
    • active stream count and per-stream snapshots
    • per-connection counters (frames, bytes, messages, handler activity, recovery activity)
    • last failure code, reason, and timestamp
  • Stream[T].DebugState() returns:
    • stream queue depths and recv timeout
    • per-stream counters
    • stream-level last failure code, reason, and timestamp
  • Recovery-enabled connections also include RecoveryDebugState in the connection snapshot.
Typical usage pattern

Use the snapshot where you handle transport/runtime errors so logs include both the immediate error and current runtime state:

conn, err := client.Connect("tcp://127.0.0.1:5555")
if err != nil {
	log.Printf("connect failed: %v", err)
	return
}

reply, err := am.SendRecvAs[*HelloReply](conn, &HelloRequest{Who: "alice"})
if err != nil {
	dbg := conn.DebugState()
	// log.Printf("sendrecv failed: %+v", dbg)
	log.Printf("sendrecv failed: err=%v code=%s reason=%s streams=%d sent=%d recv=%d",
		err,
		dbg.LastFailureCode,
		dbg.LastFailure,
		dbg.StreamCount,
		dbg.Counters.DataMessagesSent,
		dbg.Counters.DataMessagesReceived,
	)
	return
}
_ = reply

For a single stream:

stream := conn.NewStream()
_, err := am.StreamAs[*HelloReply](stream).Recv()
if err != nil {
	sdbg := stream.DebugState()
	log.Printf("stream recv failed: err=%v stream=%d code=%s reason=%s inbox=%d incoming=%d",
		err,
		sdbg.ID,
		sdbg.LastFailureCode,
		sdbg.LastFailure,
		sdbg.InboxDepth,
		sdbg.IncomingDepth,
	)
}
Failure codes

Failure codes are stable strings intended for machine filtering/alerting while LastFailure remains human-readable context.

Common codes include:

  • Stream path: stream.recv_timeout, stream.encode, stream.enqueue, stream.decode, stream.protocol, stream.protocol_reply_send
  • Connection path: connection.reader, connection.writer, connection.reader_enqueue, handler.error
  • Recovery path: recovery.resume, recovery.reconnect_terminal, recovery.read, recovery.control, recovery.data, recovery.ack_write, recovery.resume_write, recovery.live_write, recovery.ping_write
Troubleshooting quick map
Failure code Likely cause First checks
stream.recv_timeout No message arrived before stream recv timeout Check SetRecvTimeout value; verify peer is producing responses/events; inspect InboxDepth and IncomingDepth
stream.encode Local message cannot be encoded by negotiated codec Validate message shape/tags; confirm codec supports payload type
stream.enqueue Connection is closing/closed or replay enqueue rejected Check ConnectionDebugState.Closed; inspect replay limits and recent close reason
stream.decode Received payload cannot be decoded into expected type Compare wire type versus expected type; verify registry/type registration order
stream.protocol Stream-level protocol violation detected Inspect LastFailure detail and peer message ordering/type behavior
stream.protocol_reply_send Failed to send protocol ErrorReply after violation Check transport health and whether connection was already closing
connection.reader Base frame read failed Check network/transport health, frame compatibility, and max-frame settings
connection.writer Base frame write failed Check peer reachability and connection lifecycle (Closed, detach/reconnect state)
connection.reader_enqueue Read frame could not be queued into stream pipeline Check stream close timing and backpressure symptoms
handler.error Handler returned an application error Inspect handler logs, input validation, and downstream dependencies
recovery.resume Resume attempt failed but may retry Check server reachability, attach credentials, and reconnect backoff progression
recovery.reconnect_terminal Resume failed with terminal condition Check reject reason (LastFailure), codec/version mismatch, connection existence
recovery.read Recovery transport read failed Check heartbeat timeout behavior and transport blackhole symptoms
recovery.control Invalid recovery control frame payload/type Verify non-Go peer control frame format and control type handling
recovery.data Recovery data frame sequencing/validation failed Verify monotonic seq handling and replay logic
recovery.ack_write / recovery.ping_write Control frame write failed during recovery Check transport stability during idle/control periods
recovery.resume_write / recovery.live_write Replay/live data write failed during recovery writer loop Check transport churn and reconnect cadence

Recommended logging fields:

  • last_failure_code
  • last_failure_reason
  • last_failure_at
  • relevant scoped counters (for example frames_read, frames_written, data_messages_sent, data_messages_received)

Documentation

Overview

Package adaptivemsg is a wire protocol runtime for framed, multiplexed, codec-negotiated connections. It is wire-compatible with the Rust counterpart, enabling seamless Go ↔ Rust interoperability.

Transports

Two transports are supported:

  • TCP: addressed as "tcp://host:port" (or bare "host:port", which defaults to TCP)
  • Unix domain sockets: addressed as "uds:///path/to/sock" or "unix:///path/to/sock"

Codecs

Codecs are pluggable. The package ships with two MessagePack codecs:

During the handshake the client sends an ordered list of preferred codecs; the server selects the first codec that both sides support. Custom codecs can be installed globally with RegisterCodec.

Core Types

The main abstractions are:

  • Client — configures and dials outbound connections.
  • Server — listens for and dispatches inbound connections.
  • Connection — a live, negotiated session. Also acts as the default stream (stream 0). Obtained from Client.Connect on the client side or passed to Server callbacks on the server side.
  • Stream — a multiplexed logical channel within a Connection. Created by Connection.NewStream.
  • Message — the marker interface for all payload types. Register concrete types with [Register] so the codec can round-trip them by wire name.

Quick Start

A minimal echo server and client:

// ── server ──
type Ping struct{ Text string `am:"text"` }
type Pong struct{ Text string `am:"text"` }

adaptivemsg.Register[Ping]()
adaptivemsg.Register[Pong]()

adaptivemsg.NewServer().
	OnConnect(func(nc adaptivemsg.Netconn) error {
		fmt.Println("connected:", nc.PeerAddr())
		return nil
	}).
	Serve("tcp://0.0.0.0:9000")

// Meanwhile, register a handler that echoes Ping → Pong:
adaptivemsg.Handle(func(ctx *adaptivemsg.StreamContext, p *Ping) (*Pong, error) {
	return &Pong{Text: p.Text}, nil
})

// ── client ──
conn, err := adaptivemsg.NewClient().
	WithTimeout(5 * time.Second).
	Connect("tcp://127.0.0.1:9000")
if err != nil { log.Fatal(err) }
defer conn.Close()

reply, err := adaptivemsg.SendRecvAs[*Pong](conn, &Ping{Text: "hello"})

For one-shot request/reply without keeping a connection open, see Once.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func ContextAs

func ContextAs[T any](sc *StreamContext) (T, bool)

ContextAs is a generic helper that retrieves the value stored in sc and type-asserts it to T. It returns (zero, false) if no value has been set or if the stored value is not of type T.

func MustRegisterCodec

func MustRegisterCodec(codec CodecImpl) struct{}

MustRegisterCodec is like RegisterCodec but panics on error. It returns an empty struct so it can be used as a package-level variable for init-time registration.

func MustRegisterGlobalType

func MustRegisterGlobalType[T any]() struct{}

MustRegisterGlobalType is like RegisterGlobalType but panics on error. It returns an empty struct so it can be used as a package-level variable for init-time registration:

var _ = am.MustRegisterGlobalType[*MyMsg]()

func RegisterCodec

func RegisterCodec(codec CodecImpl) error

RegisterCodec installs a codec implementation in the global codec registry. The codec's ID must be non-zero and not already registered. RegisterCodec is safe for concurrent use. Built-in codecs (compact, map) are registered automatically via init().

func RegisterGlobalType

func RegisterGlobalType[T any]() error

RegisterGlobalType registers a message type T (must be a struct or *struct) in the global message registry. If T implements the handler interface (Handle(*StreamContext) (Message, error)), its handler is also registered.

Registration must happen before Client.Connect or Server.Serve since connections snapshot the registry at creation time.

Returns ErrInvalidMessage if T is not a valid message type.

func SendRecvAs

func SendRecvAs[T any](v Link, msg Message) (T, error)

SendRecvAs sends msg via the given Link and receives a typed reply of type T. When v is an OnceConn, SendRecvAs dials the address, exchanges one request-reply message, and closes the connection. When v is a Connection or Stream, it delegates to Stream.SendRecv. Returns ErrInvalidMessage if v is nil.

func WireNameOf

func WireNameOf(msg Message) (string, error)

WireNameOf returns the wire name for a message value. If msg implements NamedMessage, its WireName method is used. Otherwise the name is derived from the Go type information (namespace.package.TypeName). WireNameOf returns ErrInvalidMessage if msg is nil or not a struct (or pointer to struct).

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client configures and dials outbound connections. It uses a builder pattern so options can be chained before calling Client.Connect:

conn, err := adaptivemsg.NewClient().
	WithTimeout(5 * time.Second).
	WithCodecs(adaptivemsg.CodecMsgpackCompact).
	Connect("tcp://127.0.0.1:9000")

Defaults: codecs [CodecMsgpackCompact, CodecMsgpackMap], max frame unlimited (peer's limit applies), recovery disabled.

func NewClient

func NewClient() *Client

NewClient returns a Client with sensible defaults: both MessagePack codecs offered (CodecMsgpackCompact first, then CodecMsgpackMap), no dial timeout, unlimited max frame size, and recovery disabled.

func (*Client) Connect

func (c *Client) Connect(addr string) (*Connection, error)

Connect dials addr, performs the protocol handshake (including codec negotiation), and returns a live Connection ready for messaging.

Supported address formats:

  • "tcp://host:port" — TCP connection
  • "uds:///path/to/sock" or "unix:///path/to/sock" — Unix domain socket
  • "host:port" — bare host:port defaults to TCP

On failure Connect returns one of the typed errors: ErrConnectTimeout if the dial or handshake exceeds the configured timeout, ErrNoCommonCodec if no codec overlap exists, or a transport-level error from the operating system.

func (*Client) WithCodecs

func (c *Client) WithCodecs(codecs ...CodecID) *Client

WithCodecs sets the client's ordered list of preferred codecs. During the handshake the server walks this list and selects the first codec it also supports, so place the most desirable codec first. If no common codec exists the handshake fails with ErrNoCommonCodec.

func (*Client) WithMaxFrame

func (c *Client) WithMaxFrame(maxFrame uint32) *Client

WithMaxFrame sets the maximum payload size per frame that this client advertises to the peer. The effective limit for the connection is the minimum of both sides' values. The default is math.MaxUint32 (no client-side limit), so the server's value takes precedence.

func (*Client) WithRecovery

func (c *Client) WithRecovery(opts ClientRecoveryOptions) *Client

WithRecovery enables the v3 protocol extension that supports transparent reconnection and message replay after transient network failures. See ClientRecoveryOptions for tunables such as backoff intervals and replay buffer limits. When recovery is enabled the client will attempt the v3 handshake first and fall back to v2 if the server does not support it.

func (*Client) WithTimeout

func (c *Client) WithTimeout(timeout time.Duration) *Client

WithTimeout sets the timeout applied to the TCP dial (or Unix connect) and any subsequent TLS handshake. A zero or negative value disables the timeout, allowing the connection attempt to block indefinitely.

type ClientRecoveryOptions

type ClientRecoveryOptions struct {
	// Enable activates recovery mode. Default: false.
	Enable bool
	// ReconnectMinBackoff is the minimum backoff duration between reconnect
	// attempts. Default: 100ms.
	ReconnectMinBackoff time.Duration
	// ReconnectMaxBackoff is the maximum backoff duration between reconnect
	// attempts. Default: 2s.
	ReconnectMaxBackoff time.Duration
	// MaxReplayBytes is the maximum number of bytes buffered for replay of
	// unacknowledged messages. Default: 8 MiB.
	MaxReplayBytes int64
}

ClientRecoveryOptions controls client-side recovery behavior for automatic reconnect and replay. When Enable is true, the client negotiates the v3 protocol with the server, enabling transparent reconnection and message replay after transient network failures.

type CodecID

type CodecID byte

CodecID is a numeric identifier for a payload encoding format. Built-in values (compact, map) are defined in codec_msgpack.go. Custom codecs can be registered via RegisterCodec.

const (
	// CodecMsgpackCompact (ID=1) encodes messages as a compact MessagePack array
	// envelope: [wireName, field1, field2, ...]. This format is more
	// space-efficient than map encoding but requires field order to match
	// between sender and receiver.
	CodecMsgpackCompact CodecID = 1
	// CodecMsgpackMap (ID=2) encodes messages as a MessagePack map envelope:
	// {"type": wireName, "data": {field: value, ...}}. This format is more
	// verbose than compact but tolerant of field order changes between sender
	// and receiver.
	CodecMsgpackMap CodecID = 2
)

func (CodecID) String

func (c CodecID) String() string

String returns the human-readable codec name (e.g. "compact", "map") or "unknown" if the codec ID is not registered.

type CodecImpl

type CodecImpl interface {
	ID() CodecID
	Name() string
	// Encode returns a payload owned by the caller. Implementations must not
	// mutate or reuse the returned backing storage after Encode returns.
	Encode(Message) ([]byte, error)
	DecodeEnvelope([]byte) (Envelope, error)
	DecodeInto(body any, dst any) error
}

CodecImpl is the interface for pluggable payload codecs. Implementations must be safe for concurrent use by multiple goroutines. Encode serializes a message and returns a caller-owned byte slice. DecodeEnvelope extracts the wire name and raw body without full deserialization. DecodeInto decodes a raw body (from an Envelope) into a destination struct.

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

Connection is a live, negotiated session between two peers. It is obtained from Client.Connect on the client side or passed to Server callbacks on the server side.

A Connection doubles as the default stream (stream 0): the Connection.Send, Connection.SendRecv, Connection.Recv, and Connection.PeekWire methods all operate on stream 0. For multiplexed messaging, create additional streams with Connection.NewStream.

All exported methods are safe for concurrent use. When the Connection is closed (by either side), all outstanding streams are torn down and any blocking Recv calls return ErrClosed.

func (*Connection) Close

func (c *Connection) Close()

Close shuts down the connection and all of its streams. It closes the underlying transport, drains internal channels, and signals any goroutines blocked in Connection.Recv or Connection.WaitClosed. Close is non-blocking and idempotent; calling it more than once is safe.

func (*Connection) DebugState

func (c *Connection) DebugState() ConnectionDebugState

DebugState returns a point-in-time snapshot of the connection's debug counters, stream states, and recovery state.

func (*Connection) NewStream

func (c *Connection) NewStream() *Stream[Message]

NewStream allocates a new stream ID and returns a Stream view scoped to that ID. Streams provide independent, multiplexed message channels over the same underlying connection. All streams are automatically closed when the parent Connection is closed.

func (*Connection) PeekWire

func (c *Connection) PeekWire() (string, error)

PeekWire returns the wire name of the next message on the default stream without consuming or fully decoding it. This is useful for branching on the message type before calling a typed receive.

func (*Connection) Recv

func (c *Connection) Recv() (Message, error)

Recv blocks until the next message arrives on the default stream and returns it as an untyped Message. It returns ErrClosed if the connection closes and ErrRecvTimeout if a receive timeout (set via Connection.SetRecvTimeout) expires before a message arrives.

func (*Connection) Send

func (c *Connection) Send(msg Message) error

Send encodes msg and writes it on the default stream (stream 0). The call is fire-and-forget: it returns once the frame is queued for the writer goroutine. It returns ErrClosed if the connection has been shut down.

func (*Connection) SendRecv

func (c *Connection) SendRecv(msg Message) (Message, error)

SendRecv sends msg on the default stream and blocks until a reply arrives, returning the reply as an untyped Message. Use the generic helper SendRecvAs for typed replies. Returns ErrClosed if the connection closes before a reply is received.

func (*Connection) SetRecvTimeout

func (c *Connection) SetRecvTimeout(timeout time.Duration)

SetRecvTimeout sets the receive timeout for the default stream (stream 0). A zero or negative value disables the timeout, allowing Connection.Recv to block indefinitely.

func (*Connection) WaitClosed

func (c *Connection) WaitClosed()

WaitClosed blocks until the connection has fully closed. It is commonly used in server handlers to keep the goroutine alive for the lifetime of the connection.

type ConnectionCounters

type ConnectionCounters struct {
	// StreamsOpened is the total number of streams opened.
	StreamsOpened uint64
	// StreamsClosed is the total number of streams closed.
	StreamsClosed uint64
	// DataMessagesSent is the total number of application-level messages sent.
	DataMessagesSent uint64
	// DataMessagesReceived is the total number of application-level messages received.
	DataMessagesReceived uint64
	// FramesWritten is the total number of wire frames written.
	FramesWritten uint64
	// FramesRead is the total number of wire frames read.
	FramesRead uint64
	// BytesWritten is the total number of bytes written to the transport.
	BytesWritten uint64
	// BytesRead is the total number of bytes read from the transport.
	BytesRead uint64
	// ControlFramesWritten is the total number of control frames written.
	ControlFramesWritten uint64
	// ControlFramesRead is the total number of control frames read.
	ControlFramesRead uint64
	// ProtocolErrors is the total number of protocol errors detected.
	ProtocolErrors uint64
	// ProtocolErrorReplySendFailure is how many times sending a protocol error reply failed.
	ProtocolErrorReplySendFailure uint64
	// RemoteErrors is the total number of remote ErrorReply messages received.
	RemoteErrors uint64
	// DecodeErrors is the total number of message decode failures.
	DecodeErrors uint64
	// HandlerCalls is the total number of handler invocations.
	HandlerCalls uint64
	// HandlerErrors is the total number of handler invocations that returned an error.
	HandlerErrors uint64
	// ReconnectAttempts is the total number of recovery reconnect attempts.
	ReconnectAttempts uint64
	// ReconnectSuccesses is the total number of successful recovery reconnects.
	ReconnectSuccesses uint64
	// ReconnectFailures is the total number of failed recovery reconnect attempts.
	ReconnectFailures uint64
	// TransportAttaches is the total number of times a transport was attached.
	TransportAttaches uint64
	// TransportDetaches is the total number of times a transport was detached.
	TransportDetaches uint64
}

ConnectionCounters holds point-in-time counters for a connection. All values are cumulative since the connection was created.

type ConnectionDebugState

type ConnectionDebugState struct {
	// Closed is true if the connection has been closed.
	Closed bool
	// LastFailureCode is the code of the most recent failure on this connection.
	LastFailureCode DebugFailureCode
	// LastFailure is the human-readable description of the most recent failure.
	LastFailure string
	// LastFailureAt is the timestamp of the most recent failure.
	LastFailureAt time.Time
	// Protocol is the negotiated protocol version byte.
	Protocol byte
	// CodecID is the negotiated codec identifier.
	CodecID CodecID
	// CodecName is the human-readable name of the negotiated codec.
	CodecName string
	// MaxFrame is the negotiated maximum frame size in bytes.
	MaxFrame uint32
	// StreamCount is the current number of streams (open and closing).
	StreamCount int
	// NextSendSeq is the next sequence number to be assigned to an outbound frame.
	NextSendSeq uint64
	// Counters holds the cumulative connection counters.
	Counters ConnectionCounters
	// Streams contains a snapshot of each stream's debug state.
	Streams []StreamDebugState
	// Recovery contains the recovery subsystem state, or nil if recovery is not enabled.
	Recovery *RecoveryDebugState
}

ConnectionDebugState is a debug snapshot of a connection at a point in time, including counters, stream states, and recovery state.

type DebugFailureCode

type DebugFailureCode string

DebugFailureCode is a string code identifying the subsystem and failure mode for diagnostic purposes. Values follow the pattern "subsystem.failure_kind".

const (
	// DebugFailureNone indicates no failure has been recorded.
	DebugFailureNone DebugFailureCode = ""
	// DebugFailureStreamRecvTimeout indicates a stream receive timed out.
	DebugFailureStreamRecvTimeout DebugFailureCode = "stream.recv_timeout"
	// DebugFailureStreamEncode indicates message encoding failed on a stream.
	DebugFailureStreamEncode DebugFailureCode = "stream.encode"
	// DebugFailureStreamEnqueue indicates a failure to enqueue a frame for sending.
	DebugFailureStreamEnqueue DebugFailureCode = "stream.enqueue"
	// DebugFailureStreamProtocol indicates a protocol error on a stream (e.g. unknown message).
	DebugFailureStreamProtocol DebugFailureCode = "stream.protocol"
	// DebugFailureStreamProtocolReplySend indicates a failure to send a protocol error reply.
	DebugFailureStreamProtocolReplySend DebugFailureCode = "stream.protocol_reply_send"
	// DebugFailureStreamDecode indicates message decoding failed on a stream.
	DebugFailureStreamDecode DebugFailureCode = "stream.decode"
	// DebugFailureHandler indicates a handler returned an error.
	DebugFailureHandler DebugFailureCode = "handler.error"
	// DebugFailureWriterLoop indicates the connection writer goroutine failed.
	DebugFailureWriterLoop DebugFailureCode = "connection.writer"
	// DebugFailureReaderLoop indicates the connection reader goroutine failed.
	DebugFailureReaderLoop DebugFailureCode = "connection.reader"
	// DebugFailureReaderEnqueue indicates the reader failed to enqueue a payload to a stream.
	DebugFailureReaderEnqueue DebugFailureCode = "connection.reader_enqueue"
	// DebugFailureReconnectResume indicates a recovery resume attempt failed.
	DebugFailureReconnectResume DebugFailureCode = "recovery.resume"
	// DebugFailureReconnectTerminal indicates recovery gave up after exhausting retries.
	DebugFailureReconnectTerminal DebugFailureCode = "recovery.reconnect_terminal"
	// DebugFailureRecoveryAckWrite indicates a failure to write an ACK control frame.
	DebugFailureRecoveryAckWrite DebugFailureCode = "recovery.ack_write"
	// DebugFailureRecoveryResumeWrite indicates a failure to write a resume frame.
	DebugFailureRecoveryResumeWrite DebugFailureCode = "recovery.resume_write"
	// DebugFailureRecoveryLiveWrite indicates a failure to write a live data frame during recovery.
	DebugFailureRecoveryLiveWrite DebugFailureCode = "recovery.live_write"
	// DebugFailureRecoveryPingWrite indicates a failure to write a heartbeat ping.
	DebugFailureRecoveryPingWrite DebugFailureCode = "recovery.ping_write"
	// DebugFailureRecoveryRead indicates an error reading during recovery.
	DebugFailureRecoveryRead DebugFailureCode = "recovery.read"
	// DebugFailureRecoveryControl indicates an error processing a recovery control frame.
	DebugFailureRecoveryControl DebugFailureCode = "recovery.control"
	// DebugFailureRecoveryData indicates an error processing a recovery data frame.
	DebugFailureRecoveryData DebugFailureCode = "recovery.data"
)

type Envelope

type Envelope struct {
	// Wire is the message type name extracted from the payload.
	Wire string
	// Body holds the codec-specific raw data for deferred decoding.
	Body any
}

Envelope is an intermediate decode result that preserves the wire name and raw body without fully deserializing the message payload. Wire is the message type name. Body holds codec-specific raw data: for map codec it is a msgpack.RawMessage, for compact codec it is a []msgpack.RawMessage.

type ErrBadHandshakeMagic

type ErrBadHandshakeMagic struct{}

ErrBadHandshakeMagic indicates that the connection does not speak the adaptivemsg protocol. The initial bytes did not match the expected handshake magic value.

func (ErrBadHandshakeMagic) Error

func (e ErrBadHandshakeMagic) Error() string

type ErrClosed

type ErrClosed struct{}

ErrClosed indicates that an operation was attempted on a closed connection or stream.

func (ErrClosed) Error

func (e ErrClosed) Error() string

type ErrCodec

type ErrCodec struct {
	// Message contains the human-readable error detail.
	Message string
}

ErrCodec reports a codec encode or decode failure. It is returned when a message cannot be serialized or deserialized by the negotiated codec.

func (ErrCodec) Error

func (e ErrCodec) Error() string

type ErrCompactFieldCount

type ErrCompactFieldCount struct {
	// Expected is the number of fields the local struct definition has.
	Expected int
	// Got is the number of fields found in the incoming payload.
	Got int
}

ErrCompactFieldCount indicates that a compact-mode struct has the wrong number of fields. This happens when the sender and receiver disagree on the struct definition.

func (ErrCompactFieldCount) Error

func (e ErrCompactFieldCount) Error() string

type ErrConcurrentRecv

type ErrConcurrentRecv struct{}

ErrConcurrentRecv indicates that concurrent Recv or SendRecv calls were detected on the same stream. Only one receive operation is allowed at a time on a given stream.

func (ErrConcurrentRecv) Error

func (e ErrConcurrentRecv) Error() string

type ErrConnectTimeout

type ErrConnectTimeout struct{}

ErrConnectTimeout indicates that the TCP connect or handshake exceeded the timeout duration configured via Client.WithTimeout().

func (ErrConnectTimeout) Error

func (e ErrConnectTimeout) Error() string

type ErrFrameTooLarge

type ErrFrameTooLarge struct {
	// Size is the offending payload size in bytes.
	Size int
}

ErrFrameTooLarge indicates that a payload exceeded the negotiated maximum frame size for the connection. The receiver will reject the frame.

func (ErrFrameTooLarge) Error

func (e ErrFrameTooLarge) Error() string

type ErrHandlerTaskBusy

type ErrHandlerTaskBusy struct{}

ErrHandlerTaskBusy indicates that NewTask() was called on a stream that already has a running task. Only one handler task is allowed per stream at a time.

func (ErrHandlerTaskBusy) Error

func (e ErrHandlerTaskBusy) Error() string

type ErrHandshakeRejected

type ErrHandshakeRejected struct{}

ErrHandshakeRejected indicates that the server explicitly rejected the handshake. This is distinct from a version or codec mismatch.

func (ErrHandshakeRejected) Error

func (e ErrHandshakeRejected) Error() string

type ErrInvalidMessage

type ErrInvalidMessage struct {
	// Reason contains the human-readable detail of what is invalid.
	Reason string
}

ErrInvalidMessage indicates invalid message usage, such as passing a nil value, a non-struct type, or a type that does not satisfy the Message interface.

func (ErrInvalidMessage) Error

func (e ErrInvalidMessage) Error() string

type ErrNoCommonCodec

type ErrNoCommonCodec struct{}

ErrNoCommonCodec indicates that the handshake failed because the client and server have no codec in common. Register the same codec on both sides before connecting.

func (ErrNoCommonCodec) Error

func (e ErrNoCommonCodec) Error() string

type ErrNoCommonVersion

type ErrNoCommonVersion struct {
	// ClientMin is the minimum protocol version supported by the client.
	ClientMin byte
	// ClientMax is the maximum protocol version supported by the client.
	ClientMax byte
	// ServerMin is the minimum protocol version supported by the server.
	ServerMin byte
	// ServerMax is the maximum protocol version supported by the server.
	ServerMax byte
}

ErrNoCommonVersion indicates that no protocol version overlap exists between the client and server. The fields show each side's supported version range.

func (ErrNoCommonVersion) Error

func (e ErrNoCommonVersion) Error() string

type ErrRecvTimeout

type ErrRecvTimeout struct{}

ErrRecvTimeout indicates that no message was received within the stream's recv timeout, as configured via SetRecvTimeout.

func (ErrRecvTimeout) Error

func (e ErrRecvTimeout) Error() string

type ErrRemote

type ErrRemote struct {
	// Code is the error code from the remote ErrorReply.
	Code string
	// Message is the error message from the remote ErrorReply.
	Message string
}

ErrRemote wraps an ErrorReply received from the peer. The Code and Message fields correspond to the remote ErrorReply's fields.

func (ErrRemote) Error

func (e ErrRemote) Error() string

type ErrReplayBufferFull

type ErrReplayBufferFull struct {
	// Limit is the configured maximum replay buffer size in bytes.
	Limit int64
	// Size is the current replay buffer size in bytes.
	Size int64
}

ErrReplayBufferFull indicates that the recovery replay buffer has exceeded the configured MaxReplayBytes limit. The connection will be terminated because unacknowledged frames can no longer be retained for replay.

func (ErrReplayBufferFull) Error

func (e ErrReplayBufferFull) Error() string

type ErrResumeRejected

type ErrResumeRejected struct {
	// Reason contains the human-readable rejection detail.
	Reason string
}

ErrResumeRejected indicates that the server rejected a resume attempt. Common causes include a wrong connection secret, an expired detached session, or a codec mismatch.

func (ErrResumeRejected) Error

func (e ErrResumeRejected) Error() string

type ErrTooManyCodecs

type ErrTooManyCodecs struct {
	// Count is the number of codecs offered by the client.
	Count int
}

ErrTooManyCodecs indicates that the client offered more codecs than the protocol allows during the handshake.

func (ErrTooManyCodecs) Error

func (e ErrTooManyCodecs) Error() string

type ErrTypeMismatch

type ErrTypeMismatch struct {
	// Expected is the wire name of the expected message type.
	Expected string
	// Got is the wire name of the actually received message.
	Got string
}

ErrTypeMismatch indicates that the decoded message type does not match the expected type parameter. This occurs when a typed Recv or SendRecv receives a message with a different wire name than expected.

func (ErrTypeMismatch) Error

func (e ErrTypeMismatch) Error() string

type ErrUnknownMessage

type ErrUnknownMessage struct {
	// Name is the unrecognized wire name.
	Name string
}

ErrUnknownMessage indicates that a received message has a wire name that is not registered in the connection's message registry.

func (ErrUnknownMessage) Error

func (e ErrUnknownMessage) Error() string

type ErrUnsupportedCodec

type ErrUnsupportedCodec struct {
	// Value is the unrecognized codec ID byte.
	Value byte
}

ErrUnsupportedCodec indicates that a peer selected a codec that is not registered in the local codec registry.

func (ErrUnsupportedCodec) Error

func (e ErrUnsupportedCodec) Error() string

type ErrUnsupportedFrameVersion

type ErrUnsupportedFrameVersion struct {
	// Version is the unrecognized protocol version byte.
	Version byte
}

ErrUnsupportedFrameVersion indicates that a peer sent a frame with an unknown protocol version byte. This typically means the remote end is running an incompatible version of the adaptivemsg protocol.

func (ErrUnsupportedFrameVersion) Error

type ErrUnsupportedTransport

type ErrUnsupportedTransport struct {
	// Reason contains the human-readable detail of the unsupported feature.
	Reason string
}

ErrUnsupportedTransport indicates that a required transport feature is unavailable in the current configuration.

func (ErrUnsupportedTransport) Error

func (e ErrUnsupportedTransport) Error() string

type ErrorReply

type ErrorReply struct {
	Code    string `am:"code"`
	Message string `am:"message"`
}

ErrorReply is the standard error payload sent over the wire when a handler returns an error or when an explicit error must be communicated to the peer. Its wire name is "am.message.ErrorReply". On the receiving side an ErrorReply is surfaced as ErrRemote by Stream.SendRecv and SendRecvAs.

func NewErrorReply

func NewErrorReply(code, message string) *ErrorReply

NewErrorReply is a convenience constructor that returns an ErrorReply with the given code and human-readable message.

func (*ErrorReply) WireName

func (*ErrorReply) WireName() string
type Link interface {
	// contains filtered or unexported methods
}

Link is a sealed interface that unifies Connection, Stream, and OnceConn as valid targets for SendRecvAs and StreamAs. The unexported isLink method prevents external packages from implementing Link.

type Message

type Message interface{}

Message is the marker interface that all wire payload types must satisfy. Any struct or pointer-to-struct value implicitly implements Message. The wire name used during encoding is derived automatically from the Go type name (namespace.package.TypeName) unless the type also implements NamedMessage to supply a custom name.

type NamedMessage

type NamedMessage interface {
	WireName() string
}

NamedMessage is an optional interface that lets a Message type override the default wire name. Implement WireName to return a fixed string that identifies this message type on the wire. This is required for cross-language compatibility — the name returned must match the corresponding Rust wire name exactly.

type Netconn

type Netconn struct {
	// contains filtered or unexported fields
}

Netconn is a lightweight descriptor for a peer connection, passed to Server lifecycle callbacks (Server.OnConnect, Server.OnDisconnect). It carries metadata about the remote endpoint but does not expose the underlying transport directly.

func (Netconn) PeerAddr

func (n Netconn) PeerAddr() string

PeerAddr returns the remote address of the peer as a string, for example "192.168.1.5:54321" for a TCP connection or a filesystem path for a Unix domain socket. The value is empty if the address could not be determined.

type OkReply

type OkReply struct{}

OkReply is the default acknowledgement sent when a message handler returns nil. Its wire name is "am.message.OkReply".

func (*OkReply) WireName

func (*OkReply) WireName() string

type OnceConn

type OnceConn struct {
	// contains filtered or unexported fields
}

OnceConn is a builder for one-shot request-reply over a short-lived connection. Create one with Once, optionally configure it with OnceConn.WithTimeout or OnceConn.WithCodecs, and pass it to SendRecvAs exactly once. An OnceConn is not reusable. The default timeout is 5 seconds, covering both connection establishment and reply wait.

func Once

func Once(addr string) *OnceConn

Once creates a new OnceConn builder targeting the given address. The addr parameter supports "tcp://host:port", "uds://path", "unix://path", or a bare "host:port" (which defaults to TCP). The returned OnceConn can be passed to SendRecvAs to dial, exchange one request-reply, and close the connection.

func (*OnceConn) WithCodecs

func (o *OnceConn) WithCodecs(codecs ...CodecID) *OnceConn

WithCodecs overrides the default codec preference list (compact, map) for the short-lived connection's handshake negotiation.

func (*OnceConn) WithTimeout

func (o *OnceConn) WithTimeout(d time.Duration) *OnceConn

WithTimeout overrides the default 5-second timeout for both connection establishment and reply wait.

type RecoveryDebugState

type RecoveryDebugState struct {
	// Role is "client" or "server", indicating which recovery role this connection has.
	Role string
	// ConnectionID is the hex-encoded recovery connection identifier.
	ConnectionID string
	// TransportAttached is true if a live transport is currently connected.
	TransportAttached bool
	// TransportGen is the generation counter for transport attach/detach cycles.
	TransportGen uint64
	// ReconnectActive is true if a reconnect attempt is currently in progress.
	ReconnectActive bool
	// LastRecvSeq is the sequence number of the last received data frame.
	LastRecvSeq uint64
	// LastAckedSeq is the sequence number of the last acknowledged data frame.
	LastAckedSeq uint64
	// AckPending is the number of received frames not yet acknowledged.
	AckPending uint32
	// AckDue is true if an ACK is pending and should be sent soon.
	AckDue bool
	// AckEvery is the negotiated ACK frequency (every N data frames).
	AckEvery uint32
	// AckDelay is the negotiated delay before flushing a pending ACK.
	AckDelay time.Duration
	// HeartbeatInterval is the negotiated interval between heartbeat pings.
	HeartbeatInterval time.Duration
	// HeartbeatTimeout is the negotiated inactivity timeout for the connection.
	HeartbeatTimeout time.Duration
	// ReplayQueued is the number of frames currently buffered for replay.
	ReplayQueued int
	// ReplayBytes is the total size in bytes of frames buffered for replay.
	ReplayBytes int64
	// LiveQueueDepth is the number of frames in the live send queue.
	LiveQueueDepth int
	// ResumeQueueDepth is the number of frames queued for resume replay.
	ResumeQueueDepth int
}

RecoveryDebugState is a debug snapshot of the recovery subsystem's state at a point in time.

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server accepts inbound connections and dispatches them to registered handlers and callbacks. It uses a builder pattern for configuration:

adaptivemsg.NewServer().
	OnConnect(func(nc adaptivemsg.Netconn) error { return nil }).
	WithCodecs(adaptivemsg.CodecMsgpackCompact).
	Serve("tcp://0.0.0.0:9000")

Each accepted connection is handled in its own goroutine. Server.Serve blocks for the lifetime of the server; it only returns if the underlying listener encounters an unrecoverable error.

func NewServer

func NewServer() *Server

NewServer returns a Server with default settings: both MessagePack codecs supported (CodecMsgpackCompact and CodecMsgpackMap), recovery disabled, and no lifecycle callbacks registered.

func (*Server) OnCloseStream

func (s *Server) OnCloseStream(f func(*StreamContext)) *Server

OnCloseStream registers a callback that is invoked when a stream is destroyed, either because the peer closed it or because the parent connection closed. This is useful for per-stream cleanup.

func (*Server) OnConnect

func (s *Server) OnConnect(f func(Netconn) error) *Server

OnConnect registers a callback that is invoked each time a new connection is accepted and the handshake completes. The callback receives a Netconn descriptor for the peer. Returning a non-nil error rejects the connection and closes the underlying transport.

func (*Server) OnDisconnect

func (s *Server) OnDisconnect(f func(Netconn) error) *Server

OnDisconnect registers a callback that is invoked after a connection has fully closed. The callback receives the same Netconn descriptor that was passed to OnConnect. The return value is currently ignored but reserved for future use; callers should return nil.

func (*Server) OnNewStream

func (s *Server) OnNewStream(f func(*StreamContext)) *Server

OnNewStream registers a callback that is invoked when a new stream is created on any connection managed by this server. This is useful for per-stream initialization such as attaching context or state to the StreamContext.

func (*Server) Serve

func (s *Server) Serve(addr string) error

Serve binds to addr and enters an accept loop, blocking for the lifetime of the server. Each accepted connection is handled in a dedicated goroutine that performs the protocol handshake and then runs the reader/writer loops.

Supported address formats:

  • "tcp://host:port" — TCP listener
  • "uds:///path/to/sock" or "unix:///path/to/sock" — Unix domain socket
  • "host:port" — bare host:port defaults to TCP

Serve returns a non-nil error only if the listener itself fails (e.g. the address is already in use).

func (*Server) WithCodecs

func (s *Server) WithCodecs(codecs ...CodecID) *Server

WithCodecs sets the server's list of supported codecs. During the handshake the server walks the client's ordered preference list and selects the first codec that appears in both lists. If no common codec is found the handshake fails with ErrNoCommonCodec.

func (*Server) WithRecovery

func (s *Server) WithRecovery(opts ServerRecoveryOptions) *Server

WithRecovery enables the v3 protocol extension on the server side, allowing clients to reconnect and resume sessions after transient network failures. See ServerRecoveryOptions for tunables such as detached TTL, replay buffer limits, ACK intervals, and heartbeat configuration.

type ServerRecoveryOptions

type ServerRecoveryOptions struct {
	// Enable activates recovery mode. Default: false.
	Enable bool
	// DetachedTTL is how long the server retains a detached connection's state
	// before discarding it. Default: 30s.
	DetachedTTL time.Duration
	// MaxReplayBytes is the maximum number of bytes buffered for replay of
	// unacknowledged messages. Default: 8 MiB.
	MaxReplayBytes int64
	// AckEvery sends a cumulative ACK every N data frames. Default: 64.
	AckEvery uint32
	// AckDelay is the delay before flushing a pending ACK. Default: 20ms.
	AckDelay time.Duration
	// HeartbeatInterval is the interval between heartbeat pings when the
	// connection is idle. Default: 30s.
	HeartbeatInterval time.Duration
	// HeartbeatTimeout closes the connection if no inbound frame is received
	// within this duration. Must be ≥ 2×HeartbeatInterval. Default: 90s.
	HeartbeatTimeout time.Duration
}

ServerRecoveryOptions controls server-side recovery behavior for connection retention and ACK policy. When Enable is true, the server supports the v3 protocol with attach/resume semantics. ACK and heartbeat fields are authoritative for the connection and are sent to the client during attach/resume.

type Stream

type Stream[T any] struct {
	// contains filtered or unexported fields
}

Stream is a typed view over a single multiplexed stream on a Connection.

The type parameter T determines how Stream.SendRecv and Stream.Recv decode incoming messages. When T is an interface (e.g. Message), the connection's message registry is consulted to resolve the concrete type from the wire name. When T is a concrete type (e.g. *EchoReply), the payload is decoded directly into that type without a registry lookup.

Create a Stream with Connection.NewStream (which returns Stream[Message]) or convert an existing stream to a different reply type with StreamAs.

Send is safe for concurrent use by multiple goroutines. Recv and SendRecv must not be called concurrently on the same Stream; doing so returns ErrConcurrentRecv.

func StreamAs

func StreamAs[T any](v Link) *Stream[T]

StreamAs creates a typed Stream[T] view from any Link. It does not allocate a new stream — it wraps the existing stream core with the requested type parameter. Returns nil if v is nil or has no underlying stream core.

func (*Stream[T]) Close

func (s *Stream[T]) Close()

Close removes this stream from its connection and releases associated resources. After Close returns, subsequent Send and Recv calls on this stream will return ErrClosed. Close is idempotent.

func (*Stream[T]) DebugState

func (s *Stream[T]) DebugState() StreamDebugState

DebugState returns a point-in-time snapshot of this stream's debug counters and state.

func (*Stream[T]) ID

func (s *Stream[T]) ID() uint32

ID returns the numeric stream identifier assigned by the connection. Stream IDs are unique within a single connection.

func (*Stream[T]) PeekWire

func (s *Stream[T]) PeekWire() (string, error)

PeekWire returns the wire name of the next pending message without consuming it. The message remains available for a subsequent Stream.Recv or Stream.SendRecv call. PeekWire is useful for dispatching based on message type before committing to a decode.

func (*Stream[T]) Recv

func (s *Stream[T]) Recv() (T, error)

Recv blocks until the next message arrives on this stream and decodes it as type T. It returns ErrRecvTimeout if the receive timeout expires, ErrClosed if the connection is shut down, or ErrTypeMismatch if the decoded message cannot be converted to T.

Only one goroutine may call Recv (or SendRecv) at a time; a concurrent call returns ErrConcurrentRecv.

func (*Stream[T]) Send

func (s *Stream[T]) Send(msg Message) error

Send encodes msg with the negotiated codec and enqueues the resulting frame for transmission on this stream. Send returns as soon as the frame is queued; it does not wait for the peer to receive it.

Send is safe for concurrent use. It returns ErrClosed if the connection has been shut down.

func (*Stream[T]) SendRecv

func (s *Stream[T]) SendRecv(msg Message) (T, error)

SendRecv sends msg on this stream and blocks until a reply of type T is received. If the peer handler returns an error, the reply is decoded as an ErrorReply and returned as ErrRemote. If the incoming message cannot be converted to T, ErrTypeMismatch is returned. The call also respects the receive timeout set by Stream.SetRecvTimeout.

func (*Stream[T]) SetRecvTimeout

func (s *Stream[T]) SetRecvTimeout(timeout time.Duration)

SetRecvTimeout sets the maximum duration that Stream.Recv and Stream.SendRecv will block waiting for a message on this stream. A zero or negative value disables the timeout, causing receives to block indefinitely until a message arrives or the connection closes. The default is no timeout.

type StreamContext

type StreamContext struct {
	// contains filtered or unexported fields
}

StreamContext carries per-stream state accessible to message handlers. Each stream has its own StreamContext instance. Handlers receive it as their first argument and can use it to store per-stream session state (e.g., authentication info, user ID) and to spawn a background task via StreamContext.NewTask.

func (*StreamContext) GetContext

func (sc *StreamContext) GetContext() any

GetContext retrieves the value previously stored with StreamContext.SetContext. It returns nil if no value has been set. GetContext is safe for concurrent use.

func (*StreamContext) NewTask

func (sc *StreamContext) NewTask(fn func(*Stream[Message])) error

NewTask spawns a background goroutine tied to this stream. The function fn receives the stream for bidirectional communication and runs until it returns. Only one task may be active per stream at a time; calling NewTask while a previous task is still running returns ErrHandlerTaskBusy. NewTask is useful for long-running handlers such as streaming responses.

func (*StreamContext) SetContext

func (sc *StreamContext) SetContext(value any)

SetContext stores an arbitrary value in this stream's context. The value can later be retrieved with StreamContext.GetContext or ContextAs. SetContext is safe for concurrent use.

type StreamCounters

type StreamCounters struct {
	// DataMessagesSent is the total number of application-level messages sent on this stream.
	DataMessagesSent uint64
	// DataMessagesReceived is the total number of application-level messages received on this stream.
	DataMessagesReceived uint64
	// ProtocolErrors is the total number of protocol errors detected on this stream.
	ProtocolErrors uint64
	// ProtocolErrorReplySendFailure is how many times sending a protocol error reply failed on this stream.
	ProtocolErrorReplySendFailure uint64
	// RemoteErrors is the total number of remote ErrorReply messages received on this stream.
	RemoteErrors uint64
	// DecodeErrors is the total number of message decode failures on this stream.
	DecodeErrors uint64
	// HandlerCalls is the total number of handler invocations on this stream.
	HandlerCalls uint64
	// HandlerErrors is the total number of handler invocations that returned an error on this stream.
	HandlerErrors uint64
}

StreamCounters holds point-in-time counters for a single stream. All values are cumulative since the stream was opened.

type StreamDebugState

type StreamDebugState struct {
	// ID is the stream's numeric identifier within the connection.
	ID uint32
	// Closed is true if the stream has been closed.
	Closed bool
	// LastFailureCode is the code of the most recent failure on this stream.
	LastFailureCode DebugFailureCode
	// LastFailure is the human-readable description of the most recent failure.
	LastFailure string
	// LastFailureAt is the timestamp of the most recent failure.
	LastFailureAt time.Time
	// RecvTimeout is the stream's current receive timeout duration.
	RecvTimeout time.Duration
	// InboxDepth is the number of decoded messages waiting in the stream's inbox.
	InboxDepth int
	// IncomingDepth is the number of raw frames waiting to be decoded.
	IncomingDepth int
	// HandlerQDepth is the number of messages queued for handler dispatch.
	HandlerQDepth int
	// Counters holds the cumulative stream counters.
	Counters StreamCounters
}

StreamDebugState is a debug snapshot of a stream's state at a point in time.

Directories

Path Synopsis
cmd
amgen-go command
examples
echo/cmd/client command
echo/cmd/server command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL