libatbus_types

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 12, 2026 License: MIT Imports: 7 Imported by: 0

Documentation

Overview

Package libatbus_types defines shared types and interfaces for libatbus.

Index

Constants

View Source
const (
	MessageBodyTypeUnknown          = protocol.MessageBody_EnMessageTypeID_NONE
	MessageBodyTypeCustomCommandReq = protocol.MessageBody_EnMessageTypeID_CustomCommandReq
	MessageBodyTypeCustomCommandRsp = protocol.MessageBody_EnMessageTypeID_CustomCommandRsp
	MessageBodyTypeDataTransformReq = protocol.MessageBody_EnMessageTypeID_DataTransformReq
	MessageBodyTypeDataTransformRsp = protocol.MessageBody_EnMessageTypeID_DataTransformRsp
	MessageBodyTypeNodeRegisterReq  = protocol.MessageBody_EnMessageTypeID_NodeRegisterReq
	MessageBodyTypeNodeRegisterRsp  = protocol.MessageBody_EnMessageTypeID_NodeRegisterRsp
	MessageBodyTypeNodePingReq      = protocol.MessageBody_EnMessageTypeID_NodePingReq
	MessageBodyTypeNodePongRsp      = protocol.MessageBody_EnMessageTypeID_NodePongRsp
	MessageBodyTypeNodeMax          = MessageBodyTypeNodePongRsp
)

MessageBodyType constants using generated enum values.

View Source
const (
	ATBUS_MACRO_MAX_FRAME_HEADER uint64 = 4096 // 最大数据包头长度
)

Variables

This section is empty.

Functions

func InternalSetDelegateIsCompressionAlgorithmSupported

func InternalSetDelegateIsCompressionAlgorithmSupported(delegate func(algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) bool)

func IsCompressionAlgorithmSupported

func IsCompressionAlgorithmSupported(algorithm protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) bool

func SetDefaultIoStreamConfigure

func SetDefaultIoStreamConfigure(conf *IoStreamConfigure)

SetDefaultIoStreamConfigure sets the default values for IoStreamConfigure.

func SetDefaultNodeConfigure

func SetDefaultNodeConfigure(conf *NodeConfigure)

func SetDefaultStartConfigure

func SetDefaultStartConfigure(conf *StartConfigure)

Types

type BusIdType

type BusIdType uint64

type ChannelAddress

type ChannelAddress interface {
	GetAddress() string // Full address string (e.g., "tcp://127.0.0.1:8080")
	GetScheme() string  // Protocol scheme (e.g., "tcp", "unix", "shm")
	GetHost() string    // Host part of the address
	GetPort() int       // Port number (0 if not specified)
}

ChannelAddress represents a parsed channel address with scheme, host, port and full address.

type Connection

type Connection interface {
	Reset()

	Proc() ErrorType

	Listen() ErrorType

	Connect() ErrorType

	Disconnect() ErrorType

	Push(buffer []byte) ErrorType

	AddStatFault() uint64

	ClearStatFault()

	GetAddress() ChannelAddress

	IsConnected() bool

	IsRunning() bool

	GetBinding() Endpoint

	GetStatus() ConnectionState

	CheckFlag(flag ConnectionFlag) bool

	SetTemporary()

	GetStatistic() ConnectionStatistic

	GetConnectionContext() ConnectionContext

	RemoveOwnerChecker()
}

type ConnectionContext

type ConnectionContext interface {
	// IsClosing reports whether the connection is in a closing state.
	IsClosing() bool

	// SetClosing sets the closing state of the connection.
	SetClosing(closing bool)

	// IsHandshakeDone reports whether the handshake has been completed.
	IsHandshakeDone() bool

	// GetHandshakeStartTime returns the time when the handshake was started.
	GetHandshakeStartTime() time.Time

	// GetCryptoKeyExchangeAlgorithm returns the key exchange algorithm used for crypto handshake.
	GetCryptoKeyExchangeAlgorithm() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE

	// GetCryptoSelectKdfType returns the KDF type selected during handshake.
	GetCryptoSelectKdfType() protocol.ATBUS_CRYPTO_KDF_TYPE

	// GetCryptoSelectAlgorithm returns the crypto algorithm selected during handshake.
	GetCryptoSelectAlgorithm() protocol.ATBUS_CRYPTO_ALGORITHM_TYPE

	// GetCompressSelectAlgorithm returns the compression algorithm selected during handshake.
	GetCompressSelectAlgorithm() protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE

	// GetNextSequence returns the next sequence number for message ordering.
	// Each call increments the internal counter and returns the new value.
	GetNextSequence() uint64

	// PackMessage packs a Message into a StaticBufferBlock for transmission.
	//
	// Parameters:
	//   - m: the message to pack (head will be modified with version, body_size, etc.)
	//   - protocolVersion: the protocol version to set in the message head
	//   - maxBodySize: maximum allowed body size (0 means no limit)
	//
	// Returns:
	//   - StaticBufferBlock containing the packed message data
	//   - error code if packing fails, EN_ATBUS_ERR_SUCCESS on success
	PackMessage(m *Message, protocolVersion int32, maxBodySize int) (*buffer.StaticBufferBlock, ErrorType)

	// UnpackMessage unpacks binary data into a Message.
	//
	// Parameters:
	//   - m: the message to populate (will be modified)
	//   - input: the binary data to unpack
	//   - maxBodySize: maximum allowed body size (0 means no limit)
	//
	// Returns:
	//   - error code if unpacking fails, EN_ATBUS_ERR_SUCCESS on success
	UnpackMessage(m *Message, input []byte, maxBodySize int) ErrorType

	// HandshakeGenerateSelfKey generates the local ECDH key pair for handshake.
	//
	// In client mode, peerSequenceId should be 0 to generate a new sequence.
	// In server mode, peerSequenceId should be the peer's handshake sequence.
	//
	// Returns error code, EN_ATBUS_ERR_SUCCESS on success.
	HandshakeGenerateSelfKey(peerSequenceId uint64) ErrorType

	// HandshakeReadPeerKey reads the peer's public key and computes the shared secret.
	//
	// Parameters:
	//   - peerPubKey: the peer's handshake data containing public key
	//   - supportedCryptoAlgorithms: list of locally supported crypto algorithms
	//
	// Returns error code, EN_ATBUS_ERR_SUCCESS on success.
	HandshakeReadPeerKey(peerPubKey *protocol.CryptoHandshakeData,
		supportedCryptoAlgorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) ErrorType

	// HandshakeWriteSelfPublicKey writes the local public key to the handshake data structure.
	//
	// Parameters:
	//   - selfPubKey: output handshake data to populate with local public key
	//   - supportedCryptoAlgorithms: list of locally supported crypto algorithms
	//
	// Returns error code, EN_ATBUS_ERR_SUCCESS on success.
	HandshakeWriteSelfPublicKey(
		selfPubKey *protocol.CryptoHandshakeData,
		supportedCryptoAlgorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) ErrorType

	// UpdateCompressionAlgorithm updates the list of supported compression algorithms.
	//
	// Returns error code, EN_ATBUS_ERR_SUCCESS on success.
	UpdateCompressionAlgorithm(algorithm []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE) ErrorType

	// SetupCryptoWithKey directly sets the encryption key and IV, skipping key exchange.
	// This is primarily used for testing purposes.
	//
	// Parameters:
	//   - algorithm: the crypto algorithm type
	//   - key: the encryption key data
	//   - iv: the initialization vector data
	//
	// Returns error code, EN_ATBUS_ERR_SUCCESS on success.
	SetupCryptoWithKey(algorithm protocol.ATBUS_CRYPTO_ALGORITHM_TYPE, key []byte, iv []byte) ErrorType
}

