Documentation
¶
Overview ¶
Package adaptivemsg is a wire protocol runtime for framed, multiplexed, codec-negotiated connections. It is wire-compatible with the Rust counterpart, enabling seamless Go ↔ Rust interoperability.
Transports ¶
Two transports are supported:
- TCP: addressed as "tcp://host:port" (or bare "host:port", which defaults to TCP)
- Unix domain sockets: addressed as "uds:///path/to/sock" or "unix:///path/to/sock"
Codecs ¶
Codecs are pluggable. The package ships with two MessagePack codecs:
- CodecMsgpackCompact — positional (array) encoding, smaller on the wire
- CodecMsgpackMap — named-field (map) encoding, easier to evolve schemas
During the handshake the client sends an ordered list of preferred codecs; the server selects the first codec that both sides support. Custom codecs can be installed globally with RegisterCodec.
Core Types ¶
The main abstractions are:
- Client — configures and dials outbound connections.
- Server — listens for and dispatches inbound connections.
- Connection — a live, negotiated session. Also acts as the default stream (stream 0). Obtained from Client.Connect on the client side or passed to Server callbacks on the server side.
- Stream — a multiplexed logical channel within a Connection. Created by Connection.NewStream.
- Message — the marker interface for all payload types. Register concrete types with [Register] so the codec can round-trip them by wire name.
Quick Start ¶
A minimal echo server and client:
// ── server ──
type Ping struct{ Text string `am:"text"` }
type Pong struct{ Text string `am:"text"` }
adaptivemsg.Register[Ping]()
adaptivemsg.Register[Pong]()
adaptivemsg.NewServer().
OnConnect(func(nc adaptivemsg.Netconn) error {
fmt.Println("connected:", nc.PeerAddr())
return nil
}).
Serve("tcp://0.0.0.0:9000")
// Meanwhile, register a handler that echoes Ping → Pong:
adaptivemsg.Handle(func(ctx *adaptivemsg.StreamContext, p *Ping) (*Pong, error) {
return &Pong{Text: p.Text}, nil
})
// ── client ──
conn, err := adaptivemsg.NewClient().
WithTimeout(5 * time.Second).
Connect("tcp://127.0.0.1:9000")
if err != nil { log.Fatal(err) }
defer conn.Close()
reply, err := adaptivemsg.SendRecvAs[*Pong](conn, &Ping{Text: "hello"})
For one-shot request/reply without keeping a connection open, see Once.
Index ¶
- func ContextAs[T any](sc *StreamContext) (T, bool)
- func MustRegisterCodec(codec CodecImpl) struct{}
- func MustRegisterGlobalType[T any]() struct{}
- func RegisterCodec(codec CodecImpl) error
- func RegisterGlobalType[T any]() error
- func SendRecvAs[T any](v Link, msg Message) (T, error)
- func WireNameOf(msg Message) (string, error)
- type Client
- type ClientRecoveryOptions
- type CodecID
- type CodecImpl
- type Connection
- func (c *Connection) Close()
- func (c *Connection) DebugState() ConnectionDebugState
- func (c *Connection) NewStream() *Stream[Message]
- func (c *Connection) PeekWire() (string, error)
- func (c *Connection) Recv() (Message, error)
- func (c *Connection) Send(msg Message) error
- func (c *Connection) SendRecv(msg Message) (Message, error)
- func (c *Connection) SetRecvTimeout(timeout time.Duration)
- func (c *Connection) WaitClosed()
- type ConnectionCounters
- type ConnectionDebugState
- type DebugFailureCode
- type Envelope
- type ErrBadHandshakeMagic
- type ErrClosed
- type ErrCodec
- type ErrCompactFieldCount
- type ErrConcurrentRecv
- type ErrConnectTimeout
- type ErrFrameTooLarge
- type ErrHandlerTaskBusy
- type ErrHandshakeRejected
- type ErrInvalidMessage
- type ErrNoCommonCodec
- type ErrNoCommonVersion
- type ErrRecvTimeout
- type ErrRemote
- type ErrReplayBufferFull
- type ErrResumeRejected
- type ErrTooManyCodecs
- type ErrTypeMismatch
- type ErrUnknownMessage
- type ErrUnsupportedCodec
- type ErrUnsupportedFrameVersion
- type ErrUnsupportedTransport
- type ErrorReply
- type Link
- type Message
- type NamedMessage
- type Netconn
- type OkReply
- type OnceConn
- type RecoveryDebugState
- type Server
- func (s *Server) OnCloseStream(f func(*StreamContext)) *Server
- func (s *Server) OnConnect(f func(Netconn) error) *Server
- func (s *Server) OnDisconnect(f func(Netconn) error) *Server
- func (s *Server) OnNewStream(f func(*StreamContext)) *Server
- func (s *Server) Serve(addr string) error
- func (s *Server) WithCodecs(codecs ...CodecID) *Server
- func (s *Server) WithRecovery(opts ServerRecoveryOptions) *Server
- type ServerRecoveryOptions
- type Stream
- func (s *Stream[T]) Close()
- func (s *Stream[T]) DebugState() StreamDebugState
- func (s *Stream[T]) ID() uint32
- func (s *Stream[T]) PeekWire() (string, error)
- func (s *Stream[T]) Recv() (T, error)
- func (s *Stream[T]) Send(msg Message) error
- func (s *Stream[T]) SendRecv(msg Message) (T, error)
- func (s *Stream[T]) SetRecvTimeout(timeout time.Duration)
- type StreamContext
- type StreamCounters
- type StreamDebugState
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ContextAs ¶
func ContextAs[T any](sc *StreamContext) (T, bool)
ContextAs is a generic helper that retrieves the value stored in sc and type-asserts it to T. It returns (zero, false) if no value has been set or if the stored value is not of type T.
func MustRegisterCodec ¶
func MustRegisterCodec(codec CodecImpl) struct{}
MustRegisterCodec is like RegisterCodec but panics on error. It returns an empty struct so it can be used as a package-level variable for init-time registration.
func MustRegisterGlobalType ¶
func MustRegisterGlobalType[T any]() struct{}
MustRegisterGlobalType is like RegisterGlobalType but panics on error. It returns an empty struct so it can be used as a package-level variable for init-time registration:
var _ = am.MustRegisterGlobalType[*MyMsg]()
func RegisterCodec ¶
RegisterCodec installs a codec implementation in the global codec registry. The codec's ID must be non-zero and not already registered. RegisterCodec is safe for concurrent use. Built-in codecs (compact, map) are registered automatically via init().
func RegisterGlobalType ¶
RegisterGlobalType registers a message type T (must be a struct or *struct) in the global message registry. If T implements the handler interface (Handle(*StreamContext) (Message, error)), its handler is also registered.
Registration must happen before Client.Connect or Server.Serve since connections snapshot the registry at creation time.
Returns ErrInvalidMessage if T is not a valid message type.
func SendRecvAs ¶
SendRecvAs sends msg via the given Link and receives a typed reply of type T. When v is an OnceConn, SendRecvAs dials the address, exchanges one request-reply message, and closes the connection. When v is a Connection or Stream, it delegates to Stream.SendRecv. Returns ErrInvalidMessage if v is nil.
func WireNameOf ¶
WireNameOf returns the wire name for a message value. If msg implements NamedMessage, its WireName method is used. Otherwise the name is derived from the Go type information (namespace.package.TypeName). WireNameOf returns ErrInvalidMessage if msg is nil or not a struct (or pointer to struct).
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client configures and dials outbound connections. It uses a builder pattern so options can be chained before calling Client.Connect:
conn, err := adaptivemsg.NewClient().
WithTimeout(5 * time.Second).
WithCodecs(adaptivemsg.CodecMsgpackCompact).
Connect("tcp://127.0.0.1:9000")
Defaults: codecs [CodecMsgpackCompact, CodecMsgpackMap], max frame unlimited (peer's limit applies), recovery disabled.
func NewClient ¶
func NewClient() *Client
NewClient returns a Client with sensible defaults: both MessagePack codecs offered (CodecMsgpackCompact first, then CodecMsgpackMap), no dial timeout, unlimited max frame size, and recovery disabled.
func (*Client) Connect ¶
func (c *Client) Connect(addr string) (*Connection, error)
Connect dials addr, performs the protocol handshake (including codec negotiation), and returns a live Connection ready for messaging.
Supported address formats:
- "tcp://host:port" — TCP connection
- "uds:///path/to/sock" or "unix:///path/to/sock" — Unix domain socket
- "host:port" — bare host:port defaults to TCP
On failure Connect returns one of the typed errors: ErrConnectTimeout if the dial or handshake exceeds the configured timeout, ErrNoCommonCodec if no codec overlap exists, or a transport-level error from the operating system.
func (*Client) WithCodecs ¶
WithCodecs sets the client's ordered list of preferred codecs. During the handshake the server walks this list and selects the first codec it also supports, so place the most desirable codec first. If no common codec exists the handshake fails with ErrNoCommonCodec.
func (*Client) WithMaxFrame ¶
WithMaxFrame sets the maximum payload size per frame that this client advertises to the peer. The effective limit for the connection is the minimum of both sides' values. The default is math.MaxUint32 (no client-side limit), so the server's value takes precedence.
func (*Client) WithRecovery ¶
func (c *Client) WithRecovery(opts ClientRecoveryOptions) *Client
WithRecovery enables the v3 protocol extension that supports transparent reconnection and message replay after transient network failures. See ClientRecoveryOptions for tunables such as backoff intervals and replay buffer limits. When recovery is enabled the client will attempt the v3 handshake first and fall back to v2 if the server does not support it.
type ClientRecoveryOptions ¶
type ClientRecoveryOptions struct {
// Enable activates recovery mode. Default: false.
Enable bool
// ReconnectMinBackoff is the minimum backoff duration between reconnect
// attempts. Default: 100ms.
ReconnectMinBackoff time.Duration
// ReconnectMaxBackoff is the maximum backoff duration between reconnect
// attempts. Default: 2s.
ReconnectMaxBackoff time.Duration
// MaxReplayBytes is the maximum number of bytes buffered for replay of
// unacknowledged messages. Default: 8 MiB.
MaxReplayBytes int64
}
ClientRecoveryOptions controls client-side recovery behavior for automatic reconnect and replay. When Enable is true, the client negotiates the v3 protocol with the server, enabling transparent reconnection and message replay after transient network failures.
type CodecID ¶
type CodecID byte
CodecID is a numeric identifier for a payload encoding format. Built-in values (compact, map) are defined in codec_msgpack.go. Custom codecs can be registered via RegisterCodec.
const ( // CodecMsgpackCompact (ID=1) encodes messages as a compact MessagePack array // envelope: [wireName, field1, field2, ...]. This format is more // space-efficient than map encoding but requires field order to match // between sender and receiver. CodecMsgpackCompact CodecID = 1 // CodecMsgpackMap (ID=2) encodes messages as a MessagePack map envelope: // {"type": wireName, "data": {field: value, ...}}. This format is more // verbose than compact but tolerant of field order changes between sender // and receiver. CodecMsgpackMap CodecID = 2 )
type CodecImpl ¶
type CodecImpl interface {
ID() CodecID
Name() string
// Encode returns a payload owned by the caller. Implementations must not
// mutate or reuse the returned backing storage after Encode returns.
Encode(Message) ([]byte, error)
DecodeEnvelope([]byte) (Envelope, error)
DecodeInto(body any, dst any) error
}
CodecImpl is the interface for pluggable payload codecs. Implementations must be safe for concurrent use by multiple goroutines. Encode serializes a message and returns a caller-owned byte slice. DecodeEnvelope extracts the wire name and raw body without full deserialization. DecodeInto decodes a raw body (from an Envelope) into a destination struct.
type Connection ¶
type Connection struct {
// contains filtered or unexported fields
}
Connection is a live, negotiated session between two peers. It is obtained from Client.Connect on the client side or passed to Server callbacks on the server side.
A Connection doubles as the default stream (stream 0): the Connection.Send, Connection.SendRecv, Connection.Recv, and Connection.PeekWire methods all operate on stream 0. For multiplexed messaging, create additional streams with Connection.NewStream.
All exported methods are safe for concurrent use. When the Connection is closed (by either side), all outstanding streams are torn down and any blocking Recv calls return ErrClosed.
func (*Connection) Close ¶
func (c *Connection) Close()
Close shuts down the connection and all of its streams. It closes the underlying transport, drains internal channels, and signals any goroutines blocked in Connection.Recv or Connection.WaitClosed. Close is non-blocking and idempotent; calling it more than once is safe.
func (*Connection) DebugState ¶
func (c *Connection) DebugState() ConnectionDebugState
DebugState returns a point-in-time snapshot of the connection's debug counters, stream states, and recovery state.
func (*Connection) NewStream ¶
func (c *Connection) NewStream() *Stream[Message]
NewStream allocates a new stream ID and returns a Stream view scoped to that ID. Streams provide independent, multiplexed message channels over the same underlying connection. All streams are automatically closed when the parent Connection is closed.
func (*Connection) PeekWire ¶
func (c *Connection) PeekWire() (string, error)
PeekWire returns the wire name of the next message on the default stream without consuming or fully decoding it. This is useful for branching on the message type before calling a typed receive.
func (*Connection) Recv ¶
func (c *Connection) Recv() (Message, error)
Recv blocks until the next message arrives on the default stream and returns it as an untyped Message. It returns ErrClosed if the connection closes and ErrRecvTimeout if a receive timeout (set via Connection.SetRecvTimeout) expires before a message arrives.
func (*Connection) Send ¶
func (c *Connection) Send(msg Message) error
Send encodes msg and writes it on the default stream (stream 0). The call is fire-and-forget: it returns once the frame is queued for the writer goroutine. It returns ErrClosed if the connection has been shut down.
func (*Connection) SendRecv ¶
func (c *Connection) SendRecv(msg Message) (Message, error)
SendRecv sends msg on the default stream and blocks until a reply arrives, returning the reply as an untyped Message. Use the generic helper SendRecvAs for typed replies. Returns ErrClosed if the connection closes before a reply is received.
func (*Connection) SetRecvTimeout ¶
func (c *Connection) SetRecvTimeout(timeout time.Duration)
SetRecvTimeout sets the receive timeout for the default stream (stream 0). A zero or negative value disables the timeout, allowing Connection.Recv to block indefinitely.
func (*Connection) WaitClosed ¶
func (c *Connection) WaitClosed()
WaitClosed blocks until the connection has fully closed. It is commonly used in server handlers to keep the goroutine alive for the lifetime of the connection.
type ConnectionCounters ¶
type ConnectionCounters struct {
// StreamsOpened is the total number of streams opened.
StreamsOpened uint64
// StreamsClosed is the total number of streams closed.
StreamsClosed uint64
// DataMessagesSent is the total number of application-level messages sent.
DataMessagesSent uint64
// DataMessagesReceived is the total number of application-level messages received.
DataMessagesReceived uint64
// FramesWritten is the total number of wire frames written.
FramesWritten uint64
// FramesRead is the total number of wire frames read.
FramesRead uint64
// BytesWritten is the total number of bytes written to the transport.
BytesWritten uint64
// BytesRead is the total number of bytes read from the transport.
BytesRead uint64
// ControlFramesWritten is the total number of control frames written.
ControlFramesWritten uint64
// ControlFramesRead is the total number of control frames read.
ControlFramesRead uint64
// ProtocolErrors is the total number of protocol errors detected.
ProtocolErrors uint64
// ProtocolErrorReplySendFailure is how many times sending a protocol error reply failed.
ProtocolErrorReplySendFailure uint64
// RemoteErrors is the total number of remote ErrorReply messages received.
RemoteErrors uint64
// DecodeErrors is the total number of message decode failures.
DecodeErrors uint64
// HandlerCalls is the total number of handler invocations.
HandlerCalls uint64
// HandlerErrors is the total number of handler invocations that returned an error.
HandlerErrors uint64
// ReconnectAttempts is the total number of recovery reconnect attempts.
ReconnectAttempts uint64
// ReconnectSuccesses is the total number of successful recovery reconnects.
ReconnectSuccesses uint64
// ReconnectFailures is the total number of failed recovery reconnect attempts.
ReconnectFailures uint64
// TransportAttaches is the total number of times a transport was attached.
TransportAttaches uint64
// TransportDetaches is the total number of times a transport was detached.
TransportDetaches uint64
}
ConnectionCounters holds point-in-time counters for a connection. All values are cumulative since the connection was created.
type ConnectionDebugState ¶
type ConnectionDebugState struct {
// Closed is true if the connection has been closed.
Closed bool
// LastFailureCode is the code of the most recent failure on this connection.
LastFailureCode DebugFailureCode
// LastFailure is the human-readable description of the most recent failure.
LastFailure string
// LastFailureAt is the timestamp of the most recent failure.
LastFailureAt time.Time
// Protocol is the negotiated protocol version byte.
Protocol byte
// CodecID is the negotiated codec identifier.
CodecID CodecID
// CodecName is the human-readable name of the negotiated codec.
CodecName string
// MaxFrame is the negotiated maximum frame size in bytes.
MaxFrame uint32
// StreamCount is the current number of streams (open and closing).
StreamCount int
// NextSendSeq is the next sequence number to be assigned to an outbound frame.
NextSendSeq uint64
// Counters holds the cumulative connection counters.
Counters ConnectionCounters
// Streams contains a snapshot of each stream's debug state.
Streams []StreamDebugState
// Recovery contains the recovery subsystem state, or nil if recovery is not enabled.
Recovery *RecoveryDebugState
}
ConnectionDebugState is a debug snapshot of a connection at a point in time, including counters, stream states, and recovery state.
type DebugFailureCode ¶
type DebugFailureCode string
DebugFailureCode is a string code identifying the subsystem and failure mode for diagnostic purposes. Values follow the pattern "subsystem.failure_kind".
const ( // DebugFailureNone indicates no failure has been recorded. DebugFailureNone DebugFailureCode = "" // DebugFailureStreamRecvTimeout indicates a stream receive timed out. DebugFailureStreamRecvTimeout DebugFailureCode = "stream.recv_timeout" // DebugFailureStreamEncode indicates message encoding failed on a stream. DebugFailureStreamEncode DebugFailureCode = "stream.encode" // DebugFailureStreamEnqueue indicates a failure to enqueue a frame for sending. DebugFailureStreamEnqueue DebugFailureCode = "stream.enqueue" // DebugFailureStreamProtocol indicates a protocol error on a stream (e.g. unknown message). DebugFailureStreamProtocol DebugFailureCode = "stream.protocol" // DebugFailureStreamProtocolReplySend indicates a failure to send a protocol error reply. DebugFailureStreamProtocolReplySend DebugFailureCode = "stream.protocol_reply_send" // DebugFailureStreamDecode indicates message decoding failed on a stream. DebugFailureStreamDecode DebugFailureCode = "stream.decode" // DebugFailureHandler indicates a handler returned an error. DebugFailureHandler DebugFailureCode = "handler.error" // DebugFailureWriterLoop indicates the connection writer goroutine failed. DebugFailureWriterLoop DebugFailureCode = "connection.writer" // DebugFailureReaderLoop indicates the connection reader goroutine failed. DebugFailureReaderLoop DebugFailureCode = "connection.reader" // DebugFailureReaderEnqueue indicates the reader failed to enqueue a payload to a stream. DebugFailureReaderEnqueue DebugFailureCode = "connection.reader_enqueue" // DebugFailureReconnectResume indicates a recovery resume attempt failed. DebugFailureReconnectResume DebugFailureCode = "recovery.resume" // DebugFailureReconnectTerminal indicates recovery gave up after exhausting retries. DebugFailureReconnectTerminal DebugFailureCode = "recovery.reconnect_terminal" // DebugFailureRecoveryAckWrite indicates a failure to write an ACK control frame. DebugFailureRecoveryAckWrite DebugFailureCode = "recovery.ack_write" // DebugFailureRecoveryResumeWrite indicates a failure to write a resume frame. DebugFailureRecoveryResumeWrite DebugFailureCode = "recovery.resume_write" // DebugFailureRecoveryLiveWrite indicates a failure to write a live data frame during recovery. DebugFailureRecoveryLiveWrite DebugFailureCode = "recovery.live_write" // DebugFailureRecoveryPingWrite indicates a failure to write a heartbeat ping. DebugFailureRecoveryPingWrite DebugFailureCode = "recovery.ping_write" // DebugFailureRecoveryRead indicates an error reading during recovery. DebugFailureRecoveryRead DebugFailureCode = "recovery.read" // DebugFailureRecoveryControl indicates an error processing a recovery control frame. DebugFailureRecoveryControl DebugFailureCode = "recovery.control" // DebugFailureRecoveryData indicates an error processing a recovery data frame. DebugFailureRecoveryData DebugFailureCode = "recovery.data" )
type Envelope ¶
type Envelope struct {
// Wire is the message type name extracted from the payload.
Wire string
// Body holds the codec-specific raw data for deferred decoding.
Body any
}
Envelope is an intermediate decode result that preserves the wire name and raw body without fully deserializing the message payload. Wire is the message type name. Body holds codec-specific raw data: for map codec it is a msgpack.RawMessage, for compact codec it is a []msgpack.RawMessage.
type ErrBadHandshakeMagic ¶
type ErrBadHandshakeMagic struct{}
ErrBadHandshakeMagic indicates that the connection does not speak the adaptivemsg protocol. The initial bytes did not match the expected handshake magic value.
func (ErrBadHandshakeMagic) Error ¶
func (e ErrBadHandshakeMagic) Error() string
type ErrClosed ¶
type ErrClosed struct{}
ErrClosed indicates that an operation was attempted on a closed connection or stream.
type ErrCodec ¶
type ErrCodec struct {
// Message contains the human-readable error detail.
Message string
}
ErrCodec reports a codec encode or decode failure. It is returned when a message cannot be serialized or deserialized by the negotiated codec.
type ErrCompactFieldCount ¶
type ErrCompactFieldCount struct {
// Expected is the number of fields the local struct definition has.
Expected int
// Got is the number of fields found in the incoming payload.
Got int
}
ErrCompactFieldCount indicates that a compact-mode struct has the wrong number of fields. This happens when the sender and receiver disagree on the struct definition.
func (ErrCompactFieldCount) Error ¶
func (e ErrCompactFieldCount) Error() string
type ErrConcurrentRecv ¶
type ErrConcurrentRecv struct{}
ErrConcurrentRecv indicates that concurrent Recv or SendRecv calls were detected on the same stream. Only one receive operation is allowed at a time on a given stream.
func (ErrConcurrentRecv) Error ¶
func (e ErrConcurrentRecv) Error() string
type ErrConnectTimeout ¶
type ErrConnectTimeout struct{}
ErrConnectTimeout indicates that the TCP connect or handshake exceeded the timeout duration configured via Client.WithTimeout().
func (ErrConnectTimeout) Error ¶
func (e ErrConnectTimeout) Error() string
type ErrFrameTooLarge ¶
type ErrFrameTooLarge struct {
// Size is the offending payload size in bytes.
Size int
}
ErrFrameTooLarge indicates that a payload exceeded the negotiated maximum frame size for the connection. The receiver will reject the frame.
func (ErrFrameTooLarge) Error ¶
func (e ErrFrameTooLarge) Error() string
type ErrHandlerTaskBusy ¶
type ErrHandlerTaskBusy struct{}
ErrHandlerTaskBusy indicates that NewTask() was called on a stream that already has a running task. Only one handler task is allowed per stream at a time.
func (ErrHandlerTaskBusy) Error ¶
func (e ErrHandlerTaskBusy) Error() string
type ErrHandshakeRejected ¶
type ErrHandshakeRejected struct{}
ErrHandshakeRejected indicates that the server explicitly rejected the handshake. This is distinct from a version or codec mismatch.
func (ErrHandshakeRejected) Error ¶
func (e ErrHandshakeRejected) Error() string
type ErrInvalidMessage ¶
type ErrInvalidMessage struct {
// Reason contains the human-readable detail of what is invalid.
Reason string
}
ErrInvalidMessage indicates invalid message usage, such as passing a nil value, a non-struct type, or a type that does not satisfy the Message interface.
func (ErrInvalidMessage) Error ¶
func (e ErrInvalidMessage) Error() string
type ErrNoCommonCodec ¶
type ErrNoCommonCodec struct{}
ErrNoCommonCodec indicates that the handshake failed because the client and server have no codec in common. Register the same codec on both sides before connecting.
func (ErrNoCommonCodec) Error ¶
func (e ErrNoCommonCodec) Error() string
type ErrNoCommonVersion ¶
type ErrNoCommonVersion struct {
// ClientMin is the minimum protocol version supported by the client.
ClientMin byte
// ClientMax is the maximum protocol version supported by the client.
ClientMax byte
// ServerMin is the minimum protocol version supported by the server.
ServerMin byte
// ServerMax is the maximum protocol version supported by the server.
ServerMax byte
}
ErrNoCommonVersion indicates that no protocol version overlap exists between the client and server. The fields show each side's supported version range.
func (ErrNoCommonVersion) Error ¶
func (e ErrNoCommonVersion) Error() string
type ErrRecvTimeout ¶
type ErrRecvTimeout struct{}
ErrRecvTimeout indicates that no message was received within the stream's recv timeout, as configured via SetRecvTimeout.
func (ErrRecvTimeout) Error ¶
func (e ErrRecvTimeout) Error() string
type ErrRemote ¶
type ErrRemote struct {
// Code is the error code from the remote ErrorReply.
Code string
// Message is the error message from the remote ErrorReply.
Message string
}
ErrRemote wraps an ErrorReply received from the peer. The Code and Message fields correspond to the remote ErrorReply's fields.
type ErrReplayBufferFull ¶
type ErrReplayBufferFull struct {
// Limit is the configured maximum replay buffer size in bytes.
Limit int64
// Size is the current replay buffer size in bytes.
Size int64
}
ErrReplayBufferFull indicates that the recovery replay buffer has exceeded the configured MaxReplayBytes limit. The connection will be terminated because unacknowledged frames can no longer be retained for replay.
func (ErrReplayBufferFull) Error ¶
func (e ErrReplayBufferFull) Error() string
type ErrResumeRejected ¶
type ErrResumeRejected struct {
// Reason contains the human-readable rejection detail.
Reason string
}
ErrResumeRejected indicates that the server rejected a resume attempt. Common causes include a wrong connection secret, an expired detached session, or a codec mismatch.
func (ErrResumeRejected) Error ¶
func (e ErrResumeRejected) Error() string
type ErrTooManyCodecs ¶
type ErrTooManyCodecs struct {
// Count is the number of codecs offered by the client.
Count int
}
ErrTooManyCodecs indicates that the client offered more codecs than the protocol allows during the handshake.
func (ErrTooManyCodecs) Error ¶
func (e ErrTooManyCodecs) Error() string
type ErrTypeMismatch ¶
type ErrTypeMismatch struct {
// Expected is the wire name of the expected message type.
Expected string
// Got is the wire name of the actually received message.
Got string
}
ErrTypeMismatch indicates that the decoded message type does not match the expected type parameter. This occurs when a typed Recv or SendRecv receives a message with a different wire name than expected.
func (ErrTypeMismatch) Error ¶
func (e ErrTypeMismatch) Error() string
type ErrUnknownMessage ¶
type ErrUnknownMessage struct {
// Name is the unrecognized wire name.
Name string
}
ErrUnknownMessage indicates that a received message has a wire name that is not registered in the connection's message registry.
func (ErrUnknownMessage) Error ¶
func (e ErrUnknownMessage) Error() string
type ErrUnsupportedCodec ¶
type ErrUnsupportedCodec struct {
// Value is the unrecognized codec ID byte.
Value byte
}
ErrUnsupportedCodec indicates that a peer selected a codec that is not registered in the local codec registry.
func (ErrUnsupportedCodec) Error ¶
func (e ErrUnsupportedCodec) Error() string
type ErrUnsupportedFrameVersion ¶
type ErrUnsupportedFrameVersion struct {
// Version is the unrecognized protocol version byte.
Version byte
}
ErrUnsupportedFrameVersion indicates that a peer sent a frame with an unknown protocol version byte. This typically means the remote end is running an incompatible version of the adaptivemsg protocol.
func (ErrUnsupportedFrameVersion) Error ¶
func (e ErrUnsupportedFrameVersion) Error() string
type ErrUnsupportedTransport ¶
type ErrUnsupportedTransport struct {
// Reason contains the human-readable detail of the unsupported feature.
Reason string
}
ErrUnsupportedTransport indicates that a required transport feature is unavailable in the current configuration.
func (ErrUnsupportedTransport) Error ¶
func (e ErrUnsupportedTransport) Error() string
type ErrorReply ¶
ErrorReply is the standard error payload sent over the wire when a handler returns an error or when an explicit error must be communicated to the peer. Its wire name is "am.message.ErrorReply". On the receiving side an ErrorReply is surfaced as ErrRemote by Stream.SendRecv and SendRecvAs.
func NewErrorReply ¶
func NewErrorReply(code, message string) *ErrorReply
NewErrorReply is a convenience constructor that returns an ErrorReply with the given code and human-readable message.
func (*ErrorReply) WireName ¶
func (*ErrorReply) WireName() string
type Link ¶
type Link interface {
// contains filtered or unexported methods
}
Link is a sealed interface that unifies Connection, Stream, and OnceConn as valid targets for SendRecvAs and StreamAs. The unexported isLink method prevents external packages from implementing Link.
type Message ¶
type Message interface{}
Message is the marker interface that all wire payload types must satisfy. Any struct or pointer-to-struct value implicitly implements Message. The wire name used during encoding is derived automatically from the Go type name (namespace.package.TypeName) unless the type also implements NamedMessage to supply a custom name.
type NamedMessage ¶
type NamedMessage interface {
WireName() string
}
NamedMessage is an optional interface that lets a Message type override the default wire name. Implement WireName to return a fixed string that identifies this message type on the wire. This is required for cross-language compatibility — the name returned must match the corresponding Rust wire name exactly.
type Netconn ¶
type Netconn struct {
// contains filtered or unexported fields
}
Netconn is a lightweight descriptor for a peer connection, passed to Server lifecycle callbacks (Server.OnConnect, Server.OnDisconnect). It carries metadata about the remote endpoint but does not expose the underlying transport directly.
type OkReply ¶
type OkReply struct{}
OkReply is the default acknowledgement sent when a message handler returns nil. Its wire name is "am.message.OkReply".
type OnceConn ¶
type OnceConn struct {
// contains filtered or unexported fields
}
OnceConn is a builder for one-shot request-reply over a short-lived connection. Create one with Once, optionally configure it with OnceConn.WithTimeout or OnceConn.WithCodecs, and pass it to SendRecvAs exactly once. An OnceConn is not reusable. The default timeout is 5 seconds, covering both connection establishment and reply wait.
func Once ¶
Once creates a new OnceConn builder targeting the given address. The addr parameter supports "tcp://host:port", "uds://path", "unix://path", or a bare "host:port" (which defaults to TCP). The returned OnceConn can be passed to SendRecvAs to dial, exchange one request-reply, and close the connection.
func (*OnceConn) WithCodecs ¶
WithCodecs overrides the default codec preference list (compact, map) for the short-lived connection's handshake negotiation.
type RecoveryDebugState ¶
type RecoveryDebugState struct {
// Role is "client" or "server", indicating which recovery role this connection has.
Role string
// ConnectionID is the hex-encoded recovery connection identifier.
ConnectionID string
// TransportAttached is true if a live transport is currently connected.
TransportAttached bool
// TransportGen is the generation counter for transport attach/detach cycles.
TransportGen uint64
// ReconnectActive is true if a reconnect attempt is currently in progress.
ReconnectActive bool
// LastRecvSeq is the sequence number of the last received data frame.
LastRecvSeq uint64
// LastAckedSeq is the sequence number of the last acknowledged data frame.
LastAckedSeq uint64
// AckPending is the number of received frames not yet acknowledged.
AckPending uint32
// AckDue is true if an ACK is pending and should be sent soon.
AckDue bool
// AckEvery is the negotiated ACK frequency (every N data frames).
AckEvery uint32
// AckDelay is the negotiated delay before flushing a pending ACK.
AckDelay time.Duration
// HeartbeatInterval is the negotiated interval between heartbeat pings.
HeartbeatInterval time.Duration
// HeartbeatTimeout is the negotiated inactivity timeout for the connection.
HeartbeatTimeout time.Duration
// ReplayQueued is the number of frames currently buffered for replay.
ReplayQueued int
// ReplayBytes is the total size in bytes of frames buffered for replay.
ReplayBytes int64
// LiveQueueDepth is the number of frames in the live send queue.
LiveQueueDepth int
// ResumeQueueDepth is the number of frames queued for resume replay.
ResumeQueueDepth int
}
RecoveryDebugState is a debug snapshot of the recovery subsystem's state at a point in time.
type Server ¶
type Server struct {
// contains filtered or unexported fields
}
Server accepts inbound connections and dispatches them to registered handlers and callbacks. It uses a builder pattern for configuration:
adaptivemsg.NewServer().
OnConnect(func(nc adaptivemsg.Netconn) error { return nil }).
WithCodecs(adaptivemsg.CodecMsgpackCompact).
Serve("tcp://0.0.0.0:9000")
Each accepted connection is handled in its own goroutine. Server.Serve blocks for the lifetime of the server; it only returns if the underlying listener encounters an unrecoverable error.
func NewServer ¶
func NewServer() *Server
NewServer returns a Server with default settings: both MessagePack codecs supported (CodecMsgpackCompact and CodecMsgpackMap), recovery disabled, and no lifecycle callbacks registered.
func (*Server) OnCloseStream ¶
func (s *Server) OnCloseStream(f func(*StreamContext)) *Server
OnCloseStream registers a callback that is invoked when a stream is destroyed, either because the peer closed it or because the parent connection closed. This is useful for per-stream cleanup.
func (*Server) OnConnect ¶
OnConnect registers a callback that is invoked each time a new connection is accepted and the handshake completes. The callback receives a Netconn descriptor for the peer. Returning a non-nil error rejects the connection and closes the underlying transport.
func (*Server) OnDisconnect ¶
OnDisconnect registers a callback that is invoked after a connection has fully closed. The callback receives the same Netconn descriptor that was passed to OnConnect. The return value is currently ignored but reserved for future use; callers should return nil.
func (*Server) OnNewStream ¶
func (s *Server) OnNewStream(f func(*StreamContext)) *Server
OnNewStream registers a callback that is invoked when a new stream is created on any connection managed by this server. This is useful for per-stream initialization such as attaching context or state to the StreamContext.
func (*Server) Serve ¶
Serve binds to addr and enters an accept loop, blocking for the lifetime of the server. Each accepted connection is handled in a dedicated goroutine that performs the protocol handshake and then runs the reader/writer loops.
Supported address formats:
- "tcp://host:port" — TCP listener
- "uds:///path/to/sock" or "unix:///path/to/sock" — Unix domain socket
- "host:port" — bare host:port defaults to TCP
Serve returns a non-nil error only if the listener itself fails (e.g. the address is already in use).
func (*Server) WithCodecs ¶
WithCodecs sets the server's list of supported codecs. During the handshake the server walks the client's ordered preference list and selects the first codec that appears in both lists. If no common codec is found the handshake fails with ErrNoCommonCodec.
func (*Server) WithRecovery ¶
func (s *Server) WithRecovery(opts ServerRecoveryOptions) *Server
WithRecovery enables the v3 protocol extension on the server side, allowing clients to reconnect and resume sessions after transient network failures. See ServerRecoveryOptions for tunables such as detached TTL, replay buffer limits, ACK intervals, and heartbeat configuration.
type ServerRecoveryOptions ¶
type ServerRecoveryOptions struct {
// Enable activates recovery mode. Default: false.
Enable bool
// DetachedTTL is how long the server retains a detached connection's state
// before discarding it. Default: 30s.
DetachedTTL time.Duration
// MaxReplayBytes is the maximum number of bytes buffered for replay of
// unacknowledged messages. Default: 8 MiB.
MaxReplayBytes int64
// AckEvery sends a cumulative ACK every N data frames. Default: 64.
AckEvery uint32
// AckDelay is the delay before flushing a pending ACK. Default: 20ms.
AckDelay time.Duration
// HeartbeatInterval is the interval between heartbeat pings when the
// connection is idle. Default: 30s.
HeartbeatInterval time.Duration
// HeartbeatTimeout closes the connection if no inbound frame is received
// within this duration. Must be ≥ 2×HeartbeatInterval. Default: 90s.
HeartbeatTimeout time.Duration
}
ServerRecoveryOptions controls server-side recovery behavior for connection retention and ACK policy. When Enable is true, the server supports the v3 protocol with attach/resume semantics. ACK and heartbeat fields are authoritative for the connection and are sent to the client during attach/resume.
type Stream ¶
type Stream[T any] struct { // contains filtered or unexported fields }
Stream is a typed view over a single multiplexed stream on a Connection.
The type parameter T determines how Stream.SendRecv and Stream.Recv decode incoming messages. When T is an interface (e.g. Message), the connection's message registry is consulted to resolve the concrete type from the wire name. When T is a concrete type (e.g. *EchoReply), the payload is decoded directly into that type without a registry lookup.
Create a Stream with Connection.NewStream (which returns Stream[Message]) or convert an existing stream to a different reply type with StreamAs.
Send is safe for concurrent use by multiple goroutines. Recv and SendRecv must not be called concurrently on the same Stream; doing so returns ErrConcurrentRecv.
func StreamAs ¶
StreamAs creates a typed Stream[T] view from any Link. It does not allocate a new stream — it wraps the existing stream core with the requested type parameter. Returns nil if v is nil or has no underlying stream core.
func (*Stream[T]) Close ¶
func (s *Stream[T]) Close()
Close removes this stream from its connection and releases associated resources. After Close returns, subsequent Send and Recv calls on this stream will return ErrClosed. Close is idempotent.
func (*Stream[T]) DebugState ¶
func (s *Stream[T]) DebugState() StreamDebugState
DebugState returns a point-in-time snapshot of this stream's debug counters and state.
func (*Stream[T]) ID ¶
ID returns the numeric stream identifier assigned by the connection. Stream IDs are unique within a single connection.
func (*Stream[T]) PeekWire ¶
PeekWire returns the wire name of the next pending message without consuming it. The message remains available for a subsequent Stream.Recv or Stream.SendRecv call. PeekWire is useful for dispatching based on message type before committing to a decode.
func (*Stream[T]) Recv ¶
Recv blocks until the next message arrives on this stream and decodes it as type T. It returns ErrRecvTimeout if the receive timeout expires, ErrClosed if the connection is shut down, or ErrTypeMismatch if the decoded message cannot be converted to T.
Only one goroutine may call Recv (or SendRecv) at a time; a concurrent call returns ErrConcurrentRecv.
func (*Stream[T]) Send ¶
Send encodes msg with the negotiated codec and enqueues the resulting frame for transmission on this stream. Send returns as soon as the frame is queued; it does not wait for the peer to receive it.
Send is safe for concurrent use. It returns ErrClosed if the connection has been shut down.
func (*Stream[T]) SendRecv ¶
SendRecv sends msg on this stream and blocks until a reply of type T is received. If the peer handler returns an error, the reply is decoded as an ErrorReply and returned as ErrRemote. If the incoming message cannot be converted to T, ErrTypeMismatch is returned. The call also respects the receive timeout set by Stream.SetRecvTimeout.
func (*Stream[T]) SetRecvTimeout ¶
SetRecvTimeout sets the maximum duration that Stream.Recv and Stream.SendRecv will block waiting for a message on this stream. A zero or negative value disables the timeout, causing receives to block indefinitely until a message arrives or the connection closes. The default is no timeout.
type StreamContext ¶
type StreamContext struct {
// contains filtered or unexported fields
}
StreamContext carries per-stream state accessible to message handlers. Each stream has its own StreamContext instance. Handlers receive it as their first argument and can use it to store per-stream session state (e.g., authentication info, user ID) and to spawn a background task via StreamContext.NewTask.
func (*StreamContext) GetContext ¶
func (sc *StreamContext) GetContext() any
GetContext retrieves the value previously stored with StreamContext.SetContext. It returns nil if no value has been set. GetContext is safe for concurrent use.
func (*StreamContext) NewTask ¶
func (sc *StreamContext) NewTask(fn func(*Stream[Message])) error
NewTask spawns a background goroutine tied to this stream. The function fn receives the stream for bidirectional communication and runs until it returns. Only one task may be active per stream at a time; calling NewTask while a previous task is still running returns ErrHandlerTaskBusy. NewTask is useful for long-running handlers such as streaming responses.
func (*StreamContext) SetContext ¶
func (sc *StreamContext) SetContext(value any)
SetContext stores an arbitrary value in this stream's context. The value can later be retrieved with StreamContext.GetContext or ContextAs. SetContext is safe for concurrent use.
type StreamCounters ¶
type StreamCounters struct {
// DataMessagesSent is the total number of application-level messages sent on this stream.
DataMessagesSent uint64
// DataMessagesReceived is the total number of application-level messages received on this stream.
DataMessagesReceived uint64
// ProtocolErrors is the total number of protocol errors detected on this stream.
ProtocolErrors uint64
// ProtocolErrorReplySendFailure is how many times sending a protocol error reply failed on this stream.
ProtocolErrorReplySendFailure uint64
// RemoteErrors is the total number of remote ErrorReply messages received on this stream.
RemoteErrors uint64
// DecodeErrors is the total number of message decode failures on this stream.
DecodeErrors uint64
// HandlerCalls is the total number of handler invocations on this stream.
HandlerCalls uint64
// HandlerErrors is the total number of handler invocations that returned an error on this stream.
HandlerErrors uint64
}
StreamCounters holds point-in-time counters for a single stream. All values are cumulative since the stream was opened.
type StreamDebugState ¶
type StreamDebugState struct {
// ID is the stream's numeric identifier within the connection.
ID uint32
// Closed is true if the stream has been closed.
Closed bool
// LastFailureCode is the code of the most recent failure on this stream.
LastFailureCode DebugFailureCode
// LastFailure is the human-readable description of the most recent failure.
LastFailure string
// LastFailureAt is the timestamp of the most recent failure.
LastFailureAt time.Time
// RecvTimeout is the stream's current receive timeout duration.
RecvTimeout time.Duration
// InboxDepth is the number of decoded messages waiting in the stream's inbox.
InboxDepth int
// IncomingDepth is the number of raw frames waiting to be decoded.
IncomingDepth int
// HandlerQDepth is the number of messages queued for handler dispatch.
HandlerQDepth int
// Counters holds the cumulative stream counters.
Counters StreamCounters
}
StreamDebugState is a debug snapshot of a stream's state at a point in time.
Source Files
¶
- client.go
- codec.go
- codec_msgpack.go
- codec_registry.go
- connection.go
- context.go
- debug.go
- doc.go
- error.go
- frame.go
- frame_queue.go
- generic.go
- message.go
- once.go
- protocol.go
- raw_message.go
- recovery.go
- recovery_protocol.go
- recovery_runtime.go
- registry.go
- replay.go
- server.go
- stream.go
- transport_tcp.go
- transport_uds.go
- type_info.go
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
amgen-go
command
|
|
|
examples
|
|
|
echo/cmd/client
command
|
|
|
echo/cmd/server
command
|
|
|
hello/cmd/client
command
|
|
|
hello/cmd/server
command
|