dms

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2021 License: Apache-2.0 Imports: 16 Imported by: 0

README

devcloud-go/dms

Feature
  1. support asynchronous consume kafka message and ensure message not lost.
  2. support consumption speed-limiting.
QuickStart
  1. First you need implement the OffsetPersist interface which is defined in offset_persist.go, the create_table sql see example/create_table.sql.
type OffsetPersist interface {
	Find(groupId, topic string, partition int) (int64, error)
	Save(groupId, topic string, partition int, offset int64) error
}
  1. Then you need implement the message Handler which is defined in method_info.go#L30
type BizHandler func(msg *sarama.ConsumerMessage) error
  1. Create a props for dms consumer, there are several modes of props, async and sync, you also can specify how to commit offset, interval or quantitative by set CommitInterval or CommitSize.
  • async: consume messages asynchronous
  • sync: consume messages synchronous
  1. Create a dms consumer to consume kafka messages.

See details in package example.

Note
  1. when using async mode, the pool size should be larger than topic*partition numbers.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type BizHandler

type BizHandler func(msg *sarama.ConsumerMessage) error

type Consumer

type Consumer struct {
	// contains filtered or unexported fields
}

func NewConsumer

func NewConsumer(ctx context.Context, methods []MethodInfo, propertiesMap map[string]*Properties, offsetPersist OffsetPersist) (*Consumer, error)

func (*Consumer) Close

func (c *Consumer) Close()

func (*Consumer) Consume

func (c *Consumer) Consume()

type DmsHandler

type DmsHandler struct {
	// contains filtered or unexported fields
}

func NewDmsHandler

func NewDmsHandler(
	ctx context.Context,
	methodInfo MethodInfo,
	pool *ants.Pool,
	limiter *rate.Limiter,
	properties *Properties,
	offsetPersist OffsetPersist) (*DmsHandler, error)

func (*DmsHandler) AddTopicToMethod

func (h *DmsHandler) AddTopicToMethod(method MethodInfo)

func (*DmsHandler) Cleanup

func (h *DmsHandler) Cleanup(sess sarama.ConsumerGroupSession) error

func (*DmsHandler) Close

func (h *DmsHandler) Close() error

func (*DmsHandler) ConsumeClaim

func (*DmsHandler) OnConsume

func (h *DmsHandler) OnConsume(msg *sarama.ConsumerMessage)

func (*DmsHandler) Setup

Setup implements ConsumerGroupHandler interface, when set disable auto commit, Setup will obtain a valid offset of groupId-topic-partition from kafka broker, db and the broker's beginning offset.

func (*DmsHandler) Start

func (h *DmsHandler) Start(wg *sync.WaitGroup)

type MethodInfo

type MethodInfo struct {
	GroupId  string
	Topics   []string
	BizGroup string
	Method   BizHandler
}

func (*MethodInfo) GetUniqueKey

func (m *MethodInfo) GetUniqueKey() string

type OffsetManager

type OffsetManager struct {
	// contains filtered or unexported fields
}

func NewOffsetManager

func NewOffsetManager(startOffset int64, blockCapacity, partition int, groupId, topic string, version byte) *OffsetManager

type OffsetNode

type OffsetNode struct {
	// contains filtered or unexported fields
}

func NewOffsetNode

func NewOffsetNode(capacity int) *OffsetNode

type OffsetPersist

type OffsetPersist interface {
	Find(groupId, topic string, partition int) (int64, error)
	Save(groupId, topic string, partition int, offset int64) error
}

type Properties

type Properties struct {
	Addrs           []string
	Async           bool
	OffsetBlockSize int
	BizRetryTimes   int
	LimitPerSecond  int

	// goroutine pool size
	PoolSize     int
	PoolTaskSize int

	SaramaConfig  *sarama.Config
	InitialOffset int64

	CommitSize     int // default partitionCount*OffsetBlockSize
	AutoCommit     bool
	CommitInterval time.Duration
}

func NewProperties

func NewProperties() *Properties

NewProperties return a default dms properties

func (*Properties) Clone

func (p *Properties) Clone() *Properties

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL