mq

package
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 10, 2024 License: Apache-2.0 Imports: 6 Imported by: 2

Documentation

Overview

Package mq is an interface used for asynchronous messaging, the default implementation is kafka

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Codecer

type Codecer interface {
	Marshal(interface{}) ([]byte, error)
	Unmarshal([]byte, interface{}) error
	String() string
}

Codec is a simple encoding interface used for the mq/transport

type Event

type Event interface {
	// Topic return the topic of the message
	Topic() string
	// Message return the message body
	Message() *Message
	// ACK message reply operation
	Ack() error
	// Error get the error of message consumption
	Error() error
	// Extra the important information other than the message body
	Extra() map[string]interface{}
}

Event is given to a subscription handler for processing.

type Handler

type Handler func(Event) error

Handler is used to process messages via a subscription of a topic. The handler is passed a publication interface which contains the message and optional Ack method to acknowledge receipt of the message.

type JsonCodec

type JsonCodec struct{}

func (JsonCodec) Marshal

func (jc JsonCodec) Marshal(v interface{}) ([]byte, error)

func (JsonCodec) String

func (jc JsonCodec) String() string

func (JsonCodec) Unmarshal

func (jc JsonCodec) Unmarshal(data []byte, v interface{}) error

type MQ

type MQ interface {
	Init(...Option) error
	Options() Options
	Address() string
	Connect() error
	Disconnect() error
	Publish(topic string, m *Message, opts ...PublishOption) error
	Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
	String() string
}

type MQConfig

type MQConfig struct {
	// Addresses of the mq cluster
	Addresses []string `json:"addresses,omitempty"`

	TLSConfig
}

type Message

type Message struct {
	Header map[string]string
	Body   []byte
	// contains filtered or unexported fields
}

Message is the message entity.

func (Message) MessageKey

func (msg Message) MessageKey() string

MessageKey get the flag that represents the message

func (*Message) SetMessageKey

func (msg *Message) SetMessageKey(key string)

SetMessageKey set a flag that represents the message

type Option

type Option func(*Options)

func Addresses

func Addresses(addrs ...string) Option

Addresses set the host addresses to be used by the mq

func Codec

func Codec(c Codecer) Option

Codec sets the codec used for encoding/decoding used where

func Context

func Context(c context.Context) Option

func ContextWithValue

func ContextWithValue(k, v interface{}) Option

func ErrorHandler

func ErrorHandler(h Handler) Option

ErrorHandler set the error handler

func Log

func Log(log *logrus.Entry) Option

func Secure

func Secure(b bool) Option

Secure communication with the mq

func SetTLSConfig

func SetTLSConfig(t *tls.Config) Option

SetTLSConfig Specify TLS Config

type Options

type Options struct {
	Addresses []string
	Secure    bool
	Codec     Codecer

	// Handler executed when error happens in mq message processing
	ErrorHandler Handler

	TLSConfig *tls.Config

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context

	Log *logrus.Entry
}

type PublishOption

type PublishOption func(*PublishOptions)

func PublishContext

func PublishContext(ctx context.Context) PublishOption

PublishContext set context

type PublishOptions

type PublishOptions struct {
	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context
}

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

func DisableAutoAck

func DisableAutoAck() SubscribeOption

DisableAutoAck will disable auto acking of messages after they have been handled.

func Queue

func Queue(name string) SubscribeOption

Queue sets the name of the queue to share messages on

func SubscribeContext

func SubscribeContext(ctx context.Context) SubscribeOption

SubscribeContext set context

type SubscribeOptions

type SubscribeOptions struct {
	// AutoAck defaults to true. When a handler returns
	// with a nil error the message is receipt already.
	AutoAck bool
	// Subscribers with the same queue name
	// will create a shared subscription where each
	// receives a subset of messages.
	Queue string

	// Other options for implementations of the interface
	// can be stored in a context
	Context context.Context
}

func NewSubscribeOptions

func NewSubscribeOptions(opts ...SubscribeOption) SubscribeOptions

type Subscriber

type Subscriber interface {
	Options() SubscribeOptions
	Topic() string
	Unsubscribe() error
}

Subscriber is a convenience return type for the Subscribe method.

type TLSConfig

type TLSConfig struct {
	// CertFile the optional certificate file for client authentication
	CertFile string `json:"cert_file,omitempty"`

	// KeyFile the optional key file for client authentication
	KeyFile string `json:"key_file,omitempty"`

	// CAFile  the optional certificate authority file for TLS client authentication
	CAFile string `json:"ca_file,omitempty"`

	// VerifySSL optional verify ssl certificates chain
	VerifySSL bool `json:"verify_ssl,omitempty"`
}

func (*TLSConfig) TLSConfig

func (tc *TLSConfig) TLSConfig() (t *tls.Config, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL