Documentation
¶
Index ¶
- Constants
- Variables
- func ContextWithQueueService(ctx context.Context) context.Context
- func RegisterBackendImplementor(backend Backend)
- func RegisterIdempotentImplementor(idempotent Idempotent)
- func RegisterImplementor(s Queue)
- func StreamServerInterceptor() grpc.StreamServerInterceptor
- func UnaryServerInterceptor() grpc.UnaryServerInterceptor
- type Backend
- type Consumer
- type ConsumerFunc
- type Idempotent
- type Message
- type MessageOperation
- type MessageWrapper
- type PingMessage
- func (m *PingMessage) Begin()
- func (m *PingMessage) Cancel()
- func (m *PingMessage) Content() []byte
- func (m *PingMessage) End()
- func (m *PingMessage) Fail()
- func (m *PingMessage) IsPing() bool
- func (m *PingMessage) NotBefore() time.Time
- func (m *PingMessage) Queue() string
- func (m *PingMessage) Requeue()
- func (m *PingMessage) Retry() int
- func (m *PingMessage) Timestamp() time.Time
- func (m *PingMessage) Topic() string
- func (m *PingMessage) UniqueID() string
- type ProcessStatus
- type PublishOption
- type PublishOptions
- type Queue
- type SubscribeOption
- func WithConsumeComponent(component string) SubscribeOption
- func WithConsumeConcurrency(concurrency int) SubscribeOption
- func WithConsumeContext(ctx context.Context) SubscribeOption
- func WithConsumeIdempotent(impl Idempotent) SubscribeOption
- func WithConsumeProduct(product string) SubscribeOption
- func WithConsumeRetry(retry int) SubscribeOption
- func WithConsumeTopic(topic string) SubscribeOption
- type SubscribeOptions
Constants ¶
View Source
const ( DefaultTopic = "default" DefaultConcurrency = 1 DefaultMaxRetry = 3 )
Variables ¶
View Source
var EmptyPublishOptions = func() *PublishOptions { return &PublishOptions{ Context: context.Background(), } }
View Source
var EmptySubscribeOptions = func() *SubscribeOptions { return &SubscribeOptions{ Context: context.Background(), Topic: DefaultTopic, Concurrency: DefaultConcurrency, MaxRetry: DefaultMaxRetry, Idempotent: IdempotentImplementor(), } }
Functions ¶
func RegisterBackendImplementor ¶
func RegisterBackendImplementor(backend Backend)
Register service implementor.
func RegisterIdempotentImplementor ¶
func RegisterIdempotentImplementor(idempotent Idempotent)
Register service implementor.
func StreamServerInterceptor ¶
func StreamServerInterceptor() grpc.StreamServerInterceptor
StreamServerInterceptor returns a new streaming server interceptor for message queue service.
func UnaryServerInterceptor ¶
func UnaryServerInterceptor() grpc.UnaryServerInterceptor
UnaryServerInterceptor returns a new unary server interceptor for message queue service.
Types ¶
type Backend ¶
type Backend interface {
// Backend type.
Type() string
// Ping connect the backend server if not connected.
// Will be called before every Read/Write operation.
Ping() error
// Return the max delay duration supported by the backend.
// A negative value means no limitation.
// A zero value means delay operation is not supported.
MaxDelay() time.Duration
// Return all queue names in backend storage.
GetQueues() ([]string, error)
// Return all queue/topics in backend storage.
GetTopics() (map[string][]string, error)
// Return all topic length of specified queue in backend storage.
GetQueueLength(queue string) (map[string]int64, error)
// Return the specified queue/topic length in backend storage.
GetTopicLength(queue, topic string) (int64, error)
// Read subscribes the message of the specified queue and topic.
Read(ctx context.Context, queue, topic string, ch chan<- MessageWrapper) error
// Write publishes content data to the specified queue.
Write(ctx context.Context, queue string, delay time.Duration, content []byte) error
}
Queue backend interface.
type ConsumerFunc ¶
type Idempotent ¶
type Idempotent interface {
// Invoked before process message.
// Returns true to continue the message processing.
// Returns false to invoke Cancel for the message.
BeforeProcess(Message) bool
// Invoked after processing.
AfterProcess(Message, ProcessStatus)
}
Idempotent interface.
func IdempotentImplementor ¶
func IdempotentImplementor() Idempotent
Return the service implementor.
type Message ¶
type Message interface {
// Queue name of this message.
Queue() string
// Topic name of this message.
Topic() string
// Unique ID of this message.
UniqueID() string
// Message body content.
Content() []byte
// The creation time of the message.
Timestamp() time.Time
// The message should not be processed before this timestamp.
NotBefore() time.Time
// Message retry times.
Retry() int
// Return true for a ping message.
IsPing() bool
}
Queue message struct.
type MessageOperation ¶
type MessageOperation interface {
// Begin to process the message.
Begin()
// Indicate the message should be ignored.
Cancel()
// End indicates a successful process.
End()
// Requeue indicates the message should be retried.
Requeue()
// Fail indicates a failed process.
Fail()
}
Queue message operation interface.
type PingMessage ¶
type PingMessage struct{}
func (*PingMessage) NotBefore ¶
func (m *PingMessage) NotBefore() time.Time
The message should not be processed before this timestamp.
func (*PingMessage) Requeue ¶
func (m *PingMessage) Requeue()
Requeue indicates the message should be retried.
func (*PingMessage) Timestamp ¶
func (m *PingMessage) Timestamp() time.Time
The creation time of the message.
type ProcessStatus ¶
type ProcessStatus int
Status of the message.
const ( Created ProcessStatus = iota Processing Canceled Succeeded Failed Requeued )
type PublishOption ¶
type PublishOption func(*PublishOptions)
func WithPublishContext ¶
func WithPublishContext(ctx context.Context) PublishOption
func WithPublishDelay ¶
func WithPublishDelay(delay time.Duration) PublishOption
type Queue ¶
type Queue interface {
// Publish writes a message body to the specified queue.
Publish(queue string, content []byte, opts ...PublishOption) error
// Subscribe consumes the messages of the specified queue.
Subscribe(queue string, handler Consumer, opts ...SubscribeOption) error
}
func ContextQueueService ¶
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
func WithConsumeComponent ¶
func WithConsumeComponent(component string) SubscribeOption
func WithConsumeConcurrency ¶
func WithConsumeConcurrency(concurrency int) SubscribeOption
func WithConsumeContext ¶
func WithConsumeContext(ctx context.Context) SubscribeOption
func WithConsumeIdempotent ¶
func WithConsumeIdempotent(impl Idempotent) SubscribeOption
func WithConsumeProduct ¶
func WithConsumeProduct(product string) SubscribeOption
func WithConsumeRetry ¶
func WithConsumeRetry(retry int) SubscribeOption
func WithConsumeTopic ¶
func WithConsumeTopic(topic string) SubscribeOption
type SubscribeOptions ¶
Click to show internal directories.
Click to hide internal directories.