Documentation
¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Buffer ¶
type Buffer struct {
MaxRecordCount int
// contains filtered or unexported fields
}
Buffer holds records and answers questions on when it should be periodically flushed.
func (*Buffer) FirstSeq ¶
FirstSequenceNumber returns the sequence number of the first record in the buffer.
func (*Buffer) Flush ¶
func (b *Buffer) Flush()
Flush empties the buffer and resets the sequence counter.
func (*Buffer) GetRecords ¶
GetRecords returns the records in the buffer.
func (*Buffer) RecordCount ¶
RecordCount returns the number of records in the buffer.
func (*Buffer) ShouldFlush ¶
ShouldFlush determines if the buffer has reached its target size.
type Checkpoint ¶
type Checkpoint interface {
CheckpointExists(shardID string) bool
SequenceNumber() string
SetCheckpoint(shardID string, sequenceNumber string)
}
Checkpoint interface for functions that checkpoints need to implement in order to track consumer progress.
type Config ¶
type Config struct {
// AppName is the application name and checkpoint namespace.
AppName string
// StreamName is the Kinesis stream.
StreamName string
// FlushInterval is a regular interval for flushing the buffer. Defaults to 1s.
FlushInterval time.Duration
// BufferSize determines the batch request size. Must not exceed 500. Defaults to 500.
BufferSize int
// Logger is the logger used. Defaults to log.Log.
Logger log.Interface
// Checkpoint for tracking progress of consumer.
Checkpoint Checkpoint
}
Config vars for the application
type Consumer ¶
type Consumer struct {
Config
// contains filtered or unexported fields
}
Consumer wraps the interaction with the Kinesis stream
func NewConsumer ¶
NewConsumer creates a new consumer with initialied kinesis connection
type HandlerFunc ¶
type HandlerFunc func(b Buffer)
HandlerFunc is a convenience type to avoid having to declare a struct to implement the Handler interface, it can be used like this:
consumer.AddHandler(connector.HandlerFunc(func(b Buffer) {
// ...
}))
func (HandlerFunc) HandleRecords ¶
func (h HandlerFunc) HandleRecords(b Buffer)
HandleRecords implements the Handler interface
type RedisCheckpoint ¶
type RedisCheckpoint struct {
AppName string
StreamName string
// contains filtered or unexported fields
}
RedisCheckpoint implements the Checkpont interface. Used to enable the Pipeline.ProcessShard to checkpoint it's progress while reading records from Kinesis stream.
func (*RedisCheckpoint) CheckpointExists ¶
func (c *RedisCheckpoint) CheckpointExists(shardID string) bool
CheckpointExists determines if a checkpoint for a particular Shard exists. Typically used to determine whether we should start processing the shard with TRIM_HORIZON or AFTER_SEQUENCE_NUMBER (if checkpoint exists).
func (*RedisCheckpoint) SequenceNumber ¶
func (c *RedisCheckpoint) SequenceNumber() string
SequenceNumber returns the current checkpoint stored for the specified shard.
func (*RedisCheckpoint) SetCheckpoint ¶
func (c *RedisCheckpoint) SetCheckpoint(shardID string, sequenceNumber string)
SetCheckpoint stores a checkpoint for a shard (e.g. sequence number of last record processed by application). Upon failover, record processing is resumed from this point.
