queue

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 19, 2021 License: MIT Imports: 3 Imported by: 2

Documentation

Index

Constants

View Source
const (
	DefaultTopic       = "default"
	DefaultConcurrency = 1
	DefaultMaxRetry    = 3
)

Variables

View Source
var EmptyPublishOptions = func() *PublishOptions {
	return &PublishOptions{
		Context: context.Background(),
	}
}
View Source
var EmptySubscribeOptions = func() *SubscribeOptions {
	return &SubscribeOptions{
		Context:     context.Background(),
		Topic:       DefaultTopic,
		Concurrency: DefaultConcurrency,
		MaxRetry:    DefaultMaxRetry,
		Idempotent:  IdempotentImplementor(),
	}
}

Functions

func ContextWithQueueService

func ContextWithQueueService(ctx context.Context) context.Context

func RegisterBackendImplementor

func RegisterBackendImplementor(backend Backend)

Register service implementor.

func RegisterIdempotentImplementor

func RegisterIdempotentImplementor(idempotent Idempotent)

Register service implementor.

func RegisterImplementor

func RegisterImplementor(s Queue)

Register service implementor.

func StreamServerInterceptor

func StreamServerInterceptor() grpc.StreamServerInterceptor

StreamServerInterceptor returns a new streaming server interceptor for message queue service.

func UnaryServerInterceptor

func UnaryServerInterceptor() grpc.UnaryServerInterceptor

UnaryServerInterceptor returns a new unary server interceptor for message queue service.

Types

type Backend

type Backend interface {
	// Backend type.
	Type() string
	// Ping connect the backend server if not connected.
	// Will be called before every Read/Write operation.
	Ping() error
	// Return the max delay duration supported by the backend.
	// A negative value means no limitation.
	// A zero value means delay operation is not supported.
	MaxDelay() time.Duration
	// Return all queue names in backend storage.
	GetQueues() ([]string, error)
	// Return all queue/topics in backend storage.
	GetTopics() (map[string][]string, error)
	// Return all topic length of specified queue in backend storage.
	GetQueueLength(queue string) (map[string]int64, error)
	// Return the specified queue/topic length in backend storage.
	GetTopicLength(queue, topic string) (int64, error)

	// Read subscribes the message of the specified queue and topic.
	Read(ctx context.Context, queue, topic string, ch chan<- MessageWrapper) error
	// Write publishes content data to the specified queue.
	Write(ctx context.Context, queue string, delay time.Duration, content []byte) error
}

Queue backend interface.

func BackendImplementor

func BackendImplementor() Backend

Return the service implementor.

type Consumer

type Consumer interface {
	Handle(context.Context, Message) error
}

Queue consume handler.

type ConsumerFunc

type ConsumerFunc func(context.Context, Message) error

func (ConsumerFunc) Handle

func (fn ConsumerFunc) Handle(ctx context.Context, m Message) error

type Idempotent

type Idempotent interface {
	// Invoked before process message.
	// Returns true to continue the message processing.
	// Returns false to invoke Cancel for the message.
	BeforeProcess(Message) bool
	// Invoked after processing.
	AfterProcess(Message, ProcessStatus)
}

Idempotent interface.

func IdempotentImplementor

func IdempotentImplementor() Idempotent

Return the service implementor.

type Message

type Message interface {
	// Queue name of this message.
	Queue() string
	// Topic name of this message.
	Topic() string

	// Unique ID of this message.
	UniqueID() string
	// Message body content.
	Content() []byte
	// The creation time of the message.
	Timestamp() time.Time
	// The message should not be processed before this timestamp.
	NotBefore() time.Time
	// Message retry times.
	Retry() int
	// Return true for a ping message.
	IsPing() bool
}

Queue message struct.

type MessageOperation

type MessageOperation interface {
	// Begin to process the message.
	Begin()
	// Indicate the message should be ignored.
	Cancel()
	// End indicates a successful process.
	End()
	// Requeue indicates the message should be retried.
	Requeue()
	// Fail indicates a failed process.
	Fail()
}

Queue message operation interface.

type MessageWrapper

type MessageWrapper interface {
	Message
	MessageOperation
}

Message wrapper.

type PingMessage

type PingMessage struct{}

func (*PingMessage) Begin

func (m *PingMessage) Begin()

Begin to process the message.

func (*PingMessage) Cancel

func (m *PingMessage) Cancel()

Indicate the message should be ignored.

func (*PingMessage) Content

func (m *PingMessage) Content() []byte

Message body content.

func (*PingMessage) End

func (m *PingMessage) End()

End indicates a successful process.

func (*PingMessage) Fail

func (m *PingMessage) Fail()

Fail indicates a failed process.

func (*PingMessage) IsPing

func (m *PingMessage) IsPing() bool

Return true for a ping message.

func (*PingMessage) NotBefore

func (m *PingMessage) NotBefore() time.Time

The message should not be processed before this timestamp.

func (*PingMessage) Queue

func (m *PingMessage) Queue() string

Queue name of this message.

func (*PingMessage) Requeue

func (m *PingMessage) Requeue()

Requeue indicates the message should be retried.

func (*PingMessage) Retry

func (m *PingMessage) Retry() int

Message retry times.

func (*PingMessage) Timestamp

func (m *PingMessage) Timestamp() time.Time

The creation time of the message.

func (*PingMessage) Topic

func (m *PingMessage) Topic() string

Topic name of this message.

func (*PingMessage) UniqueID

func (m *PingMessage) UniqueID() string

Unique ID of this message.

type ProcessStatus

type ProcessStatus int

Status of the message.

const (
	Created ProcessStatus = iota
	Processing
	Canceled
	Succeeded
	Failed
	Requeued
)

type PublishOption

type PublishOption func(*PublishOptions)

func WithPublishContext

func WithPublishContext(ctx context.Context) PublishOption

func WithPublishDelay

func WithPublishDelay(delay time.Duration) PublishOption

type PublishOptions

type PublishOptions struct {
	context.Context
	Delay time.Duration
}

type Queue

type Queue interface {
	// Publish writes a message body to the specified queue.
	Publish(queue string, content []byte, opts ...PublishOption) error
	// Subscribe consumes the messages of the specified queue.
	Subscribe(queue string, handler Consumer, opts ...SubscribeOption) error
}

func ContextQueueService

func ContextQueueService(ctx context.Context) Queue

func Implementor

func Implementor() Queue

Return the service implementor.

type SubscribeOption

type SubscribeOption func(*SubscribeOptions)

func WithConsumeComponent

func WithConsumeComponent(component string) SubscribeOption

func WithConsumeConcurrency

func WithConsumeConcurrency(concurrency int) SubscribeOption

func WithConsumeContext

func WithConsumeContext(ctx context.Context) SubscribeOption

func WithConsumeIdempotent

func WithConsumeIdempotent(impl Idempotent) SubscribeOption

func WithConsumeProduct

func WithConsumeProduct(product string) SubscribeOption

func WithConsumeRetry

func WithConsumeRetry(retry int) SubscribeOption

func WithConsumeTopic

func WithConsumeTopic(topic string) SubscribeOption

type SubscribeOptions

type SubscribeOptions struct {
	context.Context
	Component   string
	Product     string
	Topic       string
	Concurrency int
	MaxRetry    int
	Idempotent  Idempotent
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL