protocol

package
v0.1.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 19, 2026 License: Apache-2.0 Imports: 5 Imported by: 0

Documentation

Overview

Package protocol implements the Croupier wire protocol over NNG.

Message Format:

Header (8 bytes):
  ┌─────────┬──────────┬─────────────────┐
  │ Version │ MsgID    │ RequestID       │
  │ (1B)    │ (3B)     │ (4B)            │
  └─────────┴──────────┴─────────────────┘

Body: protobuf serialized message

Request messages have odd MsgID, Response messages have even MsgID. The RequestID is used to match responses to their requests.

Package protocol provides message type registry for Croupier protocol.

Index

Constants

View Source
const (
	// Version1 is the current protocol version.
	Version1 = 0x01

	// HeaderSize is the fixed size of the message header in bytes.
	HeaderSize = 8 // Version(1) + MsgID(3) + RequestID(4)
)
View Source
const (
	// ControlService (0x01xx)
	MsgRegisterRequest          = 0x010101
	MsgRegisterResponse         = 0x010102
	MsgHeartbeatRequest         = 0x010103
	MsgHeartbeatResponse        = 0x010104
	MsgRegisterCapabilitiesReq  = 0x010105
	MsgRegisterCapabilitiesResp = 0x010106

	// ClientService (0x02xx)
	MsgRegisterClientRequest   = 0x020101
	MsgRegisterClientResponse  = 0x020102
	MsgClientHeartbeatRequest  = 0x020103
	MsgClientHeartbeatResponse = 0x020104
	MsgListClientsRequest      = 0x020105
	MsgListClientsResponse     = 0x020106
	MsgGetJobResultRequest     = 0x020107
	MsgGetJobResultResponse    = 0x020108

	// InvokerService (0x03xx)
	MsgInvokeRequest     = 0x030101
	MsgInvokeResponse    = 0x030102
	MsgStartJobRequest   = 0x030103
	MsgStartJobResponse  = 0x030104
	MsgStreamJobRequest  = 0x030105
	MsgJobEvent          = 0x030106 // Stream event (not request/response)
	MsgCancelJobRequest  = 0x030107
	MsgCancelJobResponse = 0x030108

	// OpsService (0x04xx)
	MsgGetSystemInfoRequest   = 0x040101
	MsgGetSystemInfoResponse  = 0x040102
	MsgListProcessesRequest   = 0x040103
	MsgListProcessesResponse  = 0x040104
	MsgReportMetricsRequest   = 0x040105
	MsgReportMetricsResponse  = 0x040106
	MsgStreamMetricsRequest   = 0x040107
	MsgMetricEvent            = 0x040108 // Stream event
	MsgRestartProcessRequest  = 0x040109
	MsgRestartProcessResponse = 0x04010A
	MsgStopProcessRequest     = 0x04010B
	MsgStopProcessResponse    = 0x04010C
	MsgStartProcessRequest    = 0x04010D
	MsgStartProcessResponse   = 0x04010E
	MsgExecuteCommandRequest  = 0x04010F
	MsgExecuteCommandResponse = 0x040110
	// System services (platform-specific)
	MsgListServicesRequest      = 0x040111
	MsgListServicesResponse     = 0x040112
	MsgGetServiceStatusRequest  = 0x040113
	MsgGetServiceStatusResponse = 0x040114

	// LocalControlService (0x05xx) - Agent local function registration
	MsgRegisterLocalRequest   = 0x050101
	MsgRegisterLocalResponse  = 0x050102
	MsgHeartbeatLocalRequest  = 0x050103
	MsgHeartbeatLocalResponse = 0x050104
	MsgListLocalRequest       = 0x050105
	MsgListLocalResponse      = 0x050106
)

Message type constants (24 bits).

Variables

This section is empty.

Functions

func GetMsgID

func GetMsgID(buf []byte) uint32

GetMsgID decodes a 24-bit MsgID from buf in big-endian order.

func GetResponseMsgID

func GetResponseMsgID(reqMsgID uint32) uint32

GetResponseMsgID returns the response MsgID for a given request MsgID.

func IsRequest

func IsRequest(msgID uint32) bool

IsRequest returns true if the MsgID indicates a request message.

func IsResponse

func IsResponse(msgID uint32) bool

IsResponse returns true if the MsgID indicates a response message.

func MsgIDString

func MsgIDString(msgID uint32) string

MsgIDString returns a human-readable string representation of the MsgID.

func NewMessageBody

func NewMessageBody(msgID uint32, reqID uint32, data []byte) []byte

NewMessageBody creates a body with protocol header prefix and data.

func NewRequestMessage

func NewRequestMessage(msgID uint32, reqID uint32, body proto.Message) (*mangos.Message, error)

NewRequestMessage creates a new request message. The caller must not use the returned message after calling SendMsg.

func NewResponseMessage

func NewResponseMessage(msgID uint32, reqID uint32, body proto.Message) (*mangos.Message, error)

NewResponseMessage creates a new response message with the given RequestID. The caller must not use the returned message after calling SendMsg.

func NewStreamMessage

func NewStreamMessage(msgID uint32, reqID uint32, body proto.Message) (*mangos.Message, error)

NewStreamMessage creates a new stream message (e.g., JobEvent). The caller must not use the returned message after calling SendMsg.

func ParseMessage

func ParseMessage(msg *mangos.Message) (version uint8, msgID uint32, reqID uint32, body []byte, err error)

ParseMessage parses a received message. It returns the version, message ID, request ID, and body.

func ParseMessageFromBody

func ParseMessageFromBody(body []byte) (version uint8, msgID uint32, reqID uint32, data []byte, err error)

ParseMessageFromBody parses a message from a body that contains the protocol header as a prefix. Body format: [8-byte header][actual data]

func PutMsgID

func PutMsgID(buf []byte, msgID uint32)

PutMsgID encodes a 24-bit MsgID into buf in big-endian order.

Types

type Registry

type Registry struct {
	// contains filtered or unexported fields
}

Registry maps message IDs to protobuf message factories.

func NewRegistry

func NewRegistry() *Registry

NewRegistry creates a new empty message registry.

func (*Registry) Create

func (r *Registry) Create(msgID uint32) (proto.Message, error)

Create creates a new instance of the message with the given MsgID.

func (*Registry) MustRegister

func (r *Registry) MustRegister(msgID uint32, factory func() proto.Message)

MustRegister registers a message type and panics on error. Useful for package-level initialization.

func (*Registry) Register

func (r *Registry) Register(msgID uint32, factory func() proto.Message)

Register registers a message type with its factory function. The factory should return a new instance of the protobuf message.

func (*Registry) RegisterBatch

func (r *Registry) RegisterBatch(types map[uint32]func() proto.Message)

RegisterBatch registers multiple message types at once.

func (*Registry) Unmarshal

func (r *Registry) Unmarshal(msgID uint32, body []byte) (proto.Message, error)

Unmarshal unmarshals the body bytes into the appropriate message type.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL