Documentation
¶
Index ¶
- Constants
- Variables
- func DefaultErrHandler(err k.Error)
- type Error
- type ICheckpointer
- type IError
- type IKinesis
- type IKinesumer
- type IProvisioner
- type IRecord
- type Kinesumer
- func (kin *Kinesumer) Begin() ([]*ShardWorker, error)
- func (kin *Kinesumer) End()
- func (kin *Kinesumer) GetShards() (shards []*kinesis.Shard, err error)
- func (kin *Kinesumer) GetStreams() (streams []string, err error)
- func (kin *Kinesumer) LaunchShardWorker(shards []*kinesis.Shard) (int, *ShardWorker, error)
- func (kin *Kinesumer) Records() <-chan k.Record
- func (kin *Kinesumer) StreamExists() (found bool, err error)
- type Options
- type Record
- type ShardWorker
- func (s *ShardWorker) GetRecords(it string) ([]*kinesis.Record, string, int64, error)
- func (s *ShardWorker) GetRecordsAndProcess(it, sequence string) (cont bool, nextIt string, nextSeq string)
- func (s *ShardWorker) GetShardIterator(iteratorType string, sequence string) (string, error)
- func (s *ShardWorker) RunWorker()
- func (s *ShardWorker) TryGetShardIterator(iteratorType string, sequence string) string
- type Unit
Constants ¶
View Source
const ( ECrit = "crit" EError = "error" EWarn = "warn" EInfo = "info" EDebug = "debug" )
Variables ¶
View Source
var DefaultOptions = Options{ ListStreamsLimit: 1000, DescribeStreamLimit: 10000, GetRecordsLimit: 10000, PollTime: 2000, MaxShardWorkers: 50, ErrHandler: DefaultErrHandler, DefaultIteratorType: "LATEST", }
Functions ¶
func DefaultErrHandler ¶
Types ¶
type ICheckpointer ¶
type ICheckpointer kinesumeriface.Checkpointer
type IKinesumer ¶
type IKinesumer kinesumeriface.Kinesumer
type IProvisioner ¶
type IProvisioner kinesumeriface.Provisioner
type Kinesumer ¶
type Kinesumer struct {
Kinesis k.Kinesis
Checkpointer k.Checkpointer
Provisioner k.Provisioner
Stream string
Options *Options
// contains filtered or unexported fields
}
func NewDefault ¶
func (*Kinesumer) Begin ¶
func (kin *Kinesumer) Begin() ([]*ShardWorker, error)
func (*Kinesumer) GetStreams ¶
func (*Kinesumer) LaunchShardWorker ¶
func (*Kinesumer) StreamExists ¶
type Record ¶
type Record struct {
// contains filtered or unexported fields
}
func (*Record) MillisBehindLatest ¶
func (*Record) PartitionKey ¶
func (*Record) SequenceNumber ¶
type ShardWorker ¶
type ShardWorker struct {
GetRecordsLimit int64
// contains filtered or unexported fields
}
func (*ShardWorker) GetRecords ¶
func (*ShardWorker) GetRecordsAndProcess ¶
func (s *ShardWorker) GetRecordsAndProcess(it, sequence string) (cont bool, nextIt string, nextSeq string)
func (*ShardWorker) GetShardIterator ¶
func (s *ShardWorker) GetShardIterator(iteratorType string, sequence string) (string, error)
func (*ShardWorker) RunWorker ¶
func (s *ShardWorker) RunWorker()
func (*ShardWorker) TryGetShardIterator ¶
func (s *ShardWorker) TryGetShardIterator(iteratorType string, sequence string) string
Source Files
¶
Click to show internal directories.
Click to hide internal directories.