Documentation
¶
Index ¶
- Constants
- Variables
- func ConvertFromStringMap(m map[string]string) []*pb.KeyValue
- func ConvertToStringMap(pbb []*pb.KeyValue) map[string]string
- func Crc32cCheckSum(data []byte) uint32
- func GetAndAdd(n *uint64, diff uint64) uint64
- func GetCompressionProvider(compressionType pb.CompressionType, level compression.Level) compression.Provider
- func GetConnectionsCount(p *ConnectionPool) int
- func GetTopicRestPath(tn *TopicName) string
- func IsV2Namespace(namespace string) bool
- func IsV2TopicName(tn *TopicName) bool
- func JavaStringHash(s string) uint32
- func MarshalToSizedBuffer(m proto.Message, out []byte) error
- func Murmur3_32Hash(s string) uint32
- func ParseRelativeTimeInSeconds(relativeTime string) (time.Duration, error)
- func Retry[T any](ctx context.Context, op OpFn[T], nextDuration func(error) time.Duration) (T, error)
- func SingleSend(wb Buffer, producerID, sequenceID uint64, msgMetadata *pb.MessageMetadata, ...) error
- func StartCleanConnectionsTask(p *ConnectionPool, connectionMaxIdleTime time.Duration)
- func TimestampMillis(t time.Time) uint64
- func TopicNameWithoutPartitionPart(tn *TopicName) string
- type BatchBuilder
- type BatcherBuilderProvider
- type BlockingQueue
- type Buffer
- type BuffersPool
- type CheckSum
- type ClientHandlers
- type Closable
- type Connection
- type ConnectionListener
- type ConnectionPool
- type ConsumerHandler
- type FlushBatch
- type GetTopicsOfNamespaceMode
- type HTTPClient
- type LeveledMetrics
- type LookupResult
- type LookupSchema
- type LookupService
- type MemoryLimitController
- type MessageReader
- type Metrics
- type OpFn
- type PartitionedTopicMetadata
- type PulsarServiceURI
- type RPCClient
- type RPCResult
- type SchemaType
- type Semaphore
- type ServiceNameResolver
- type TLSOptions
- type TopicName
Constants ¶
const ( // MaxMessageSize limit message size for transfer MaxMessageSize = 5 * 1024 * 1024 // MessageFramePadding is for metadata and other frame headers MessageFramePadding = 10 * 1024 // MaxFrameSize limit the maximum size that pulsar allows for messages to be sent. MaxFrameSize = MaxMessageSize + MessageFramePadding )
const ( Persistent GetTopicsOfNamespaceMode = "PERSISTENT" NonPersistent = "NON_PERSISTENT" All = "ALL" )
const ( BinaryService = "pulsar" HTTPService = "http" HTTPSService = "https" SSLService = "ssl" BinaryPort = 6650 BinaryTLSPort = 6651 HTTPPort = 80 HTTPSPort = 443 )
const HTTPAdminServiceV1Format string = "/admin/%s/partitions"
const HTTPAdminServiceV2Format string = "/admin/v2/%s/partitions"
const HTTPLookupServiceBasePathV1 string = "/lookup/v2/destination/"
const HTTPLookupServiceBasePathV2 string = "/lookup/v2/topic/"
const HTTPSchemaV2 string = "/admin/v2/schemas/%s/schema"
const HTTPSchemaWithVersionV2 string = "/admin/v2/schemas/%s/schema/%d"
const HTTPTopicUnderNamespaceV1 string = "/admin/namespaces/%s/destinations?mode=%s"
const HTTPTopicUnderNamespaceV2 string = "/admin/v2/namespaces/%s/topics?mode=%s"
const (
PulsarProtocolVersion = int32(pb.ProtocolVersion_v20)
)
Variables ¶
var ( Version string ClientVersionString string )
var ErrConnectionClosed = errors.New("connection closed")
var ErrCorruptedMessage = errors.New("corrupted message")
ErrCorruptedMessage is the error returned by ReadMessageData when it has detected corrupted data. The data is considered corrupted if it's missing a header, a checksum mismatch or there was an error when unmarshalling the message metadata.
var ErrEOM = errors.New("EOF")
ErrEOM is the error returned by ReadMessage when no more input is available.
var ErrExceedMaxMessageSize = errors.New("encryptedPayload exceeds MaxMessageSize")
var ( // ErrRequestTimeOut happens when request not finished in given requestTimeout. ErrRequestTimeOut = errors.New("request timed out") )
var HTTPSchemaTypeMap = map[string]SchemaType{ "NONE": BYTES, "STRING": STRING, "JSON": JSON, "PROTOBUF": PROTOBUF, "AVRO": AVRO, "BOOLEAN": BOOLEAN, "INT8": INT8, "INT16": INT16, "INT32": INT32, "INT64": INT64, "FLOAT": FLOAT, "DOUBLE": DOUBLE, "KEYVALUE": KeyValue, "BYTES": BYTES, "AUTO": AUTO, "AUTOCONSUME": AutoConsume, "AUTOPUBLISH": AutoPublish, "PROTOBUF_NATIVE": ProtoNative, }
Functions ¶
func ConvertFromStringMap ¶
ConvertFromStringMap convert a string map to a KeyValue []byte
func ConvertToStringMap ¶
ConvertToStringMap convert a KeyValue []byte to string map
func Crc32cCheckSum ¶
Crc32cCheckSum handles computing the checksum.
func GetCompressionProvider ¶ added in v0.10.0
func GetCompressionProvider( compressionType pb.CompressionType, level compression.Level, ) compression.Provider
func GetConnectionsCount ¶ added in v0.10.0
func GetConnectionsCount(p *ConnectionPool) int
func GetTopicRestPath ¶ added in v0.5.0
func IsV2Namespace ¶ added in v0.6.0
func IsV2TopicName ¶ added in v0.5.0
func JavaStringHash ¶
JavaStringHash and Java String.hashCode() equivalent
func MarshalToSizedBuffer ¶ added in v0.10.0
func Murmur3_32Hash ¶
Murmur3_32Hash use Murmur3 hashing function
func Retry ¶ added in v0.15.0
func Retry[T any](ctx context.Context, op OpFn[T], nextDuration func(error) time.Duration) (T, error)
Retry the given operation until the returned error is nil or the context is done.
func SingleSend ¶ added in v0.10.0
func StartCleanConnectionsTask ¶ added in v0.10.0
func StartCleanConnectionsTask(p *ConnectionPool, connectionMaxIdleTime time.Duration)
func TimestampMillis ¶
TimestampMillis return a time unix nano.
Types ¶
type BatchBuilder ¶
type BatchBuilder interface {
// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
IsFull() bool
// Add will add single message to batch.
Add(
metadata *pb.SingleMessageMetadata, sequenceIDGenerator *uint64,
payload []byte,
callback interface{}, replicateTo []string, deliverAt time.Time,
schemaVersion []byte, multiSchemaEnabled bool,
useTxn bool,
mostSigBits uint64,
leastSigBits uint64,
) bool
// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
Flush() *FlushBatch
// FlushBatches all the messages buffered in multiple batches and wait until all
// messages have been successfully persisted.
FlushBatches() []*FlushBatch
// Return the batch container batch message in multiple batches.
IsMultiBatches() bool
Close() error
// contains filtered or unexported methods
}
BatchBuilder is a interface of batch builders
func NewBatchBuilder ¶
func NewBatchBuilder( maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor, ) (BatchBuilder, error)
NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewKeyBasedBatchBuilder ¶ added in v0.4.0
func NewKeyBasedBatchBuilder( maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor, ) (BatchBuilder, error)
NewKeyBasedBatchBuilder init batch builder and return BatchBuilder pointer. Build a new key based batch message container.
type BatcherBuilderProvider ¶ added in v0.4.0
type BatcherBuilderProvider func( maxMessages uint, maxBatchSize uint, maxMessageSize uint32, producerName string, producerID uint64, compressionType pb.CompressionType, level compression.Level, bufferPool BuffersPool, metrics *Metrics, logger log.Logger, encryptor crypto.Encryptor, ) (BatchBuilder, error)
BatcherBuilderProvider defines func which returns the BatchBuilder.
type BlockingQueue ¶
type BlockingQueue interface {
// Put enqueue one item, block if the queue is full
// This is currently used for the internal testing
Put(item interface{})
// PutUnsafe enqueue one item without locking the queue, block if the queue is full
PutUnsafe(item interface{})
// Take dequeue one item, block until it's available
Take() interface{}
// Poll dequeue one item, return nil if queue is empty
Poll() interface{}
// CompareAndPoll compare the first item and poll it if meet the conditions
CompareAndPoll(compare func(item interface{}) bool) interface{}
// Peek return the first item without dequeing, return nil if queue is empty
Peek() interface{}
// PeekLast return last item in queue without dequeing, return nil if queue is empty
PeekLast() interface{}
// Size return the current size of the queue
Size() int
// ReadableSlice returns a new view of the readable items in the queue
ReadableSlice() []interface{}
// IterateUnsafe iterates the items in the queue without blocking the queue
IterateUnsafe(func(item interface{}))
// Lock locks the queue for manual control
// Users must call Unlock() after finishing their operations
Lock()
// Unlock unlocks the queue
// Must be called after Lock() to release the lock
Unlock()
}
BlockingQueue is a interface of block queue
func NewBlockingQueue ¶
func NewBlockingQueue(maxSize int) BlockingQueue
NewBlockingQueue init block queue and returns a BlockingQueue
type Buffer ¶
type Buffer interface {
ReadableBytes() uint32
WritableBytes() uint32
// Capacity returns the capacity of the buffer's underlying byte slice,
// that is, the total space allocated for the buffer's data.
Capacity() uint32
IsWritable() bool
Read(size uint32) []byte
Skip(size uint32)
Get(readerIndex uint32, size uint32) []byte
ReadableSlice() []byte
WritableSlice() []byte
// WrittenBytes advance the writer index when data was written in a slice
WrittenBytes(size uint32)
// MoveToFront copy the available portion of data at the beginning of the buffer
MoveToFront()
ReadUint16() uint16
ReadUint32() uint32
WriteUint16(n uint16)
WriteUint32(n uint32)
WriterIndex() uint32
ReaderIndex() uint32
Write(s []byte)
Put(writerIdx uint32, s []byte)
PutUint32(n uint32, writerIdx uint32)
Resize(newSize uint32)
ResizeIfNeeded(spaceNeeded uint32)
// Retain increases the reference count
Retain()
// Release decreases the reference count and returns the buffer to the pool
// if it's associated with a buffer pool and the count reaches zero.
Release()
// RefCnt returns the current reference count of the buffer.
RefCnt() int64
// SetReleaseCallback sets a callback function that will be called when the buffer is returned to a pool.
SetReleaseCallback(cb func())
// Clear will clear the current buffer data.
Clear()
}
Buffer is a variable-sized buffer of bytes with Read and Write methods. The zero value for Buffer is an empty buffer ready to use.
func NewBufferWrapper ¶
type BuffersPool ¶ added in v0.2.0
func NewBufferPool ¶ added in v0.16.0
func NewBufferPool() BuffersPool
type ClientHandlers ¶
type ClientHandlers struct {
// contains filtered or unexported fields
}
ClientHandlerMap is a simple concurrent-safe map for the client type
func NewClientHandlers ¶
func NewClientHandlers() ClientHandlers
func (*ClientHandlers) Add ¶
func (h *ClientHandlers) Add(c Closable)
func (*ClientHandlers) Close ¶
func (h *ClientHandlers) Close()
func (*ClientHandlers) Del ¶
func (h *ClientHandlers) Del(c Closable)
func (*ClientHandlers) Val ¶
func (h *ClientHandlers) Val(c Closable) bool
type Connection ¶
type Connection interface {
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
SendRequestNoWait(req *pb.BaseCommand) error
WriteData(ctx context.Context, data Buffer)
RegisterListener(id uint64, listener ConnectionListener) error
UnregisterListener(id uint64)
AddConsumeHandler(id uint64, handler ConsumerHandler) error
DeleteConsumeHandler(id uint64)
ID() string
GetMaxMessageSize() int32
Close()
WaitForClose() <-chan struct{}
IsProxied() bool
}
Connection is a interface of client cnx.
type ConnectionListener ¶
type ConnectionListener interface {
// ReceivedSendReceipt receive and process the return value of the send command.
ReceivedSendReceipt(response *pb.CommandSendReceipt)
// ConnectionClosed close the TCP connection.
ConnectionClosed(closeProducer *pb.CommandCloseProducer)
// SetRedirectedClusterURI set the redirected cluster URI for lookups
SetRedirectedClusterURI(redirectedClusterURI string)
}
ConnectionListener is a user of a connection (eg. a producer or a consumer) that can register itself to get notified when the connection is closed.
type ConnectionPool ¶
type ConnectionPool interface {
// GetConnection get a connection from ConnectionPool.
GetConnection(logicalAddr *url.URL, physicalAddr *url.URL, keySuffix int32) (Connection, error)
// GetConnections get all connections in the pool.
GetConnections() map[string]Connection
// GenerateRoundRobinIndex generates a round-robin index.
GenerateRoundRobinIndex() int32
// Close all the connections in the pool
Close()
}
ConnectionPool is a interface of connection pool.
func NewConnectionPool ¶
func NewConnectionPool( tlsOptions *TLSOptions, auth auth.Provider, connectionTimeout time.Duration, keepAliveInterval time.Duration, maxConnectionsPerHost int, logger log.Logger, metrics *Metrics, description string, connectionMaxIdleTime time.Duration) ConnectionPool
NewConnectionPool init connection pool.
type ConsumerHandler ¶
type ConsumerHandler interface {
MessageReceived(response *pb.CommandMessage, headersAndPayload Buffer) error
ActiveConsumerChanged(isActive bool)
// ConnectionClosed close the TCP connection.
ConnectionClosed(closeConsumer *pb.CommandCloseConsumer)
// SetRedirectedClusterURI set the redirected cluster URI for lookups
SetRedirectedClusterURI(redirectedClusterURI string)
}
type FlushBatch ¶ added in v0.13.1
type GetTopicsOfNamespaceMode ¶ added in v0.6.0
type GetTopicsOfNamespaceMode string
GetTopicsOfNamespaceMode for CommandGetTopicsOfNamespace_Mode
type HTTPClient ¶ added in v0.5.0
type HTTPClient interface {
Get(endpoint string, obj interface{}, params map[string]string) error
Closable
}
func NewHTTPClient ¶ added in v0.5.0
func NewHTTPClient(serviceURL *url.URL, serviceNameResolver ServiceNameResolver, tlsConfig *TLSOptions, requestTimeout time.Duration, logger log.Logger, metrics *Metrics, authProvider auth.Provider) (HTTPClient, error)
type LeveledMetrics ¶ added in v0.7.0
type LeveledMetrics struct {
MessagesPublished prometheus.Counter
BytesPublished prometheus.Counter
MessagesPending prometheus.Gauge
BytesPending prometheus.Gauge
PublishErrorsTimeout prometheus.Counter
PublishErrorsMsgTooLarge prometheus.Counter
PublishLatency prometheus.Observer
PublishRPCLatency prometheus.Observer
MessagesReceived prometheus.Counter
BytesReceived prometheus.Counter
PrefetchedMessages prometheus.Gauge
PrefetchedBytes prometheus.Gauge
AcksCounter prometheus.Counter
NacksCounter prometheus.Counter
DlqCounter prometheus.Counter
ProcessingTime prometheus.Observer
ProducersOpened prometheus.Counter
ProducersClosed prometheus.Counter
ProducersReconnectFailure prometheus.Counter
ProducersReconnectMaxRetry prometheus.Counter
ProducersPartitions prometheus.Gauge
ConsumersOpened prometheus.Counter
ConsumersClosed prometheus.Counter
ConsumersReconnectFailure prometheus.Counter
ConsumersReconnectMaxRetry prometheus.Counter
ConsumersPartitions prometheus.Gauge
ReadersOpened prometheus.Counter
ReadersClosed prometheus.Counter
}
type LookupResult ¶
LookupResult encapsulates a struct for lookup a request, containing two parts: LogicalAddr, PhysicalAddr.
type LookupSchema ¶ added in v0.16.0
type LookupSchema struct {
SchemaType SchemaType
Data []byte
Properties map[string]string
}
LookupSchema return lookup schema result
type LookupService ¶
type LookupService interface {
// Lookup perform a lookup for the given topic, confirm the location of the broker
// where the topic is located, and return the LookupResult.
Lookup(topic string) (*LookupResult, error)
// GetPartitionedTopicMetadata perform a CommandPartitionedTopicMetadata request for
// the given topic, returns the CommandPartitionedTopicMetadataResponse as the result.
GetPartitionedTopicMetadata(topic string) (*PartitionedTopicMetadata, error)
// GetTopicsOfNamespace returns all the topics name for a given namespace.
GetTopicsOfNamespace(namespace string, mode GetTopicsOfNamespaceMode) ([]string, error)
// GetSchema returns schema for a given version.
GetSchema(topic string, schemaVersion []byte) (*LookupSchema, error)
GetBrokerAddress(brokerServiceURL string, proxyThroughServiceURL bool) (*LookupResult, error)
ServiceNameResolver() *ServiceNameResolver
// Closable Allow Lookup Service's internal client to be able to closed
Closable
}
LookupService is a interface of lookup service.
func NewHTTPLookupService ¶ added in v0.5.0
func NewHTTPLookupService(httpClient HTTPClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver, tlsEnabled bool, logger log.Logger, metrics *Metrics) LookupService
NewHTTPLookupService init a http based lookup service struct and return an object of LookupService.
func NewLookupService ¶
func NewLookupService(rpcClient RPCClient, serviceURL *url.URL, serviceNameResolver ServiceNameResolver, tlsEnabled bool, listenerName string, lookupProperties []*pb.KeyValue, logger log.Logger, metrics *Metrics) LookupService
NewLookupService init a lookup service struct and return an object of LookupService.
type MemoryLimitController ¶ added in v0.10.0
type MemoryLimitController interface {
ReserveMemory(ctx context.Context, size int64) bool
TryReserveMemory(size int64) bool
ForceReserveMemory(size int64)
ReleaseMemory(size int64)
CurrentUsage() int64
CurrentUsagePercent() float64
IsMemoryLimited() bool
RegisterTrigger(trigger func())
}
func NewMemoryLimitController ¶ added in v0.10.0
func NewMemoryLimitController(limit int64, threshold float64) MemoryLimitController
NewMemoryLimitController threshold valid range is (0, 1.0)
type MessageReader ¶
type MessageReader struct {
// contains filtered or unexported fields
}
MessageReader provides helper methods to parse the metadata and messages from the binary format Wire format for a messages
Old format (single message) [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
Batch format [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [METADATA_SIZE][METADATA][PAYLOAD] [METADATA_SIZE][METADATA][PAYLOAD]
func NewMessageReader ¶
func NewMessageReader(headersAndPayload Buffer) *MessageReader
func NewMessageReaderFromArray ¶
func NewMessageReaderFromArray(headersAndPayload []byte) *MessageReader
func (*MessageReader) ReadBrokerMetadata ¶ added in v0.9.0
func (r *MessageReader) ReadBrokerMetadata() (*pb.BrokerEntryMetadata, error)
func (*MessageReader) ReadMessage ¶
func (r *MessageReader) ReadMessage() (*pb.SingleMessageMetadata, []byte, error)
func (*MessageReader) ReadMessageMetadata ¶
func (r *MessageReader) ReadMessageMetadata() (*pb.MessageMetadata, error)
func (*MessageReader) ResetBuffer ¶
func (r *MessageReader) ResetBuffer(buffer Buffer)
type Metrics ¶ added in v0.4.0
type Metrics struct {
SendingBuffersCount prometheus.Gauge
// Metrics that are not labeled with specificity are immediately available
ConnectionsOpened prometheus.Counter
ConnectionsClosed prometheus.Counter
ConnectionsEstablishmentErrors prometheus.Counter
ConnectionsHandshakeErrors prometheus.Counter
LookupRequestsCount prometheus.Counter
PartitionedTopicMetadataRequestsCount prometheus.Counter
RPCRequestCount prometheus.Counter
// contains filtered or unexported fields
}
func NewMetricsProvider ¶ added in v0.4.0
func NewMetricsProvider(metricsCardinality int, userDefinedLabels map[string]string, registerer prometheus.Registerer) *Metrics
NewMetricsProvider returns metrics registered to registerer.
func (*Metrics) GetLeveledMetrics ¶ added in v0.7.0
func (mp *Metrics) GetLeveledMetrics(t string) *LeveledMetrics
type PartitionedTopicMetadata ¶ added in v0.5.0
type PartitionedTopicMetadata struct {
Partitions int `json:"partitions"` // Number of partitions for the topic
}
PartitionedTopicMetadata encapsulates a struct for metadata of a partitioned topic
type PulsarServiceURI ¶ added in v0.5.0
type PulsarServiceURI struct {
ServiceName string
ServiceInfos []string
ServiceHosts []string
URL *url.URL
// contains filtered or unexported fields
}
func NewPulsarServiceURIFromURIString ¶ added in v0.5.0
func NewPulsarServiceURIFromURIString(uri string) (*PulsarServiceURI, error)
func NewPulsarServiceURIFromURL ¶ added in v0.5.0
func NewPulsarServiceURIFromURL(url *url.URL) (*PulsarServiceURI, error)
type RPCClient ¶
type RPCClient interface {
// Create a new unique request id
NewRequestID() uint64
NewProducerID() uint64
NewConsumerID() uint64
// Send a request and block until the result is available
RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
RequestToHost(serviceNameResolver *ServiceNameResolver, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
RequestWithCnxKeySuffix(logicalAddr *url.URL, physicalAddr *url.URL, cnxKeySuffix int32, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
Request(logicalAddr *url.URL, physicalAddr *url.URL, requestID uint64,
cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
RequestOnCnxNoWait(cnx Connection, cmdType pb.BaseCommand_Type, message proto.Message) error
RequestOnCnx(cnx Connection, requestID uint64, cmdType pb.BaseCommand_Type, message proto.Message) (*RPCResult, error)
LookupService(URL string) (LookupService, error)
}
type RPCResult ¶
type RPCResult struct {
Response *pb.BaseCommand
Cnx Connection
}
type SchemaType ¶ added in v0.16.0
type SchemaType int
SchemaType We need to define a SchemaType in this internal package, to avoid directly importing pulsar.SchemaType. In case we might encounter importing cycle problem.
const ( NONE SchemaType = iota //No schema defined STRING //Simple String encoding with UTF-8 JSON //JSON object encoding and validation PROTOBUF //Protobuf message encoding and decoding AVRO //Serialize and deserialize via Avro BOOLEAN // INT8 //A 8-byte integer. INT16 //A 16-byte integer. INT32 //A 32-byte integer. INT64 //A 64-byte integer. FLOAT //A float number. DOUBLE //A double number KeyValue //A Schema that contains Key Schema and Value Schema. BYTES = 0 //A bytes array. AUTO = -2 // AutoConsume = -3 //Auto Consume Type. AutoPublish = -4 //Auto Publish Type. ProtoNative = 20 //Protobuf native message encoding and decoding )
type Semaphore ¶
type Semaphore interface {
// Acquire a permit, if one is available and returns immediately,
// reducing the number of available permits by one.
Acquire(ctx context.Context) bool
// Try to acquire a permit. The method will return immediately
// with a `true` if it was possible to acquire a permit and
// `false` otherwise.
TryAcquire() bool
// Release a permit, returning it to the semaphore.
// Release a permit, increasing the number of available permits by
// one. If any threads are trying to acquire a permit, then one is
// selected and given the permit that was just released. That thread
// is (re)enabled for thread scheduling purposes.
// There is no requirement that a thread that releases a permit must
// have acquired that permit by calling Acquire().
// Correct usage of a semaphore is established by programming convention
// in the application.
Release()
}
func NewSemaphore ¶ added in v0.2.0
type ServiceNameResolver ¶ added in v0.5.0
type ServiceNameResolver interface {
ResolveHost() (*url.URL, error)
ResolveHostURI() (*PulsarServiceURI, error)
UpdateServiceURL(url *url.URL) error
GetServiceURI() *PulsarServiceURI
GetServiceURL() *url.URL
GetAddressList() []*url.URL
}
func NewPulsarServiceNameResolver ¶ added in v0.5.0
func NewPulsarServiceNameResolver(url *url.URL) ServiceNameResolver
type TLSOptions ¶
Source Files
¶
- batch_builder.go
- blocking_queue.go
- buffer.go
- channel_cond.go
- checksum.go
- client_handlers.go
- closable.go
- commands.go
- connection.go
- connection_pool.go
- connection_reader.go
- hash.go
- helper.go
- http_client.go
- key_based_batch_builder.go
- lookup_service.go
- memory_limit_controller.go
- metrics.go
- namespace_name.go
- retry.go
- rpc_client.go
- schema.go
- semaphore.go
- service_name_resolver.go
- service_uri.go
- topic_name.go
- utils.go
- version.go