ConnectionContext abstracts the per-connection state and the pack/unpack flow.

This interface is the Go equivalent of C++ atbus::connection_context. The pack/unpack methods match the C++ signatures:

  • pack_message(message&, protocol_version, random_engine, max_body_size) -> buffer_result_t
  • unpack_message(message&, input, max_body_size) -> int

This interface is implemented by [libatbus_impl.ConnectionContext]. Keep it minimal and stable: only add methods that are required by callers.

type ConnectionFlag

type ConnectionFlag uint32
const (
	ConnectionFlag_RegProc         ConnectionFlag = 0x0001 /** 注册了proc记录到node,清理的时候需要移除 **/
	ConnectionFlag_RegFd           ConnectionFlag = 0x0002 /** 关联了fd到node或endpoint,清理的时候需要移除 **/
	ConnectionFlag_AccessShareAddr ConnectionFlag = 0x0004 /** 共享内部地址(内存通道的地址共享) **/
	ConnectionFlag_AccessShareHost ConnectionFlag = 0x0008 /** 共享物理机(共享内存通道的物理机共享) **/
	ConnectionFlag_Resetting       ConnectionFlag = 0x0010 /** 正在执行重置(防止递归死循环) **/
	ConnectionFlag_Destructing     ConnectionFlag = 0x0020 /** 正在执行析构(屏蔽某些接口) **/
	ConnectionFlag_ListenFd        ConnectionFlag = 0x0040 /** 是否是用于listen的连接 **/
	ConnectionFlag_Temporary       ConnectionFlag = 0x0080 /** 是否是临时连接 **/
	ConnectionFlag_PeerClosed      ConnectionFlag = 0x0100 /** 对端已关闭 **/
	ConnectionFlag_ServerMode      ConnectionFlag = 0x0200 /** 连接处于服务端模式 **/
	ConnectionFlag_ClientMode      ConnectionFlag = 0x0400 /** 连接处于客户端模式 **/
)

type ConnectionState

type ConnectionState int32
const (
	ConnectionState_Disconnected  ConnectionState = 0
	ConnectionState_Connecting    ConnectionState = 1
	ConnectionState_Handshaking   ConnectionState = 2
	ConnectionState_Connected     ConnectionState = 3
	ConnectionState_Disconnecting ConnectionState = 4
)

type ConnectionStatistic

type ConnectionStatistic struct {
	PushStartTimes   uint64
	PushStartSize    uint64
	PushSuccessTimes uint64
	PushSuccessSize  uint64
	PushFailedTimes  uint64
	PushFailedSize   uint64

	PullStartTimes uint64
	PullStartSize  uint64

	FaultCount uint64
}

type Endpoint

type Endpoint interface {
	GetOwner() Node

	Reset()

	GetId() BusIdType

	GetPid() int32

	GetHostname() string

	GetHashCode() string

	UpdateHashCode(code string)

	AddConnection(conn Connection, forceData bool) bool

	RemoveConnection(conn Connection) bool

	IsAvailable() bool

	GetFlags() uint32

	GetFlag(f EndpointFlag) bool

	SetFlag(f EndpointFlag, v bool)

	GetListenAddress() []ChannelAddress

	ClearListenAddress()

	AddListenAddress(addr string)

	UpdateSupportSchemes(schemes []string)

	IsSchemeSupported(scheme string) bool

	AddPingTimer()

	ClearPingTimer()

	// ============== connection functions ==============
	GetCtrlConnection(peer Endpoint) Connection

	GetDataConnection(peer Endpoint, enableFallbackCtrl bool) Connection

	GetDataConnectionCount(enableFallbackCtrl bool) int

	// ============== statistic functions ==============
	AddStatisticFault() uint64

	ClearStatisticFault()

	SetStatisticUnfinishedPing(p uint64)

	GetStatisticUnfinishedPing() uint64

	SetStatisticPingDelay(pd time.Duration, pongTimepoint time.Time)

	GetStatisticPingDelay() time.Duration

	GetStatisticLastPong() time.Time

	GetStatisticCreatedTime() time.Time

	GetStatisticPushStartTimes() uint64
	GetStatisticPushStartSize() uint64
	GetStatisticPushSuccessTimes() uint64
	GetStatisticPushSuccessSize() uint64
	GetStatisticPushFailedTimes() uint64
	GetStatisticPushFailedSize() uint64
	GetStatisticPullTimes() uint64
	GetStatisticPullSize() uint64
}

