Versions in this module Expand all Collapse all v1 v1.0.1 Mar 12, 2026 v1.0.0 Mar 11, 2026 Changes in this version + const ATBUS_MACRO_MAX_FRAME_HEADER + const MessageBodyTypeCustomCommandReq + const MessageBodyTypeCustomCommandRsp + const MessageBodyTypeDataTransformReq + const MessageBodyTypeDataTransformRsp + const MessageBodyTypeNodeMax + const MessageBodyTypeNodePingReq + const MessageBodyTypeNodePongRsp + const MessageBodyTypeNodeRegisterReq + const MessageBodyTypeNodeRegisterRsp + const MessageBodyTypeUnknown + func InternalSetDelegateIsCompressionAlgorithmSupported(delegate func(algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) bool) + func IsCompressionAlgorithmSupported(algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) bool + func SetDefaultIoStreamConfigure(conf *IoStreamConfigure) + func SetDefaultNodeConfigure(conf *NodeConfigure) + func SetDefaultStartConfigure(conf *StartConfigure) + type BusIdType uint64 + type ChannelAddress interface + GetAddress func() string + GetHost func() string + GetPort func() int + GetScheme func() string + type Connection interface + AddStatFault func() uint64 + CheckFlag func(flag ConnectionFlag) bool + ClearStatFault func() + Connect func() ErrorType + Disconnect func() ErrorType + GetAddress func() ChannelAddress + GetBinding func() Endpoint + GetConnectionContext func() ConnectionContext + GetStatistic func() ConnectionStatistic + GetStatus func() ConnectionState + IsConnected func() bool + IsRunning func() bool + Listen func() ErrorType + Proc func() ErrorType + Push func(buffer []byte) ErrorType + RemoveOwnerChecker func() + Reset func() + SetTemporary func() + type ConnectionContext interface + GetCompressSelectAlgorithm func() protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE + GetCryptoKeyExchangeAlgorithm func() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE + GetCryptoSelectAlgorithm func() protocol.ATBUS_CRYPTO_ALGORITHM_TYPE + GetCryptoSelectKdfType func() protocol.ATBUS_CRYPTO_KDF_TYPE + GetHandshakeStartTime func() time.Time + GetNextSequence func() uint64 + HandshakeGenerateSelfKey func(peerSequenceId uint64) ErrorType + HandshakeReadPeerKey func(peerPubKey *protocol.CryptoHandshakeData, ...) ErrorType + HandshakeWriteSelfPublicKey func(selfPubKey *protocol.CryptoHandshakeData, ...) ErrorType + IsClosing func() bool + IsHandshakeDone func() bool + PackMessage func(m *Message, protocolVersion int32, maxBodySize int) (*buffer.StaticBufferBlock, ErrorType) + SetClosing func(closing bool) + SetupCryptoWithKey func(algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE, key []byte, iv []byte) ErrorType + UnpackMessage func(m *Message, input []byte, maxBodySize int) ErrorType + UpdateCompressionAlgorithm func(algorithm []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) ErrorType + type ConnectionFlag uint32 + const ConnectionFlag_AccessShareAddr + const ConnectionFlag_AccessShareHost + const ConnectionFlag_ClientMode + const ConnectionFlag_Destructing + const ConnectionFlag_ListenFd + const ConnectionFlag_PeerClosed + const ConnectionFlag_RegFd + const ConnectionFlag_RegProc + const ConnectionFlag_Resetting + const ConnectionFlag_ServerMode + const ConnectionFlag_Temporary + type ConnectionState int32 + const ConnectionState_Connected + const ConnectionState_Connecting + const ConnectionState_Disconnected + const ConnectionState_Disconnecting + const ConnectionState_Handshaking + type ConnectionStatistic struct + FaultCount uint64 + PullStartSize uint64 + PullStartTimes uint64 + PushFailedSize uint64 + PushFailedTimes uint64 + PushStartSize uint64 + PushStartTimes uint64 + PushSuccessSize uint64 + PushSuccessTimes uint64 + type Endpoint interface + AddConnection func(conn Connection, forceData bool) bool + AddListenAddress func(addr string) + AddPingTimer func() + AddStatisticFault func() uint64 + ClearListenAddress func() + ClearPingTimer func() + ClearStatisticFault func() + GetCtrlConnection func(peer Endpoint) Connection + GetDataConnection func(peer Endpoint, enableFallbackCtrl bool) Connection + GetDataConnectionCount func(enableFallbackCtrl bool) int + GetFlag func(f EndpointFlag) bool + GetFlags func() uint32 + GetHashCode func() string + GetHostname func() string + GetId func() BusIdType + GetListenAddress func() []ChannelAddress + GetOwner func() Node + GetPid func() int32 + GetStatisticCreatedTime func() time.Time + GetStatisticLastPong func() time.Time + GetStatisticPingDelay func() time.Duration + GetStatisticPullSize func() uint64 + GetStatisticPullTimes func() uint64 + GetStatisticPushFailedSize func() uint64 + GetStatisticPushFailedTimes func() uint64 + GetStatisticPushStartSize func() uint64 + GetStatisticPushStartTimes func() uint64 + GetStatisticPushSuccessSize func() uint64 + GetStatisticPushSuccessTimes func() uint64 + GetStatisticUnfinishedPing func() uint64 + IsAvailable func() bool + IsSchemeSupported func(scheme string) bool + RemoveConnection func(conn Connection) bool + Reset func() + SetFlag func(f EndpointFlag, v bool) + SetStatisticPingDelay func(pd time.Duration, pongTimepoint time.Time) + SetStatisticUnfinishedPing func(p uint64) + UpdateHashCode func(code string) + UpdateSupportSchemes func(schemes []string) + type EndpointCollectionType = map[BusIdType]Endpoint + type EndpointFlag uint32 + const EndpointFlag_ConnectionSorted + const EndpointFlag_Destructing + const EndpointFlag_HasListenFd + const EndpointFlag_HasListenPorc + const EndpointFlag_HasPingTimer + const EndpointFlag_MutableFlags + const EndpointFlag_Resetting + type ErrorType = error_code.ErrorType + type IoStreamCallbackEventHandleSet struct + func (h *IoStreamCallbackEventHandleSet) GetCallback(eventType IoStreamCallbackEventType) IoStreamCallbackFunc + func (h *IoStreamCallbackEventHandleSet) SetCallback(eventType IoStreamCallbackEventType, callback IoStreamCallbackFunc) + type IoStreamCallbackEventType int32 + const IoStreamCallbackEventType_Accepted + const IoStreamCallbackEventType_Connected + const IoStreamCallbackEventType_Disconnected + const IoStreamCallbackEventType_Max + const IoStreamCallbackEventType_Received + const IoStreamCallbackEventType_Written + type IoStreamCallbackFunc func(channel IoStreamChannel, conn IoStreamConnection, status int32, ...) + type IoStreamChannel interface + Close func() ErrorType + Connect func(addr string) (IoStreamConnection, ErrorType) + Disconnect func(conn IoStreamConnection) ErrorType + GetContext func() context.Context + GetEventHandleSet func() *IoStreamCallbackEventHandleSet + GetFlag func(f IoStreamConnectionFlag) bool + GetPrivateData func() interface{} + GetStatisticActiveRequestCount func() uint64 + GetStatisticCheckBlockSizeFailedCount func() uint64 + GetStatisticCheckHashFailedCount func() uint64 + GetStatisticReadNetEgainCount func() uint64 + Listen func(addr string) ErrorType + Send func(conn IoStreamConnection, data []byte) ErrorType + SetFlag func(f IoStreamConnectionFlag, v bool) + SetPrivateData func(data interface{}) + type IoStreamChannelFlag uint16 + const IoStreamChannelFlag_Closing + const IoStreamChannelFlag_InCallback + const IoStreamChannelFlag_IsLoopOwner + type IoStreamConfigure struct + Backlog int32 + ConfirmTimeout time.Duration + Keepalive time.Duration + MaxReadCheckBlockSizeFailedCount uint64 + MaxReadCheckHashFailedCount uint64 + MaxReadNetEgainCount uint64 + NoBlock bool + NoDelay bool + ReceiveBufferLimitSize uint64 + ReceiveBufferMaxSize uint64 + ReceiveBufferStatic uint64 + SendBufferLimitSize uint64 + SendBufferMaxSize uint64 + SendBufferStatic uint64 + type IoStreamConnection interface + GetAddress func() ChannelAddress + GetChannel func() IoStreamChannel + GetEventHandleSet func() *IoStreamCallbackEventHandleSet + GetFlag func(f IoStreamConnectionFlag) bool + GetPrivateData func() interface{} + GetProactivelyDisconnectCallback func() IoStreamCallbackFunc + GetReadBufferManager func() *buffer.BufferManager + GetStatus func() IoStreamConnectionStatus + SetFlag func(f IoStreamConnectionFlag, v bool) + SetPrivateData func(data interface{}) + type IoStreamConnectionFlag uint16 + const IoStreamConnectionFlag_Accept + const IoStreamConnectionFlag_Closing + const IoStreamConnectionFlag_Connect + const IoStreamConnectionFlag_Listen + const IoStreamConnectionFlag_Writing + type IoStreamConnectionStatus uint16 + const IoStreamConnectionStatus_Connected + const IoStreamConnectionStatus_Created + const IoStreamConnectionStatus_Disconnected + const IoStreamConnectionStatus_Disconnecting + type Message struct + func NewMessage() *Message + func (m *Message) Body() *protocol.MessageBody + func (m *Message) GetBody() *protocol.MessageBody + func (m *Message) GetBodyType() MessageBodyType + func (m *Message) GetHead() *protocol.MessageHead + func (m *Message) GetUnpackErrorMessage() string + func (m *Message) Head() *protocol.MessageHead + func (m *Message) MutableBody() *protocol.MessageBody + func (m *Message) MutableHead() *protocol.MessageHead + func (m *Message) SetUnpackError(err string) + type MessageBodyType = protocol.MessageBody_EnMessageTypeID + type Node interface + AddConnectionGcList func(conn Connection) + AddEndpoint func(ep Endpoint) ErrorType + AddEndpointGcList func(ep Endpoint) + AddStatisticDispatchTimes func() + AllocateMessageSequence func() uint64 + CheckAccessHash func(accessKey *protocol.AccessData, plainText string, conn Connection) bool + CheckFlag func(f NodeFlag) bool + Connect func(address string) ErrorType + ConnectWithEndpoint func(address string, ep Endpoint) ErrorType + CreateEndpoint func(tid BusIdType, hostName string, pid int) Endpoint + DisableDebugMessageVerbose func() + Disconnect func(id BusIdType) ErrorType + DispatchAllSelfMessages func() int32 + EnableDebugMessageVerbose func() + FatalShutdown func(ep Endpoint, conn Connection, code ErrorType, err error) ErrorType + GetAccessCode func() string + GetConfigure func() *NodeConfigure + GetContext func() context.Context + GetCryptoKeyExchangeType func() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE + GetEndpoint func(tid BusIdType) Endpoint + GetEventHandleOnAddEndpoint func() NodeOnEndpointEventFunc + GetEventHandleOnAvailable func() NodeOnNodeUpFunc + GetEventHandleOnCloseConnection func() NodeOnCloseConnectionFunc + GetEventHandleOnCustomCommandRequest func() NodeOnCustomCommandRequestFunc + GetEventHandleOnCustomCommandResponse func() NodeOnCustomCommandResponseFunc + GetEventHandleOnForwardRequest func() NodeOnForwardRequestFunc + GetEventHandleOnForwardResponse func() NodeOnForwardResponseFunc + GetEventHandleOnInvalidConnection func() NodeOnInvalidConnectionFunc + GetEventHandleOnNewConnection func() NodeOnNewConnectionFunc + GetEventHandleOnPingEndpoint func() NodeOnPingPongEndpointFunc + GetEventHandleOnPongEndpoint func() NodeOnPingPongEndpointFunc + GetEventHandleOnRegister func() NodeOnRegisterFunc + GetEventHandleOnRemoveEndpoint func() NodeOnEndpointEventFunc + GetEventHandleOnShutdown func() NodeOnNodeDownFunc + GetEventHandleOnTopologyUpdateUpstream func() NodeOnTopologyUpdateUpstreamFunc + GetHostname func() string + GetId func() BusIdType + GetImmediateEndpointSet func() EndpointCollectionType + GetIoStreamChannel func() IoStreamChannel + GetIoStreamConfigure func() *IoStreamConfigure + GetListenList func() []ChannelAddress + GetLogger func() *utils_log.Logger + GetPeerChannel func(tid BusIdType, fn func(from Endpoint, to Endpoint) Connection, ...) (ErrorType, Endpoint, Connection, TopologyPeer) + GetPid func() int + GetProtocolMinimalVersion func() int32 + GetProtocolVersion func() int32 + GetSelfEndpoint func() Endpoint + GetState func() NodeState + GetTimerTick func() time.Time + GetTopologyRegistry func() TopologyRegistry + GetTopologyRelation func(tid BusIdType) (TopologyRelationType, TopologyPeer) + GetUpstreamEndpoint func() Endpoint + Init func(id BusIdType, conf *NodeConfigure) ErrorType + IsDebugMessageVerboseEnabled func() bool + IsEndpointAvailable func(tid BusIdType) bool + Listen func(address string) ErrorType + LogDebug func(ep Endpoint, conn Connection, m *Message, msg string, args ...any) + LogError func(ep Endpoint, conn Connection, status int, errcode ErrorType, msg string, ...) + LogInfo func(ep Endpoint, conn Connection, msg string, args ...any) + OnActived func() + OnCustomCommandRequest func(ep Endpoint, conn Connection, from BusIdType, argv [][]byte) (ErrorType, [][]byte) + OnCustomCommandResponse func(ep Endpoint, conn Connection, from BusIdType, argv [][]byte, sequence uint64) ErrorType + OnDisconnect func(ep Endpoint, conn Connection) ErrorType + OnNewConnection func(conn Connection) ErrorType + OnPing func(ep Endpoint, message *Message, body *protocol.PingData) ErrorType + OnPong func(ep Endpoint, message *Message, body *protocol.PingData) ErrorType + OnReceiveData func(ep Endpoint, conn Connection, message *Message, data []byte) + OnReceiveForwardResponse func(ep Endpoint, conn Connection, message *Message) + OnRegister func(ep Endpoint, conn Connection, code ErrorType) + OnShutdown func(code ErrorType) ErrorType + OnUpstreamRegisterDone func() + Poll func() ErrorType + Proc func(now time.Time) ErrorType + ReloadCompression func(compressionAllowAlgorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE, ...) ErrorType + ReloadCrypto func(cryptoKeyExchangeType protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE, ...) ErrorType + RemoveEndpoint func(ep Endpoint) ErrorType + Reset func() ErrorType + SendCtrlMessage func(tid BusIdType, message *Message, options *NodeSendDataOptions) (ErrorType, Endpoint, Connection) + SendCustomCommand func(tid BusIdType, args [][]byte) ErrorType + SendCustomCommandWithOptions func(tid BusIdType, args [][]byte, options *NodeSendDataOptions) ErrorType + SendData func(tid BusIdType, t int32, data []byte) ErrorType + SendDataMessage func(tid BusIdType, message *Message, options *NodeSendDataOptions) (ErrorType, Endpoint, Connection) + SendDataWithOptions func(tid BusIdType, t int32, data []byte, options *NodeSendDataOptions) ErrorType + SetEventHandleOnAddEndpoint func(handle NodeOnEndpointEventFunc) + SetEventHandleOnAvailable func(handle NodeOnNodeUpFunc) + SetEventHandleOnCloseConnection func(handle NodeOnCloseConnectionFunc) + SetEventHandleOnCustomCommandRequest func(handle NodeOnCustomCommandRequestFunc) + SetEventHandleOnCustomCommandResponse func(handle NodeOnCustomCommandResponseFunc) + SetEventHandleOnForwardRequest func(handle NodeOnForwardRequestFunc) + SetEventHandleOnForwardResponse func(handle NodeOnForwardResponseFunc) + SetEventHandleOnInvalidConnection func(handle NodeOnInvalidConnectionFunc) + SetEventHandleOnNewConnection func(handle NodeOnNewConnectionFunc) + SetEventHandleOnPingEndpoint func(handle NodeOnPingPongEndpointFunc) + SetEventHandleOnPongEndpoint func(handle NodeOnPingPongEndpointFunc) + SetEventHandleOnRegister func(handle NodeOnRegisterFunc) + SetEventHandleOnRemoveEndpoint func(handle NodeOnEndpointEventFunc) + SetEventHandleOnShutdown func(handle NodeOnNodeDownFunc) + SetEventHandleOnTopologyUpdateUpstream func(handle NodeOnTopologyUpdateUpstreamFunc) + SetHostname func(hostname string, force bool) bool + SetLogger func(logger *utils_log.Logger) + SetTopologyUpstream func(tid BusIdType) + Shutdown func(reason ErrorType) ErrorType + Start func() ErrorType + StartWithConfigure func(conf *StartConfigure) ErrorType + type NodeConfigure struct + AccessTokenMaxNumber uint32 + AccessTokens [][]byte + BackLog int32 + CompressionAllowAlgorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE + CompressionLevel protocol.ATBUS_COMPRESSION_LEVEL + CryptoAllowAlgorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE + CryptoKeyExchangeType protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE + CryptoKeyRefreshInterval time.Duration + EventLoopContext context.Context + FaultTolerant uint32 + FirstIdleTimeout time.Duration + LoopTimes int32 + MessageSize uint64 + OverwriteListenPath bool + PingInterval time.Duration + ProtocolMinimalVersion int32 + ProtocolVersion int32 + RecvBufferSize uint64 + RetryInterval time.Duration + SendBufferNumber uint64 + SendBufferSize uint64 + TTL int32 + TopologyLabels map[string]string + UpstreamAddress string + type NodeEventHandleSet struct + OnCloseConnection NodeOnCloseConnectionFunc + OnCustomCommandRequest NodeOnCustomCommandRequestFunc + OnCustomCommandResponse NodeOnCustomCommandResponseFunc + OnEndpointAdded NodeOnEndpointEventFunc + OnEndpointPing NodeOnPingPongEndpointFunc + OnEndpointPong NodeOnPingPongEndpointFunc + OnEndpointRemoved NodeOnEndpointEventFunc + OnForwardRequest NodeOnForwardRequestFunc + OnForwardResponse NodeOnForwardResponseFunc + OnInvalidConnection NodeOnInvalidConnectionFunc + OnNewConnection NodeOnNewConnectionFunc + OnNodeDown NodeOnNodeDownFunc + OnNodeUp NodeOnNodeUpFunc + OnRegister NodeOnRegisterFunc + OnTopologyUpdateUpstream NodeOnTopologyUpdateUpstreamFunc + type NodeFlag uint32 + const NodeFlag_Actived + const NodeFlag_InCallback + const NodeFlag_InGcConnections + const NodeFlag_InGcEndpoints + const NodeFlag_InPoll + const NodeFlag_InProc + const NodeFlag_None + const NodeFlag_RecvSelfMsg + const NodeFlag_Resetting + const NodeFlag_ResettingGc + const NodeFlag_Shutdown + const NodeFlag_UpstreamRegDone + type NodeGetPeerOptionFlag uint16 + const NodeGetPeerOptionFlag_NoUpstream + type NodeGetPeerOptions struct + func CreateNodeGetPeerOptions() *NodeGetPeerOptions + func (o *NodeGetPeerOptions) GetBlacklist() []BusIdType + func (o *NodeGetPeerOptions) GetFlag(f NodeGetPeerOptionFlag) bool + func (o *NodeGetPeerOptions) SetBlacklist(blacklist []BusIdType) + func (o *NodeGetPeerOptions) SetFlag(f NodeGetPeerOptionFlag, v bool) + type NodeOnCloseConnectionFunc func(n Node, ep Endpoint, conn Connection) ErrorType + type NodeOnCustomCommandRequestFunc func(n Node, ep Endpoint, conn Connection, from BusIdType, argv [][]byte) (result ErrorType, response [][]byte) + type NodeOnCustomCommandResponseFunc func(n Node, ep Endpoint, conn Connection, from BusIdType, rspData [][]byte, ...) ErrorType + type NodeOnEndpointEventFunc func(n Node, ep Endpoint, status ErrorType) ErrorType + type NodeOnForwardRequestFunc func(n Node, ep Endpoint, conn Connection, msg *Message, data []byte) ErrorType + type NodeOnForwardResponseFunc func(n Node, ep Endpoint, conn Connection, msg *Message) ErrorType + type NodeOnInvalidConnectionFunc func(n Node, conn Connection, errCode ErrorType) ErrorType + type NodeOnNewConnectionFunc func(n Node, conn Connection) ErrorType + type NodeOnNodeDownFunc func(n Node, errCode ErrorType) ErrorType + type NodeOnNodeUpFunc func(n Node, errCode ErrorType) ErrorType + type NodeOnPingPongEndpointFunc func(n Node, ep Endpoint, msg *Message, ping *protocol.PingData) ErrorType + type NodeOnRegisterFunc func(n Node, ep Endpoint, conn Connection, errCode ErrorType) ErrorType + type NodeOnTopologyUpdateUpstreamFunc func(n Node, self TopologyPeer, upstream TopologyPeer, data *TopologyData) ErrorType + type NodeSendDataOptionFlag uint32 + const NodeSendDataOptionFlag_NoUpstream + const NodeSendDataOptionFlag_RequiredResponse + type NodeSendDataOptions struct + func CreateNodeSendDataOptions() *NodeSendDataOptions + func (o *NodeSendDataOptions) GetFlag(f NodeSendDataOptionFlag) bool + func (o *NodeSendDataOptions) GetSequence() uint64 + func (o *NodeSendDataOptions) SetFlag(f NodeSendDataOptionFlag, v bool) + func (o *NodeSendDataOptions) SetSequence(seq uint64) + type NodeState int32 + const NodeState_ConnectingUpstream + const NodeState_Created + const NodeState_Inited + const NodeState_LostUpstream + const NodeState_Running + type StartConfigure struct + TimerTimepoint time.Time + type TimerDescPair struct + Timeout time.Time + Value V + type TimerDescType = utils_memory.LRUMap[K, TimerDescPair[V]] + type TopologyData struct + Hostname string + Labels map[string]string + Pid int32 + type TopologyPeer interface + ContainsDownstream func(busId BusIdType) bool + ForeachDownstream func(fn func(peer TopologyPeer) bool) bool + GetBusId func() BusIdType + GetTopologyData func() *TopologyData + GetUpstream func() TopologyPeer + type TopologyPolicyRule struct + RequireLabelValues map[string]map[string]struct{} + RequireSameHostName bool + RequireSameProcess bool + type TopologyRegistry interface + CheckPolicy func(rule *TopologyPolicyRule, fromData *TopologyData, toData *TopologyData) bool + ForeachPeer func(fn func(peer TopologyPeer) bool) bool + GetPeer func(busId BusIdType) TopologyPeer + GetRelation func(from BusIdType, to BusIdType) (TopologyRelationType, TopologyPeer) + RemovePeer func(targetBusId BusIdType) + UpdatePeer func(targetBusId BusIdType, upstreamBusId BusIdType, data *TopologyData) bool + type TopologyRelationType uint8 + const TopologyRelationType_ImmediateDownstream + const TopologyRelationType_ImmediateUpstream + const TopologyRelationType_Invalid + const TopologyRelationType_OtherUpstreamPeer + const TopologyRelationType_SameUpstreamPeer + const TopologyRelationType_Self + const TopologyRelationType_TransitiveDownstream + const TopologyRelationType_TransitiveUpstream