Documentation
¶
Overview ¶
Example (Direct_exchange) ¶
Direct Exchange
m := mockMessage{
Message: "foo",
}
w1 := NewWorker(
WithQueue("direct_queue"),
WithExchangeName("direct_exchange"),
WithExchangeType("direct"),
WithRoutingKey("direct_exchange"),
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
fmt.Println("worker01 get data:", string(m.Payload()))
time.Sleep(100 * time.Millisecond)
return nil
}),
)
q1, err := queue.NewQueue(
queue.WithWorker(w1),
)
if err != nil {
w1.opts.logger.Fatal(err)
}
q1.Start()
w2 := NewWorker(
WithQueue("direct_queue"),
WithExchangeName("direct_exchange"),
WithExchangeType("direct"),
WithRoutingKey("direct_exchange"),
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
fmt.Println("worker02 get data:", string(m.Payload()))
time.Sleep(100 * time.Millisecond)
return nil
}),
)
q2, err := queue.NewQueue(
queue.WithWorker(w2),
)
if err != nil {
w2.opts.logger.Fatal(err)
}
q2.Start()
w := NewWorker(
WithExchangeName("direct_exchange"),
WithExchangeType("direct"),
WithRoutingKey("direct_exchange"),
)
q, err := queue.NewQueue(
queue.WithWorker(w),
)
if err != nil {
w.opts.logger.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
if err := q.Queue(m); err != nil {
w.opts.logger.Fatal(err)
}
if err := q.Queue(m); err != nil {
w.opts.logger.Fatal(err)
}
if err := q.Queue(m); err != nil {
w.opts.logger.Fatal(err)
}
if err := q.Queue(m); err != nil {
w.opts.logger.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
q.Release()
q1.Release()
q2.Release()
Output: worker01 get data: foo worker02 get data: foo worker01 get data: foo worker02 get data: foo
Example (Fanout_exchange) ¶
Fanout Exchange
m := mockMessage{
Message: "foo",
}
w1 := NewWorker(
WithQueue("fanout_queue_1"),
WithExchangeName("fanout_exchange"),
WithExchangeType("fanout"),
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
fmt.Println("worker01 get data:", string(m.Payload()))
return nil
}),
)
q1, err := queue.NewQueue(
queue.WithWorker(w1),
)
if err != nil {
w1.opts.logger.Fatal(err)
}
q1.Start()
w2 := NewWorker(
WithQueue("fanout_queue_2"),
WithExchangeName("fanout_exchange"),
WithExchangeType("fanout"),
WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
fmt.Println("worker02 get data:", string(m.Payload()))
return nil
}),
)
q2, err := queue.NewQueue(
queue.WithWorker(w2),
)
if err != nil {
w2.opts.logger.Fatal(err)
}
q2.Start()
w := NewWorker(
WithExchangeName("fanout_exchange"),
WithExchangeType("fanout"),
)
q, err := queue.NewQueue(
queue.WithWorker(w),
)
if err != nil {
w.opts.logger.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
if err := q.Queue(m); err != nil {
w.opts.logger.Fatal(err)
}
time.Sleep(200 * time.Millisecond)
q.Release()
q1.Release()
q2.Release()
Output: worker01 get data: foo worker02 get data: foo
Index ¶
- Constants
- type Option
- func WithAddr(addr string) Option
- func WithAutoAck(val bool) Option
- func WithExchangeName(val string) Option
- func WithExchangeType(val string) Option
- func WithLogger(l queue.Logger) Option
- func WithQueue(val string) Option
- func WithRoutingKey(val string) Option
- func WithRunFunc(fn func(context.Context, core.TaskMessage) error) Option
- func WithTag(val string) Option
- type Worker
Examples ¶
Constants ¶
View Source
const ( ExchangeDirect = "direct" ExchangeFanout = "fanout" ExchangeTopic = "topic" ExchangeHeaders = "headers" )
defined in rabbitmq client package.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option func(*options)
Option for queue system
func WithExchangeName ¶
WithExchangeName setup the Exchange name Exchanges are AMQP 0-9-1 entities where messages are sent to. Exchanges take a message and route it into zero or more queues.
func WithExchangeType ¶
WithExchangeType setup the Exchange type The routing algorithm used depends on the exchange type and rules called bindings. AMQP 0-9-1 brokers provide four exchange types: Direct exchange (Empty string) and amq.direct Fanout exchange amq.fanout Topic exchange amq.topic Headers exchange amq.match (and amq.headers in RabbitMQ)
func WithRunFunc ¶
WithRunFunc setup the run func of queue
Click to show internal directories.
Click to hide internal directories.
