Documentation
¶
Index ¶
- Constants
- func GetVersion() (version string)
- func Version() (version string)
- type ClientConfig
- type ConsumeStatus
- type ConsumerModel
- type LogConfig
- type LogLevel
- type Message
- type MessageExt
- type MessageModel
- type MessageQueue
- type MessageQueueSelector
- type Producer
- type ProducerConfig
- type ProducerModel
- type PullConsumer
- type PullConsumerConfig
- type PullResult
- type PullStatus
- type PushConsumer
- type PushConsumerConfig
- type SendResult
- type SendStatus
- type SessionCredentials
- type TransactionLocalListener
- type TransactionProducer
- type TransactionStatus
Constants ¶
const ( CommonProducer = ProducerModel(1) OrderlyProducer = ProducerModel(2) TransProducer = ProducerModel(3) )
Different models
const ( BroadCasting = MessageModel(1) Clustering = MessageModel(2) )
MessageModel
const ( CoCurrently = ConsumerModel(1) Orderly = ConsumerModel(2) )
ConsumerModel
const ( NIL = rmqError(C.OK) ErrNullPoint = rmqError(C.NULL_POINTER) ErrMallocFailed = rmqError(C.MALLOC_FAILED) ErrProducerStartFailed = rmqError(C.PRODUCER_START_FAILED) ErrSendSyncFailed = rmqError(C.PRODUCER_SEND_SYNC_FAILED) ErrSendOnewayFailed = rmqError(C.PRODUCER_SEND_ONEWAY_FAILED) ErrSendOrderlyFailed = rmqError(C.PRODUCER_SEND_ORDERLY_FAILED) ErrPushConsumerStartFailed = rmqError(C.PUSHCONSUMER_ERROR_CODE_START) ErrPullConsumerStartFailed = rmqError(C.PULLCONSUMER_START_FAILED) ErrFetchMQFailed = rmqError(C.PULLCONSUMER_FETCH_MQ_FAILED) ErrFetchMessageFailed = rmqError(C.PULLCONSUMER_FETCH_MESSAGE_FAILED) )
This is error messages
const ( LogLevelFatal = LogLevel(C.E_LOG_LEVEL_FATAL) LogLevelError = LogLevel(C.E_LOG_LEVEL_ERROR) LogLevelWarn = LogLevel(C.E_LOG_LEVEL_WARN) LogLevelInfo = LogLevel(C.E_LOG_LEVEL_INFO) LogLevelDebug = LogLevel(C.E_LOG_LEVEL_DEBUG) LogLevelTrace = LogLevel(C.E_LOG_LEVEL_TRACE) LogLevelNum = LogLevel(C.E_LOG_LEVEL_LEVEL_NUM) )
predefined log level
const ( //SendOK OK SendOK = SendStatus(C.E_SEND_OK) //SendFlushDiskTimeout Failed because broker flush error SendFlushDiskTimeout = SendStatus(C.E_SEND_FLUSH_DISK_TIMEOUT) //SendFlushSlaveTimeout Failed because slave broker timeout SendFlushSlaveTimeout = SendStatus(C.E_SEND_FLUSH_SLAVE_TIMEOUT) //SendSlaveNotAvailable Failed because slave broker error SendSlaveNotAvailable = SendStatus(C.E_SEND_SLAVE_NOT_AVAILABLE) )
const ( PullFound = PullStatus(C.E_FOUND) PullNoNewMsg = PullStatus(C.E_NO_NEW_MSG) PullNoMatchedMsg = PullStatus(C.E_NO_MATCHED_MSG) PullOffsetIllegal = PullStatus(C.E_OFFSET_ILLEGAL) PullBrokerTimeout = PullStatus(C.E_BROKER_TIMEOUT) )
predefined pull status
const ( //ConsumeSuccess commit offset to broker ConsumeSuccess = ConsumeStatus(C.E_CONSUME_SUCCESS) //ReConsumeLater it will be send back to broker ReConsumeLater = ConsumeStatus(C.E_RECONSUME_LATER) )
const ( CommitTransaction = TransactionStatus(C.E_COMMIT_TRANSACTION) RollbackTransaction = TransactionStatus(C.E_ROLLBACK_TRANSACTION) UnknownTransaction = TransactionStatus(C.E_UNKNOWN_TRANSACTION) )
const GoClientVersion = "Go Client V1.2.4, Support CPP Core:V1.2.X"
GoClientVersion const strings for version
Variables ¶
This section is empty.
Functions ¶
Types ¶
type ClientConfig ¶
type ClientConfig struct {
GroupID string
NameServer string
NameServerDomain string
InstanceName string
Credentials *SessionCredentials
LogC *LogConfig
}
ClientConfig save client config
func (*ClientConfig) String ¶
func (config *ClientConfig) String() string
type ConsumeStatus ¶
type ConsumeStatus int
ConsumeStatus the retern value for consumer
func (ConsumeStatus) String ¶
func (status ConsumeStatus) String() string
type ConsumerModel ¶
type ConsumerModel int
ConsumerModel CoCurrently or Orderly
func (ConsumerModel) String ¶
func (mode ConsumerModel) String() string
type Message ¶
type Message struct {
Topic string
Tags string
Keys string
Body string
DelayTimeLevel int
Property map[string]string
// contains filtered or unexported fields
}
Message used for send
func (*Message) GetProperty ¶ added in v1.2.4
type MessageExt ¶
type MessageExt struct {
Message
MessageID string
QueueId int
ReconsumeTimes int
StoreSize int
BornTimestamp int64
StoreTimestamp int64
QueueOffset int64
CommitLogOffset int64
PreparedTransactionOffset int64
// contains filtered or unexported fields
}
MessageExt used for consume
func (*MessageExt) GetProperty ¶
func (msgExt *MessageExt) GetProperty(key string) string
GetProperty get the message property by key from message ext
func (*MessageExt) String ¶
func (msgExt *MessageExt) String() string
type MessageModel ¶
type MessageModel int
MessageModel Clustering or BroadCasting
func (MessageModel) String ¶
func (mode MessageModel) String() string
type MessageQueue ¶
MessageQueue the queue of the message
func (*MessageQueue) String ¶
func (q *MessageQueue) String() string
type MessageQueueSelector ¶
MessageQueueSelector select one message queue
type Producer ¶
type Producer interface {
// SendMessageSync send a message with sync
SendMessageSync(msg *Message) (*SendResult, error)
// SendMessageOrderly send the message orderly
SendMessageOrderly(
msg *Message,
selector MessageQueueSelector,
arg interface{},
autoRetryTimes int) (*SendResult, error)
// SendMessageOneway send a message with oneway
SendMessageOneway(msg *Message) error
SendMessageOrderlyByShardingKey(msg *Message, shardingkey string) (*SendResult, error)
// contains filtered or unexported methods
}
Producer define interface
func NewProducer ¶
func NewProducer(config *ProducerConfig) (Producer, error)
NewProducer create a new producer with config
type ProducerConfig ¶
type ProducerConfig struct {
ClientConfig
SendMsgTimeout int
CompressLevel int
MaxMessageSize int
ProducerModel ProducerModel
}
ProducerConfig define a producer
func (*ProducerConfig) String ¶
func (config *ProducerConfig) String() string
type ProducerModel ¶
type ProducerModel int
ProducerModel Common or orderly
func (ProducerModel) String ¶
func (mode ProducerModel) String() string
type PullConsumer ¶
type PullConsumer interface {
// Pull returns the messages from the consume queue by specify the offset and the max number
Pull(mq MessageQueue, subExpression string, offset int64, maxNums int) PullResult
// FetchSubscriptionMessageQueues returns the consume queue of the topic
FetchSubscriptionMessageQueues(topic string) []MessageQueue
// contains filtered or unexported methods
}
PullConsumer consumer pulling the message
func NewPullConsumer ¶
func NewPullConsumer(config *PullConsumerConfig) (PullConsumer, error)
NewPullConsumer creates a pull consumer
type PullConsumerConfig ¶
type PullConsumerConfig struct {
ClientConfig
}
PullConsumerConfig the configuration for the pull consumer
func (*PullConsumerConfig) String ¶
func (config *PullConsumerConfig) String() string
type PullResult ¶
type PullResult struct {
NextBeginOffset int64
MinOffset int64
MaxOffset int64
Status PullStatus
Messages []*MessageExt
}
PullResult the pull result
func (*PullResult) String ¶
func (pr *PullResult) String() string
type PullStatus ¶
type PullStatus int
PullStatus pull status
func (PullStatus) String ¶
func (ps PullStatus) String() string
type PushConsumer ¶
type PushConsumer interface {
// Subscribe a new topic with specify filter expression and consume function.
Subscribe(topic, expression string, consumeFunc func(msg *MessageExt) ConsumeStatus) error
// contains filtered or unexported methods
}
PushConsumer apis for PushConsumer
func NewPushConsumer ¶
func NewPushConsumer(config *PushConsumerConfig) (PushConsumer, error)
NewPushConsumer create a new consumer with config.
type PushConsumerConfig ¶
type PushConsumerConfig struct {
ClientConfig
ThreadCount int
MessageBatchMaxSize int
Model MessageModel
ConsumerModel ConsumerModel
MaxCacheMessageSize int
MaxCacheMessageSizeInMB int
}
PushConsumerConfig define a new consumer.
func (*PushConsumerConfig) String ¶
func (config *PushConsumerConfig) String() string
type SendResult ¶
type SendResult struct {
Status SendStatus
MsgId string
Offset int64
}
SendResult status for send
func (*SendResult) String ¶
func (result *SendResult) String() string
type SendStatus ¶
type SendStatus int
SendStatus The Status for send result from C apis.
func (SendStatus) String ¶
func (status SendStatus) String() string
type SessionCredentials ¶
SessionCredentials access config for client
func (*SessionCredentials) String ¶
func (session *SessionCredentials) String() string
type TransactionLocalListener ¶ added in v1.2.4
type TransactionLocalListener interface {
Execute(m *Message, arg interface{}) TransactionStatus
Check(m *MessageExt, arg interface{}) TransactionStatus
}
TransactionExecutor local executor for transaction message
type TransactionProducer ¶ added in v1.2.4
type TransactionProducer interface {
// send a transaction message with sync
SendMessageTransaction(msg *Message, arg interface{}) (*SendResult, error)
// contains filtered or unexported methods
}
func NewTransactionProducer ¶ added in v1.2.4
func NewTransactionProducer(config *ProducerConfig, listener TransactionLocalListener, arg interface{}) (TransactionProducer, error)
NewTransactionProducer create a new trasaction producer with config
type TransactionStatus ¶ added in v1.2.4
type TransactionStatus int
func (TransactionStatus) String ¶ added in v1.2.4
func (status TransactionStatus) String() string