type EndpointCollectionType

type EndpointCollectionType = map[BusIdType]Endpoint

type EndpointFlag

type EndpointFlag uint32
const (
	EndpointFlag_Resetting        EndpointFlag = 0x0001 /** 正在执行重置(防止递归死循环) **/
	EndpointFlag_ConnectionSorted EndpointFlag = 0x0002
	EndpointFlag_Destructing      EndpointFlag = 0x0004 /** 正在执行析构 **/
	EndpointFlag_HasListenPorc    EndpointFlag = 0x0008 /** 是否有proc类的listen地址 **/
	EndpointFlag_HasListenFd      EndpointFlag = 0x0010 /** 是否有fd类的listen地址 **/

	EndpointFlag_MutableFlags EndpointFlag = 0x0020 /** 可动态变化的属性起始边界 **/
	EndpointFlag_HasPingTimer EndpointFlag = 0x0040 /** 是否设置了ping定时器 **/
)

type ErrorType

type ErrorType = error_code.ErrorType

type IoStreamCallbackEventHandleSet

type IoStreamCallbackEventHandleSet struct {
	// contains filtered or unexported fields
}

func (*IoStreamCallbackEventHandleSet) GetCallback

GetCallback returns the callback for the specified event type.

func (*IoStreamCallbackEventHandleSet) SetCallback

SetCallback sets the callback for the specified event type.

type IoStreamCallbackEventType

type IoStreamCallbackEventType int32
const (
	IoStreamCallbackEventType_Accepted     IoStreamCallbackEventType = 0
	IoStreamCallbackEventType_Connected    IoStreamCallbackEventType = 1
	IoStreamCallbackEventType_Disconnected IoStreamCallbackEventType = 2
	IoStreamCallbackEventType_Received     IoStreamCallbackEventType = 3
	IoStreamCallbackEventType_Written      IoStreamCallbackEventType = 4
	IoStreamCallbackEventType_Max          IoStreamCallbackEventType = 5
)

type IoStreamCallbackFunc

type IoStreamCallbackFunc func(channel IoStreamChannel, conn IoStreamConnection, status int32, privData interface{})

type IoStreamChannel

type IoStreamChannel interface {
	GetContext() context.Context

	SetFlag(f IoStreamConnectionFlag, v bool)

	GetFlag(f IoStreamConnectionFlag) bool

	// 事件响应函数集合
	GetEventHandleSet() *IoStreamCallbackEventHandleSet

	GetStatisticActiveRequestCount() uint64
	GetStatisticReadNetEgainCount() uint64
	GetStatisticCheckBlockSizeFailedCount() uint64
	GetStatisticCheckHashFailedCount() uint64

	// 自定义数据区域
	SetPrivateData(data interface{})

	GetPrivateData() interface{}

	// 核心操作方法
	Listen(addr string) ErrorType
	Connect(addr string) (IoStreamConnection, ErrorType)
	Send(conn IoStreamConnection, data []byte) ErrorType
	Disconnect(conn IoStreamConnection) ErrorType
	Close() ErrorType
}

type IoStreamChannelFlag

type IoStreamChannelFlag uint16
const (
	IoStreamChannelFlag_IsLoopOwner IoStreamChannelFlag = 0x01
	IoStreamChannelFlag_Closing     IoStreamChannelFlag = 0x02
	IoStreamChannelFlag_InCallback  IoStreamChannelFlag = 0x04
)

type IoStreamConfigure

type IoStreamConfigure struct {
	Keepalive time.Duration

	NoBlock bool
	NoDelay bool

	SendBufferStatic       uint64
	ReceiveBufferStatic    uint64
	SendBufferMaxSize      uint64
	SendBufferLimitSize    uint64
	ReceiveBufferMaxSize   uint64
	ReceiveBufferLimitSize uint64

	Backlog        int32
	ConfirmTimeout time.Duration

	MaxReadNetEgainCount             uint64
	MaxReadCheckBlockSizeFailedCount uint64
	MaxReadCheckHashFailedCount      uint64
}

type IoStreamConnection

type IoStreamConnection interface {
	GetAddress() ChannelAddress
	GetStatus() IoStreamConnectionStatus
	SetFlag(f IoStreamConnectionFlag, v bool)
	GetFlag(f IoStreamConnectionFlag) bool

	GetChannel() IoStreamChannel

	// 事件响应函数集合
	GetEventHandleSet() *IoStreamCallbackEventHandleSet

	// 主动关闭连接的回调(为了减少额外分配而采用的缓存策略)
	GetProactivelyDisconnectCallback() IoStreamCallbackFunc

	GetReadBufferManager() *buffer.BufferManager

	// 自定义数据区域
	SetPrivateData(data interface{})

	GetPrivateData() interface{}
}

type IoStreamConnectionFlag

type IoStreamConnectionFlag uint16
const (
	IoStreamConnectionFlag_Listen  IoStreamConnectionFlag = 0x01
	IoStreamConnectionFlag_Connect IoStreamConnectionFlag = 0x02
	IoStreamConnectionFlag_Accept  IoStreamConnectionFlag = 0x04
	IoStreamConnectionFlag_Writing IoStreamConnectionFlag = 0x08
	IoStreamConnectionFlag_Closing IoStreamConnectionFlag = 0x10
)

type IoStreamConnectionStatus

type IoStreamConnectionStatus uint16
const (
	IoStreamConnectionStatus_Created       IoStreamConnectionStatus = 0
	IoStreamConnectionStatus_Connected     IoStreamConnectionStatus = 1
	IoStreamConnectionStatus_Disconnecting IoStreamConnectionStatus = 2
	IoStreamConnectionStatus_Disconnected  IoStreamConnectionStatus = 3
)

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is the Go equivalent of C++ atbus::message.

