Documentation
¶
Overview ¶
Package kinesis provides a Kinesis Data Streams source via aws-sdk-go-v2.
PRODUCTION USERS: prefer pkg/exec/lambda/kinesis. AWS Lambda's Kinesis event-source mapping owns shard discovery, lease coordination, automatic scaling on shard count (via ParallelizationFactor), checkpointing, and partial-batch retry semantics — everything this package punts on. The Lambda handler shares the same retry / dedup / metrics core (pkg/exec/processor) as streaming.Run, so the pipeline definition is identical.
This package is the polling ECS path, kept for dev/demo and for single-instance use cases where running a Lambda is unnecessary (integration tests, local one-shot consumers). It is NOT a production path: it is single-instance, has no checkpointing, and does not handle resharding mid-run.
Implementation: ListShards once at startup, spawn one goroutine per shard that loops GetRecords + decode + emit. EventID is "<stream>/<shard>/<sequenceNumber>", globally unique across the stream's history, suitable for at-least-once dedup at the state-store boundary.
What's missing for production scale (intentionally — Lambda owns it):
- Lease management / shard rebalancing across multiple workers.
- Checkpointing across restarts.
- Resharding mid-run.
We do not plan to bring KCL v3 Go in-tree — the Lambda event-source mapping is the supported production path. If you need KCL semantics inside an ECS worker (e.g. for a long-running per-record cost that exceeds Lambda's 15-minute cap), wire awslabs/amazon-kinesis-client-go yourself against the source.Source contract.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Config ¶
type Config[T any] struct { Client *kinesis.Client StreamName string Decode Decoder[T] // StartingPosition controls where each shard begins. Defaults to LATEST. StartingPosition types.ShardIteratorType // StartTimestamp is required when StartingPosition is AT_TIMESTAMP. StartTimestamp *time.Time // PollInterval is the sleep between empty GetRecords calls per shard. Defaults // to 1s. Lower values reduce latency at the cost of higher API call volume. PollInterval time.Duration // RecordsPerCall caps GetRecords output. Default 1000 (the Kinesis maximum). RecordsPerCall int32 // OnDecodeError, if non-nil, is called for every record whose Decode returned // an error. Default behavior is to drop silently. Wire to a DLQ / metrics // recorder to surface poison pills. OnDecodeError func(raw []byte, shardID, sequenceNumber string, err error) // OnGetRecordsError, if non-nil, is called for every GetRecords error before // the per-shard loop backs off and retries. Wire to logging / metrics. OnGetRecordsError func(streamName, shardID string, err error) }
Config configures a Kinesis Source.
type Decoder ¶
Decoder converts a raw Kinesis record's Data to a typed Record value.
func JSONDecoder ¶
JSONDecoder returns a Decoder that unmarshals JSON into T.
type Source ¶
type Source[T any] struct { // contains filtered or unexported fields }
Source implements source.Source for a Kinesis Data Stream.
func NewSource ¶
NewSource constructs a Kinesis Source. The returned Source does not own the client; callers manage its lifecycle.