Documentation
¶
Index ¶
- Constants
- Variables
- func ConvertFromStringMap(m map[string]string) []*pb.KeyValue
- func ConvertToStringMap(pbb []*pb.KeyValue) map[string]string
- func Crc32cCheckSum(data []byte) uint32
- func GetAndAdd(n *uint64, diff uint64) uint64
- func JavaStringHash(s string) uint32
- func Murmur3_32Hash(s string) uint32
- func ParseRelativeTimeInSeconds(relativeTime string) (time.Duration, error)
- func TimestampMillis(t time.Time) uint64
- func TopicNameWithoutPartitionPart(tn *TopicName) string
- type Backoff
- type BatchBuilder
- type BlockingQueue
- type Buffer
- type BuffersPool
- type CheckSum
- type ClientHandlers
- type Closable
- type Connection
- type ConnectionListener
- type ConnectionPool
- type ConsumerHandler
- type LookupResult
- type LookupService
- type MessageReader
- type RPCClient
- type RPCResult
- type Semaphore
- type TLSOptions
- type TopicName
Constants ¶
const ( // MaxMessageSize limit message size for transfer MaxMessageSize = 5 * 1024 * 1024 // MessageFramePadding is for metadata and other frame headers MessageFramePadding = 10 * 1024 // MaxFrameSize limit the maximum size that pulsar allows for messages to be sent. MaxFrameSize = MaxMessageSize + MessageFramePadding )
const ( // TODO: Find a better way to embed the version in the library code PulsarVersion = "0.1" ClientVersionString = "Pulsar Go " + PulsarVersion PulsarProtocolVersion = int32(pb.ProtocolVersion_v13) )
Variables ¶
var ErrConnectionClosed = errors.New("connection closed")
var ErrCorruptedMessage = errors.New("corrupted message")
ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data. The data is considered corrupted if it's missing a header, a checksum mismatch or there was an error when unmarshalling the message metadata.
var ErrEOM = errors.New("EOF")
ErrEOM is the error returned by ReadMessage when no more input is available.
Functions ¶
func ConvertFromStringMap ¶
ConvertFromStringMap convert a string map to a KeyValue []byte
func ConvertToStringMap ¶
ConvertToStringMap convert a KeyValue []byte to string map
func Crc32cCheckSum ¶
Crc32cCheckSum handles computing the checksum.
func JavaStringHash ¶
JavaStringHash and Java String.hashCode() equivalent
func Murmur3_32Hash ¶
Murmur3_32Hash use Murmur3 hashing function
func TimestampMillis ¶
TimestampMillis return a time unix nano.
Types ¶
type BatchBuilder ¶
type BatchBuilder struct {
// contains filtered or unexported fields
}
BatchBuilder wraps the objects needed to build a batch.
func NewBatchBuilder ¶
func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, logger log.Logger) (*BatchBuilder, error)
NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func (*BatchBuilder) Add ¶
func (bb *BatchBuilder) Add(metadata *pb.SingleMessageMetadata, sequenceID uint64, payload []byte, callback interface{}, replicateTo []string, deliverAt time.Time) bool
Add will add single message to batch.
func (*BatchBuilder) Close ¶ added in v0.2.0
func (bb *BatchBuilder) Close() error
func (*BatchBuilder) Flush ¶
func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{})
Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
func (*BatchBuilder) IsFull ¶
func (bb *BatchBuilder) IsFull() bool
IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
type BlockingQueue ¶
type BlockingQueue interface {
// Put enqueue one item, block if the queue is full
Put(item interface{})
// Take dequeue one item, block until it's available
Take() interface{}
// Poll dequeue one item, return nil if queue is empty
Poll() interface{}
// Peek return the first item without dequeing, return nil if queue is empty
Peek() interface{}
// PeekLast return last item in queue without dequeing, return nil if queue is empty
PeekLast() interface{}
// Size return the current size of the queue
Size() int
// ReadableSlice returns a new view of the readable items in the queue
ReadableSlice() []interface{}
}
BlockingQueue is a interface of block queue
func NewBlockingQueue ¶
func NewBlockingQueue(maxSize int) BlockingQueue
NewBlockingQueue init block queue and returns a BlockingQueue
type Buffer ¶
type Buffer interface {
ReadableBytes() uint32
WritableBytes() uint32
// Capacity returns the capacity of the buffer's underlying byte slice,
// that is, the total space allocated for the buffer's data.
Capacity() uint32
IsWritable() bool
Read(size uint32) []byte
Get(readerIndex uint32, size uint32) []byte
ReadableSlice() []byte
WritableSlice() []byte
// WrittenBytes advance the writer index when data was written in a slice
WrittenBytes(size uint32)
// MoveToFront copy the available portion of data at the beginning of the buffer
MoveToFront()
ReadUint16() uint16
ReadUint32() uint32
WriteUint16(n uint16)
WriteUint32(n uint32)
WriterIndex() uint32
ReaderIndex() uint32
Write(s []byte)
Put(writerIdx uint32, s []byte)
PutUint32(n uint32, writerIdx uint32)
Resize(newSize uint32)
ResizeIfNeeded(spaceNeeded uint32)
// Clear will clear the current buffer data.
Clear()
}
Buffer is a variable-sized buffer of bytes with Read and Write methods. The zero value for Buffer is an empty buffer ready to use.
func NewBufferWrapper ¶
type BuffersPool ¶ added in v0.2.0
type BuffersPool interface {
GetBuffer() Buffer
}
type ClientHandlers ¶
type ClientHandlers struct {
// contains filtered or unexported fields
}
ClientHandlerMap is a simple concurrent-safe map for the client type
func NewClientHandlers ¶
func NewClientHandlers() ClientHandlers
func (*ClientHandlers) Add ¶
func (h *ClientHandlers) Add(c Closable)
func (*ClientHandlers) Close ¶
func (h *ClientHandlers) Close()
func (*ClientHandlers) Del ¶
func (h *ClientHandlers) Del(c Closable)
func (*ClientHandlers) Val ¶
func (h *ClientHandlers) Val(c Closable) bool
type Connection ¶
type Connection interface {
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
SendRequestNoWait(req *pb.BaseCommand) error
WriteData(data Buffer)
RegisterListener(id uint64, listener ConnectionListener)
UnregisterListener(id uint64)
AddConsumeHandler(id uint64, handler ConsumerHandler)
DeleteConsumeHandler(id uint64)
ID() string
GetMaxMessageSize() int32
Close()
}
Connection is a interface of client cnx.
type ConnectionListener ¶
type ConnectionListener interface {
// ReceivedSendReceipt receive and process the return value of the send command.
ReceivedSendReceipt(response *pb.CommandSendReceipt)
// ConnectionClosed close the TCP connection.
ConnectionClosed()
}
ConnectionListener is a user of a connection (eg. a producer or a consumer) that can register itself to get notified when the connection is closed.
type ConnectionPool ¶
type ConnectionPool interface {
// GetConnection get a connection from ConnectionPool.
GetConnection(logicalAddr *url.URL, physicalAddr *url.URL) (Connection, error)
// Close all the connections in the pool
Close()
}
ConnectionPool is a interface of connection pool.
func NewConnectionPool ¶
func NewConnectionPool( tlsOptions *TLSOptions, auth auth.Provider, connectionTimeout time.Duration, maxConnectionsPerHost int, logger log.Logger) ConnectionPool
NewConnectionPool init connection pool.
type ConsumerHandler ¶
type ConsumerHandler interface {
MessageReceived(response *pb.CommandMessage, headersAndPayload Buffer) error
// ConnectionClosed close the TCP connection.
ConnectionClosed()
}
type LookupResult ¶
LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr.
type LookupService ¶
type LookupService interface {
// Lookup perform a lookup for the given topic, confirm the location of the broker
// where the topic is located, and return the LookupResult.
Lookup(topic string) (*LookupResult, error)
}
LookupService is a interface of lookup service.
func NewLookupService ¶
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, tlsEnabled bool, logger log.Logger) LookupService
NewLookupService init a lookup service struct and return an object of LookupService.
type MessageReader ¶
type MessageReader struct {
// contains filtered or unexported fields
}
MessageReader provides helper methods to parse the metadata and messages from the binary format Wire format for a messages
Old format (single message) [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
Batch format [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [METADATA_SIZE][METADATA][PAYLOAD] [METADATA_SIZE][METADATA][PAYLOAD]
func NewMessageReader ¶
func NewMessageReader(headersAndPayload Buffer) *MessageReader
func NewMessageReaderFromArray ¶
func NewMessageReaderFromArray(headersAndPayload []byte) *MessageReader
func (*MessageReader) ReadMessage ¶
func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error)
func (*MessageReader) ReadMessageMetadata ¶
func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error)
func (*MessageReader) ResetBuffer ¶
func (r *MessageReader) ResetBuffer(buffer Buffer)
type RPCClient ¶
type RPCClient interface {
// Create a new unique request id
NewRequestID() uint64
NewProducerID() uint64
NewConsumerID() uint64
// Send a request and block until the result is available
RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error
RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
}
func NewRPCClient ¶
type RPCResult ¶
type RPCResult struct {
Response *pb.BaseCommand
Cnx Connection
}
type Semaphore ¶
type Semaphore interface {
// Acquire a permit, if one is available and returns immediately,
// reducing the number of available permits by one.
Acquire()
// Try to acquire a permit. The method will return immediately
// with a `true` if it was possible to acquire a permit and
// `false` otherwise.
TryAcquire() bool
// Release a permit, returning it to the semaphore.
// Release a permit, increasing the number of available permits by
// one. If any threads are trying to acquire a permit, then one is
// selected and given the permit that was just released. That thread
// is (re)enabled for thread scheduling purposes.
// There is no requirement that a thread that releases a permit must
// have acquired that permit by calling Acquire().
// Correct usage of a semaphore is established by programming convention
// in the application.
Release()
}