Documentation
¶
Index ¶
- Constants
- Variables
- func InitLogger()
- func NewBaseConsumeService(clientId string, messageListener MessageListener, ...) *baseConsumeService
- func NewDefaultClientSession(target string, cli *defaultClient) (*defaultClientSession, error)
- func NewFiFoConsumeService(clientId string, messageListener MessageListener, ...) *fifoConsumeService
- func NewSimpleThreadPool(poolName string, taskSize int, threadNum int) *simpleThreadPool
- func NewStandardConsumeService(clientId string, messageListener MessageListener, ...) *standardConsumeService
- func ResetLogger()
- type Client
- type ClientConn
- type ClientConnFunc
- type ClientManager
- type ClientMeterProvider
- type ClientOption
- type ClientSettings
- type Config
- type ConnOption
- func WithContext(ctx context.Context) ConnOption
- func WithDialOptions(dialOptions ...grpc.DialOption) ConnOption
- func WithDialTimeout(dur time.Duration) ConnOption
- func WithMaxCallRecvMsgSize(size int) ConnOption
- func WithMaxCallSendMsgSize(size int) ConnOption
- func WithTLSConfig(tc *tls.Config) ConnOption
- func WithZapLogger(logger *zap.Logger) ConnOption
- type ConsumeService
- type Consumer
- type ConsumerResult
- type ErrRpcStatus
- type FilterExpression
- type FilterExpressionType
- type FuncMessageListener
- type InvocationStatus
- type Message
- func (msg *Message) AddProperty(key, value string)
- func (msg *Message) GetDeliveryTimestamp() *time.Time
- func (msg *Message) GetKeys() []string
- func (msg *Message) GetMessageCommon() *MessageCommon
- func (msg *Message) GetMessageGroup() *string
- func (msg *Message) GetProperties() map[string]string
- func (msg *Message) GetTag() *string
- func (msg *Message) SetDelayTimestamp(deliveryTimestamp time.Time)
- func (msg *Message) SetKeys(keys ...string)
- func (msg *Message) SetMessageGroup(messageGroup string)
- func (msg *Message) SetTag(tag string)
- type MessageCommon
- type MessageHookPoints
- type MessageHookPointsStatus
- type MessageId
- type MessageIdCodec
- type MessageInterceptor
- type MessageListener
- type MessageMeterInterceptor
- type MessageView
- func (msg *MessageView) GetBody() []byte
- func (msg *MessageView) GetBornHost() *string
- func (msg *MessageView) GetBornTimestamp() *time.Time
- func (msg *MessageView) GetDeliveryAttempt() int32
- func (msg *MessageView) GetDeliveryTimestamp() *time.Time
- func (msg *MessageView) GetKeys() []string
- func (msg *MessageView) GetMessageCommon() *MessageCommon
- func (msg *MessageView) GetMessageGroup() *string
- func (msg *MessageView) GetMessageId() string
- func (msg *MessageView) GetOffset() int64
- func (msg *MessageView) GetProperties() map[string]string
- func (msg *MessageView) GetReceiptHandle() string
- func (msg *MessageView) GetTag() *string
- func (msg *MessageView) GetTopic() string
- func (msg *MessageView) GetTraceContext() *string
- func (msg *MessageView) SetDelayTimeLevel(deliveryTimestamp time.Time)
- func (msg *MessageView) SetKeys(keys ...string)
- func (msg *MessageView) SetMessageGroup(messageGroup string)
- func (msg *MessageView) SetTag(tag string)
- type MockClient
- type MockClientManager
- func (m *MockClientManager) AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, ...) (*v2.AckMessageResponse, error)
- func (m *MockClientManager) ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, ...) (*v2.ChangeInvisibleDurationResponse, error)
- func (m *MockClientManager) EXPECT() *MockClientManagerMockRecorder
- func (m *MockClientManager) EndTransaction(ctx context.Context, endpoints *v2.Endpoints, ...) (*v2.EndTransactionResponse, error)
- func (m *MockClientManager) ForwardMessageToDeadLetterQueue(ctx context.Context, endpoints *v2.Endpoints, ...) (*v2.ForwardMessageToDeadLetterQueueResponse, error)
- func (m *MockClientManager) HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, ...) (*v2.HeartbeatResponse, error)
- func (m *MockClientManager) NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, ...) (*v2.NotifyClientTerminationResponse, error)
- func (m *MockClientManager) QueryAssignments(ctx context.Context, endpoints *v2.Endpoints, ...) (*v2.QueryAssignmentResponse, error)
- func (m *MockClientManager) QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, ...) (*v2.QueryRouteResponse, error)
- func (m *MockClientManager) ReceiveMessage(ctx context.Context, endpoints *v2.Endpoints, ...) (v2.MessagingService_ReceiveMessageClient, error)
- func (m *MockClientManager) RegisterClient(client Client)
- func (m *MockClientManager) SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, ...) (*v2.SendMessageResponse, error)
- func (m *MockClientManager) Telemetry(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error)
- func (m *MockClientManager) UnRegisterClient(client Client)
- type MockClientManagerMockRecorder
- func (mr *MockClientManagerMockRecorder) AckMessage(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) ChangeInvisibleDuration(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) EndTransaction(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) ForwardMessageToDeadLetterQueue(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) HeartBeat(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) NotifyClientTermination(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) QueryAssignments(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) QueryRoute(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) ReceiveMessage(ctx, endpoints, request interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) RegisterClient(client interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) SendMessage(ctx, endpoints, request, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) Telemetry(ctx, endpoints, duration interface{}) *gomock.Call
- func (mr *MockClientManagerMockRecorder) UnRegisterClient(client interface{}) *gomock.Call
- type MockClientMockRecorder
- type MockRpcClient
- func (m *MockRpcClient) AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error)
- func (m *MockRpcClient) ChangeInvisibleDuration(ctx context.Context, request *v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, error)
- func (m *MockRpcClient) EXPECT() *MockRpcClientMockRecorder
- func (m *MockRpcClient) EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error)
- func (m *MockRpcClient) ForwardMessageToDeadLetterQueue(ctx context.Context, request *v2.ForwardMessageToDeadLetterQueueRequest) (*v2.ForwardMessageToDeadLetterQueueResponse, error)
- func (m *MockRpcClient) GetTarget() string
- func (m *MockRpcClient) GracefulStop() error
- func (m *MockRpcClient) HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error)
- func (m *MockRpcClient) NotifyClientTermination(ctx context.Context, request *v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, error)
- func (m *MockRpcClient) QueryAssignments(ctx context.Context, request *v2.QueryAssignmentRequest) (*v2.QueryAssignmentResponse, error)
- func (m *MockRpcClient) QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error)
- func (m *MockRpcClient) ReceiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error)
- func (m *MockRpcClient) SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error)
- func (m *MockRpcClient) Telemetry(ctx context.Context) (v2.MessagingService_TelemetryClient, error)
- type MockRpcClientMockRecorder
- func (mr *MockRpcClientMockRecorder) AckMessage(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) ChangeInvisibleDuration(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) EndTransaction(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) ForwardMessageToDeadLetterQueue(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) GetTarget() *gomock.Call
- func (mr *MockRpcClientMockRecorder) GracefulStop() *gomock.Call
- func (mr *MockRpcClientMockRecorder) HeartBeat(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) NotifyClientTermination(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) QueryAssignments(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) QueryRoute(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) ReceiveMessage(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) SendMessage(ctx, request interface{}) *gomock.Call
- func (mr *MockRpcClientMockRecorder) Telemetry(ctx interface{}) *gomock.Call
- type MockisClient
- type MockisClientMockRecorder
- type NewClientFunc
- type ProcessQueue
- type Producer
- type ProducerOption
- type PublishingLoadBalancer
- type PublishingMessage
- type PushConsumer
- type PushConsumerOption
- func WithPushAwaitDuration(awaitDuration time.Duration) PushConsumerOption
- func WithPushConsumptionThreadCount(consumptionThreadCount int32) PushConsumerOption
- func WithPushMaxCacheMessageCount(maxCacheMessageCount int32) PushConsumerOption
- func WithPushMaxCacheMessageSizeInBytes(maxCacheMessageSizeInBytes int64) PushConsumerOption
- func WithPushMessageListener(messageListener MessageListener) PushConsumerOption
- func WithPushSubscriptionExpressions(subscriptionExpressions map[string]*FilterExpression) PushConsumerOption
- type RocketmqResolver
- type RpcClient
- type RpcClientOption
- func WithHealthCheckDuration(d time.Duration) RpcClientOption
- func WithHeartbeatDuration(d time.Duration) RpcClientOption
- func WithRpcClientClientConnFunc(f ClientConnFunc) RpcClientOption
- func WithRpcClientConnOption(opts ...ConnOption) RpcClientOption
- func WithRpcClientTimeout(d time.Duration) RpcClientOption
- type SendReceipt
- type SimpleConsumer
- type SimpleConsumerOption
- type SubscriptionLoadBalancer
- type Transaction
- type TransactionChecker
- type TransactionResolution
- type UnifiedMessage
Constants ¶
const ( CLIENT_LOG_ROOT = "rocketmq.client.logRoot" CLIENT_LOG_MAXINDEX = "rocketmq.client.logFileMaxIndex" CLIENT_LOG_FILESIZE = "rocketmq.client.logFileMaxSize" CLIENT_LOG_LEVEL = "rocketmq.client.logLevel" // CLIENT_LOG_ADDITIVE = "rocketmq.client.log.additive" CLIENT_LOG_FILENAME = "rocketmq.client.logFileName" // CLIENT_LOG_ASYNC_QUEUESIZE = "rocketmq.client.logAsyncQueueSize" ENABLE_CONSOLE_APPENDER = "mq.consoleAppender.enabled" )
const ( MESSAGE_ID_LENGTH_FOR_V1_OR_LATER = 34 MESSAGE_ID_VERSION_V0 string = "00" MESSAGE_ID_VERSION_V1 string = "01" )
const ( FORWARD_FIFO_MESSAGE_TO_DLQ_FAILURE_BACKOFF_DELAY time.Duration = time.Second ACK_MESSAGE_FAILURE_BACKOFF_DELAY time.Duration = time.Second CHANGE_INVISIBLE_DURATION_FAILURE_BACKOFF_DELAY time.Duration = time.Second RECEIVING_FLOW_CONTROL_BACKOFF_DELAY time.Duration = time.Millisecond * 20 RECEIVING_FAILURE_BACKOFF_DELAY time.Duration = time.Second RECEIVING_BACKOFF_DELAY_WHEN_CACHE_IS_FULL time.Duration = time.Second )
const ( SPAN_ATTRIBUTE_KEY_ROCKETMQ_OPERATION = "messaging.rocketmq.operation" SPAN_ATTRIBUTE_KEY_ROCKETMQ_NAMESPACE = "messaging.rocketmq.namespace" SPAN_ATTRIBUTE_KEY_ROCKETMQ_TAG = "messaging.rocketmq.message_tag" SPAN_ATTRIBUTE_KEY_ROCKETMQ_KEYS = "messaging.rocketmq.message_keys" SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_ID = "messaging.rocketmq.client_id" SPAN_ATTRIBUTE_KEY_ROCKETMQ_MESSAGE_TYPE = "messaging.rocketmq.message_type" SPAN_ATTRIBUTE_KEY_ROCKETMQ_CLIENT_GROUP = "messaging.rocketmq.client_group" SPAN_ATTRIBUTE_KEY_ROCKETMQ_ATTEMPT = "messaging.rocketmq.attempt" SPAN_ATTRIBUTE_KEY_ROCKETMQ_BATCH_SIZE = "messaging.rocketmq.batch_size" SPAN_ATTRIBUTE_KEY_ROCKETMQ_DELIVERY_TIMESTAMP = "messaging.rocketmq.delivery_timestamp" SPAN_ATTRIBUTE_KEY_ROCKETMQ_AVAILABLE_TIMESTAMP = "messaging.rocketmq.available_timestamp" SPAN_ATTRIBUTE_KEY_ROCKETMQ_ACCESS_KEY = "messaging.rocketmq.access_key" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_MESSAGING_SYSTEM = "rocketmq" SPAN_ATTRIBUTE_VALUE_DESTINATION_KIND = "topic" SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL = "RMQ-gRPC" SPAN_ATTRIBUTE_VALUE_MESSAGING_PROTOCOL_VERSION = "v1" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_NORMAL_MESSAGE = "normal" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_FIFO_MESSAGE = "fifo" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_DELAY_MESSAGE = "delay" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_TRANSACTION_MESSAGE = "transaction" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_SEND_OPERATION = "send" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_RECEIVE_OPERATION = "receive" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PULL_OPERATION = "pull" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_AWAIT_OPERATION = "await" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_PROCESS_OPERATION = "process" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ACK_OPERATION = "ack" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_NACK_OPERATION = "nack" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_COMMIT_OPERATION = "commit" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_ROLLBACK_OPERATION = "rollback" SPAN_ATTRIBUTE_VALUE_ROCKETMQ_DLQ_OPERATION = "dlq" // Messaging span attribute name list SPAN_ATTRIBUTE_KEY_MESSAGING_SYSTEM = "messaging.system" SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION = "messaging.destination" SPAN_ATTRIBUTE_KEY_MESSAGING_DESTINATION_KIND = "messaging.destination_kind" SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL = "messaging.protocol" SPAN_ATTRIBUTE_KEY_MESSAGING_PROTOCOL_VERSION = "messaging.protocol_version" SPAN_ATTRIBUTE_KEY_MESSAGING_URL = "messaging.url" SPAN_ATTRIBUTE_KEY_MESSAGING_ID = "messaging.message_id" SPAN_ATTRIBUTE_KEY_MESSAGING_PAYLOAD_SIZE_BYTES = "messaging.message_payload_size_bytes" SPAN_ATTRIBUTE_KEY_MESSAGING_OPERATION = "messaging.operation" SPAN_ATTRIBUTE_VALUE_MESSAGING_SEND_OPERATION = "send" SPAN_ATTRIBUTE_VALUE_MESSAGING_RECEIVE_OPERATION = "receive" SPAN_ATTRIBUTE_VALUE_MESSAGING_PROCESS_OPERATION = "process" SPAN_ATTRIBUTE_KEY_TRANSACTION_RESOLUTION = "commitAction" // Span annotation SPAN_ANNOTATION_AWAIT_CONSUMPTION = "__await_consumption" SPAN_ANNOTATION_MESSAGE_KEYS = "__message_keys" SPAN_ANNOTATION_ATTR_START_TIME = "__start_time" )
RocketMQ span attribute name list
const (
CLIENT_ENABLE_SSL = "rocketmq.client.enableSsl"
)
const (
DefaultScheme = "ip"
)
const (
MAX_MESSAGE_NUM = 1
)
Variables ¶
var ( ErrNoAvailableEndpoints = errors.New("rocketmq: no available endpoints") EnableSsl = true )
var ( PublishMLatencyMs = stats.Int64("publish_latency", "Publish latency in milliseconds", "ms") ConsumeDeliveryMLatencyMs = stats.Int64("delivery_latency", "Time spent delivering messages from servers to clients", "ms") ConsumeAwaitMLatencyMs = stats.Int64("await_time", "Client side queuing time of messages before getting processed", "ms") ConsumeProcessMLatencyMs = stats.Int64("process_time", "Process message time", "ms") PublishLatencyView = view.View{ Name: "rocketmq_send_cost_time", Description: "Publish latency", Measure: PublishMLatencyMs, Aggregation: view.Distribution(1, 5, 10, 20, 50, 200, 500), TagKeys: []tag.Key{topicTag, clientIdTag, invocationStatusTag}, } ConsumeDeliveryLatencyView = view.View{ Name: "rocketmq_delivery_latency", Description: "Message delivery latency", Measure: ConsumeDeliveryMLatencyMs, Aggregation: view.Distribution(1, 5, 10, 20, 50, 200, 500), TagKeys: []tag.Key{topicTag, clientIdTag, consumerGroupTag}, } ConsumeAwaitTimeView = view.View{ Name: "rocketmq_await_time", Description: "Message await time", Measure: ConsumeAwaitMLatencyMs, Aggregation: view.Distribution(1, 5, 20, 100, 1000, 5000, 10000), TagKeys: []tag.Key{topicTag, clientIdTag, consumerGroupTag}, } ConsumeProcessTimeView = view.View{ Name: "rocketmq_process_time", Description: "Message process time", Measure: ConsumeProcessMLatencyMs, Aggregation: view.Distribution(1, 5, 10, 100, 1000, 10000, 60000), TagKeys: []tag.Key{topicTag, clientIdTag, consumerGroupTag, invocationStatusTag}, } )
var (
ErrNoAvailableBrokers = errors.New("rocketmq: no available brokers")
)
var NewClient = func(config *Config, opts ...ClientOption) (Client, error) { endpoints, err := utils.ParseTarget(config.Endpoint) if err != nil { return nil, err } cli := &defaultClient{ config: config, opts: defaultNSOptions, clientID: utils.GenClientID(), accessPoint: endpoints, messageInterceptors: make([]MessageInterceptor, 0), endpointsTelemetryClientTable: make(map[string]*defaultClientSession), on: *atomic.NewBool(true), inited: *atomic.NewBool(false), } cli.log = sugarBaseLogger.With("client_id", cli.clientID) for _, opt := range opts { opt.apply(&cli.opts) } cli.done = make(chan struct{}, 1) cli.clientMeterProvider = NewDefaultClientMeterProvider(cli) return cli, nil }
var NewClientConcrete = func(config *Config, opts ...ClientOption) (*defaultClient, error) { endpoints, err := utils.ParseTarget(config.Endpoint) if err != nil { return nil, err } cli := &defaultClient{ config: config, opts: defaultNSOptions, clientID: utils.GenClientID(), accessPoint: endpoints, messageInterceptors: make([]MessageInterceptor, 0), endpointsTelemetryClientTable: make(map[string]*defaultClientSession), on: *atomic.NewBool(true), clientManager: &MockClientManager{}, } cli.log = sugarBaseLogger.With("client_id", cli.clientID) for _, opt := range opts { opt.apply(&cli.opts) } cli.done = make(chan struct{}, 1) cli.clientMeterProvider = NewDefaultClientMeterProvider(cli) return cli, nil }
var NewClientConn = func(endpoint string, opts ...ConnOption) (ClientConn, error) { client := &clientConn{ opts: defaultConnOptions, validate: validator.New(), } if len(endpoint) == 0 { return nil, ErrNoAvailableEndpoints } for _, opt := range opts { opt.apply(&client.opts) } baseCtx := context.TODO() if client.opts.Context != nil { baseCtx = client.opts.Context } ctx, cancel := context.WithCancel(baseCtx) client.ctx = ctx client.cancel = cancel client.creds = credentials.NewTLS(client.opts.TLS) if client.opts.MaxCallSendMsgSize > 0 || client.opts.MaxCallRecvMsgSize > 0 { if client.opts.MaxCallRecvMsgSize > 0 && client.opts.MaxCallSendMsgSize > client.opts.MaxCallRecvMsgSize { return nil, fmt.Errorf("gRPC message recv limit (%d bytes) must be greater than send limit (%d bytes)", client.opts.MaxCallRecvMsgSize, client.opts.MaxCallSendMsgSize) } if client.opts.MaxCallSendMsgSize > 0 { client.callOpts = append(client.callOpts, grpc.MaxCallSendMsgSize(client.opts.MaxCallSendMsgSize)) } if client.opts.MaxCallRecvMsgSize > 0 { client.callOpts = append(client.callOpts, grpc.MaxCallRecvMsgSize(client.opts.MaxCallRecvMsgSize)) } } conn, err := client.dial(endpoint) if err != nil { client.cancel() return nil, err } client.conn = conn return client, nil }
var NewDefaultClientManager = func() *defaultClientManager { return &defaultClientManager{ rpcClientTable: make(map[string]RpcClient), done: make(chan struct{}), opts: defaultClientManagerOptions, } }
var NewDefaultClientMeter = func(exporter view.Exporter, on bool, endpoints *v2.Endpoints, clientID string) *defaultClientMeter { return &defaultClientMeter{ enabled: *atomic.NewBool(on), endpoints: endpoints, ocaExporter: exporter, } }
var NewDefaultClientMeterProvider = func(client *defaultClient) ClientMeterProvider { cmp := &defaultClientMeterProvider{ client: client, clientMeter: NewDefaultClientMeter(nil, false, nil, "nil"), } client.registerMessageInterceptor(NewDefaultMessageMeterInterceptor(cmp)) return cmp }
var NewDefaultMessageMeterInterceptor = func(clientMeterProvider ClientMeterProvider) *defaultMessageMeterInterceptor {
return &defaultMessageMeterInterceptor{
clientMeterProvider: clientMeterProvider,
}
}
var NewDefultInflightRequestCountInterceptor = func() *defultInflightRequestCountInterceptor { return &defultInflightRequestCountInterceptor{ inflightReceiveRequestCount: *atomic.NewInt64(0), } }
var NewFilterExpression = func(expression string) *FilterExpression { return &FilterExpression{ expression: expression, expressionType: TAG, } }
var NewFilterExpressionWithType = func(expression string, expressionType FilterExpressionType) *FilterExpression { return &FilterExpression{ expression: expression, expressionType: expressionType, } }
var NewProducer = func(config *Config, opts ...ProducerOption) (Producer, error) { copyOpt := defaultProducerOptions po := ©Opt for _, opt := range opts { opt.apply(po) } cli, err := po.clientFunc(config) if err != nil { return nil, err } p := &defaultProducer{ po: *po, cli: cli.(*defaultClient), checker: po.checker, } p.cli.initTopics = po.topics endpoints, err := utils.ParseTarget(config.Endpoint) if err != nil { return nil, err } p.pSetting = &producerSettings{ clientId: p.cli.GetClientID(), endpoints: endpoints, clientType: v2.ClientType_PRODUCER, retryPolicy: &v2.RetryPolicy{ MaxAttempts: po.maxAttempts, Strategy: &v2.RetryPolicy_ExponentialBackoff{ ExponentialBackoff: &v2.ExponentialBackoff{ Max: durationpb.New(time.Duration(0)), Initial: durationpb.New(time.Duration(0)), Multiplier: 1, }, }, }, requestTimeout: p.cli.opts.timeout, validateMessageType: *atomic.NewBool(true), maxBodySizeBytes: *atomic.NewInt32(4 * 1024 * 1024), } for _, topic := range po.topics { topicResource := &v2.Resource{ Name: topic, ResourceNamespace: config.NameSpace, } p.pSetting.topics.Store(topic, topicResource) } p.cli.settings = p.pSetting p.cli.clientImpl = p return p, nil }
var NewPublishingLoadBalancer = func(messageQueues []*v2.MessageQueue) (PublishingLoadBalancer, error) { plb := &publishingLoadBalancer{ messageQueues: messageQueues, } return plb, nil }
var NewPublishingMessage = func(msg *Message, namespace string, settings *producerSettings, txEnabled bool) (*PublishingMessage, error) { if msg == nil { return nil, fmt.Errorf("message is nil") } pMsg := &PublishingMessage{ msg: msg, } maxBodySizeBytes := int(settings.maxBodySizeBytes.Load()) length := len(msg.Body) if length > maxBodySizeBytes { return nil, fmt.Errorf("message body size exceeds the threshold, max size=%d bytes", maxBodySizeBytes) } pMsg.encoding = v2.Encoding_IDENTITY pMsg.namespace = namespace pMsg.messageId = GetMessageIdCodecInstance().NextMessageId().String() if msg.GetMessageGroup() == nil && msg.GetDeliveryTimestamp() == nil && !txEnabled { pMsg.messageType = v2.MessageType_NORMAL return pMsg, nil } if msg.GetMessageGroup() != nil && !txEnabled { pMsg.messageType = v2.MessageType_FIFO return pMsg, nil } if msg.GetDeliveryTimestamp() != nil && !txEnabled { pMsg.messageType = v2.MessageType_DELAY return pMsg, nil } if msg.GetMessageGroup() == nil && msg.GetDeliveryTimestamp() == nil && txEnabled { pMsg.messageType = v2.MessageType_TRANSACTION return pMsg, nil } return nil, fmt.Errorf("transactional message should not set messageGroup or deliveryTimestamp") }
var NewPushConsumer = func(config *Config, opts ...PushConsumerOption) (PushConsumer, error) { copyOpt := defaultPushConsumerOptions pcOpts := ©Opt for _, opt := range opts { opt.apply(pcOpts) } if len(config.ConsumerGroup) == 0 { return nil, fmt.Errorf("consumerGroup could not be nil") } if pcOpts.messageListener == nil { return nil, fmt.Errorf("messageListener could not be nil") } if utils.CountSyncMapSize(pcOpts.subscriptionExpressions) == 0 { return nil, fmt.Errorf("subscriptionExpressions have not been set yet") } cli, err := pcOpts.clientFunc(config) if err != nil { return nil, err } if pcOpts.subscriptionExpressions == nil { pcOpts.subscriptionExpressions = &sync.Map{} } pc := &defaultPushConsumer{ pcOpts: *pcOpts, cli: cli.(*defaultClient), groupName: config.ConsumerGroup, awaitDuration: pcOpts.awaitDuration, subscriptionExpressions: pcOpts.subscriptionExpressions, subTopicRouteDataResultCache: &sync.Map{}, cacheAssignments: &sync.Map{}, processQueueTable: &sync.Map{}, stopping: *atomic.NewBool(false), inflightRequestCountInterceptor: NewDefultInflightRequestCountInterceptor(), } pc.cli.initTopics = make([]string, 0) pcOpts.subscriptionExpressions.Range(func(key, value interface{}) bool { pc.cli.initTopics = append(pc.cli.initTopics, key.(string)) return true }) endpoints, err := utils.ParseTarget(config.Endpoint) if err != nil { return nil, err } pc.pcSettings = &pushConsumerSettings{ clientId: pc.cli.GetClientID(), endpoints: endpoints, clientType: v2.ClientType_PUSH_CONSUMER, requestTimeout: pc.cli.opts.timeout, retryPolicy: &v2.RetryPolicy{ MaxAttempts: 16, }, isFifo: false, receiveBatchSize: 32, groupName: &v2.Resource{ Name: pc.groupName, ResourceNamespace: config.NameSpace, }, longPollingTimeout: time.Second * 30, subscriptionExpressions: pcOpts.subscriptionExpressions, } pc.cli.settings = pc.pcSettings pc.cli.clientImpl = pc pc.cli.registerMessageInterceptor(pc.inflightRequestCountInterceptor) return pc, nil }
var NewRpcClient = func(target string, opts ...RpcClientOption) (RpcClient, error) { rc := &rpcClient{ target: target, opts: defaultRpcClientOptions, } for _, opt := range opts { opt.apply(&rc.opts) } conn, err := rc.opts.clientConnFunc(target, rc.opts.connOptions...) if err != nil { return nil, fmt.Errorf("create grpc conn failed, err=%w", err) } rc.conn = conn rc.msc = v2.NewMessagingServiceClient(conn.Conn()) rc.activityNanoTime = time.Now() sugarBaseLogger.Infof("create rpc client success, target=%v", target) return rc, nil }
var NewSimpleConsumer = func(config *Config, opts ...SimpleConsumerOption) (SimpleConsumer, error) { copyOpt := defaultSimpleConsumerOptions scOpts := ©Opt for _, opt := range opts { opt.apply(scOpts) } if len(config.ConsumerGroup) == 0 { return nil, fmt.Errorf("consumerGroup could not be nil") } cli, err := scOpts.clientFunc(config) if err != nil { return nil, err } if scOpts.subscriptionExpressions == nil { scOpts.subscriptionExpressions = make(map[string]*FilterExpression) } sc := &defaultSimpleConsumer{ scOpts: *scOpts, cli: cli.(*defaultClient), groupName: config.ConsumerGroup, awaitDuration: scOpts.awaitDuration, subscriptionExpressions: &scOpts.subscriptionExpressions, } sc.cli.initTopics = make([]string, 0) for topic := range scOpts.subscriptionExpressions { sc.cli.initTopics = append(sc.cli.initTopics, topic) } endpoints, err := utils.ParseTarget(config.Endpoint) if err != nil { return nil, err } sc.scSettings = &simpleConsumerSettings{ clientId: sc.cli.GetClientID(), endpoints: endpoints, clientType: v2.ClientType_SIMPLE_CONSUMER, requestTimeout: sc.cli.opts.timeout, groupName: &v2.Resource{ Name: sc.groupName, ResourceNamespace: config.NameSpace, }, longPollingTimeout: scOpts.awaitDuration, subscriptionExpressions: scOpts.subscriptionExpressions, } sc.cli.settings = sc.scSettings sc.cli.clientImpl = sc return sc, nil }
var NewSubscriptionLoadBalancer = func(messageQueues []*v2.MessageQueue) (SubscriptionLoadBalancer, error) { slb := &subscriptionLoadBalancer{ messageQueues: messageQueues, } return slb, nil }
var NewTransactionImpl = func(producerImpl Producer) *transactionImpl { return &transactionImpl{ producerImpl: producerImpl, messages: make(map[string]*PublishingMessage), } }
var SUB_ALL = NewFilterExpression("*")
Functions ¶
func InitLogger ¶
func InitLogger()
func NewBaseConsumeService ¶ added in v5.1.3
func NewBaseConsumeService(clientId string, messageListener MessageListener, consumptionExecutor *simpleThreadPool, messageInterceptor MessageInterceptor) *baseConsumeService
func NewDefaultClientSession ¶
func NewFiFoConsumeService ¶ added in v5.1.3
func NewFiFoConsumeService(clientId string, messageListener MessageListener, consumptionExecutor *simpleThreadPool, messageInterceptor MessageInterceptor) *fifoConsumeService
func NewSimpleThreadPool ¶ added in v5.1.3
func NewStandardConsumeService ¶ added in v5.1.3
func NewStandardConsumeService(clientId string, messageListener MessageListener, consumptionExecutor *simpleThreadPool, messageInterceptor MessageInterceptor) *standardConsumeService
func ResetLogger ¶
func ResetLogger()
Types ¶
type ClientConn ¶
type ClientConn interface {
Conn() *grpc.ClientConn
Close() error
}
type ClientConnFunc ¶
type ClientConnFunc func(string, ...ConnOption) (ClientConn, error)
type ClientManager ¶
type ClientManager interface {
RegisterClient(client Client)
UnRegisterClient(client Client)
QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) (*v2.QueryRouteResponse, error)
QueryAssignments(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryAssignmentRequest, duration time.Duration) (*v2.QueryAssignmentResponse, error)
HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, duration time.Duration) (*v2.HeartbeatResponse, error)
SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, duration time.Duration) (*v2.SendMessageResponse, error)
Telemetry(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error)
EndTransaction(ctx context.Context, endpoints *v2.Endpoints, request *v2.EndTransactionRequest, duration time.Duration) (*v2.EndTransactionResponse, error)
NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, request *v2.NotifyClientTerminationRequest, duration time.Duration) (*v2.NotifyClientTerminationResponse, error)
ReceiveMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error)
AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, duration time.Duration) (*v2.AckMessageResponse, error)
ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, request *v2.ChangeInvisibleDurationRequest, duration time.Duration) (*v2.ChangeInvisibleDurationResponse, error)
ForwardMessageToDeadLetterQueue(ctx context.Context, endpoints *v2.Endpoints, request *v2.ForwardMessageToDeadLetterQueueRequest, duration time.Duration) (*v2.ForwardMessageToDeadLetterQueueResponse, error)
}
type ClientMeterProvider ¶
type ClientOption ¶
type ClientOption interface {
// contains filtered or unexported methods
}
A ClientOption sets options such as timeout, etc.
func WithClientConnFunc ¶
func WithClientConnFunc(f ClientConnFunc) ClientOption
WithClientConnFunc returns a Option that sets ClientConnFunc for nameserver. Default is NewClientConn.
func WithConnOptions ¶
func WithConnOptions(opts ...ConnOption) ClientOption
WithConnOptions returns a Option that sets ConnOption for grpc ClientConn.
func WithQueryRouteTimeout ¶
func WithQueryRouteTimeout(d time.Duration) ClientOption
WithQueryRouteTimeout returns a Option that sets timeout duration for nameserver. Default is 3s.
func WithRpcClientOptions ¶
func WithRpcClientOptions(opts ...RpcClientOption) ClientOption
WithRpcClientOptions returns a Option that sets RpcClientOption for grpc ClientConn.
type ClientSettings ¶
type ClientSettings interface {
GetClientID() string
GetClientType() v2.ClientType
GetAccessPoint() *v2.Endpoints
GetRetryPolicy() *v2.RetryPolicy
GetRequestTimeout() time.Duration
// contains filtered or unexported methods
}
type Config ¶
type Config struct {
Endpoint string `validate:"required"`
NameSpace string
ConsumerGroup string
Credentials *credentials.SessionCredentials `validate:"required"`
}
type ConnOption ¶
type ConnOption interface {
// contains filtered or unexported methods
}
A ConnOption sets options such as tls.Config, etc.
func WithContext ¶
func WithContext(ctx context.Context) ConnOption
WithContext is the default client context; it can be used to cancel grpc dial out and other operations that do not have an explicit context.
func WithDialOptions ¶
func WithDialOptions(dialOptions ...grpc.DialOption) ConnOption
WithDialOptions returns a ConnOption that sets grpc.DialOption for grpc.DialContext.
func WithDialTimeout ¶
func WithDialTimeout(dur time.Duration) ConnOption
WithDialTimeout returns a ConnOption that sets DialTimeout for grpc.DialContext. Default it is 5 second.
func WithMaxCallRecvMsgSize ¶
func WithMaxCallRecvMsgSize(size int) ConnOption
WithMaxCallRecvMsgSize returns a ConnOption that sets client-side request send limit in bytes for grpc.DialContext.
func WithMaxCallSendMsgSize ¶
func WithMaxCallSendMsgSize(size int) ConnOption
WithMaxCallSendMsgSize returns a ConnOption that sets the client-side response receive limit. If 0, it defaults to "math.MaxInt32", because range response can easily exceed request send limits. Make sure that "MaxCallRecvMsgSize" >= server-side default send/recv limit.
func WithTLSConfig ¶
func WithTLSConfig(tc *tls.Config) ConnOption
WithTLSConfig returns a ConnOption that sets tls.Config for grpc.DialContext. Default it is x509 insecure tls.Config.
func WithZapLogger ¶
func WithZapLogger(logger *zap.Logger) ConnOption
type ConsumeService ¶ added in v5.1.3
type ConsumeService interface {
Shutdown() error
// contains filtered or unexported methods
}
type Consumer ¶
type Consumer interface {
GetGroupName() string
// contains filtered or unexported methods
}
type ConsumerResult ¶ added in v5.1.3
type ConsumerResult int8
const ( /** * Consume message successfully. */ SUCCESS ConsumerResult = 0 /** * Failed to consume message. */ FAILURE ConsumerResult = 1 )
type ErrRpcStatus ¶
func AsErrRpcStatus ¶
func AsErrRpcStatus(err error) (*ErrRpcStatus, bool)
func (*ErrRpcStatus) Error ¶
func (err *ErrRpcStatus) Error() string
func (*ErrRpcStatus) GetCode ¶
func (err *ErrRpcStatus) GetCode() int32
func (*ErrRpcStatus) GetMessage ¶
func (err *ErrRpcStatus) GetMessage() string
type FilterExpression ¶
type FilterExpression struct {
// contains filtered or unexported fields
}
type FilterExpressionType ¶
type FilterExpressionType int32
const ( SQL92 FilterExpressionType = iota TAG UNSPECIFIED )
type FuncMessageListener ¶ added in v5.1.3
type FuncMessageListener struct {
Consume func(*MessageView) ConsumerResult
}
type InvocationStatus ¶
type InvocationStatus string
const ( InvocationStatus_SUCCESS InvocationStatus = "success" InvocationStatus_FAILURE InvocationStatus = "failure" )
type Message ¶
type Message struct {
Topic string
Body []byte
Tag *string
// contains filtered or unexported fields
}
func (*Message) AddProperty ¶
func (*Message) GetDeliveryTimestamp ¶
func (*Message) GetMessageCommon ¶
func (msg *Message) GetMessageCommon() *MessageCommon
func (*Message) GetMessageGroup ¶
func (*Message) GetProperties ¶
func (*Message) SetDelayTimestamp ¶
func (*Message) SetMessageGroup ¶
type MessageCommon ¶
type MessageCommon struct {
// contains filtered or unexported fields
}
type MessageHookPoints ¶
type MessageHookPoints int32
const ( MessageHookPoints_SEND MessageHookPoints = iota MessageHookPoints_RECEIVE MessageHookPoints_CONSUME MessageHookPoints_ACK MessageHookPoints_CHANGE_INVISIBLE_DURATION MessageHookPoints_COMMIT_TRANSACTION MessageHookPoints_ROLLBACK_TRANSACTION MessageHookPoints_FORWARD_TO_DLQ )
type MessageHookPointsStatus ¶
type MessageHookPointsStatus int32
const ( MessageHookPointsStatus_UNSET MessageHookPointsStatus = iota MessageHookPointsStatus_OK MessageHookPointsStatus_ERROR )
type MessageId ¶
type MessageId interface {
// GetVersion Get the version of the messageId
GetVersion() string
// String string-formed string id
String() string
}
MessageId Abstract message id
func NewMessageId ¶
type MessageIdCodec ¶
* The codec for the message-id.
Codec here provides the following two functions:
1. Provide decoding function of message-id of all versions above v0.
2. Provide a generator of message-id of v1 version.
The message-id of versions above V1 consists of 17 bytes in total. The first two bytes represent the version number. For V1, these two bytes are 0x0001.
V1 message id example
┌──┬────────────┬────┬────────┬────────┐ │01│56F7E71C361B│21BC│024CCDBE│00000000│ └──┴────────────┴────┴────────┴────────┘
V1 version message id generation rules
process id(lower 2bytes)
▲
mac address(lower 6bytes) │ sequence number(big endian)
▲ │ ▲ (4bytes)
│ │ │
┌─────┴─────┐ ┌┴┐ ┌───┐ ┌─┴─┐
0x01+ │ 6 │ │2│ │ 4 │ │ 4 │
└───────────┘ └─┘ └─┬─┘ └───┘
│
▼
seconds since 2021-01-01 00:00:00(UTC+0)
(lower 4bytes)
func GetMessageIdCodecInstance ¶
func GetMessageIdCodecInstance() MessageIdCodec
type MessageInterceptor ¶
type MessageInterceptor interface {
// contains filtered or unexported methods
}
type MessageListener ¶ added in v5.1.3
type MessageListener interface {
// contains filtered or unexported methods
}
type MessageMeterInterceptor ¶
type MessageMeterInterceptor interface {
MessageInterceptor
}
type MessageView ¶
type MessageView struct {
ReceiptHandle string
// contains filtered or unexported fields
}
func (*MessageView) GetBody ¶
func (msg *MessageView) GetBody() []byte
func (*MessageView) GetBornHost ¶
func (msg *MessageView) GetBornHost() *string
func (*MessageView) GetBornTimestamp ¶
func (msg *MessageView) GetBornTimestamp() *time.Time
func (*MessageView) GetDeliveryAttempt ¶
func (msg *MessageView) GetDeliveryAttempt() int32
func (*MessageView) GetDeliveryTimestamp ¶
func (msg *MessageView) GetDeliveryTimestamp() *time.Time
func (*MessageView) GetKeys ¶
func (msg *MessageView) GetKeys() []string
func (*MessageView) GetMessageCommon ¶
func (msg *MessageView) GetMessageCommon() *MessageCommon
func (*MessageView) GetMessageGroup ¶
func (msg *MessageView) GetMessageGroup() *string
func (*MessageView) GetMessageId ¶
func (msg *MessageView) GetMessageId() string
func (*MessageView) GetOffset ¶
func (msg *MessageView) GetOffset() int64
func (*MessageView) GetProperties ¶
func (msg *MessageView) GetProperties() map[string]string
func (*MessageView) GetReceiptHandle ¶
func (msg *MessageView) GetReceiptHandle() string
func (*MessageView) GetTag ¶
func (msg *MessageView) GetTag() *string
func (*MessageView) GetTopic ¶
func (msg *MessageView) GetTopic() string
func (*MessageView) GetTraceContext ¶
func (msg *MessageView) GetTraceContext() *string
func (*MessageView) SetDelayTimeLevel ¶
func (msg *MessageView) SetDelayTimeLevel(deliveryTimestamp time.Time)
func (*MessageView) SetKeys ¶
func (msg *MessageView) SetKeys(keys ...string)
func (*MessageView) SetMessageGroup ¶
func (msg *MessageView) SetMessageGroup(messageGroup string)
func (*MessageView) SetTag ¶
func (msg *MessageView) SetTag(tag string)
type MockClient ¶
type MockClient struct {
// contains filtered or unexported fields
}
MockClient is a mock of Client interface.
func NewMockClient ¶
func NewMockClient(ctrl *gomock.Controller) *MockClient
NewMockClient creates a new mock instance.
func (*MockClient) EXPECT ¶
func (m *MockClient) EXPECT() *MockClientMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockClient) GetClientID ¶
func (m *MockClient) GetClientID() string
GetClientID mocks base method.
func (*MockClient) GracefulStop ¶
func (m *MockClient) GracefulStop() error
GracefulStop mocks base method.
type MockClientManager ¶
type MockClientManager struct {
// contains filtered or unexported fields
}
MockClientManager is a mock of ClientManager interface.
func NewMockClientManager ¶
func NewMockClientManager(ctrl *gomock.Controller) *MockClientManager
NewMockClientManager creates a new mock instance.
func (*MockClientManager) AckMessage ¶
func (m *MockClientManager) AckMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.AckMessageRequest, duration time.Duration) (*v2.AckMessageResponse, error)
AckMessage mocks base method.
func (*MockClientManager) ChangeInvisibleDuration ¶
func (m *MockClientManager) ChangeInvisibleDuration(ctx context.Context, endpoints *v2.Endpoints, request *v2.ChangeInvisibleDurationRequest, duration time.Duration) (*v2.ChangeInvisibleDurationResponse, error)
ChangeInvisibleDuration mocks base method.
func (*MockClientManager) EXPECT ¶
func (m *MockClientManager) EXPECT() *MockClientManagerMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockClientManager) EndTransaction ¶
func (m *MockClientManager) EndTransaction(ctx context.Context, endpoints *v2.Endpoints, request *v2.EndTransactionRequest, duration time.Duration) (*v2.EndTransactionResponse, error)
EndTransaction mocks base method.
func (*MockClientManager) ForwardMessageToDeadLetterQueue ¶ added in v5.1.3
func (m *MockClientManager) ForwardMessageToDeadLetterQueue(ctx context.Context, endpoints *v2.Endpoints, request *v2.ForwardMessageToDeadLetterQueueRequest, duration time.Duration) (*v2.ForwardMessageToDeadLetterQueueResponse, error)
ForwardMessageToDeadLetterQueue mocks base method.
func (*MockClientManager) HeartBeat ¶
func (m *MockClientManager) HeartBeat(ctx context.Context, endpoints *v2.Endpoints, request *v2.HeartbeatRequest, duration time.Duration) (*v2.HeartbeatResponse, error)
HeartBeat mocks base method.
func (*MockClientManager) NotifyClientTermination ¶
func (m *MockClientManager) NotifyClientTermination(ctx context.Context, endpoints *v2.Endpoints, request *v2.NotifyClientTerminationRequest, duration time.Duration) (*v2.NotifyClientTerminationResponse, error)
NotifyClientTermination mocks base method.
func (*MockClientManager) QueryAssignments ¶ added in v5.1.3
func (m *MockClientManager) QueryAssignments(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryAssignmentRequest, duration time.Duration) (*v2.QueryAssignmentResponse, error)
QueryAssignments mocks base method.
func (*MockClientManager) QueryRoute ¶
func (m *MockClientManager) QueryRoute(ctx context.Context, endpoints *v2.Endpoints, request *v2.QueryRouteRequest, duration time.Duration) (*v2.QueryRouteResponse, error)
QueryRoute mocks base method.
func (*MockClientManager) ReceiveMessage ¶
func (m *MockClientManager) ReceiveMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error)
ReceiveMessage mocks base method.
func (*MockClientManager) RegisterClient ¶
func (m *MockClientManager) RegisterClient(client Client)
RegisterClient mocks base method.
func (*MockClientManager) SendMessage ¶
func (m *MockClientManager) SendMessage(ctx context.Context, endpoints *v2.Endpoints, request *v2.SendMessageRequest, duration time.Duration) (*v2.SendMessageResponse, error)
SendMessage mocks base method.
func (*MockClientManager) Telemetry ¶
func (m *MockClientManager) Telemetry(ctx context.Context, endpoints *v2.Endpoints, duration time.Duration) (v2.MessagingService_TelemetryClient, error)
Telemetry mocks base method.
func (*MockClientManager) UnRegisterClient ¶
func (m *MockClientManager) UnRegisterClient(client Client)
UnRegisterClient mocks base method.
type MockClientManagerMockRecorder ¶
type MockClientManagerMockRecorder struct {
// contains filtered or unexported fields
}
MockClientManagerMockRecorder is the mock recorder for MockClientManager.
func (*MockClientManagerMockRecorder) AckMessage ¶
func (mr *MockClientManagerMockRecorder) AckMessage(ctx, endpoints, request, duration interface{}) *gomock.Call
AckMessage indicates an expected call of AckMessage.
func (*MockClientManagerMockRecorder) ChangeInvisibleDuration ¶
func (mr *MockClientManagerMockRecorder) ChangeInvisibleDuration(ctx, endpoints, request, duration interface{}) *gomock.Call
ChangeInvisibleDuration indicates an expected call of ChangeInvisibleDuration.
func (*MockClientManagerMockRecorder) EndTransaction ¶
func (mr *MockClientManagerMockRecorder) EndTransaction(ctx, endpoints, request, duration interface{}) *gomock.Call
EndTransaction indicates an expected call of EndTransaction.
func (*MockClientManagerMockRecorder) ForwardMessageToDeadLetterQueue ¶ added in v5.1.3
func (mr *MockClientManagerMockRecorder) ForwardMessageToDeadLetterQueue(ctx, endpoints, request, duration interface{}) *gomock.Call
ForwardMessageToDeadLetterQueue indicates an expected call of ForwardMessageToDeadLetterQueue.
func (*MockClientManagerMockRecorder) HeartBeat ¶
func (mr *MockClientManagerMockRecorder) HeartBeat(ctx, endpoints, request, duration interface{}) *gomock.Call
HeartBeat indicates an expected call of HeartBeat.
func (*MockClientManagerMockRecorder) NotifyClientTermination ¶
func (mr *MockClientManagerMockRecorder) NotifyClientTermination(ctx, endpoints, request, duration interface{}) *gomock.Call
NotifyClientTermination indicates an expected call of NotifyClientTermination.
func (*MockClientManagerMockRecorder) QueryAssignments ¶ added in v5.1.3
func (mr *MockClientManagerMockRecorder) QueryAssignments(ctx, endpoints, request, duration interface{}) *gomock.Call
QueryAssignments indicates an expected call of QueryAssignments.
func (*MockClientManagerMockRecorder) QueryRoute ¶
func (mr *MockClientManagerMockRecorder) QueryRoute(ctx, endpoints, request, duration interface{}) *gomock.Call
QueryRoute indicates an expected call of QueryRoute.
func (*MockClientManagerMockRecorder) ReceiveMessage ¶
func (mr *MockClientManagerMockRecorder) ReceiveMessage(ctx, endpoints, request interface{}) *gomock.Call
ReceiveMessage indicates an expected call of ReceiveMessage.
func (*MockClientManagerMockRecorder) RegisterClient ¶
func (mr *MockClientManagerMockRecorder) RegisterClient(client interface{}) *gomock.Call
RegisterClient indicates an expected call of RegisterClient.
func (*MockClientManagerMockRecorder) SendMessage ¶
func (mr *MockClientManagerMockRecorder) SendMessage(ctx, endpoints, request, duration interface{}) *gomock.Call
SendMessage indicates an expected call of SendMessage.
func (*MockClientManagerMockRecorder) Telemetry ¶
func (mr *MockClientManagerMockRecorder) Telemetry(ctx, endpoints, duration interface{}) *gomock.Call
Telemetry indicates an expected call of Telemetry.
func (*MockClientManagerMockRecorder) UnRegisterClient ¶
func (mr *MockClientManagerMockRecorder) UnRegisterClient(client interface{}) *gomock.Call
UnRegisterClient indicates an expected call of UnRegisterClient.
type MockClientMockRecorder ¶
type MockClientMockRecorder struct {
// contains filtered or unexported fields
}
MockClientMockRecorder is the mock recorder for MockClient.
func (*MockClientMockRecorder) GetClientID ¶
func (mr *MockClientMockRecorder) GetClientID() *gomock.Call
GetClientID indicates an expected call of GetClientID.
func (*MockClientMockRecorder) GracefulStop ¶
func (mr *MockClientMockRecorder) GracefulStop() *gomock.Call
GracefulStop indicates an expected call of GracefulStop.
func (*MockClientMockRecorder) Sign ¶
func (mr *MockClientMockRecorder) Sign(ctx interface{}) *gomock.Call
Sign indicates an expected call of Sign.
type MockRpcClient ¶
type MockRpcClient struct {
// contains filtered or unexported fields
}
MockRpcClient is a mock of RpcClient interface.
func NewMockRpcClient ¶
func NewMockRpcClient(ctrl *gomock.Controller) *MockRpcClient
NewMockRpcClient creates a new mock instance.
func (*MockRpcClient) AckMessage ¶
func (m *MockRpcClient) AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error)
AckMessage mocks base method.
func (*MockRpcClient) ChangeInvisibleDuration ¶
func (m *MockRpcClient) ChangeInvisibleDuration(ctx context.Context, request *v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, error)
ChangeInvisibleDuration mocks base method.
func (*MockRpcClient) EXPECT ¶
func (m *MockRpcClient) EXPECT() *MockRpcClientMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
func (*MockRpcClient) EndTransaction ¶
func (m *MockRpcClient) EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error)
EndTransaction mocks base method.
func (*MockRpcClient) ForwardMessageToDeadLetterQueue ¶ added in v5.1.3
func (m *MockRpcClient) ForwardMessageToDeadLetterQueue(ctx context.Context, request *v2.ForwardMessageToDeadLetterQueueRequest) (*v2.ForwardMessageToDeadLetterQueueResponse, error)
ForwardMessageToDeadLetterQueue mocks base method.
func (*MockRpcClient) GetTarget ¶
func (m *MockRpcClient) GetTarget() string
GetTarget mocks base method.
func (*MockRpcClient) GracefulStop ¶
func (m *MockRpcClient) GracefulStop() error
GracefulStop mocks base method.
func (*MockRpcClient) HeartBeat ¶
func (m *MockRpcClient) HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error)
HeartBeat mocks base method.
func (*MockRpcClient) NotifyClientTermination ¶
func (m *MockRpcClient) NotifyClientTermination(ctx context.Context, request *v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, error)
NotifyClientTermination mocks base method.
func (*MockRpcClient) QueryAssignments ¶ added in v5.1.3
func (m *MockRpcClient) QueryAssignments(ctx context.Context, request *v2.QueryAssignmentRequest) (*v2.QueryAssignmentResponse, error)
QueryAssignments mocks base method.
func (*MockRpcClient) QueryRoute ¶
func (m *MockRpcClient) QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error)
QueryRoute mocks base method.
func (*MockRpcClient) ReceiveMessage ¶
func (m *MockRpcClient) ReceiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error)
ReceiveMessage mocks base method.
func (*MockRpcClient) SendMessage ¶
func (m *MockRpcClient) SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error)
SendMessage mocks base method.
func (*MockRpcClient) Telemetry ¶
func (m *MockRpcClient) Telemetry(ctx context.Context) (v2.MessagingService_TelemetryClient, error)
Telemetry mocks base method.
type MockRpcClientMockRecorder ¶
type MockRpcClientMockRecorder struct {
// contains filtered or unexported fields
}
MockRpcClientMockRecorder is the mock recorder for MockRpcClient.
func (*MockRpcClientMockRecorder) AckMessage ¶
func (mr *MockRpcClientMockRecorder) AckMessage(ctx, request interface{}) *gomock.Call
AckMessage indicates an expected call of AckMessage.
func (*MockRpcClientMockRecorder) ChangeInvisibleDuration ¶
func (mr *MockRpcClientMockRecorder) ChangeInvisibleDuration(ctx, request interface{}) *gomock.Call
ChangeInvisibleDuration indicates an expected call of ChangeInvisibleDuration.
func (*MockRpcClientMockRecorder) EndTransaction ¶
func (mr *MockRpcClientMockRecorder) EndTransaction(ctx, request interface{}) *gomock.Call
EndTransaction indicates an expected call of EndTransaction.
func (*MockRpcClientMockRecorder) ForwardMessageToDeadLetterQueue ¶ added in v5.1.3
func (mr *MockRpcClientMockRecorder) ForwardMessageToDeadLetterQueue(ctx, request interface{}) *gomock.Call
ForwardMessageToDeadLetterQueue indicates an expected call of ForwardMessageToDeadLetterQueue.
func (*MockRpcClientMockRecorder) GetTarget ¶
func (mr *MockRpcClientMockRecorder) GetTarget() *gomock.Call
GetTarget indicates an expected call of GetTarget.
func (*MockRpcClientMockRecorder) GracefulStop ¶
func (mr *MockRpcClientMockRecorder) GracefulStop() *gomock.Call
GracefulStop indicates an expected call of GracefulStop.
func (*MockRpcClientMockRecorder) HeartBeat ¶
func (mr *MockRpcClientMockRecorder) HeartBeat(ctx, request interface{}) *gomock.Call
HeartBeat indicates an expected call of HeartBeat.
func (*MockRpcClientMockRecorder) NotifyClientTermination ¶
func (mr *MockRpcClientMockRecorder) NotifyClientTermination(ctx, request interface{}) *gomock.Call
NotifyClientTermination indicates an expected call of NotifyClientTermination.
func (*MockRpcClientMockRecorder) QueryAssignments ¶ added in v5.1.3
func (mr *MockRpcClientMockRecorder) QueryAssignments(ctx, request interface{}) *gomock.Call
QueryAssignments indicates an expected call of QueryAssignments.
func (*MockRpcClientMockRecorder) QueryRoute ¶
func (mr *MockRpcClientMockRecorder) QueryRoute(ctx, request interface{}) *gomock.Call
QueryRoute indicates an expected call of QueryRoute.
func (*MockRpcClientMockRecorder) ReceiveMessage ¶
func (mr *MockRpcClientMockRecorder) ReceiveMessage(ctx, request interface{}) *gomock.Call
ReceiveMessage indicates an expected call of ReceiveMessage.
func (*MockRpcClientMockRecorder) SendMessage ¶
func (mr *MockRpcClientMockRecorder) SendMessage(ctx, request interface{}) *gomock.Call
SendMessage indicates an expected call of SendMessage.
func (*MockRpcClientMockRecorder) Telemetry ¶
func (mr *MockRpcClientMockRecorder) Telemetry(ctx interface{}) *gomock.Call
Telemetry indicates an expected call of Telemetry.
type MockisClient ¶
type MockisClient struct {
// contains filtered or unexported fields
}
MockisClient is a mock of isClient interface.
func NewMockisClient ¶
func NewMockisClient(ctrl *gomock.Controller) *MockisClient
NewMockisClient creates a new mock instance.
func (*MockisClient) EXPECT ¶
func (m *MockisClient) EXPECT() *MockisClientMockRecorder
EXPECT returns an object that allows the caller to indicate expected use.
type MockisClientMockRecorder ¶
type MockisClientMockRecorder struct {
// contains filtered or unexported fields
}
MockisClientMockRecorder is the mock recorder for MockisClient.
type NewClientFunc ¶
type NewClientFunc func(*Config, ...ClientOption) (Client, error)
type ProcessQueue ¶ added in v5.1.3
type ProcessQueue interface {
// contains filtered or unexported methods
}
type Producer ¶
type Producer interface {
Send(context.Context, *Message) ([]*SendReceipt, error)
SendWithTransaction(context.Context, *Message, Transaction) ([]*SendReceipt, error)
SendAsync(context.Context, *Message, func(context.Context, []*SendReceipt, error))
BeginTransaction() Transaction
Start() error
GracefulStop() error
// contains filtered or unexported methods
}
type ProducerOption ¶
type ProducerOption interface {
// contains filtered or unexported methods
}
A ProducerOption sets options such as tls.Config, etc.
func WithClientFunc ¶
func WithClientFunc(f NewClientFunc) ProducerOption
WithClientFunc returns a ProducerOption that sets ClientFunc for producer. Default is nameserver.New.
func WithMaxAttempts ¶
func WithMaxAttempts(m int32) ProducerOption
func WithTopics ¶
func WithTopics(t ...string) ProducerOption
func WithTransactionChecker ¶
func WithTransactionChecker(checker *TransactionChecker) ProducerOption
type PublishingLoadBalancer ¶
type PublishingLoadBalancer interface {
TakeMessageQueueByMessageGroup(messageGroup *string) ([]*v2.MessageQueue, error)
TakeMessageQueues(excluded *sync.Map, count int) ([]*v2.MessageQueue, error)
CopyAndUpdate([]*v2.MessageQueue) PublishingLoadBalancer
}
type PublishingMessage ¶
type PublishingMessage struct {
// contains filtered or unexported fields
}
type PushConsumer ¶ added in v5.1.3
type PushConsumer interface {
Consumer
Start() error
GracefulStop() error
Subscribe(topic string, filterExpression *FilterExpression) error
Unsubscribe(topic string) error
Ack(ctx context.Context, messageView *MessageView) error
ChangeInvisibleDuration(messageView *MessageView, invisibleDuration time.Duration) error
ChangeInvisibleDurationAsync(messageView *MessageView, invisibleDuration time.Duration)
}
type PushConsumerOption ¶ added in v5.1.3
type PushConsumerOption interface {
// contains filtered or unexported methods
}
A ConsumerOption sets options such as tag, etc.
func WithPushAwaitDuration ¶ added in v5.1.3
func WithPushAwaitDuration(awaitDuration time.Duration) PushConsumerOption
func WithPushConsumptionThreadCount ¶ added in v5.1.3
func WithPushConsumptionThreadCount(consumptionThreadCount int32) PushConsumerOption
func WithPushMaxCacheMessageCount ¶ added in v5.1.3
func WithPushMaxCacheMessageCount(maxCacheMessageCount int32) PushConsumerOption
func WithPushMaxCacheMessageSizeInBytes ¶ added in v5.1.3
func WithPushMaxCacheMessageSizeInBytes(maxCacheMessageSizeInBytes int64) PushConsumerOption
func WithPushMessageListener ¶ added in v5.1.3
func WithPushMessageListener(messageListener MessageListener) PushConsumerOption
func WithPushSubscriptionExpressions ¶ added in v5.1.3
func WithPushSubscriptionExpressions(subscriptionExpressions map[string]*FilterExpression) PushConsumerOption
WithTag returns a consumerOption that sets tag for consumer. Note: Default it uses *.
type RocketmqResolver ¶ added in v5.1.3
type RocketmqResolver struct {
// contains filtered or unexported fields
}
func (*RocketmqResolver) Close ¶ added in v5.1.3
func (r *RocketmqResolver) Close()
func (*RocketmqResolver) ResolveNow ¶ added in v5.1.3
func (r *RocketmqResolver) ResolveNow(resolver.ResolveNowOptions)
type RpcClient ¶
type RpcClient interface {
GracefulStop() error
HeartBeat(ctx context.Context, request *v2.HeartbeatRequest) (*v2.HeartbeatResponse, error)
QueryRoute(ctx context.Context, request *v2.QueryRouteRequest) (*v2.QueryRouteResponse, error)
QueryAssignments(ctx context.Context, request *v2.QueryAssignmentRequest) (*v2.QueryAssignmentResponse, error)
SendMessage(ctx context.Context, request *v2.SendMessageRequest) (*v2.SendMessageResponse, error)
Telemetry(ctx context.Context) (v2.MessagingService_TelemetryClient, error)
EndTransaction(ctx context.Context, request *v2.EndTransactionRequest) (*v2.EndTransactionResponse, error)
NotifyClientTermination(ctx context.Context, request *v2.NotifyClientTerminationRequest) (*v2.NotifyClientTerminationResponse, error)
ReceiveMessage(ctx context.Context, request *v2.ReceiveMessageRequest) (v2.MessagingService_ReceiveMessageClient, error)
AckMessage(ctx context.Context, request *v2.AckMessageRequest) (*v2.AckMessageResponse, error)
ChangeInvisibleDuration(ctx context.Context, request *v2.ChangeInvisibleDurationRequest) (*v2.ChangeInvisibleDurationResponse, error)
ForwardMessageToDeadLetterQueue(ctx context.Context, request *v2.ForwardMessageToDeadLetterQueueRequest) (*v2.ForwardMessageToDeadLetterQueueResponse, error)
GetTarget() string
// contains filtered or unexported methods
}
type RpcClientOption ¶
type RpcClientOption interface {
// contains filtered or unexported methods
}
A RpcClientOption sets options such as tls.Config, etc.
func WithHealthCheckDuration ¶
func WithHealthCheckDuration(d time.Duration) RpcClientOption
WithHealthCheckDuration returns a RpcClientOption that sets healthCheckDuration for RpcClient. Default is 15s.
func WithHeartbeatDuration ¶
func WithHeartbeatDuration(d time.Duration) RpcClientOption
WithHeartbeatDuration returns a RpcClientOption that sets heartbeatDuration for RpcClient. Default is 10s.
func WithRpcClientClientConnFunc ¶
func WithRpcClientClientConnFunc(f ClientConnFunc) RpcClientOption
WithRpcClientClientConnFunc returns a RpcClientOption that sets ClientConnFunc for RpcClient. Default is NewClientConn.
func WithRpcClientConnOption ¶
func WithRpcClientConnOption(opts ...ConnOption) RpcClientOption
WithRpcClientConnOption returns a RpcClientOption that sets ConnOption for RpcClient.
func WithRpcClientTimeout ¶
func WithRpcClientTimeout(d time.Duration) RpcClientOption
WithRpcClientTimeout returns a RpcClientOption that sets time for RpcClient when heartbeat and health check. Default is 5s.
type SendReceipt ¶
type SimpleConsumer ¶
type SimpleConsumer interface {
Consumer
Start() error
GracefulStop() error
Subscribe(topic string, filterExpression *FilterExpression) error
Unsubscribe(topic string) error
Ack(ctx context.Context, messageView *MessageView) error
Receive(ctx context.Context, maxMessageNum int32, invisibleDuration time.Duration) ([]*MessageView, error)
ChangeInvisibleDuration(messageView *MessageView, invisibleDuration time.Duration) error
ChangeInvisibleDurationAsync(messageView *MessageView, invisibleDuration time.Duration)
}
type SimpleConsumerOption ¶
type SimpleConsumerOption interface {
// contains filtered or unexported methods
}
A ConsumerOption sets options such as tag, etc.
func WithClientFuncForSimpleConsumer ¶
func WithClientFuncForSimpleConsumer(f NewClientFunc) SimpleConsumerOption
WithClientFuncForSimpleConsumer returns a consumerOption that sets ClientFunc for simple consumer. Default is nameserver.New.
func WithSimpleAwaitDuration ¶ added in v5.1.3
func WithSimpleAwaitDuration(awaitDuration time.Duration) SimpleConsumerOption
func WithSimpleSubscriptionExpressions ¶ added in v5.1.3
func WithSimpleSubscriptionExpressions(subscriptionExpressions map[string]*FilterExpression) SimpleConsumerOption
WithTag returns a consumerOption that sets tag for consumer. Note: Default it uses *.
type SubscriptionLoadBalancer ¶
type SubscriptionLoadBalancer interface {
TakeMessageQueue() (*v2.MessageQueue, error)
CopyAndUpdate([]*v2.MessageQueue) SubscriptionLoadBalancer
}
type Transaction ¶
type TransactionChecker ¶
type TransactionChecker struct {
Check func(msg *MessageView) TransactionResolution
}
type TransactionResolution ¶
type TransactionResolution int32
const ( UNKNOWN TransactionResolution = iota // 开始生成枚举值, 默认为0 COMMIT ROLLBACK )
type UnifiedMessage ¶
type UnifiedMessage struct {
// contains filtered or unexported fields
}
func (*UnifiedMessage) GetMessage ¶
func (uMsg *UnifiedMessage) GetMessage() *Message
Source Files
¶
- client.go
- client_manager.go
- client_manager_mock.go
- client_mock.go
- client_options.go
- config.go
- conn.go
- conn_options.go
- consumer.go
- consumer_options.go
- consumer_service.go
- error.go
- loadBalancer.go
- log.go
- message.go
- message_id.go
- message_id_codec.go
- metric.go
- name_resolver.go
- process_queue.go
- producer.go
- producer_options.go
- publishing_message.go
- push_consumer.go
- push_consumer_options.go
- rpc_client.go
- rpc_client_mock.go
- rpc_client_options.go
- simple_consumer.go
- simple_consumer_options.go
- simple_thread_pool.go
- trace.go
- transaction.go
- user_agent.go
Directories
¶
| Path | Synopsis |
|---|---|
|
example
|
|
|
consumer/push_consumer
command
|
|
|
consumer/simple_consumer
command
|
|
|
producer/async
command
|
|
|
producer/delay
command
|
|
|
producer/fifo
command
|
|
|
producer/normal
command
|
|
|
producer/transaction
command
|
|
|
pkg
|
|
|
protocol
|
|