It wraps the protobuf-generated protocol.MessageHead and protocol.MessageBody, and provides accessor methods similar to the C++ class. The Go version does not use protobuf Arena or inplace caching; it holds the structs directly.

func NewMessage

func NewMessage() *Message

NewMessage creates a new empty Message.

func (*Message) Body

func (m *Message) Body() *protocol.MessageBody

Body returns the message body, creating an empty one if nil.

func (*Message) GetBody

func (m *Message) GetBody() *protocol.MessageBody

GetBody returns the message body (may be nil).

func (*Message) GetBodyType

func (m *Message) GetBodyType() MessageBodyType

GetBodyType returns the body oneof case based on the set message type. It uses the generated GetMessageTypeOneofCase() method.

func (*Message) GetHead

func (m *Message) GetHead() *protocol.MessageHead

GetHead returns the message head (may be nil).

func (*Message) GetUnpackErrorMessage

func (m *Message) GetUnpackErrorMessage() string

GetUnpackErrorMessage returns any unpack error message stored in the message.

func (*Message) Head

func (m *Message) Head() *protocol.MessageHead

Head returns the message head, creating an empty one if nil.

func (*Message) MutableBody

func (m *Message) MutableBody() *protocol.MessageBody

MutableBody returns a mutable reference to the message body.

func (*Message) MutableHead

func (m *Message) MutableHead() *protocol.MessageHead

MutableHead returns a mutable reference to the message head.

func (*Message) SetUnpackError

func (m *Message) SetUnpackError(err string)

SetUnpackError sets the unpack error message.

type MessageBodyType

type MessageBodyType = protocol.MessageBody_EnMessageTypeID

MessageBodyType is the Go equivalent of C++ message_body_type (the oneof case). It aliases protocol.MessageBody_EnMessageTypeID for convenience.

type Node

