consumer

package
v1.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 26, 2021 License: MIT Imports: 11 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrCommitNotFound = errors.New("commit not found")
)

Functions

This section is empty.

Types

type CommitReader

type CommitReader interface {
	GetCommit(ctx context.Context, cid cid.Cid) (commit, error)
}

type Consumer

type Consumer interface {
	Consume(globalCtx context.Context, handler MessageHandler) error
}

Consumer defines an interface for blocking action for listening for incoming events and invoking handler on each of them

type FakeCommitReader

type FakeCommitReader struct {
	Commits map[cid.Cid]commit
}

func (FakeCommitReader) GetCommit

func (f FakeCommitReader) GetCommit(ctx context.Context, cid cid.Cid) (commit, error)

type FirstToLastConsumer

type FirstToLastConsumer struct {
	// contains filtered or unexported fields
}

func NewFirstToLastConsumer

func NewFirstToLastConsumer(headReader thead.Reader, consumerOffsetManager thead.Manager, messageReader storage.MessageReader, config FirstToLastConsumerConfig) *FirstToLastConsumer

func (*FirstToLastConsumer) Consume

func (f *FirstToLastConsumer) Consume(globalCtx context.Context, handler MessageHandler) error

type FirstToLastConsumerConfig

type FirstToLastConsumerConfig struct {
	PollInterval time.Duration
	PollTimeout  time.Duration
}

type MessageFandlerFunc

type MessageFandlerFunc func(ctx context.Context, message storage.ProtoUnmarshallable) error

func (MessageFandlerFunc) Handle

type MessageHandler

type MessageHandler interface {
	Handle(ctx context.Context, message storage.ProtoUnmarshallable) error
}

type StorageCommitReader

type StorageCommitReader struct {
	// contains filtered or unexported fields
}

func NewStorageCommitReader

func NewStorageCommitReader(reader storage.MessageReader, timeout time.Duration) *StorageCommitReader

func (*StorageCommitReader) GetCommit

func (cr *StorageCommitReader) GetCommit(ctx context.Context, cid cid.Cid) (commit, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL