Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BatchHandler ¶
type BatchHandler interface {
Handler
// Batch processing the results returned by Handler.Handle.
Batch(ctx context.Context, data []interface{}) error
}
BatchHandler one more Batch method than Handler.
type HandleFunc ¶
HandleFunc type for Handler.Handle Func.
type Handler ¶
type Handler interface {
// Info set the topic name and some config.
Info() *Info
// Handle for *kafka.Message.
Handle(ctx context.Context, msg *kafka.Message) (interface{}, error)
}
Handler only include Info and Handle func.
type Info ¶
type Info struct {
// used to get reader from otkafka.ReaderMaker.
// default: "default"
Name string
// reader workers count.
// default: 1
ReadWorker int
// batch workers count.
// default: 1
BatchWorker int
// data size for batch processing.
// default: 1
BatchSize int
// handler workers count.
HandleWorker int
// the size of the data channel.
// default: 100
ChanSize int
// run the batchFunc automatically at specified intervals, avoid not executing without reaching BatchSize
// default: 30s
AutoBatchInterval time.Duration
}
Info the info of BatchHandler.
Note:
If sequence is necessary, make sure that per worker count is one. Multiple goroutines cannot guarantee the order in which data is processed.
type Out ¶
Out to provide Handler to in.Handlers.
func NewOut ¶
NewOut create Out to provide Handler to in.Handlers.
Usage:
func newHandlerA(logger log.Logger) processor.Out {
return processor.NewOut(
&HandlerA{logger: logger},
)
}
Or
func newHandlers(logger log.Logger) processor.Out {
return processor.NewOut(
&HandlerA{logger: logger},
&HandlerB{logger: logger},
)
}
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor dispatch Handler.
func (*Processor) ProvideCloser ¶ added in v0.6.1
func (e *Processor) ProvideCloser()
ProvideCloser implements container.CloserProvider for the Module.
func (*Processor) ProvideRunGroup ¶
ProvideRunGroup run workers:
- Fetch message from *kafka.Reader.
- Handle message by Handler.Handle.
- Batch data by BatchHandler.Batch. If batch success then commit message.
Click to show internal directories.
Click to hide internal directories.