type Node interface {
	Init(id BusIdType, conf *NodeConfigure) ErrorType

	ReloadCrypto(cryptoKeyExchangeType protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE,
		cryptoKeyRefreshInterval time.Duration,
		cryptoAllowAlgorithms []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE) ErrorType

	ReloadCompression(compressionAllowAlgorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE,
		compressionLevel protocol.ATBUS_COMPRESSION_LEVEL) ErrorType

	Start() ErrorType

	StartWithConfigure(conf *StartConfigure) ErrorType

	Reset() ErrorType

	Proc(now time.Time) ErrorType

	Poll() ErrorType

	Listen(address string) ErrorType

	Connect(address string) ErrorType

	ConnectWithEndpoint(address string, ep Endpoint) ErrorType

	Disconnect(id BusIdType) ErrorType

	GetCryptoKeyExchangeType() protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE

	// ================= send/receive functions =================
	// SendData 发送数据。
	//
	// Parameters:
	//   - tid: 发送目标 ID
	//   - t: 自定义类型,将作为 message.head.type 字段传递,可用于业务区分服务类型
	//   - data: 要发送的数据块
	//
	// Return: 0 或错误码。
	//
	// Note: 接收端收到的数据很可能不是地址对齐的,所以不建议发送内存数据对象(struct/class)。
	// 如果必须发送内存数据对象,接收端一定要 memcpy,不能直接类型转换,除非手动设置了地址对齐规则。
	SendData(tid BusIdType, t int32, data []byte) ErrorType

	// SendDataWithOptions 发送数据(带选项)。
	//
	// Parameters:
	//   - tid: 发送目标 ID
	//   - t: 自定义类型,将作为 message.head.type 字段传递,可用于业务区分服务类型
	//   - data: 要发送的数据块
	//   - options: 发送选项;如果未设置 sequence,会自动分配并写回
	//
	// Return: 0 或错误码。
	//
	// Note: 接收端收到的数据很可能不是地址对齐的,所以不建议发送内存数据对象(struct/class)。
	// 如果必须发送内存数据对象,接收端一定要 memcpy,不能直接类型转换,除非手动设置了地址对齐规则。
	SendDataWithOptions(tid BusIdType, t int32, data []byte, options *NodeSendDataOptions) ErrorType

	// SendCustomCommand 发送自定义命令。
	//
	// Parameters:
	//   - tid: 发送目标 ID
	//   - args: 自定义消息内容数组
	//
	// Return: 0 或错误码。
	SendCustomCommand(tid BusIdType, args [][]byte) ErrorType

	// SendCustomCommandWithOptions 发送自定义命令(带选项)。
	//
	// Parameters:
	//   - tid: 发送目标 ID
	//   - args: 自定义消息内容数组
	//   - options: 发送选项;如果未设置 sequence,会自动分配并写回
	//
	// Return: 0 或错误码。
	SendCustomCommandWithOptions(tid BusIdType, args [][]byte, options *NodeSendDataOptions) ErrorType

	// ================= endpoint management functions =================
	// GetPeerChannel 获取远程发送目标信息。
	//
	// Parameters:
	//   - tid: 发送目标 ID,不能是自己的 BUS ID
	//   - fn: 获取有效连接的接口,用以区分数据通道和控制通道
	//   - options: 获取选项
	//
	// Return: (0 或错误码, 发送目标 endpoint, 发送连接 connection)。
	GetPeerChannel(tid BusIdType, fn func(from Endpoint, to Endpoint) Connection, options *NodeGetPeerOptions) (ErrorType, Endpoint, Connection, TopologyPeer)

	// SetTopologyUpstream 设置节点的上游节点信息。
	//
	// Parameters:
	//   - tid: 上游节点 ID
	SetTopologyUpstream(tid BusIdType)

	// CreateEndpoint 创建 endpoint 对象,并设置生命周期检查。
	// 并不会自动添加到本地,请在初始化完成后调用 AddEndpoint 添加。
	//
	// Parameters:
	//   - tid: 目标 endpoint 的 bus id
	//   - hostName: 目标 endpoint 的主机名
	//   - pid: 目标 endpoint 的进程 ID
	// Return: endpoint 对象,可能为 nil。
	CreateEndpoint(tid BusIdType, hostName string, pid int) Endpoint

	// GetEndpoint 获取本地 endpoint 对象。
	// Return: endpoint 对象,可能为 nil。
	GetEndpoint(tid BusIdType) Endpoint

	// AddEndpoint 添加目标端点。
	// Return: 0 或错误码。
	AddEndpoint(ep Endpoint) ErrorType

	// RemoveEndpoint 移除目标端点。
	// Return: 0 或错误码。
	RemoveEndpoint(ep Endpoint) ErrorType

	// IsEndpointAvailable 是否有到对端的数据通道(可以向对端发送数据)。
	// Note: 如果只有控制通道没有数据通道,返回 false。
	IsEndpointAvailable(tid BusIdType) bool

	// CheckAccessHash 检查 access token 集合的有效性。
	// Return: 没有检查通过的 access token 则返回 false。
	CheckAccessHash(accessKey *protocol.AccessData, plainText string, conn Connection) bool

	// GetAccessCode 获取节点的 hash code。
	GetAccessCode() string

	GetIoStreamChannel() IoStreamChannel

	GetSelfEndpoint() Endpoint

	GetUpstreamEndpoint() Endpoint

	GetTopologyRelation(tid BusIdType) (TopologyRelationType, TopologyPeer)

	GetImmediateEndpointSet() EndpointCollectionType

	// ================= internal message functions =================
	// SendDataMessage 发送数据消息。
	// Return: (0 或错误码, 发送目标 endpoint, 发送连接 connection)。
	SendDataMessage(tid BusIdType, message *Message, options *NodeSendDataOptions) (ErrorType, Endpoint, Connection)

	// SendCtrlMessage 发送控制消息。
	// Return: (0 或错误码, 发送目标 endpoint, 发送连接 connection)。
	SendCtrlMessage(tid BusIdType, message *Message, options *NodeSendDataOptions) (ErrorType, Endpoint, Connection)

	OnReceiveData(ep Endpoint, conn Connection, message *Message, data []byte)

	OnReceiveForwardResponse(ep Endpoint, conn Connection, message *Message)

	OnDisconnect(ep Endpoint, conn Connection) ErrorType

	OnNewConnection(conn Connection) ErrorType

	OnRegister(ep Endpoint, conn Connection, code ErrorType)

	OnActived()

	OnShutdown(code ErrorType) ErrorType

	OnUpstreamRegisterDone()

	OnCustomCommandRequest(ep Endpoint, conn Connection, from BusIdType, argv [][]byte) (ErrorType, [][]byte)

	OnCustomCommandResponse(ep Endpoint, conn Connection, from BusIdType, argv [][]byte, sequence uint64) ErrorType

	OnPing(ep Endpoint, message *Message, body *protocol.PingData) ErrorType

	OnPong(ep Endpoint, message *Message, body *protocol.PingData) ErrorType

	DispatchAllSelfMessages() int32

	GetContext() context.Context

	// Shutdown 关闭 node。
	//
	// Note: 如果需要在关闭前执行资源回收,可以在 on_node_down_fn_t 回调中返回非 0 值来阻止 node 的 reset 操作,
	// 并在资源释放完成后再调用 Shutdown,在第二次 on_node_down_fn_t 回调中返回 0 值。
	//
	// Note: 或者也可以通过 ref_object 和 unref_object 来标记和解除数据引用,reset 函数会执行事件 loop 直到所有引用的资源被移除。
	Shutdown(reason ErrorType) ErrorType

	FatalShutdown(ep Endpoint, conn Connection, code ErrorType, err error) ErrorType

	SetLogger(logger *utils_log.Logger)

	GetLogger() *utils_log.Logger

	IsDebugMessageVerboseEnabled() bool

	EnableDebugMessageVerbose()

	DisableDebugMessageVerbose()

	AddEndpointGcList(ep Endpoint)

	AddConnectionGcList(conn Connection)

	// ================= event handle functions =================
	SetEventHandleOnForwardRequest(handle NodeOnForwardRequestFunc)
	GetEventHandleOnForwardRequest() NodeOnForwardRequestFunc

	SetEventHandleOnForwardResponse(handle NodeOnForwardResponseFunc)
	GetEventHandleOnForwardResponse() NodeOnForwardResponseFunc

	SetEventHandleOnRegister(handle NodeOnRegisterFunc)
	GetEventHandleOnRegister() NodeOnRegisterFunc

	SetEventHandleOnShutdown(handle NodeOnNodeDownFunc)
	GetEventHandleOnShutdown() NodeOnNodeDownFunc

	SetEventHandleOnAvailable(handle NodeOnNodeUpFunc)
	GetEventHandleOnAvailable() NodeOnNodeUpFunc

	SetEventHandleOnInvalidConnection(handle NodeOnInvalidConnectionFunc)
	GetEventHandleOnInvalidConnection() NodeOnInvalidConnectionFunc

	SetEventHandleOnNewConnection(handle NodeOnNewConnectionFunc)
	GetEventHandleOnNewConnection() NodeOnNewConnectionFunc

	SetEventHandleOnCloseConnection(handle NodeOnCloseConnectionFunc)
	GetEventHandleOnCloseConnection() NodeOnCloseConnectionFunc

	SetEventHandleOnCustomCommandRequest(handle NodeOnCustomCommandRequestFunc)
	GetEventHandleOnCustomCommandRequest() NodeOnCustomCommandRequestFunc

	SetEventHandleOnCustomCommandResponse(handle NodeOnCustomCommandResponseFunc)
	GetEventHandleOnCustomCommandResponse() NodeOnCustomCommandResponseFunc

	SetEventHandleOnAddEndpoint(handle NodeOnEndpointEventFunc)
	GetEventHandleOnAddEndpoint() NodeOnEndpointEventFunc

	SetEventHandleOnRemoveEndpoint(handle NodeOnEndpointEventFunc)
	GetEventHandleOnRemoveEndpoint() NodeOnEndpointEventFunc

	SetEventHandleOnPingEndpoint(handle NodeOnPingPongEndpointFunc)
	GetEventHandleOnPingEndpoint() NodeOnPingPongEndpointFunc

	SetEventHandleOnPongEndpoint(handle NodeOnPingPongEndpointFunc)
	GetEventHandleOnPongEndpoint() NodeOnPingPongEndpointFunc

	SetEventHandleOnTopologyUpdateUpstream(handle NodeOnTopologyUpdateUpstreamFunc)
	GetEventHandleOnTopologyUpdateUpstream() NodeOnTopologyUpdateUpstreamFunc

	// ================= date field setter and getter =================
	GetIoStreamConfigure() *IoStreamConfigure

	GetId() BusIdType

	GetConfigure() *NodeConfigure

	CheckFlag(f NodeFlag) bool

	GetState() NodeState

	GetTimerTick() time.Time

	GetTopologyRegistry() TopologyRegistry

	GetPid() int

	GetHostname() string

	SetHostname(hostname string, force bool) bool

	GetProtocolVersion() int32

	GetProtocolMinimalVersion() int32

	GetListenList() []ChannelAddress

	// ================= statistic functions =================
	AddStatisticDispatchTimes()

	// ================= utility functions =================
	AllocateMessageSequence() uint64

	LogDebug(ep Endpoint, conn Connection, m *Message, msg string, args ...any)
	LogInfo(ep Endpoint, conn Connection, msg string, args ...any)
	LogError(ep Endpoint, conn Connection, status int, errcode ErrorType, msg string, args ...any)
}

