kinesumer

package
v0.9.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 27, 2015 License: MIT, BSD-2-Clause Imports: 8 Imported by: 0

README

Kinesumer

Circle CI

Kinesumer is a simple Go client library for Amazon AWS Kinesis. It aims to be a native Go alternative to Amazon's KCL. Kinesumer includes a tool (called kinesumer) that lets you tail Kinesis streams and check the status of Kinesumer workers.

Features

  • Automatically manages one consumer goroutine per shard.
  • Handles shard splitting and merging properly.
  • Provides a simple channel interface for incoming Kinesis records.
  • Provides a tool for managing Kinesis streams:
    • Tailing a stream

Using the package

Install

go get github.com/remind101/kinesumer

Example Program

package main

import (
	"fmt"
	"os"

	"github.com/remind101/kinesumer"
)

func main() {
	k, err := kinesumer.NewDefaultKinesumer(
		"Stream",
	)
	if err != nil {
		panic(err)
	}
	k.Begin()
	defer k.End()
	for i := 0; i < 100; i++ {
		rec := <-k.Records()
		fmt.Println(string(rec.Data()))
	}
}

Using the tool

Install

go get -u github.com/remind101/kinesumer/cmd/kinesumer

To tail a stream make sure you have AWS credentials ready (either in ~/.aws or in env vars) and run:

kinesumer tail -s STREAM_NAME

Documentation

Index

Constants

View Source
const (
	ECrit  = "crit"
	EError = "error"
	EWarn  = "warn"
	EInfo  = "info"
	EDebug = "debug"
)

Variables

View Source
var DefaultOptions = Options{

	ListStreamsLimit:    1000,
	DescribeStreamLimit: 10000,
	GetRecordsLimit:     10000,

	PollTime:            2000,
	MaxShardWorkers:     50,
	ErrHandler:          DefaultErrHandler,
	DefaultIteratorType: "LATEST",
}

Functions

func DefaultErrHandler

func DefaultErrHandler(err k.Error)

Types

type Error

type Error struct {
	// contains filtered or unexported fields
}

func NewError

func NewError(severity, message string, origin error) *Error

func (*Error) Error

func (e *Error) Error() string

func (*Error) Origin

func (e *Error) Origin() error

func (*Error) Severity

func (e *Error) Severity() string

type ICheckpointer

type ICheckpointer kinesumeriface.Checkpointer

type IError

type IError kinesumeriface.Error

type IKinesis

type IKinesis kinesumeriface.Kinesis

type IKinesumer

type IKinesumer kinesumeriface.Kinesumer

type IProvisioner

type IProvisioner kinesumeriface.Provisioner

type IRecord

type IRecord kinesumeriface.Record

type Kinesumer

type Kinesumer struct {
	Kinesis      k.Kinesis
	Checkpointer k.Checkpointer
	Provisioner  k.Provisioner
	Stream       string
	Options      *Options
	// contains filtered or unexported fields
}

func New

func New(kinesis k.Kinesis, checkpointer k.Checkpointer, provisioner k.Provisioner,
	randSource rand.Source, stream string, opt *Options) (*Kinesumer, error)

func NewDefault

func NewDefault(stream string) (*Kinesumer, error)

func (*Kinesumer) Begin

func (kin *Kinesumer) Begin() ([]*ShardWorker, error)

func (*Kinesumer) End

func (kin *Kinesumer) End()

func (*Kinesumer) GetShards

func (kin *Kinesumer) GetShards() (shards []*kinesis.Shard, err error)

func (*Kinesumer) GetStreams

func (kin *Kinesumer) GetStreams() (streams []string, err error)

func (*Kinesumer) LaunchShardWorker

func (kin *Kinesumer) LaunchShardWorker(shards []*kinesis.Shard) (int, *ShardWorker, error)

func (*Kinesumer) Records

func (kin *Kinesumer) Records() <-chan k.Record

func (*Kinesumer) StreamExists

func (kin *Kinesumer) StreamExists() (found bool, err error)

type Options

type Options struct {
	ListStreamsLimit    int64
	DescribeStreamLimit int64
	GetRecordsLimit     int64
	PollTime            int
	MaxShardWorkers     int
	ErrHandler          func(k.Error)
	DefaultIteratorType string
}

type Record

type Record struct {
	// contains filtered or unexported fields
}

func (*Record) Data

func (r *Record) Data() []byte

func (*Record) Done

func (r *Record) Done()

func (*Record) MillisBehindLatest

func (r *Record) MillisBehindLatest() int64

func (*Record) PartitionKey

func (r *Record) PartitionKey() string

func (*Record) SequenceNumber

func (r *Record) SequenceNumber() string

func (*Record) ShardId

func (r *Record) ShardId() string

type ShardWorker

type ShardWorker struct {
	GetRecordsLimit int64
	// contains filtered or unexported fields
}

func (*ShardWorker) GetRecords

func (s *ShardWorker) GetRecords(it string) ([]*kinesis.Record, string, int64, error)

func (*ShardWorker) GetRecordsAndProcess

func (s *ShardWorker) GetRecordsAndProcess(it, sequence string) (cont bool, nextIt string, nextSeq string)

func (*ShardWorker) GetShardIterator

func (s *ShardWorker) GetShardIterator(iteratorType string, sequence string) (string, error)

func (*ShardWorker) RunWorker

func (s *ShardWorker) RunWorker()

func (*ShardWorker) TryGetShardIterator

func (s *ShardWorker) TryGetShardIterator(iteratorType string, sequence string) string

type Unit

type Unit struct{}

Unit has only one possible value, Unit{}, and is used to make signal channels to tell the workers when to stop

Directories

Path Synopsis
checkpointers
cmd
kinesumer command
provisioners

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL