kinesis

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 12, 2026 License: Apache-2.0 Imports: 10 Imported by: 0

Documentation

Overview

Package kinesis provides a Kinesis Data Streams source via aws-sdk-go-v2.

PRODUCTION USERS: prefer pkg/exec/lambda/kinesis. AWS Lambda's Kinesis event-source mapping owns shard discovery, lease coordination, automatic scaling on shard count (via ParallelizationFactor), checkpointing, and partial-batch retry semantics — everything this package punts on. The Lambda handler shares the same retry / dedup / metrics core (pkg/exec/processor) as streaming.Run, so the pipeline definition is identical.

This package is the polling ECS path, kept for dev/demo and for single-instance use cases where running a Lambda is unnecessary (integration tests, local one-shot consumers). It is NOT a production path: it is single-instance, has no checkpointing, and does not handle resharding mid-run.

Implementation: ListShards once at startup, spawn one goroutine per shard that loops GetRecords + decode + emit. EventID is "<stream>/<shard>/<sequenceNumber>", globally unique across the stream's history, suitable for at-least-once dedup at the state-store boundary.

What's missing for production scale (intentionally — Lambda owns it):

  • Lease management / shard rebalancing across multiple workers.
  • Checkpointing across restarts.
  • Resharding mid-run.

We do not plan to bring KCL v3 Go in-tree — the Lambda event-source mapping is the supported production path. If you need KCL semantics inside an ECS worker (e.g. for a long-running per-record cost that exceeds Lambda's 15-minute cap), wire awslabs/amazon-kinesis-client-go yourself against the source.Source contract.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config[T any] struct {
	Client     *kinesis.Client
	StreamName string
	Decode     Decoder[T]

	// StartingPosition controls where each shard begins. Defaults to LATEST.
	StartingPosition types.ShardIteratorType

	// StartTimestamp is required when StartingPosition is AT_TIMESTAMP.
	StartTimestamp *time.Time

	// PollInterval is the sleep between empty GetRecords calls per shard. Defaults
	// to 1s. Lower values reduce latency at the cost of higher API call volume.
	PollInterval time.Duration

	// RecordsPerCall caps GetRecords output. Default 1000 (the Kinesis maximum).
	RecordsPerCall int32

	// OnDecodeError, if non-nil, is called for every record whose Decode returned
	// an error. Default behavior is to drop silently. Wire to a DLQ / metrics
	// recorder to surface poison pills.
	OnDecodeError func(raw []byte, shardID, sequenceNumber string, err error)

	// OnGetRecordsError, if non-nil, is called for every GetRecords error before
	// the per-shard loop backs off and retries. Wire to logging / metrics.
	OnGetRecordsError func(streamName, shardID string, err error)
}

Config configures a Kinesis Source.

type Decoder

type Decoder[T any] func([]byte) (T, error)

Decoder converts a raw Kinesis record's Data to a typed Record value.

func JSONDecoder

func JSONDecoder[T any]() Decoder[T]

JSONDecoder returns a Decoder that unmarshals JSON into T.

type Source

type Source[T any] struct {
	// contains filtered or unexported fields
}

Source implements source.Source for a Kinesis Data Stream.

func NewSource

func NewSource[T any](cfg Config[T]) (*Source[T], error)

NewSource constructs a Kinesis Source. The returned Source does not own the client; callers manage its lifecycle.

func (*Source[T]) Close

func (s *Source[T]) Close() error

Close is a no-op; the underlying client is owned by the caller.

func (*Source[T]) Name

func (s *Source[T]) Name() string

Name returns "kinesis:<stream>".

func (*Source[T]) Read

func (s *Source[T]) Read(ctx context.Context, out chan<- source.Record[T]) error

Read consumes all shards in parallel until ctx is canceled or every shard returns nil NextShardIterator (closed shards). Returns nil on graceful shutdown.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL