Documentation
¶
Overview ¶
Package libatbus_channel_iostream provides IO stream channel implementation for libatbus. It handles TCP/Unix socket/Named pipe connections with proper frame encoding/decoding.
Index ¶
- Constants
- Variables
- func CalculateHash(payload []byte) uint32
- func PackFrame(payload []byte, frame []byte) int
- func PackFrameSize(payloadSize uint64) int
- func TryUnpackFrameHeader(data []byte) (payloadLen uint64, headerSize int, needMoreData bool)
- type FrameReader
- type IoStreamCallbackEventHandleSet
- type IoStreamCallbackEventType
- type IoStreamCallbackFunc
- type IoStreamChannel
- func (c *IoStreamChannel) Close() error_code.ErrorType
- func (c *IoStreamChannel) Connect(addr string) (types.IoStreamConnection, error_code.ErrorType)
- func (c *IoStreamChannel) Disconnect(conn types.IoStreamConnection) error_code.ErrorType
- func (c *IoStreamChannel) GetConfigure() *IoStreamConfigure
- func (c *IoStreamChannel) GetConnections() []*IoStreamConnection
- func (c *IoStreamChannel) GetContext() context.Context
- func (c *IoStreamChannel) GetEventHandleSet() *IoStreamCallbackEventHandleSet
- func (c *IoStreamChannel) GetFlag(f IoStreamConnectionFlag) bool
- func (c *IoStreamChannel) GetPrivateData() interface{}
- func (c *IoStreamChannel) GetStatisticActiveRequestCount() uint64
- func (c *IoStreamChannel) GetStatisticCheckBlockSizeFailedCount() uint64
- func (c *IoStreamChannel) GetStatisticCheckHashFailedCount() uint64
- func (c *IoStreamChannel) GetStatisticReadNetEgainCount() uint64
- func (c *IoStreamChannel) Listen(addr string) error_code.ErrorType
- func (c *IoStreamChannel) Send(conn types.IoStreamConnection, data []byte) error_code.ErrorType
- func (c *IoStreamChannel) SetFlag(f IoStreamConnectionFlag, v bool)
- func (c *IoStreamChannel) SetPrivateData(data interface{})
- type IoStreamChannelFlag
- type IoStreamConfigure
- type IoStreamConnection
- func (c *IoStreamConnection) GetAddress() types.ChannelAddress
- func (c *IoStreamConnection) GetChannel() types.IoStreamChannel
- func (c *IoStreamConnection) GetEventHandleSet() *IoStreamCallbackEventHandleSet
- func (c *IoStreamConnection) GetFlag(f IoStreamConnectionFlag) bool
- func (c *IoStreamConnection) GetNetConn() net.Conn
- func (c *IoStreamConnection) GetPrivateData() interface{}
- func (c *IoStreamConnection) GetProactivelyDisconnectCallback() IoStreamCallbackFunc
- func (c *IoStreamConnection) GetReadBufferManager() *buffer.BufferManager
- func (c *IoStreamConnection) GetStatus() IoStreamConnectionStatus
- func (c *IoStreamConnection) SetFlag(f IoStreamConnectionFlag, v bool)
- func (c *IoStreamConnection) SetPrivateData(data interface{})
- func (c *IoStreamConnection) SetStatus(status IoStreamConnectionStatus)
- type IoStreamConnectionFlag
- type IoStreamConnectionStatus
- type UnpackFrameResult
Constants ¶
const ( // DefaultWriteQueueSize is the default size of the write queue DefaultWriteQueueSize = 1024 // DefaultReadBufferSize is the default read buffer size DefaultReadBufferSize = 64 * 1024 )
const ( // HashSize is the size of the hash field in bytes (murmur3_x86_32) HashSize = 4 // MaxVintSize is the maximum size of a varint encoded value (for uint64) MaxVintSize = 10 // MaxFrameHeaderSize is the maximum possible frame header size MaxFrameHeaderSize = HashSize + MaxVintSize )
const ( // IoStreamCallbackEventTypeAccepted is triggered when a new connection is accepted. IoStreamCallbackEventTypeAccepted = types.IoStreamCallbackEventType_Accepted // IoStreamCallbackEventTypeConnected is triggered when an outbound connection is established. IoStreamCallbackEventTypeConnected = types.IoStreamCallbackEventType_Connected // IoStreamCallbackEventTypeDisconnected is triggered when a connection is closed. IoStreamCallbackEventTypeDisconnected = types.IoStreamCallbackEventType_Disconnected // IoStreamCallbackEventTypeReceived is triggered when data is received. IoStreamCallbackEventTypeReceived = types.IoStreamCallbackEventType_Received // IoStreamCallbackEventTypeWritten is triggered when data has been written. IoStreamCallbackEventTypeWritten = types.IoStreamCallbackEventType_Written // IoStreamCallbackEventTypeMax is the maximum event type value. IoStreamCallbackEventTypeMax = types.IoStreamCallbackEventType_Max )
const ( // IoStreamConnectionFlagListen indicates this is a listening connection. IoStreamConnectionFlagListen = types.IoStreamConnectionFlag_Listen // IoStreamConnectionFlagConnect indicates this is an outbound connection. IoStreamConnectionFlagConnect = types.IoStreamConnectionFlag_Connect // IoStreamConnectionFlagAccept indicates this connection was accepted from a listener. IoStreamConnectionFlagAccept = types.IoStreamConnectionFlag_Accept // IoStreamConnectionFlagWriting indicates the connection is currently writing. IoStreamConnectionFlagWriting = types.IoStreamConnectionFlag_Writing // IoStreamConnectionFlagClosing indicates the connection is being closed. IoStreamConnectionFlagClosing = types.IoStreamConnectionFlag_Closing )
const ( // IoStreamConnectionStatusCreated indicates the connection has been created but not connected. IoStreamConnectionStatusCreated = types.IoStreamConnectionStatus_Created // IoStreamConnectionStatusConnected indicates the connection is established. IoStreamConnectionStatusConnected = types.IoStreamConnectionStatus_Connected // IoStreamConnectionStatusDisconnecting indicates the connection is being disconnected. IoStreamConnectionStatusDisconnecting = types.IoStreamConnectionStatus_Disconnecting // IoStreamConnectionStatusDisconnected indicates the connection has been closed. IoStreamConnectionStatusDisconnected = types.IoStreamConnectionStatus_Disconnected )
const ( // IoStreamChannelFlagIsLoopOwner indicates the channel owns the event loop. IoStreamChannelFlagIsLoopOwner = types.IoStreamChannelFlag_IsLoopOwner // IoStreamChannelFlagClosing indicates the channel is being closed. IoStreamChannelFlagClosing = types.IoStreamChannelFlag_Closing // IoStreamChannelFlagInCallback indicates we're currently in a callback. IoStreamChannelFlagInCallback = types.IoStreamChannelFlag_InCallback )
Variables ¶
var ( // ErrBufferTooSmall indicates the buffer is too small to hold the frame ErrBufferTooSmall = errors.New("buffer too small for frame") // ErrInvalidFrameHash indicates the frame hash verification failed ErrInvalidFrameHash = errors.New("invalid frame hash") // ErrInvalidFrameLength indicates the frame length is invalid ErrInvalidFrameLength = errors.New("invalid frame length") // ErrIncompleteFrame indicates the frame is incomplete (need more data) ErrIncompleteFrame = errors.New("incomplete frame data") )
var SetDefaultIoStreamConfigure = types.SetDefaultIoStreamConfigure
SetDefaultIoStreamConfigure is an alias for types.SetDefaultIoStreamConfigure.
Functions ¶
func CalculateHash ¶
CalculateHash calculates the murmur3_x86_32 hash of the payload with seed 0. This matches the C++ implementation: atfw::util::hash::murmur_hash3_x86_32(buf, len, 0)
func PackFrame ¶
PackFrame packs payload into a frame with hash and length prefix. Returns the number of bytes written, or 0 if buffer is too small.
The frame format is:
- 4 bytes: murmur3_x86_32 hash of payload (little-endian)
- varint: payload length (libatbus custom vint format)
- payload: the actual data
func PackFrameSize ¶
PackFrameSize returns the total size needed to pack a frame with the given payload size.
func TryUnpackFrameHeader ¶
TryUnpackFrameHeader tries to parse just the frame header (hash + length) without validating hash. This is useful for pre-allocating buffer space for large messages. Returns:
- payloadLen: the payload length if header is complete
- headerSize: the size of the header (hash + varint) if complete, 0 otherwise
- needMoreData: true if more data is needed to parse the header
Types ¶
type FrameReader ¶
type FrameReader struct {
// contains filtered or unexported fields
}
FrameReader provides a streaming frame reader that accumulates data and extracts complete frames.
func NewFrameReader ¶
func NewFrameReader(initialCapacity int) *FrameReader
NewFrameReader creates a new FrameReader with the specified initial buffer capacity.
func (*FrameReader) Available ¶
func (r *FrameReader) Available() int
Available returns the number of bytes available for reading.
func (*FrameReader) ReadFrame ¶
func (r *FrameReader) ReadFrame() UnpackFrameResult
ReadFrame attempts to read a complete frame from the buffer. Returns the unpacked result. If no complete frame is available, Error will be ErrIncompleteFrame.
func (*FrameReader) Write ¶
func (r *FrameReader) Write(data []byte) int
Write appends data to the reader's buffer, growing it if necessary. Returns the number of bytes written.
type IoStreamCallbackEventHandleSet ¶
type IoStreamCallbackEventHandleSet = types.IoStreamCallbackEventHandleSet
IoStreamCallbackEventHandleSet holds callback functions for IO stream events.
type IoStreamCallbackEventType ¶
type IoStreamCallbackEventType = types.IoStreamCallbackEventType
IoStreamCallbackEventType defines the type of IO stream callback event.
type IoStreamCallbackFunc ¶
type IoStreamCallbackFunc = types.IoStreamCallbackFunc
IoStreamCallbackFunc is the callback function type for IO stream events.
type IoStreamChannel ¶
type IoStreamChannel struct {
// contains filtered or unexported fields
}
IoStreamChannel manages IO stream connections.
func NewIoStreamChannel ¶
func NewIoStreamChannel(ctx context.Context, conf *IoStreamConfigure) *IoStreamChannel
NewIoStreamChannel creates a new IO stream channel.
func (*IoStreamChannel) Close ¶
func (c *IoStreamChannel) Close() error_code.ErrorType
Close closes the channel and all connections.
func (*IoStreamChannel) Connect ¶
func (c *IoStreamChannel) Connect(addr string) (types.IoStreamConnection, error_code.ErrorType)
Connect connects to the specified address. Returns the connection and error code.
func (*IoStreamChannel) Disconnect ¶
func (c *IoStreamChannel) Disconnect(conn types.IoStreamConnection) error_code.ErrorType
Disconnect disconnects the specified connection.
func (*IoStreamChannel) GetConfigure ¶
func (c *IoStreamChannel) GetConfigure() *IoStreamConfigure
GetConfigure returns the channel configuration.
func (*IoStreamChannel) GetConnections ¶
func (c *IoStreamChannel) GetConnections() []*IoStreamConnection
GetConnection returns all current connections.
func (*IoStreamChannel) GetContext ¶
func (c *IoStreamChannel) GetContext() context.Context
GetContext returns the channel context.
func (*IoStreamChannel) GetEventHandleSet ¶
func (c *IoStreamChannel) GetEventHandleSet() *IoStreamCallbackEventHandleSet
GetEventHandleSet returns the event handle set.
func (*IoStreamChannel) GetFlag ¶
func (c *IoStreamChannel) GetFlag(f IoStreamConnectionFlag) bool
GetFlag returns whether a flag is set.
func (*IoStreamChannel) GetPrivateData ¶
func (c *IoStreamChannel) GetPrivateData() interface{}
GetPrivateData returns user-defined private data.
func (*IoStreamChannel) GetStatisticActiveRequestCount ¶
func (c *IoStreamChannel) GetStatisticActiveRequestCount() uint64
GetStatisticActiveRequestCount returns the active request count.
func (*IoStreamChannel) GetStatisticCheckBlockSizeFailedCount ¶
func (c *IoStreamChannel) GetStatisticCheckBlockSizeFailedCount() uint64
GetStatisticCheckBlockSizeFailedCount returns the block size check failure count.
func (*IoStreamChannel) GetStatisticCheckHashFailedCount ¶
func (c *IoStreamChannel) GetStatisticCheckHashFailedCount() uint64
GetStatisticCheckHashFailedCount returns the hash check failure count.
func (*IoStreamChannel) GetStatisticReadNetEgainCount ¶
func (c *IoStreamChannel) GetStatisticReadNetEgainCount() uint64
GetStatisticReadNetEgainCount returns the EAGAIN count.
func (*IoStreamChannel) Listen ¶
func (c *IoStreamChannel) Listen(addr string) error_code.ErrorType
Listen starts listening on the specified address. Supported address formats:
- ipv4://host:port - IPv4/IPv6 TCP
- ipv6://host:port - IPv4/IPv6 TCP
- atcp://host:port - IPv4/IPv6 TCP
- dns://host:port - DNS resolved TCP
- unix://path - Unix domain socket
- pipe://path - Named pipe (same as unix)
func (*IoStreamChannel) Send ¶
func (c *IoStreamChannel) Send(conn types.IoStreamConnection, data []byte) error_code.ErrorType
Send sends data to the specified connection. The data will be automatically framed with hash and length prefix.
func (*IoStreamChannel) SetFlag ¶
func (c *IoStreamChannel) SetFlag(f IoStreamConnectionFlag, v bool)
SetFlag sets or clears a channel flag.
func (*IoStreamChannel) SetPrivateData ¶
func (c *IoStreamChannel) SetPrivateData(data interface{})
SetPrivateData sets user-defined private data.
type IoStreamChannelFlag ¶
type IoStreamChannelFlag = types.IoStreamChannelFlag
IoStreamChannelFlag defines channel flags.
type IoStreamConfigure ¶
type IoStreamConfigure = types.IoStreamConfigure
IoStreamConfigure is an alias for types.IoStreamConfigure.
type IoStreamConnection ¶
type IoStreamConnection struct {
// contains filtered or unexported fields
}
IoStreamConnection represents a single IO stream connection.
func (*IoStreamConnection) GetAddress ¶
func (c *IoStreamConnection) GetAddress() types.ChannelAddress
GetAddress returns the connection address.
func (*IoStreamConnection) GetChannel ¶
func (c *IoStreamConnection) GetChannel() types.IoStreamChannel
GetChannel returns the parent channel.
func (*IoStreamConnection) GetEventHandleSet ¶
func (c *IoStreamConnection) GetEventHandleSet() *IoStreamCallbackEventHandleSet
GetEventHandleSet returns the event handle set.
func (*IoStreamConnection) GetFlag ¶
func (c *IoStreamConnection) GetFlag(f IoStreamConnectionFlag) bool
GetFlag returns whether a flag is set.
func (*IoStreamConnection) GetNetConn ¶
func (c *IoStreamConnection) GetNetConn() net.Conn
GetNetConn returns the underlying net.Conn.
func (*IoStreamConnection) GetPrivateData ¶
func (c *IoStreamConnection) GetPrivateData() interface{}
GetPrivateData returns user-defined private data.
func (*IoStreamConnection) GetProactivelyDisconnectCallback ¶
func (c *IoStreamConnection) GetProactivelyDisconnectCallback() IoStreamCallbackFunc
GetProactivelyDisconnectCallback returns the proactive disconnect callback.
func (*IoStreamConnection) GetReadBufferManager ¶
func (c *IoStreamConnection) GetReadBufferManager() *buffer.BufferManager
GetReadBufferManager returns the read buffer manager.
func (*IoStreamConnection) GetStatus ¶
func (c *IoStreamConnection) GetStatus() IoStreamConnectionStatus
GetStatus returns the current connection status.
func (*IoStreamConnection) SetFlag ¶
func (c *IoStreamConnection) SetFlag(f IoStreamConnectionFlag, v bool)
SetFlag sets or clears a connection flag.
func (*IoStreamConnection) SetPrivateData ¶
func (c *IoStreamConnection) SetPrivateData(data interface{})
SetPrivateData sets user-defined private data.
func (*IoStreamConnection) SetStatus ¶
func (c *IoStreamConnection) SetStatus(status IoStreamConnectionStatus)
SetStatus sets the connection status.
type IoStreamConnectionFlag ¶
type IoStreamConnectionFlag = types.IoStreamConnectionFlag
IoStreamConnectionFlag defines connection flags.
type IoStreamConnectionStatus ¶
type IoStreamConnectionStatus = types.IoStreamConnectionStatus
IoStreamConnectionStatus defines connection status.
type UnpackFrameResult ¶
type UnpackFrameResult struct {
// Payload is the extracted payload data
Payload []byte
// Consumed is the total number of bytes consumed from the input buffer
Consumed int
// Error indicates any error that occurred during unpacking
Error error
// ErrorCode is the libatbus error code
ErrorCode error_code.ErrorType
}
UnpackFrameResult contains the result of unpacking a frame.
func UnpackFrame ¶
func UnpackFrame(data []byte) UnpackFrameResult
UnpackFrame attempts to unpack a frame from the input buffer. Returns the unpacked result including payload, consumed bytes, and any error.
If the buffer doesn't contain a complete frame, Error will be ErrIncompleteFrame. If the hash verification fails, Error will be ErrInvalidFrameHash.