type NodeConfigure

type NodeConfigure struct {
	EventLoopContext context.Context

	UpstreamAddress        string            // 上游节点地址
	TopologyLabels         map[string]string // 拓扑标签
	LoopTimes              int32             // 消息循环次数限制,防止某些通道繁忙把其他通道堵死
	TTL                    int32             // 消息转发跳转限制
	ProtocolVersion        int32
	ProtocolMinimalVersion int32

	// ===== 连接配置 =====
	BackLog              int32
	FirstIdleTimeout     time.Duration // 第一个包允许的空闲时间
	PingInterval         time.Duration // ping包间隔
	RetryInterval        time.Duration // 重试包间隔
	FaultTolerant        uint32        // 容错次数(次)
	AccessTokenMaxNumber uint32        // 最大 access token 数量,请不要设置的太大,验证次数最大可能是 N^2
	AccessTokens         [][]byte      // access token 列表
	OverwriteListenPath  bool          // 是否覆盖已存在的 listen path(unix/pipe socket)

	// ===== 加密算法配置 =====
	CryptoKeyExchangeType    protocol.ATBUS_CRYPTO_KEY_EXCHANGE_TYPE
	CryptoKeyRefreshInterval time.Duration
	CryptoAllowAlgorithms    []protocol.ATBUS_CRYPTO_ALGORITHM_TYPE

	// ===== 压缩算法配置 =====
	CompressionAllowAlgorithms []protocol.ATBUS_COMPRESSION_ALGORITHM_TYPE
	CompressionLevel           protocol.ATBUS_COMPRESSION_LEVEL

	// ===== 缓冲区配置 =====
	MessageSize      uint64 // max message size
	RecvBufferSize   uint64 // 接收缓冲区,和数据包大小有关
	SendBufferSize   uint64 // 发送缓冲区限制
	SendBufferNumber uint64 // 发送缓冲区静态 Buffer 数量限制,0 则为动态缓冲区
}

type NodeEventHandleSet

type NodeEventHandleSet struct {
	OnForwardRequest         NodeOnForwardRequestFunc
	OnForwardResponse        NodeOnForwardResponseFunc
	OnRegister               NodeOnRegisterFunc
	OnNodeDown               NodeOnNodeDownFunc
	OnNodeUp                 NodeOnNodeUpFunc
	OnInvalidConnection      NodeOnInvalidConnectionFunc
	OnNewConnection          NodeOnNewConnectionFunc
	OnCloseConnection        NodeOnCloseConnectionFunc
	OnCustomCommandRequest   NodeOnCustomCommandRequestFunc
	OnCustomCommandResponse  NodeOnCustomCommandResponseFunc
	OnEndpointAdded          NodeOnEndpointEventFunc
	OnEndpointRemoved        NodeOnEndpointEventFunc
	OnEndpointPing           NodeOnPingPongEndpointFunc
	OnEndpointPong           NodeOnPingPongEndpointFunc
	OnTopologyUpdateUpstream NodeOnTopologyUpdateUpstreamFunc
}

NodeEventHandleSet collects all callbacks.

type NodeFlag

type NodeFlag uint32
const (
	NodeFlag_None            NodeFlag = 0x0000
	NodeFlag_Resetting       NodeFlag = 0x0001 // 正在重置
	NodeFlag_ResettingGc     NodeFlag = 0x0002 // 正在重置且正准备 GC 或 GC 流程已完成
	NodeFlag_Actived         NodeFlag = 0x0004 // 已激活
	NodeFlag_UpstreamRegDone NodeFlag = 0x0008 // 已通过父节点注册
	NodeFlag_Shutdown        NodeFlag = 0x0010 // 已完成关闭前的资源回收
	NodeFlag_RecvSelfMsg     NodeFlag = 0x0020 // 正在接收发给自己的信息
	NodeFlag_InCallback      NodeFlag = 0x0040 // 在回调函数中
	NodeFlag_InProc          NodeFlag = 0x0080 // 在 Proc 函数中
	NodeFlag_InPoll          NodeFlag = 0x0100 // 在 Poll 函数中
	NodeFlag_InGcEndpoints   NodeFlag = 0x0200 // 在清理 endpoint 过程中
	NodeFlag_InGcConnections NodeFlag = 0x0400 // 在清理 connection 过程中
)

