Versions in this module Expand all Collapse all v2 v2.0.0 May 31, 2022 Changes in this version + var ErrStreamConsumerAlreadyListened = errors.New("流已经被监听了") + var ErrStreamConsumerNotListeningYet = errors.New("流未被监听") + var ErrStreamNeedToPointOutGroups = errors.New("stream操作需要指定消费者组") + type Consumer struct + TopicInfos map[string]string + func NewConsumer(kb *clientkeybatch.ClientKeyBatch, opts ...broker.Option) *Consumer + func (s *Consumer) Get(ctx context.Context, timeout time.Duration) ([]redis.XStream, error) + func (s *Consumer) Listen(asyncHanddler bool, p ...event.Parser) error + func (s *Consumer) StopListening() error + type Producer struct + func NewProducer(k *Stream, opts ...broker.Option) *Producer + func (p *Producer) PubEvent(ctx context.Context, payload interface{}) (*event.Event, error) + func (p *Producer) Publish(ctx context.Context, payload interface{}) error + type Stream struct + MaxLen int64 + Strict bool + func New(k *clientkey.ClientKey, maxlen int64, strict bool) *Stream + func (s *Stream) Ack(ctx context.Context, groupname string, ids ...string) error + func (s *Stream) AsProducer(opts ...broker.Option) *Producer + func (s *Stream) CreateGroup(ctx context.Context, groupname, start string, autocreate bool) (string, error) + func (s *Stream) Delete(ctx context.Context, ids ...string) error + func (s *Stream) DeleteConsumerFromGroup(ctx context.Context, groupname, consumername string) (int64, error) + func (s *Stream) DeleteGroup(ctx context.Context, groupname string) (int64, error) + func (s *Stream) GroupInfos(ctx context.Context) ([]redis.XInfoGroup, error) + func (s *Stream) HasGroup(ctx context.Context, groupname string) (bool, error) + func (s *Stream) HasGroups(ctx context.Context, groupnames ...string) (bool, error) + func (s *Stream) Len(ctx context.Context) (int64, error) + func (s *Stream) Move(ctx context.Context, groupname, toconsumer string, minIdle time.Duration, ...) ([]redis.XMessage, error) + func (s *Stream) MoveJustID(ctx context.Context, groupname, toconsumer string, minIdle time.Duration, ...) ([]string, error) + func (s *Stream) Pending(ctx context.Context, groupname string) (*redis.XPending, error) + func (s *Stream) Range(ctx context.Context, start, stop string) ([]redis.XMessage, error) + func (s *Stream) SetGroupStartAt(ctx context.Context, groupname, start string) (string, error) + func (s *Stream) Trim(ctx context.Context, count int64, strict bool) (int64, error) Other modules containing this package github.com/Golang-Tools/redishelper