Documentation
¶
Overview ¶
Example ¶
package main
import (
"context"
"fmt"
"os"
"strings"
"time"
redis "github.com/Bofry/lib-redis-stream"
)
func main() {
var (
EVN_REDIS_SERVERS = strings.Split(os.Getenv("REDIS_SERVERS"), ",")
)
if len(EVN_REDIS_SERVERS) == 0 {
EVN_REDIS_SERVERS = []string{"127.0.0.1:6379"}
}
// register consumer group
{
admin, err := redis.NewAdminClient(&redis.UniversalOptions{
Addrs: EVN_REDIS_SERVERS,
DB: 0,
})
if err != nil {
panic(err)
}
defer func() {
defer admin.Close()
count, err := admin.Handle().XLen("gotestStream").Result()
if err != nil {
panic(err)
}
fmt.Printf("Retained messages: %d\n", count)
/*
XGROUP DESTROY gotestStream gotestGroup
*/
_, err = admin.DeleteConsumerGroup("gotestStream", "gotestGroup")
if err != nil {
panic(err)
}
/*
DEL gotestStream
*/
_, err = admin.Handle().Del("gotestStream").Result()
if err != nil {
panic(err)
}
}()
/*
XGROUP CREATE gotestStream gotestGroup $ MKSTREAM
*/
_, err = admin.CreateConsumerGroupAndStream("gotestStream", "gotestGroup", redis.StreamLastDeliveredID)
if err != nil {
panic(err)
}
}
// publish
{
conf := redis.ProducerConfig{
UniversalOptions: &redis.UniversalOptions{
Addrs: EVN_REDIS_SERVERS,
DB: 0,
},
}
p, err := redis.NewProducer(&conf)
if err != nil {
panic(err)
}
defer p.Close()
// produce message
{
publishMessages := []struct {
id string
values map[string]interface{}
}{
{id: "4567-0", values: map[string]interface{}{"name": "luffy", "age": 19}},
{id: "4567-1", values: map[string]interface{}{"name": "nami", "age": 21}},
{id: "4567-2", values: map[string]interface{}{"name": "zoro", "age": 21}},
}
for _, message := range publishMessages {
reply, err := p.Write("gotestStream", message.values, redis.WithMessageID(message.id))
if err != nil {
panic(err)
}
_ = reply
fmt.Printf("ID: %s\n", reply)
}
}
}
// subscribe
{
// the config only for test use !!
opt := redis.UniversalOptions{
Addrs: EVN_REDIS_SERVERS,
DB: 0,
}
c := &redis.Consumer{
Group: "gotestGroup",
Name: "gotestConsumer",
RedisOption: &opt,
MaxInFlight: 1,
MaxPollingTimeout: 10 * time.Millisecond,
ClaimMinIdleTime: 30 * time.Millisecond,
IdlingTimeout: 2000 * time.Millisecond,
ClaimSensitivity: 2,
ClaimOccurrenceRate: 2,
MessageHandler: func(message *redis.Message) {
fmt.Printf("Message on %s: %v\n", message.Stream, message.XMessage)
message.Ack()
message.Del()
},
ErrorHandler: func(err error) (disposed bool) {
fmt.Println(err)
return true
},
}
err := c.Subscribe(
redis.Stream("gotestStream"),
)
if err != nil {
panic(err)
}
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
select {
case <-ctx.Done():
c.Close()
break
}
}
}
Output: ID: 4567-0 ID: 4567-1 ID: 4567-2 Message on gotestStream: &{4567-0 map[age:19 name:luffy]} Message on gotestStream: &{4567-1 map[age:21 name:nami]} Message on gotestStream: &{4567-2 map[age:21 name:zoro]} Retained messages: 0
Index ¶
- Constants
- func DefaultLogger() *log.Logger
- type AdminClient
- func (c *AdminClient) Close() error
- func (c *AdminClient) CreateConsumerGroup(stream, group, offset string) (string, error)
- func (c *AdminClient) CreateConsumerGroupAndStream(stream, group, offset string) (string, error)
- func (c *AdminClient) DeleteConsumer(stream, group, consumer string) (int64, error)
- func (c *AdminClient) DeleteConsumerGroup(stream, group string) (int64, error)
- func (c *AdminClient) Handle() redis.UniversalClient
- func (c *AdminClient) SetConsumerGroupOffset(stream, group, offset string) (string, error)
- type Consumer
- type ConsumerError
- type ConsumerOffset
- type CyclicCounter
- type DecodeMessageContentOption
- type DecodeMessageContentOptionFunc
- type DecodeMessageContentSetting
- type ErrorHandleProc
- type Forwarder
- type ForwarderRunner
- type Message
- type MessageContent
- type MessageDelegate
- type MessageHandleProc
- type MessageState
- func (s *MessageState) Del(name string) interface{}
- func (s *MessageState) Has(name string) bool
- func (s *MessageState) Len() int
- func (s *MessageState) Set(name string, value interface{}) (old interface{}, err error)
- func (s *MessageState) SetString(name, value string) (old interface{}, err error)
- func (s *MessageState) Value(name string) interface{}
- func (s *MessageState) Visit(visit func(name string, value interface{}))
- type ProduceMessageContentOption
- type ProduceMessageIDOption
- type ProduceMessageOption
- type Producer
- func (p *Producer) Close()
- func (p *Producer) Handle() redis.UniversalClient
- func (p *Producer) Write(stream string, values map[string]interface{}, opts ...ProduceMessageOption) (string, error)
- func (p *Producer) WriteContent(stream string, msg *MessageContent, opts ...ProduceMessageOption) (string, error)
- type ProducerConfig
- type RedisError
- type Stream
- type StreamOffset
- type StreamOffsetInfo
- type UniversalClient
- type UniversalOptions
- type XMessage
- type XStream
Examples ¶
Constants ¶
View Source
const ( StreamAsteriskID string = "*" StreamLastDeliveredID string = "$" StreamZeroID string = "0" StreamZeroOffset ConsumerOffset = "0" StreamNeverDeliveredOffset ConsumerOffset = ">" StreamUnspecifiedOffset ConsumerOffset = "" Nil = redis.Nil LOGGER_PREFIX string = "[lib-redis-stream] " MAX_PENDING_FETCHING_SIZE int64 = 4096 MIN_PENDING_FETCHING_SIZE int64 = 16 PENDING_FETCHING_SIZE_COEFFICIENT int64 = 3 )
View Source
const ( MESSAGE_STATE_NAME_MAX_LENGTH = 255 MESSAGE_STATE_VALUE_MAX_SIZE = 0x0fff )
Variables ¶
This section is empty.
Functions ¶
func DefaultLogger ¶
Types ¶
type AdminClient ¶
type AdminClient struct {
// contains filtered or unexported fields
}
func NewAdminClient ¶
func NewAdminClient(opt *UniversalOptions) (*AdminClient, error)
func (*AdminClient) Close ¶
func (c *AdminClient) Close() error
func (*AdminClient) CreateConsumerGroup ¶
func (c *AdminClient) CreateConsumerGroup(stream, group, offset string) (string, error)
func (*AdminClient) CreateConsumerGroupAndStream ¶
func (c *AdminClient) CreateConsumerGroupAndStream(stream, group, offset string) (string, error)
func (*AdminClient) DeleteConsumer ¶
func (c *AdminClient) DeleteConsumer(stream, group, consumer string) (int64, error)
func (*AdminClient) DeleteConsumerGroup ¶
func (c *AdminClient) DeleteConsumerGroup(stream, group string) (int64, error)
func (*AdminClient) Handle ¶
func (c *AdminClient) Handle() redis.UniversalClient
func (*AdminClient) SetConsumerGroupOffset ¶
func (c *AdminClient) SetConsumerGroupOffset(stream, group, offset string) (string, error)
type Consumer ¶
type Consumer struct {
Group string
Name string
RedisOption *redis.UniversalOptions
MaxInFlight int64
MaxPollingTimeout time.Duration
ClaimMinIdleTime time.Duration
IdlingTimeout time.Duration // 若沒有任何訊息時等待多久
ClaimSensitivity int // Read 時取得的訊息數小於 n 的話, 執行 Claim
ClaimOccurrenceRate int32 // Read 每執行 n 次後 執行 Claim 1 次
MessageHandler MessageHandleProc
ErrorHandler ErrorHandleProc
Logger *log.Logger
// contains filtered or unexported fields
}
func (*Consumer) Subscribe ¶
func (c *Consumer) Subscribe(streams ...StreamOffsetInfo) error
type ConsumerError ¶
type ConsumerError struct {
// contains filtered or unexported fields
}
func (*ConsumerError) Error ¶
func (e *ConsumerError) Error() string
func (*ConsumerError) IsRedisError ¶
func (e *ConsumerError) IsRedisError() bool
func (*ConsumerError) Unwrap ¶
func (e *ConsumerError) Unwrap() error
type ConsumerOffset ¶ added in v0.3.3
type ConsumerOffset string
type CyclicCounter ¶
type CyclicCounter struct {
// contains filtered or unexported fields
}
type DecodeMessageContentOption ¶ added in v0.2.1
type DecodeMessageContentOption interface {
// contains filtered or unexported methods
}
func WithMessageStateKeyPrefix ¶ added in v0.2.1
func WithMessageStateKeyPrefix(prefix string) DecodeMessageContentOption
------------------------------
type DecodeMessageContentOptionFunc ¶ added in v0.2.1
type DecodeMessageContentOptionFunc func(setting *DecodeMessageContentSetting)
type DecodeMessageContentSetting ¶ added in v0.2.1
type DecodeMessageContentSetting struct {
MessageStateKeyPrefix string
}
type Forwarder ¶
type Forwarder struct {
*Producer
}
func NewForwarder ¶
func NewForwarder(config *ProducerConfig) (*Forwarder, error)
func (*Forwarder) Runner ¶
func (f *Forwarder) Runner() *ForwarderRunner
type ForwarderRunner ¶
type ForwarderRunner struct {
// contains filtered or unexported fields
}
func (*ForwarderRunner) Start ¶
func (r *ForwarderRunner) Start()
func (*ForwarderRunner) Stop ¶
func (r *ForwarderRunner) Stop()
type Message ¶
type Message struct {
*XMessage
ConsumerGroup string
Stream string
Delegate MessageDelegate
// contains filtered or unexported fields
}
func (*Message) Content ¶
func (m *Message) Content(opts ...DecodeMessageContentOption) *MessageContent
func (*Message) HasResponded ¶
type MessageContent ¶
type MessageContent struct {
State MessageState
Values map[string]interface{}
}
func DecodeMessageContent ¶
func DecodeMessageContent(container map[string]interface{}, opts ...DecodeMessageContentOption) *MessageContent
func NewMessageContent ¶
func NewMessageContent() *MessageContent
func (*MessageContent) WriteTo ¶
func (c *MessageContent) WriteTo(container map[string]interface{})
type MessageDelegate ¶
type MessageState ¶
type MessageState struct {
// contains filtered or unexported fields
}
func (*MessageState) Del ¶
func (s *MessageState) Del(name string) interface{}
func (*MessageState) Has ¶
func (s *MessageState) Has(name string) bool
func (*MessageState) Len ¶
func (s *MessageState) Len() int
func (*MessageState) Set ¶
func (s *MessageState) Set(name string, value interface{}) (old interface{}, err error)
func (*MessageState) SetString ¶
func (s *MessageState) SetString(name, value string) (old interface{}, err error)
func (*MessageState) Value ¶
func (s *MessageState) Value(name string) interface{}
func (*MessageState) Visit ¶
func (s *MessageState) Visit(visit func(name string, value interface{}))
type ProduceMessageContentOption ¶
type ProduceMessageContentOption func(msg *MessageContent) error
func WithTracePropagation ¶
func WithTracePropagation(ctx context.Context, propagator propagation.TextMapPropagator) ProduceMessageContentOption
type ProduceMessageIDOption ¶
func WithMessageID ¶
func WithMessageID(id string) ProduceMessageIDOption
type ProduceMessageOption ¶
type ProduceMessageOption interface {
// contains filtered or unexported methods
}
type Producer ¶
type Producer struct {
// contains filtered or unexported fields
}
func NewProducer ¶
func NewProducer(config *ProducerConfig) (*Producer, error)
func (*Producer) Handle ¶
func (p *Producer) Handle() redis.UniversalClient
func (*Producer) WriteContent ¶
func (p *Producer) WriteContent(stream string, msg *MessageContent, opts ...ProduceMessageOption) (string, error)
type ProducerConfig ¶
type ProducerConfig struct {
*UniversalOptions
Logger *log.Logger
}
type RedisError ¶
type RedisError interface {
RedisError()
}
type Stream ¶
type Stream string
func (Stream) NeverDeliveredOffset ¶
func (s Stream) NeverDeliveredOffset() StreamOffset
func (Stream) Offset ¶
func (s Stream) Offset(offset string) StreamOffset
func (Stream) Zero ¶
func (s Stream) Zero() StreamOffset
type StreamOffset ¶
type StreamOffset struct {
Stream string
Offset ConsumerOffset
}
type StreamOffsetInfo ¶
type StreamOffsetInfo interface {
// contains filtered or unexported methods
}
type UniversalClient ¶
type UniversalClient = redis.UniversalClient
type UniversalOptions ¶
type UniversalOptions = redis.UniversalOptions
Source Files
¶
- adminClient.go
- clientMessageDelegate.go
- consumer.go
- consumerClient.go
- consumerError.go
- cyclicCounter.go
- decodeMessageContentOption.go
- def.go
- forwarder.go
- forwarderRunner.go
- message.go
- messageContent.go
- messageState.go
- noCopy.go
- produceMessageContentOptions.go
- produceMessageIDOptions.go
- producer.go
- producerConfig.go
- stream.go
- streamOffset.go
- util.go
Click to show internal directories.
Click to hide internal directories.