type NodeGetPeerOptionFlag

type NodeGetPeerOptionFlag uint16
const (
	NodeGetPeerOptionFlag_NoUpstream NodeGetPeerOptionFlag = 0x0001 // 不允许上游转发
)

type NodeGetPeerOptions

type NodeGetPeerOptions struct {
	// contains filtered or unexported fields
}

func CreateNodeGetPeerOptions

func CreateNodeGetPeerOptions() *NodeGetPeerOptions

func (*NodeGetPeerOptions) GetBlacklist

func (o *NodeGetPeerOptions) GetBlacklist() []BusIdType

func (*NodeGetPeerOptions) GetFlag

func (*NodeGetPeerOptions) SetBlacklist

func (o *NodeGetPeerOptions) SetBlacklist(blacklist []BusIdType)

func (*NodeGetPeerOptions) SetFlag

type NodeOnCloseConnectionFunc

type NodeOnCloseConnectionFunc func(n Node, ep Endpoint, conn Connection) ErrorType

NodeOnCloseConnectionFunc is called when a connection is closed. Parameters: node, endpoint, connection.

type NodeOnCustomCommandRequestFunc

type NodeOnCustomCommandRequestFunc func(n Node, ep Endpoint, conn Connection, from BusIdType, argv [][]byte) (result ErrorType, response [][]byte)

NodeOnCustomCommandRequestFunc is called when a custom command request is received.

Parameters:

  • node, endpoint, connection
  • source bus id
  • argv: list of raw arg byte slices
  • rsp: output response lines (some transports may ignore cross-node responses)

type NodeOnCustomCommandResponseFunc

type NodeOnCustomCommandResponseFunc func(n Node, ep Endpoint, conn Connection, from BusIdType, rspData [][]byte, sequence uint64) ErrorType

NodeOnCustomCommandResponseFunc is called when a custom command response is received. Parameters: node, endpoint, connection, source bus id, response data list, sequence of the request.

type NodeOnEndpointEventFunc

type NodeOnEndpointEventFunc func(n Node, ep Endpoint, status ErrorType) ErrorType

NodeOnEndpointEventFunc is called when an endpoint is added/removed. Parameters: node, endpoint, status/error code.

type NodeOnForwardRequestFunc

type NodeOnForwardRequestFunc func(n Node, ep Endpoint, conn Connection, msg *Message, data []byte) ErrorType

NodeOnForwardRequestFunc is called when a message is received. Parameters: node, source endpoint, source connection, decoded message, raw payload bytes.

type NodeOnForwardResponseFunc

type NodeOnForwardResponseFunc func(n Node, ep Endpoint, conn Connection, msg *Message) ErrorType

NodeOnForwardResponseFunc is called when a forwarded message send fails or (optionally) succeeds.

Note: unless send is marked as requiring a response/notification, success usually does not trigger a callback. Parameters: node, source endpoint, source connection, message (may be nil depending on context).

type NodeOnInvalidConnectionFunc

type NodeOnInvalidConnectionFunc func(n Node, conn Connection, errCode ErrorType) ErrorType

NodeOnInvalidConnectionFunc is called when a connection becomes invalid. Parameters: node, connection, error code (typically EN_ATBUS_ERR_NODE_TIMEOUT).

type NodeOnNewConnectionFunc

type NodeOnNewConnectionFunc func(n Node, conn Connection) ErrorType

NodeOnNewConnectionFunc is called when a new connection is created. Parameters: node, connection.

type NodeOnNodeDownFunc

type NodeOnNodeDownFunc func(n Node, errCode ErrorType) ErrorType

NodeOnNodeDownFunc is called when the node is shutting down. Parameters: node, reason.

type NodeOnNodeUpFunc

type NodeOnNodeUpFunc func(n Node, errCode ErrorType) ErrorType

NodeOnNodeUpFunc is called when the node starts serving. Parameters: node, status (typically EN_ATBUS_ERR_SUCCESS).

type NodeOnPingPongEndpointFunc

type NodeOnPingPongEndpointFunc func(n Node, ep Endpoint, msg *Message, ping *protocol.PingData) ErrorType

NodeOnPingPongEndpointFunc is called when a ping/pong message is received from an endpoint. Parameters: node, endpoint, decoded message, ping data.

type NodeOnRegisterFunc

type NodeOnRegisterFunc func(n Node, ep Endpoint, conn Connection, errCode ErrorType) ErrorType

NodeOnRegisterFunc is called when a new peer endpoint is registered. Parameters: node, endpoint, connection, status/error code.

type NodeOnTopologyUpdateUpstreamFunc

type NodeOnTopologyUpdateUpstreamFunc func(n Node, self TopologyPeer, upstream TopologyPeer, data *TopologyData) ErrorType

NodeOnTopologyUpdateUpstreamFunc is called when the topology upstream is updated. Parameters: node, self, new upstream, topology data of self.

type NodeSendDataOptionFlag

type NodeSendDataOptionFlag uint32
const (
	NodeSendDataOptionFlag_RequiredResponse NodeSendDataOptionFlag = 0x0001 // 是否强制需要回包(默认情况下如果发送成功是没有回包通知的)
	NodeSendDataOptionFlag_NoUpstream       NodeSendDataOptionFlag = 0x0002 // 是否禁止上游转发
)

type NodeSendDataOptions

type NodeSendDataOptions struct {
	// contains filtered or unexported fields
}

func CreateNodeSendDataOptions

func CreateNodeSendDataOptions() *NodeSendDataOptions

func (*NodeSendDataOptions) GetFlag

func (*NodeSendDataOptions) GetSequence

func (o *NodeSendDataOptions) GetSequence() uint64

func (*NodeSendDataOptions) SetFlag

func (*NodeSendDataOptions) SetSequence

