Documentation
¶
Overview ¶
Package libatbus_types defines shared types and interfaces for libatbus.
Index ¶
- Constants
- func InternalSetDelegateIsCompressionAlgorithmSupported(delegate func(algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) bool)
- func IsCompressionAlgorithmSupported(algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) bool
- func SetDefaultIoStreamConfigure(conf *IoStreamConfigure)
- func SetDefaultNodeConfigure(conf *NodeConfigure)
- func SetDefaultStartConfigure(conf *StartConfigure)
- type BusIdType
- type ChannelAddress
- type Connection
- type ConnectionContext
- type ConnectionFlag
- type ConnectionState
- type ConnectionStatistic
- type Endpoint
- type EndpointCollectionType
- type EndpointFlag
- type ErrorType
- type IoStreamCallbackEventHandleSet
- type IoStreamCallbackEventType
- type IoStreamCallbackFunc
- type IoStreamChannel
- type IoStreamChannelFlag
- type IoStreamConfigure
- type IoStreamConnection
- type IoStreamConnectionFlag
- type IoStreamConnectionStatus
- type Message
- func (m *Message) Body() *protocol.MessageBody
- func (m *Message) GetBody() *protocol.MessageBody
- func (m *Message) GetBodyType() MessageBodyType
- func (m *Message) GetHead() *protocol.MessageHead
- func (m *Message) GetUnpackErrorMessage() string
- func (m *Message) Head() *protocol.MessageHead
- func (m *Message) MutableBody() *protocol.MessageBody
- func (m *Message) MutableHead() *protocol.MessageHead
- func (m *Message) SetUnpackError(err string)
- type MessageBodyType
- type Node
- type NodeConfigure
- type NodeEventHandleSet
- type NodeFlag
- type NodeGetPeerOptionFlag
- type NodeGetPeerOptions
- type NodeOnCloseConnectionFunc
- type NodeOnCustomCommandRequestFunc
- type NodeOnCustomCommandResponseFunc
- type NodeOnEndpointEventFunc
- type NodeOnForwardRequestFunc
- type NodeOnForwardResponseFunc
- type NodeOnInvalidConnectionFunc
- type NodeOnNewConnectionFunc
- type NodeOnNodeDownFunc
- type NodeOnNodeUpFunc
- type NodeOnPingPongEndpointFunc
- type NodeOnRegisterFunc
- type NodeOnTopologyUpdateUpstreamFunc
- type NodeSendDataOptionFlag
- type NodeSendDataOptions
- type NodeState
- type StartConfigure
- type TimerDescPair
- type TimerDescType
- type TopologyData
- type TopologyPeer
- type TopologyPolicyRule
- type TopologyRegistry
- type TopologyRelationType
Constants ¶
const ( MessageBodyTypeUnknown = protocol.MessageBody_EnMessageTypeID_NONE MessageBodyTypeCustomCommandReq = protocol.MessageBody_EnMessageTypeID_CustomCommandReq MessageBodyTypeCustomCommandRsp = protocol.MessageBody_EnMessageTypeID_CustomCommandRsp MessageBodyTypeDataTransformReq = protocol.MessageBody_EnMessageTypeID_DataTransformReq MessageBodyTypeDataTransformRsp = protocol.MessageBody_EnMessageTypeID_DataTransformRsp MessageBodyTypeNodeRegisterReq = protocol.MessageBody_EnMessageTypeID_NodeRegisterReq MessageBodyTypeNodeRegisterRsp = protocol.MessageBody_EnMessageTypeID_NodeRegisterRsp MessageBodyTypeNodePingReq = protocol.MessageBody_EnMessageTypeID_NodePingReq MessageBodyTypeNodePongRsp = protocol.MessageBody_EnMessageTypeID_NodePongRsp MessageBodyTypeNodeMax = MessageBodyTypeNodePongRsp )
MessageBodyType constants using generated enum values.
const (
ATBUS_MACRO_MAX_FRAME_HEADER uint64 = 4096 // 最大数据包头长度
)
Variables ¶
This section is empty.
Functions ¶
func InternalSetDelegateIsCompressionAlgorithmSupported ¶
func InternalSetDelegateIsCompressionAlgorithmSupported(delegate func(algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) bool)
func IsCompressionAlgorithmSupported ¶
func IsCompressionAlgorithmSupported(algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) bool
func SetDefaultIoStreamConfigure ¶
func SetDefaultIoStreamConfigure(conf *IoStreamConfigure)
SetDefaultIoStreamConfigure sets the default values for IoStreamConfigure.
func SetDefaultNodeConfigure ¶
func SetDefaultNodeConfigure(conf *NodeConfigure)
func SetDefaultStartConfigure ¶
func SetDefaultStartConfigure(conf *StartConfigure)
Types ¶
type ChannelAddress ¶
type ChannelAddress interface {
GetAddress() string // Full address string (e.g., "tcp://127.0.0.1:8080")
GetScheme() string // Protocol scheme (e.g., "tcp", "unix", "shm")
GetHost() string // Host part of the address
GetPort() int // Port number (0 if not specified)
}
ChannelAddress represents a parsed channel address with scheme, host, port and full address.
type Connection ¶
type Connection interface {
Reset()
Proc() ErrorType
Listen() ErrorType
Connect() ErrorType
Disconnect() ErrorType
Push(buffer []byte) ErrorType
AddStatFault() uint64
ClearStatFault()
GetAddress() ChannelAddress
IsConnected() bool
IsRunning() bool
GetBinding() Endpoint
GetStatus() ConnectionState
CheckFlag(flag ConnectionFlag) bool
SetTemporary()
GetStatistic() ConnectionStatistic
GetConnectionContext() ConnectionContext
RemoveOwnerChecker()
}
type ConnectionContext ¶
type ConnectionContext interface {
// IsClosing reports whether the connection is in a closing state.
IsClosing() bool
// SetClosing sets the closing state of the connection.
SetClosing(closing bool)
// IsHandshakeDone reports whether the handshake has been completed.
IsHandshakeDone() bool
// GetHandshakeStartTime returns the time when the handshake was started.
GetHandshakeStartTime() time.Time
// GetCryptoKeyExchangeAlgorithm returns the key exchange algorithm used for crypto handshake.
GetCryptoKeyExchangeAlgorithm() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
// GetCryptoSelectKdfType returns the KDF type selected during handshake.
GetCryptoSelectKdfType() protocol.ATBUS_CRYPTO_KDF_TYPE
// GetCryptoSelectAlgorithm returns the crypto algorithm selected during handshake.
GetCryptoSelectAlgorithm() protocol.ATBUS_CRYPTO_ALGORITHM_TYPE
// GetCompressSelectAlgorithm returns the compression algorithm selected during handshake.
GetCompressSelectAlgorithm() protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE
// GetNextSequence returns the next sequence number for message ordering.
// Each call increments the internal counter and returns the new value.
GetNextSequence() uint64
// PackMessage packs a Message into a StaticBufferBlock for transmission.
//
// Parameters:
// - m: the message to pack (head will be modified with version, body_size, etc.)
// - protocolVersion: the protocol version to set in the message head
// - maxBodySize: maximum allowed body size (0 means no limit)
//
// Returns:
// - StaticBufferBlock containing the packed message data
// - error code if packing fails, EN_ATBUS_ERR_SUCCESS on success
PackMessage(m *Message, protocolVersion int32, maxBodySize int) (*buffer.StaticBufferBlock, ErrorType)
// UnpackMessage unpacks binary data into a Message.
//
// Parameters:
// - m: the message to populate (will be modified)
// - input: the binary data to unpack
// - maxBodySize: maximum allowed body size (0 means no limit)
//
// Returns:
// - error code if unpacking fails, EN_ATBUS_ERR_SUCCESS on success
UnpackMessage(m *Message, input []byte, maxBodySize int) ErrorType
// HandshakeGenerateSelfKey generates the local ECDH key pair for handshake.
//
// In client mode, peerSequenceId should be 0 to generate a new sequence.
// In server mode, peerSequenceId should be the peer's handshake sequence.
//
// Returns error code, EN_ATBUS_ERR_SUCCESS on success.
HandshakeGenerateSelfKey(peerSequenceId uint64) ErrorType
// HandshakeReadPeerKey reads the peer's public key and computes the shared secret.
//
// Parameters:
// - peerPubKey: the peer's handshake data containing public key
// - supportedCryptoAlgorithms: list of locally supported crypto algorithms
//
// Returns error code, EN_ATBUS_ERR_SUCCESS on success.
HandshakeReadPeerKey(peerPubKey *protocol.CryptoHandshakeData,
supportedCryptoAlgorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) ErrorType
// HandshakeWriteSelfPublicKey writes the local public key to the handshake data structure.
//
// Parameters:
// - selfPubKey: output handshake data to populate with local public key
// - supportedCryptoAlgorithms: list of locally supported crypto algorithms
//
// Returns error code, EN_ATBUS_ERR_SUCCESS on success.
HandshakeWriteSelfPublicKey(
selfPubKey *protocol.CryptoHandshakeData,
supportedCryptoAlgorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) ErrorType
// UpdateCompressionAlgorithm updates the list of supported compression algorithms.
//
// Returns error code, EN_ATBUS_ERR_SUCCESS on success.
UpdateCompressionAlgorithm(algorithm []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) ErrorType
// SetupCryptoWithKey directly sets the encryption key and IV, skipping key exchange.
// This is primarily used for testing purposes.
//
// Parameters:
// - algorithm: the crypto algorithm type
// - key: the encryption key data
// - iv: the initialization vector data
//
// Returns error code, EN_ATBUS_ERR_SUCCESS on success.
SetupCryptoWithKey(algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE, key []byte, iv []byte) ErrorType
}
ConnectionContext abstracts the per-connection state and the pack/unpack flow.
This interface is the Go equivalent of C++ atbus::connection_context. The pack/unpack methods match the C++ signatures:
- pack_message(message&, protocol_version, random_engine, max_body_size) -> buffer_result_t
- unpack_message(message&, input, max_body_size) -> int
This interface is implemented by [libatbus_impl.ConnectionContext]. Keep it minimal and stable: only add methods that are required by callers.
type ConnectionFlag ¶
type ConnectionFlag uint32
const ( ConnectionFlag_RegProc ConnectionFlag = 0x0001 /** 注册了proc记录到node,清理的时候需要移除 **/ ConnectionFlag_RegFd ConnectionFlag = 0x0002 /** 关联了fd到node或endpoint,清理的时候需要移除 **/ ConnectionFlag_Resetting ConnectionFlag = 0x0010 /** 正在执行重置(防止递归死循环) **/ ConnectionFlag_Destructing ConnectionFlag = 0x0020 /** 正在执行析构(屏蔽某些接口) **/ ConnectionFlag_ListenFd ConnectionFlag = 0x0040 /** 是否是用于listen的连接 **/ ConnectionFlag_Temporary ConnectionFlag = 0x0080 /** 是否是临时连接 **/ ConnectionFlag_PeerClosed ConnectionFlag = 0x0100 /** 对端已关闭 **/ ConnectionFlag_ServerMode ConnectionFlag = 0x0200 /** 连接处于服务端模式 **/ ConnectionFlag_ClientMode ConnectionFlag = 0x0400 /** 连接处于客户端模式 **/ )
type ConnectionState ¶
type ConnectionState int32
const ( ConnectionState_Disconnected ConnectionState = 0 ConnectionState_Connecting ConnectionState = 1 ConnectionState_Handshaking ConnectionState = 2 ConnectionState_Connected ConnectionState = 3 ConnectionState_Disconnecting ConnectionState = 4 )
type ConnectionStatistic ¶
type Endpoint ¶
type Endpoint interface {
GetOwner() Node
Reset()
GetId() BusIdType
GetPid() int32
GetHostname() string
GetHashCode() string
UpdateHashCode(code string)
AddConnection(conn Connection, forceData bool) bool
RemoveConnection(conn Connection) bool
IsAvailable() bool
GetFlags() uint32
GetFlag(f EndpointFlag) bool
SetFlag(f EndpointFlag, v bool)
GetListenAddress() []ChannelAddress
ClearListenAddress()
AddListenAddress(addr string)
UpdateSupportSchemes(schemes []string)
IsSchemeSupported(scheme string) bool
AddPingTimer()
ClearPingTimer()
// ============== connection functions ==============
GetCtrlConnection(peer Endpoint) Connection
GetDataConnection(peer Endpoint, enableFallbackCtrl bool) Connection
GetDataConnectionCount(enableFallbackCtrl bool) int
// ============== statistic functions ==============
AddStatisticFault() uint64
ClearStatisticFault()
SetStatisticUnfinishedPing(p uint64)
GetStatisticUnfinishedPing() uint64
SetStatisticPingDelay(pd time.Duration, pongTimepoint time.Time)
GetStatisticPingDelay() time.Duration
GetStatisticLastPong() time.Time
GetStatisticCreatedTime() time.Time
GetStatisticPushStartTimes() uint64
GetStatisticPushStartSize() uint64
GetStatisticPushSuccessTimes() uint64
GetStatisticPushSuccessSize() uint64
GetStatisticPushFailedTimes() uint64
GetStatisticPushFailedSize() uint64
GetStatisticPullTimes() uint64
GetStatisticPullSize() uint64
}
type EndpointCollectionType ¶
type EndpointFlag ¶
type EndpointFlag uint32
const ( EndpointFlag_Resetting EndpointFlag = 0x0001 /** 正在执行重置(防止递归死循环) **/ EndpointFlag_ConnectionSorted EndpointFlag = 0x0002 EndpointFlag_Destructing EndpointFlag = 0x0004 /** 正在执行析构 **/ EndpointFlag_HasListenPorc EndpointFlag = 0x0008 /** 是否有proc类的listen地址 **/ EndpointFlag_HasListenFd EndpointFlag = 0x0010 /** 是否有fd类的listen地址 **/ EndpointFlag_MutableFlags EndpointFlag = 0x0020 /** 可动态变化的属性起始边界 **/ EndpointFlag_HasPingTimer EndpointFlag = 0x0040 /** 是否设置了ping定时器 **/ )
type ErrorType ¶
type ErrorType = error_code.ErrorType
type IoStreamCallbackEventHandleSet ¶
type IoStreamCallbackEventHandleSet struct {
// contains filtered or unexported fields
}
func (*IoStreamCallbackEventHandleSet) GetCallback ¶
func (h *IoStreamCallbackEventHandleSet) GetCallback(eventType IoStreamCallbackEventType) IoStreamCallbackFunc
GetCallback returns the callback for the specified event type.
func (*IoStreamCallbackEventHandleSet) SetCallback ¶
func (h *IoStreamCallbackEventHandleSet) SetCallback(eventType IoStreamCallbackEventType, callback IoStreamCallbackFunc)
SetCallback sets the callback for the specified event type.
type IoStreamCallbackEventType ¶
type IoStreamCallbackEventType int32
const ( IoStreamCallbackEventType_Accepted IoStreamCallbackEventType = 0 IoStreamCallbackEventType_Connected IoStreamCallbackEventType = 1 IoStreamCallbackEventType_Disconnected IoStreamCallbackEventType = 2 IoStreamCallbackEventType_Received IoStreamCallbackEventType = 3 IoStreamCallbackEventType_Written IoStreamCallbackEventType = 4 IoStreamCallbackEventType_Max IoStreamCallbackEventType = 5 )
type IoStreamCallbackFunc ¶
type IoStreamCallbackFunc func(channel IoStreamChannel, conn IoStreamConnection, status int32, privData interface{})
type IoStreamChannel ¶
type IoStreamChannel interface {
GetContext() context.Context
SetFlag(f IoStreamConnectionFlag, v bool)
GetFlag(f IoStreamConnectionFlag) bool
// 事件响应函数集合
GetEventHandleSet() *IoStreamCallbackEventHandleSet
GetStatisticActiveRequestCount() uint64
GetStatisticReadNetEgainCount() uint64
GetStatisticCheckBlockSizeFailedCount() uint64
GetStatisticCheckHashFailedCount() uint64
// 自定义数据区域
SetPrivateData(data interface{})
GetPrivateData() interface{}
// 核心操作方法
Listen(addr string) ErrorType
Connect(addr string) (IoStreamConnection, ErrorType)
Send(conn IoStreamConnection, data []byte) ErrorType
Disconnect(conn IoStreamConnection) ErrorType
Close() ErrorType
}
type IoStreamChannelFlag ¶
type IoStreamChannelFlag uint16
const ( IoStreamChannelFlag_IsLoopOwner IoStreamChannelFlag = 0x01 IoStreamChannelFlag_Closing IoStreamChannelFlag = 0x02 IoStreamChannelFlag_InCallback IoStreamChannelFlag = 0x04 )
type IoStreamConfigure ¶
type IoStreamConfigure struct {
Keepalive time.Duration
NoBlock bool
NoDelay bool
SendBufferStatic uint64
ReceiveBufferStatic uint64
SendBufferMaxSize uint64
SendBufferLimitSize uint64
ReceiveBufferMaxSize uint64
ReceiveBufferLimitSize uint64
Backlog int32
ConfirmTimeout time.Duration
MaxReadNetEgainCount uint64
MaxReadCheckBlockSizeFailedCount uint64
MaxReadCheckHashFailedCount uint64
}
type IoStreamConnection ¶
type IoStreamConnection interface {
GetAddress() ChannelAddress
GetStatus() IoStreamConnectionStatus
SetFlag(f IoStreamConnectionFlag, v bool)
GetFlag(f IoStreamConnectionFlag) bool
GetChannel() IoStreamChannel
// 事件响应函数集合
GetEventHandleSet() *IoStreamCallbackEventHandleSet
// 主动关闭连接的回调(为了减少额外分配而采用的缓存策略)
GetProactivelyDisconnectCallback() IoStreamCallbackFunc
GetReadBufferManager() *buffer.BufferManager
// 自定义数据区域
SetPrivateData(data interface{})
GetPrivateData() interface{}
}
type IoStreamConnectionFlag ¶
type IoStreamConnectionFlag uint16
const ( IoStreamConnectionFlag_Listen IoStreamConnectionFlag = 0x01 IoStreamConnectionFlag_Connect IoStreamConnectionFlag = 0x02 IoStreamConnectionFlag_Accept IoStreamConnectionFlag = 0x04 IoStreamConnectionFlag_Writing IoStreamConnectionFlag = 0x08 IoStreamConnectionFlag_Closing IoStreamConnectionFlag = 0x10 )
type IoStreamConnectionStatus ¶
type IoStreamConnectionStatus uint16
const ( IoStreamConnectionStatus_Created IoStreamConnectionStatus = 0 IoStreamConnectionStatus_Connected IoStreamConnectionStatus = 1 IoStreamConnectionStatus_Disconnecting IoStreamConnectionStatus = 2 IoStreamConnectionStatus_Disconnected IoStreamConnectionStatus = 3 )
type Message ¶
type Message struct {
// contains filtered or unexported fields
}
Message is the Go equivalent of C++ atbus::message.
It wraps the protobuf-generated protocol.MessageHead and protocol.MessageBody, and provides accessor methods similar to the C++ class. The Go version does not use protobuf Arena or inplace caching; it holds the structs directly.
func (*Message) Body ¶
func (m *Message) Body() *protocol.MessageBody
Body returns the message body, creating an empty one if nil.
func (*Message) GetBody ¶
func (m *Message) GetBody() *protocol.MessageBody
GetBody returns the message body (may be nil).
func (*Message) GetBodyType ¶
func (m *Message) GetBodyType() MessageBodyType
GetBodyType returns the body oneof case based on the set message type. It uses the generated GetMessageTypeOneofCase() method.
func (*Message) GetHead ¶
func (m *Message) GetHead() *protocol.MessageHead
GetHead returns the message head (may be nil).
func (*Message) GetUnpackErrorMessage ¶
GetUnpackErrorMessage returns any unpack error message stored in the message.
func (*Message) Head ¶
func (m *Message) Head() *protocol.MessageHead
Head returns the message head, creating an empty one if nil.
func (*Message) MutableBody ¶
func (m *Message) MutableBody() *protocol.MessageBody
MutableBody returns a mutable reference to the message body.
func (*Message) MutableHead ¶
func (m *Message) MutableHead() *protocol.MessageHead
MutableHead returns a mutable reference to the message head.
func (*Message) SetUnpackError ¶
SetUnpackError sets the unpack error message.
type MessageBodyType ¶
type MessageBodyType = protocol.MessageBody_EnMessageTypeID
MessageBodyType is the Go equivalent of C++ message_body_type (the oneof case). It aliases protocol.MessageBody_EnMessageTypeID for convenience.
type Node ¶
type Node interface {
Init(id BusIdType, conf *NodeConfigure) ErrorType
ReloadCrypto(cryptoKeyExchangeType protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE,
cryptoKeyRefreshInterval time.Duration,
cryptoAllowAlgorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) ErrorType
ReloadCompression(compressionAllowAlgorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE,
compressionLevel protocol.ATBUS_COMPRESSION_LEVEL) ErrorType
Start() ErrorType
StartWithConfigure(conf *StartConfigure) ErrorType
Reset() ErrorType
Proc(now time.Time) ErrorType
Poll() ErrorType
Listen(address string) ErrorType
Connect(address string) ErrorType
ConnectWithEndpoint(address string, ep Endpoint) ErrorType
Disconnect(id BusIdType) ErrorType
GetCryptoKeyExchangeType() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
// ================= send/receive functions =================
// SendData 发送数据。
//
// Parameters:
// - tid: 发送目标 ID
// - t: 自定义类型,将作为 message.head.type 字段传递,可用于业务区分服务类型
// - data: 要发送的数据块
//
// Return: 0 或错误码。
//
// Note: 接收端收到的数据很可能不是地址对齐的,所以不建议发送内存数据对象(struct/class)。
// 如果必须发送内存数据对象,接收端一定要 memcpy,不能直接类型转换,除非手动设置了地址对齐规则。
SendData(tid BusIdType, t int32, data []byte) ErrorType
// SendDataWithOptions 发送数据(带选项)。
//
// Parameters:
// - tid: 发送目标 ID
// - t: 自定义类型,将作为 message.head.type 字段传递,可用于业务区分服务类型
// - data: 要发送的数据块
// - options: 发送选项;如果未设置 sequence,会自动分配并写回
//
// Return: 0 或错误码。
//
// Note: 接收端收到的数据很可能不是地址对齐的,所以不建议发送内存数据对象(struct/class)。
// 如果必须发送内存数据对象,接收端一定要 memcpy,不能直接类型转换,除非手动设置了地址对齐规则。
SendDataWithOptions(tid BusIdType, t int32, data []byte, options *NodeSendDataOptions) ErrorType
// SendCustomCommand 发送自定义命令。
//
// Parameters:
// - tid: 发送目标 ID
// - args: 自定义消息内容数组
//
// Return: 0 或错误码。
SendCustomCommand(tid BusIdType, args [][]byte) ErrorType
// SendCustomCommandWithOptions 发送自定义命令(带选项)。
//
// Parameters:
// - tid: 发送目标 ID
// - args: 自定义消息内容数组
// - options: 发送选项;如果未设置 sequence,会自动分配并写回
//
// Return: 0 或错误码。
SendCustomCommandWithOptions(tid BusIdType, args [][]byte, options *NodeSendDataOptions) ErrorType
// ================= endpoint management functions =================
// GetPeerChannel 获取远程发送目标信息。
//
// Parameters:
// - tid: 发送目标 ID,不能是自己的 BUS ID
// - fn: 获取有效连接的接口,用以区分数据通道和控制通道
// - options: 获取选项
//
// Return: (0 或错误码, 发送目标 endpoint, 发送连接 connection)。
GetPeerChannel(tid BusIdType, fn func(from Endpoint, to Endpoint) Connection, options *NodeGetPeerOptions) (ErrorType, Endpoint, Connection, TopologyPeer)
// SetTopologyUpstream 设置节点的上游节点信息。
//
// Parameters:
// - tid: 上游节点 ID
SetTopologyUpstream(tid BusIdType)
// CreateEndpoint 创建 endpoint 对象,并设置生命周期检查。
// 并不会自动添加到本地,请在初始化完成后调用 AddEndpoint 添加。
//
// Parameters:
// - tid: 目标 endpoint 的 bus id
// - hostName: 目标 endpoint 的主机名
// - pid: 目标 endpoint 的进程 ID
// Return: endpoint 对象,可能为 nil。
CreateEndpoint(tid BusIdType, hostName string, pid int) Endpoint
// GetEndpoint 获取本地 endpoint 对象。
// Return: endpoint 对象,可能为 nil。
GetEndpoint(tid BusIdType) Endpoint
// AddEndpoint 添加目标端点。
// Return: 0 或错误码。
AddEndpoint(ep Endpoint) ErrorType
// RemoveEndpoint 移除目标端点。
// Return: 0 或错误码。
RemoveEndpoint(ep Endpoint) ErrorType
// IsEndpointAvailable 是否有到对端的数据通道(可以向对端发送数据)。
// Note: 如果只有控制通道没有数据通道,返回 false。
IsEndpointAvailable(tid BusIdType) bool
// CheckAccessHash 检查 access token 集合的有效性。
// Return: 没有检查通过的 access token 则返回 false。
CheckAccessHash(accessKey *protocol.AccessData, plainText string, conn Connection) bool
// GetAccessCode 获取节点的 hash code。
GetAccessCode() string
GetIoStreamChannel() IoStreamChannel
GetSelfEndpoint() Endpoint
GetUpstreamEndpoint() Endpoint
GetTopologyRelation(tid BusIdType) (TopologyRelationType, TopologyPeer)
GetImmediateEndpointSet() EndpointCollectionType
// ================= internal message functions =================
// SendDataMessage 发送数据消息。
// Return: (0 或错误码, 发送目标 endpoint, 发送连接 connection)。
SendDataMessage(tid BusIdType, message *Message, options *NodeSendDataOptions) (ErrorType, Endpoint, Connection)
// SendCtrlMessage 发送控制消息。
// Return: (0 或错误码, 发送目标 endpoint, 发送连接 connection)。
SendCtrlMessage(tid BusIdType, message *Message, options *NodeSendDataOptions) (ErrorType, Endpoint, Connection)
OnReceiveData(ep Endpoint, conn Connection, message *Message, data []byte)
OnReceiveForwardResponse(ep Endpoint, conn Connection, message *Message)
OnDisconnect(ep Endpoint, conn Connection) ErrorType
OnNewConnection(conn Connection) ErrorType
OnRegister(ep Endpoint, conn Connection, code ErrorType)
OnActived()
OnShutdown(code ErrorType) ErrorType
OnUpstreamRegisterDone()
OnCustomCommandRequest(ep Endpoint, conn Connection, from BusIdType, argv [][]byte) (ErrorType, [][]byte)
OnCustomCommandResponse(ep Endpoint, conn Connection, from BusIdType, argv [][]byte, sequence uint64) ErrorType
OnPing(ep Endpoint, message *Message, body *protocol.PingData) ErrorType
OnPong(ep Endpoint, message *Message, body *protocol.PingData) ErrorType
DispatchAllSelfMessages() int32
GetContext() context.Context
// Shutdown 关闭 node。
//
// Note: 如果需要在关闭前执行资源回收,可以在 on_node_down_fn_t 回调中返回非 0 值来阻止 node 的 reset 操作,
// 并在资源释放完成后再调用 Shutdown,在第二次 on_node_down_fn_t 回调中返回 0 值。
//
// Note: 或者也可以通过 ref_object 和 unref_object 来标记和解除数据引用,reset 函数会执行事件 loop 直到所有引用的资源被移除。
Shutdown(reason ErrorType) ErrorType
FatalShutdown(ep Endpoint, conn Connection, code ErrorType, err error) ErrorType
SetLogger(logger *utils_log.Logger)
GetLogger() *utils_log.Logger
IsDebugMessageVerboseEnabled() bool
EnableDebugMessageVerbose()
DisableDebugMessageVerbose()
AddEndpointGcList(ep Endpoint)
AddConnectionGcList(conn Connection)
// ================= event handle functions =================
SetEventHandleOnForwardRequest(handle NodeOnForwardRequestFunc)
GetEventHandleOnForwardRequest() NodeOnForwardRequestFunc
SetEventHandleOnForwardResponse(handle NodeOnForwardResponseFunc)
GetEventHandleOnForwardResponse() NodeOnForwardResponseFunc
SetEventHandleOnRegister(handle NodeOnRegisterFunc)
GetEventHandleOnRegister() NodeOnRegisterFunc
SetEventHandleOnShutdown(handle NodeOnNodeDownFunc)
GetEventHandleOnShutdown() NodeOnNodeDownFunc
SetEventHandleOnAvailable(handle NodeOnNodeUpFunc)
GetEventHandleOnAvailable() NodeOnNodeUpFunc
SetEventHandleOnInvalidConnection(handle NodeOnInvalidConnectionFunc)
GetEventHandleOnInvalidConnection() NodeOnInvalidConnectionFunc
SetEventHandleOnNewConnection(handle NodeOnNewConnectionFunc)
GetEventHandleOnNewConnection() NodeOnNewConnectionFunc
SetEventHandleOnCloseConnection(handle NodeOnCloseConnectionFunc)
GetEventHandleOnCloseConnection() NodeOnCloseConnectionFunc
SetEventHandleOnCustomCommandRequest(handle NodeOnCustomCommandRequestFunc)
GetEventHandleOnCustomCommandRequest() NodeOnCustomCommandRequestFunc
SetEventHandleOnCustomCommandResponse(handle NodeOnCustomCommandResponseFunc)
GetEventHandleOnCustomCommandResponse() NodeOnCustomCommandResponseFunc
SetEventHandleOnAddEndpoint(handle NodeOnEndpointEventFunc)
GetEventHandleOnAddEndpoint() NodeOnEndpointEventFunc
SetEventHandleOnRemoveEndpoint(handle NodeOnEndpointEventFunc)
GetEventHandleOnRemoveEndpoint() NodeOnEndpointEventFunc
SetEventHandleOnPingEndpoint(handle NodeOnPingPongEndpointFunc)
GetEventHandleOnPingEndpoint() NodeOnPingPongEndpointFunc
SetEventHandleOnPongEndpoint(handle NodeOnPingPongEndpointFunc)
GetEventHandleOnPongEndpoint() NodeOnPingPongEndpointFunc
SetEventHandleOnTopologyUpdateUpstream(handle NodeOnTopologyUpdateUpstreamFunc)
GetEventHandleOnTopologyUpdateUpstream() NodeOnTopologyUpdateUpstreamFunc
// ================= date field setter and getter =================
GetIoStreamConfigure() *IoStreamConfigure
GetId() BusIdType
GetConfigure() *NodeConfigure
CheckFlag(f NodeFlag) bool
GetState() NodeState
GetTimerTick() time.Time
GetTopologyRegistry() TopologyRegistry
GetPid() int
GetHostname() string
SetHostname(hostname string, force bool) bool
GetProtocolVersion() int32
GetProtocolMinimalVersion() int32
GetListenList() []ChannelAddress
// ================= statistic functions =================
AddStatisticDispatchTimes()
// ================= utility functions =================
AllocateMessageSequence() uint64
LogDebug(ep Endpoint, conn Connection, m *Message, msg string, args ...any)
LogInfo(ep Endpoint, conn Connection, msg string, args ...any)
LogError(ep Endpoint, conn Connection, status int, errcode ErrorType, msg string, args ...any)
}
type NodeConfigure ¶
type NodeConfigure struct {
EventLoopContext context.Context
UpstreamAddress string // 上游节点地址
TopologyLabels map[string]string // 拓扑标签
LoopTimes int32 // 消息循环次数限制,防止某些通道繁忙把其他通道堵死
TTL int32 // 消息转发跳转限制
ProtocolVersion int32
ProtocolMinimalVersion int32
// ===== 连接配置 =====
BackLog int32
FirstIdleTimeout time.Duration // 第一个包允许的空闲时间
PingInterval time.Duration // ping包间隔
RetryInterval time.Duration // 重试包间隔
FaultTolerant uint32 // 容错次数(次)
AccessTokenMaxNumber uint32 // 最大 access token 数量,请不要设置的太大,验证次数最大可能是 N^2
AccessTokens [][]byte // access token 列表
OverwriteListenPath bool // 是否覆盖已存在的 listen path(unix/pipe socket)
// ===== 加密算法配置 =====
CryptoKeyExchangeType protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
CryptoKeyRefreshInterval time.Duration
CryptoAllowAlgorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE
// ===== 压缩算法配置 =====
CompressionAllowAlgorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE
CompressionLevel protocol.ATBUS_COMPRESSION_LEVEL
// ===== 缓冲区配置 =====
MessageSize uint64 // max message size
RecvBufferSize uint64 // 接收缓冲区,和数据包大小有关
SendBufferSize uint64 // 发送缓冲区限制
SendBufferNumber uint64 // 发送缓冲区静态 Buffer 数量限制,0 则为动态缓冲区
}
type NodeEventHandleSet ¶
type NodeEventHandleSet struct {
OnForwardRequest NodeOnForwardRequestFunc
OnForwardResponse NodeOnForwardResponseFunc
OnRegister NodeOnRegisterFunc
OnNodeDown NodeOnNodeDownFunc
OnNodeUp NodeOnNodeUpFunc
OnInvalidConnection NodeOnInvalidConnectionFunc
OnNewConnection NodeOnNewConnectionFunc
OnCloseConnection NodeOnCloseConnectionFunc
OnCustomCommandRequest NodeOnCustomCommandRequestFunc
OnCustomCommandResponse NodeOnCustomCommandResponseFunc
OnEndpointAdded NodeOnEndpointEventFunc
OnEndpointRemoved NodeOnEndpointEventFunc
OnEndpointPing NodeOnPingPongEndpointFunc
OnEndpointPong NodeOnPingPongEndpointFunc
OnTopologyUpdateUpstream NodeOnTopologyUpdateUpstreamFunc
}
NodeEventHandleSet collects all callbacks.
type NodeFlag ¶
type NodeFlag uint32
const ( NodeFlag_None NodeFlag = 0x0000 NodeFlag_Resetting NodeFlag = 0x0001 // 正在重置 NodeFlag_ResettingGc NodeFlag = 0x0002 // 正在重置且正准备 GC 或 GC 流程已完成 NodeFlag_Actived NodeFlag = 0x0004 // 已激活 NodeFlag_UpstreamRegDone NodeFlag = 0x0008 // 已通过父节点注册 NodeFlag_Shutdown NodeFlag = 0x0010 // 已完成关闭前的资源回收 NodeFlag_RecvSelfMsg NodeFlag = 0x0020 // 正在接收发给自己的信息 NodeFlag_InCallback NodeFlag = 0x0040 // 在回调函数中 NodeFlag_InProc NodeFlag = 0x0080 // 在 Proc 函数中 NodeFlag_InPoll NodeFlag = 0x0100 // 在 Poll 函数中 NodeFlag_InGcEndpoints NodeFlag = 0x0200 // 在清理 endpoint 过程中 NodeFlag_InGcConnections NodeFlag = 0x0400 // 在清理 connection 过程中 )
type NodeGetPeerOptionFlag ¶
type NodeGetPeerOptionFlag uint16
const (
NodeGetPeerOptionFlag_NoUpstream NodeGetPeerOptionFlag = 0x0001 // 不允许上游转发
)
type NodeGetPeerOptions ¶
type NodeGetPeerOptions struct {
// contains filtered or unexported fields
}
func CreateNodeGetPeerOptions ¶
func CreateNodeGetPeerOptions() *NodeGetPeerOptions
func (*NodeGetPeerOptions) GetBlacklist ¶
func (o *NodeGetPeerOptions) GetBlacklist() []BusIdType
func (*NodeGetPeerOptions) GetFlag ¶
func (o *NodeGetPeerOptions) GetFlag(f NodeGetPeerOptionFlag) bool
func (*NodeGetPeerOptions) SetBlacklist ¶
func (o *NodeGetPeerOptions) SetBlacklist(blacklist []BusIdType)
func (*NodeGetPeerOptions) SetFlag ¶
func (o *NodeGetPeerOptions) SetFlag(f NodeGetPeerOptionFlag, v bool)
type NodeOnCloseConnectionFunc ¶
type NodeOnCloseConnectionFunc func(n Node, ep Endpoint, conn Connection) ErrorType
NodeOnCloseConnectionFunc is called when a connection is closed. Parameters: node, endpoint, connection.
type NodeOnCustomCommandRequestFunc ¶
type NodeOnCustomCommandRequestFunc func(n Node, ep Endpoint, conn Connection, from BusIdType, argv [][]byte) (result ErrorType, response [][]byte)
NodeOnCustomCommandRequestFunc is called when a custom command request is received.
Parameters:
- node, endpoint, connection
- source bus id
- argv: list of raw arg byte slices
- rsp: output response lines (some transports may ignore cross-node responses)
type NodeOnCustomCommandResponseFunc ¶
type NodeOnCustomCommandResponseFunc func(n Node, ep Endpoint, conn Connection, from BusIdType, rspData [][]byte, sequence uint64) ErrorType
NodeOnCustomCommandResponseFunc is called when a custom command response is received. Parameters: node, endpoint, connection, source bus id, response data list, sequence of the request.
type NodeOnEndpointEventFunc ¶
NodeOnEndpointEventFunc is called when an endpoint is added/removed. Parameters: node, endpoint, status/error code.
type NodeOnForwardRequestFunc ¶
type NodeOnForwardRequestFunc func(n Node, ep Endpoint, conn Connection, msg *Message, data []byte) ErrorType
NodeOnForwardRequestFunc is called when a message is received. Parameters: node, source endpoint, source connection, decoded message, raw payload bytes.
type NodeOnForwardResponseFunc ¶
type NodeOnForwardResponseFunc func(n Node, ep Endpoint, conn Connection, msg *Message) ErrorType
NodeOnForwardResponseFunc is called when a forwarded message send fails or (optionally) succeeds.
Note: unless send is marked as requiring a response/notification, success usually does not trigger a callback. Parameters: node, source endpoint, source connection, message (may be nil depending on context).
type NodeOnInvalidConnectionFunc ¶
type NodeOnInvalidConnectionFunc func(n Node, conn Connection, errCode ErrorType) ErrorType
NodeOnInvalidConnectionFunc is called when a connection becomes invalid. Parameters: node, connection, error code (typically EN_ATBUS_ERR_NODE_TIMEOUT).
type NodeOnNewConnectionFunc ¶
type NodeOnNewConnectionFunc func(n Node, conn Connection) ErrorType
NodeOnNewConnectionFunc is called when a new connection is created. Parameters: node, connection.
type NodeOnNodeDownFunc ¶
NodeOnNodeDownFunc is called when the node is shutting down. Parameters: node, reason.
type NodeOnNodeUpFunc ¶
NodeOnNodeUpFunc is called when the node starts serving. Parameters: node, status (typically EN_ATBUS_ERR_SUCCESS).
type NodeOnPingPongEndpointFunc ¶
type NodeOnPingPongEndpointFunc func(n Node, ep Endpoint, msg *Message, ping *protocol.PingData) ErrorType
NodeOnPingPongEndpointFunc is called when a ping/pong message is received from an endpoint. Parameters: node, endpoint, decoded message, ping data.
type NodeOnRegisterFunc ¶
type NodeOnRegisterFunc func(n Node, ep Endpoint, conn Connection, errCode ErrorType) ErrorType
NodeOnRegisterFunc is called when a new peer endpoint is registered. Parameters: node, endpoint, connection, status/error code.
type NodeOnTopologyUpdateUpstreamFunc ¶
type NodeOnTopologyUpdateUpstreamFunc func(n Node, self TopologyPeer, upstream TopologyPeer, data *TopologyData) ErrorType
NodeOnTopologyUpdateUpstreamFunc is called when the topology upstream is updated. Parameters: node, self, new upstream, topology data of self.
type NodeSendDataOptionFlag ¶
type NodeSendDataOptionFlag uint32
const ( NodeSendDataOptionFlag_RequiredResponse NodeSendDataOptionFlag = 0x0001 // 是否强制需要回包(默认情况下如果发送成功是没有回包通知的) NodeSendDataOptionFlag_NoUpstream NodeSendDataOptionFlag = 0x0002 // 是否禁止上游转发 )
type NodeSendDataOptions ¶
type NodeSendDataOptions struct {
// contains filtered or unexported fields
}
func CreateNodeSendDataOptions ¶
func CreateNodeSendDataOptions() *NodeSendDataOptions
func (*NodeSendDataOptions) GetFlag ¶
func (o *NodeSendDataOptions) GetFlag(f NodeSendDataOptionFlag) bool
func (*NodeSendDataOptions) GetSequence ¶
func (o *NodeSendDataOptions) GetSequence() uint64
func (*NodeSendDataOptions) SetFlag ¶
func (o *NodeSendDataOptions) SetFlag(f NodeSendDataOptionFlag, v bool)
func (*NodeSendDataOptions) SetSequence ¶
func (o *NodeSendDataOptions) SetSequence(seq uint64)
type StartConfigure ¶
type TimerDescPair ¶
type TimerDescType ¶
type TimerDescType[K comparable, V any] = utils_memory.LRUMap[K, TimerDescPair[V]]
type TopologyData ¶
type TopologyData struct {
// Pid is the process id of the peer.
Pid int32
// Hostname is the hostname of the peer.
Hostname string
// Labels contains arbitrary labels for policy matching (e.g. region/zone/group/version).
Labels map[string]string
}
TopologyData is the runtime topology data associated with a peer.
type TopologyPeer ¶
type TopologyPeer interface {
// GetBusId returns the bus id of this peer.
GetBusId() BusIdType
// GetUpstream returns the upstream (parent) peer, or nil if this peer is a root.
GetUpstream() TopologyPeer
// GetTopologyData returns current topology data of this peer.
GetTopologyData() *TopologyData
// ContainsDownstream checks whether a downstream(peer) with given bus id exists.
ContainsDownstream(busId BusIdType) bool
// ForeachDownstream iterates all downstream peers.
ForeachDownstream(fn func(peer TopologyPeer) bool) bool
}
TopologyPeer is a topology node (peer). A peer has:
- A stable bus id.
- An optional upstream peer.
- A set of downstream peers.
- Associated TopologyData.
type TopologyPolicyRule ¶
type TopologyPolicyRule struct {
// RequireSameProcess requires same pid and same hostname if true.
RequireSameProcess bool
// RequireSameHostName requires same hostname if true.
RequireSameHostName bool
// RequireLabelValues contains label constraints.
// Key: label name.
// Value: allowed label values (set).
RequireLabelValues map[string]map[string]struct{}
}
TopologyPolicyRule is a policy rule used by TopologyRegistry.CheckPolicy. It describes constraints that the "to" peer must satisfy.
type TopologyRegistry ¶
type TopologyRegistry interface {
// GetPeer returns peer by bus id, or nil if not found.
GetPeer(busId BusIdType) TopologyPeer
// RemovePeer removes a peer and fixes relationships.
// If the peer has an upstream, it will be removed from the upstream's downstream set.
// For each downstream peer whose upstream is this peer, its upstream will be cleared.
RemovePeer(targetBusId BusIdType)
// UpdatePeer creates or updates a peer.
// - If targetBusId is 0, this call is ignored.
// - If upstreamBusId is 0, the peer becomes a root (no upstream).
// - If peer exists and upstream changed, downstream links will be updated accordingly.
// true on success, false on failure (e.g. invalid target_bus_id or there will be a circle).
UpdatePeer(targetBusId BusIdType, upstreamBusId BusIdType, data *TopologyData) bool
// GetRelation returns the topology relation between two peers.
// The first return value is the relation type.
// The second return value is the next-hop peer (may be nil):
// - If the relation is TransitiveUpstream or TransitiveDownstream, it's to.
// - If the relation is SameUpstreamPeer, it's the upstream of both from and to.
// - If the relation is OtherUpstreamPeer, it prefers the upstream of from;
// if from has no upstream then returns to.
GetRelation(from BusIdType, to BusIdType) (TopologyRelationType, TopologyPeer)
// ForeachPeer iterates all peers in registry.
// The callback function returns false to stop iteration early.
// Returns true if iterated all peers, false if stopped by callback.
ForeachPeer(fn func(peer TopologyPeer) bool) bool
// CheckPolicy checks whether toData satisfies the fromPolicy.
// The checks include:
// - same hostname (optional)
// - same process (optional, implies same hostname)
// - required labels (optional)
CheckPolicy(rule *TopologyPolicyRule, fromData *TopologyData, toData *TopologyData) bool
}
TopologyRegistry provides CRUD for peers and relation querying. This is an in-process data structure. It is NOT thread-safe.
type TopologyRelationType ¶
type TopologyRelationType uint8
TopologyRelationType represents the relation type between two peers in the topology registry. The relation is evaluated based on the upstream chain (parent links).
const ( // TopologyRelationType_Invalid indicates invalid input or one/both peers not found. TopologyRelationType_Invalid TopologyRelationType = 0 // TopologyRelationType_Self indicates from == to. TopologyRelationType_Self TopologyRelationType = 1 // TopologyRelationType_ImmediateUpstream indicates to is the direct upstream(parent) of from. TopologyRelationType_ImmediateUpstream TopologyRelationType = 2 // TopologyRelationType_TransitiveUpstream indicates to is an ancestor of from, but not the direct upstream. TopologyRelationType_TransitiveUpstream TopologyRelationType = 3 // TopologyRelationType_ImmediateDownstream indicates to is the direct downstream(child) of from. TopologyRelationType_ImmediateDownstream TopologyRelationType = 4 // TopologyRelationType_TransitiveDownstream indicates to is a descendant of from, but not the direct downstream. TopologyRelationType_TransitiveDownstream TopologyRelationType = 5 // TopologyRelationType_SameUpstreamPeer indicates from and to share the same direct upstream. TopologyRelationType_SameUpstreamPeer TopologyRelationType = 6 // TopologyRelationType_OtherUpstreamPeer indicates from and to do not fall into any of the above categories. TopologyRelationType_OtherUpstreamPeer TopologyRelationType = 7 )