Documentation
¶
Index ¶
- Variables
- func DefaultDecodeFunc[T ITopic](ctx context.Context, bs []byte) (T, error)
- func DefaultEncodeFunc[T ITopic](ctx context.Context, dst T) ([]byte, error)
- func DefaultSubscriberCheckFunc[T ITopic](ctx context.Context, dst T) bool
- func DefaultSubscriberHandleFunc[T ITopic](ctx context.Context, dst T) error
- func Extract(ctx context.Context, p propagation.TextMapPropagator, metadata metadata.MD) (baggage.Baggage, sdktrace.SpanContext)
- func Inject(ctx context.Context, p propagation.TextMapPropagator, metadata metadata.MD)
- type DecodeFunc
- type EncodeFunc
- type Event
- type IMsgBus
- type IPublisher
- type ISharedStorage
- type ISubscriber
- type ITopic
- type ITopicManager
- type MetadataSupplier
- type OTELOption
- type OTELOptions
- type Publisher
- type PublisherOption
- type PublisherOptions
- type Subscriber
- type SubscriberCheckFunc
- type SubscriberHandleFunc
- type SubscriberOption
- type SubscriberOptions
- type TopicManager
Constants ¶
This section is empty.
Variables ¶
Functions ¶
func DefaultDecodeFunc ¶
func DefaultEncodeFunc ¶
func Extract ¶
func Extract(ctx context.Context, p propagation.TextMapPropagator, metadata metadata.MD) ( baggage.Baggage, sdktrace.SpanContext, )
Extract extracts the metadata from ctx.
func Inject ¶
func Inject(ctx context.Context, p propagation.TextMapPropagator, metadata metadata.MD)
Inject injects cross-cutting concerns from the ctx into the metadata.
Types ¶
type IMsgBus ¶
type IMsgBus interface {
// Push 推入数据
Push(ctx context.Context, topic string, bs []byte) error
// Pop 以阻塞的方式获取数据
// blockTimeout 为 0 则永久阻塞 直到 context 退出 或 数据到达
Pop(ctx context.Context, topic, channel string, blockTimeout time.Duration) (data []byte, ackFn func(), err error)
// AddChannel 为 topic 添加 channel
AddChannel(ctx context.Context, topic string, channel string) error
// RemoveChannel 删除 Channel, channel 下的数据也应该被删除
RemoveChannel(ctx context.Context, topic string, channel string) error
// ListChannel 列出 Topic 下所有 Channel
ListChannel(ctx context.Context, topic string) ([]string, error)
}
type IPublisher ¶
func NewPublisher ¶
func NewPublisher[T ITopic](client IMsgBus, topicManager ITopicManager, otelOptions *OTELOptions, opts ...PublisherOption[T]) IPublisher[T]
type ISharedStorage ¶
type ISubscriber ¶
type ISubscriber[T ITopic] interface { Handle(ctx context.Context) error // Close 取消订阅,这个操作必须成功 Close(ctx context.Context) }
func NewSubscriber ¶
func NewSubscriber[T ITopic](topic, channel string, client IMsgBus, otelOptions *OTELOptions, topicManager ITopicManager, opts ...SubscriberOption[T]) ISubscriber[T]
type ITopicManager ¶
type ITopicManager interface {
// Register 注册
Register(ctx context.Context, topic string, channel string, uuid string, ttl time.Duration) error
// Unregister 取消注册
Unregister(ctx context.Context, topic string, channel string, uuid string)
}
ITopicManager 用于监听 topic channel 当 channel
func NewTopicManager ¶
func NewTopicManager(ctx context.Context, client IMsgBus, cas try_lock.CASCommand, storage ISharedStorage) ITopicManager
type MetadataSupplier ¶
type MetadataSupplier struct {
// contains filtered or unexported fields
}
func NewMetadataSupplier ¶
func NewMetadataSupplier(metadata metadata.MD) *MetadataSupplier
func (*MetadataSupplier) Get ¶
func (s *MetadataSupplier) Get(key string) string
func (*MetadataSupplier) Keys ¶
func (s *MetadataSupplier) Keys() []string
func (*MetadataSupplier) Set ¶
func (s *MetadataSupplier) Set(key, value string)
type OTELOption ¶
type OTELOption func(o *OTELOptions)
func WithOTELOptionPropagator ¶
func WithOTELOptionPropagator(propagator propagation.TextMapPropagator) OTELOption
type OTELOptions ¶
type OTELOptions struct {
// contains filtered or unexported fields
}
func NewOTELOptions ¶
func NewOTELOptions(opts ...OTELOption) *OTELOptions
func (*OTELOptions) ConsumerStartSpan ¶
type PublisherOption ¶
type PublisherOption[T ITopic] func(o *PublisherOptions[T])
func WithEncodeFunc ¶
func WithEncodeFunc[T ITopic](f EncodeFunc[T]) PublisherOption[T]
type PublisherOptions ¶
type PublisherOptions[T ITopic] struct { Encode EncodeFunc[T] }
type Subscriber ¶
type Subscriber[T ITopic] struct { // contains filtered or unexported fields }
func (*Subscriber[T]) Close ¶
func (x *Subscriber[T]) Close(ctx context.Context)
type SubscriberHandleFunc ¶
type SubscriberOption ¶
type SubscriberOption[T ITopic] func(o *SubscriberOptions[T])
func WithCheckFunc ¶
func WithCheckFunc[T ITopic](f SubscriberCheckFunc[T]) SubscriberOption[T]
func WithDecodeFunc ¶
func WithDecodeFunc[T ITopic](f DecodeFunc[T]) SubscriberOption[T]
func WithHandleFunc ¶
func WithHandleFunc[T ITopic](f SubscriberHandleFunc[T]) SubscriberOption[T]
type SubscriberOptions ¶
type SubscriberOptions[T ITopic] struct { HandleEvent SubscriberHandleFunc[T] CheckEvent SubscriberCheckFunc[T] Decode DecodeFunc[T] }
type TopicManager ¶
type TopicManager struct {
// contains filtered or unexported fields
}
func (*TopicManager) Unregister ¶
Source Files
¶
Click to show internal directories.
Click to hide internal directories.