func (o *NodeSendDataOptions) SetSequence(seq uint64)

type NodeState

type NodeState int32
const (
	NodeState_Created            NodeState = 0
	NodeState_Inited             NodeState = 1
	NodeState_LostUpstream       NodeState = 2
	NodeState_ConnectingUpstream NodeState = 3
	NodeState_Running            NodeState = 4
)

type StartConfigure

type StartConfigure struct {
	TimerTimepoint time.Time
}

type TimerDescPair

type TimerDescPair[V any] struct {
	Timeout time.Time
	Value   V
}

type TimerDescType

type TimerDescType[K comparable, V any] = utils_memory.LRUMap[K, TimerDescPair[V]]

type TopologyData

type TopologyData struct {
	// Pid is the process id of the peer.
	Pid int32
	// Hostname is the hostname of the peer.
	Hostname string
	// Labels contains arbitrary labels for policy matching (e.g. region/zone/group/version).
	Labels map[string]string
}

TopologyData is the runtime topology data associated with a peer.

type TopologyPeer

type TopologyPeer interface {
	// GetBusId returns the bus id of this peer.
	GetBusId() BusIdType

	// GetUpstream returns the upstream (parent) peer, or nil if this peer is a root.
	GetUpstream() TopologyPeer

	// GetTopologyData returns current topology data of this peer.
	GetTopologyData() *TopologyData

	// ContainsDownstream checks whether a downstream(peer) with given bus id exists.
	ContainsDownstream(busId BusIdType) bool

	// ForeachDownstream iterates all downstream peers.
	ForeachDownstream(fn func(peer TopologyPeer) bool) bool
}

TopologyPeer is a topology node (peer). A peer has:

  • A stable bus id.
  • An optional upstream peer.
  • A set of downstream peers.
  • Associated TopologyData.

type TopologyPolicyRule

type TopologyPolicyRule struct {
	// RequireSameProcess requires same pid and same hostname if true.
	RequireSameProcess bool
	// RequireSameHostName requires same hostname if true.
	RequireSameHostName bool
	// RequireLabelValues contains label constraints.
	// Key: label name.
	// Value: allowed label values (set).
	RequireLabelValues map[string]map[string]struct{}
}

TopologyPolicyRule is a policy rule used by TopologyRegistry.CheckPolicy. It describes constraints that the "to" peer must satisfy.

type TopologyRegistry

type TopologyRegistry interface {
	// GetPeer returns peer by bus id, or nil if not found.
	GetPeer(busId BusIdType) TopologyPeer

	// RemovePeer removes a peer and fixes relationships.
	// If the peer has an upstream, it will be removed from the upstream's downstream set.
	// For each downstream peer whose upstream is this peer, its upstream will be cleared.
	RemovePeer(targetBusId BusIdType)

	// UpdatePeer creates or updates a peer.
	//   - If targetBusId is 0, this call is ignored.
	//   - If upstreamBusId is 0, the peer becomes a root (no upstream).
	//   - If peer exists and upstream changed, downstream links will be updated accordingly.
	// true on success, false on failure (e.g. invalid target_bus_id or there will be a circle).
	UpdatePeer(targetBusId BusIdType, upstreamBusId BusIdType, data *TopologyData) bool

	// GetRelation returns the topology relation between two peers.
	// The first return value is the relation type.
	// The second return value is the next-hop peer (may be nil):
	//   - If the relation is TransitiveUpstream or TransitiveDownstream, it's to.
	//   - If the relation is SameUpstreamPeer, it's the upstream of both from and to.
	//   - If the relation is OtherUpstreamPeer, it prefers the upstream of from;
	//     if from has no upstream then returns to.
	GetRelation(from BusIdType, to BusIdType) (TopologyRelationType, TopologyPeer)

	// ForeachPeer iterates all peers in registry.
	// The callback function returns false to stop iteration early.
	// Returns true if iterated all peers, false if stopped by callback.
	ForeachPeer(fn func(peer TopologyPeer) bool) bool

	// CheckPolicy checks whether toData satisfies the fromPolicy.
	// The checks include:
	//   - same hostname (optional)
	//   - same process (optional, implies same hostname)
	//   - required labels (optional)
	CheckPolicy(rule *TopologyPolicyRule, fromData *TopologyData, toData *TopologyData) bool
}

TopologyRegistry provides CRUD for peers and relation querying. This is an in-process data structure. It is NOT thread-safe.

type TopologyRelationType

type TopologyRelationType uint8

TopologyRelationType represents the relation type between two peers in the topology registry. The relation is evaluated based on the upstream chain (parent links).

const (
	// TopologyRelationType_Invalid indicates invalid input or one/both peers not found.
	TopologyRelationType_Invalid TopologyRelationType = 0
	// TopologyRelationType_Self indicates from == to.
	TopologyRelationType_Self TopologyRelationType = 1
	// TopologyRelationType_ImmediateUpstream indicates to is the direct upstream(parent) of from.
	TopologyRelationType_ImmediateUpstream TopologyRelationType = 2
	// TopologyRelationType_TransitiveUpstream indicates to is an ancestor of from, but not the direct upstream.
	TopologyRelationType_TransitiveUpstream TopologyRelationType = 3
	// TopologyRelationType_ImmediateDownstream indicates to is the direct downstream(child) of from.
	TopologyRelationType_ImmediateDownstream TopologyRelationType = 4
	// TopologyRelationType_TransitiveDownstream indicates to is a descendant of from, but not the direct downstream.
	TopologyRelationType_TransitiveDownstream TopologyRelationType = 5
	// TopologyRelationType_SameUpstreamPeer indicates from and to share the same direct upstream.
	TopologyRelationType_SameUpstreamPeer TopologyRelationType = 6
	// TopologyRelationType_OtherUpstreamPeer indicates from and to do not fall into any of the above categories.
	TopologyRelationType_OtherUpstreamPeer TopologyRelationType = 7
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL