Documentation
¶
Index ¶
- func ClassifyS3Error(task, bucket, key, region string, err error) *engine.TaskError
- type Downloader
- type DownloaderFactory
- type EmitResult
- func StreamGzipFunc(ctx context.Context, uploader Uploader, bucket, key string, ...) (EmitResult, error)
- func StreamGzipJSON(ctx context.Context, uploader Uploader, bucket, key string, obj any) (EmitResult, error)
- func StreamGzipNDJSON[T any](ctx context.Context, uploader Uploader, bucket, key string, records []T) (EmitResult, error)
- type ObjectLister
- type ObjectListerFactory
- type TransferClient
- type TransferClientFactory
- type Uploader
- type UploaderFactory
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func ClassifyS3Error ¶ added in v0.0.31
ClassifyS3Error wraps an S3 error with operator-actionable context. The returned TaskError includes the task name, S3 coordinates, a human-readable message, and a hint for resolution.
Types ¶
type Downloader ¶ added in v0.0.30
type Downloader interface {
GetObject(ctx context.Context, input *s3.GetObjectInput, opts ...func(*s3.Options)) (*s3.GetObjectOutput, error)
}
Downloader abstracts S3 GetObject for streaming reads. Unlike TransferClient (which writes to io.WriterAt), Downloader returns a streaming io.ReadCloser body suitable for gzip decompression.
func DefaultDownloaderFactory ¶ added in v0.0.30
func DefaultDownloaderFactory(ctx context.Context, region string) (Downloader, error)
DefaultDownloaderFactory creates a real S3 client for streaming downloads.
type DownloaderFactory ¶ added in v0.0.30
type DownloaderFactory func(ctx context.Context, region string) (Downloader, error)
DownloaderFactory builds a Downloader for a given region.
type EmitResult ¶ added in v0.0.60
type EmitResult struct {
// UncompressedSHA256 is the hex digest of the bytes fed into gzip.
UncompressedSHA256 string
}
EmitResult reports what was published: the hex SHA-256 over the uncompressed payload, surfaced in the TaskResult/log so an operator can verify the decompressed object after gunzip. It is the canonical reader-verifiable seal; it is not embedded in the object itself (a payload cannot carry the hash of its own bytes).
func StreamGzipFunc ¶ added in v0.0.60
func StreamGzipFunc(ctx context.Context, uploader Uploader, bucket, key string, write func(io.Writer) error) (EmitResult, error)
StreamGzipFunc gzips and streams whatever write emits, under the same seal as StreamGzipNDJSON. It exists for producers that generate the payload lazily (e.g. querying one block at a time) and so must not materialize the whole record set in memory first. write receives the destination writer and must emit the complete uncompressed payload.
func StreamGzipJSON ¶ added in v0.0.60
func StreamGzipJSON(ctx context.Context, uploader Uploader, bucket, key string, obj any) (EmitResult, error)
StreamGzipJSON gzips a single indented JSON object and streams it to S3 with the same uncompressed-payload seal as StreamGzipNDJSON.
func StreamGzipNDJSON ¶ added in v0.0.60
func StreamGzipNDJSON[T any](ctx context.Context, uploader Uploader, bucket, key string, records []T) (EmitResult, error)
StreamGzipNDJSON gzips each record as one NDJSON line and streams it to S3 without buffering the whole payload, computing a SHA-256 over the uncompressed bytes as they pass. The integrity seal is twofold:
- ChecksumAlgorithm=SHA256 on the put: the SDK sends a trailing aws-chunked checksum, so S3 validates a SHA-256 of the compressed bytes it received without precomputing — the wire seal. For a multipart upload (payload over the uploader's threshold) S3 stores the composite-of-parts form (`<hash>-N`), still a valid per-part seal but not a flat SHA-256 of the body a reader can recompute.
- the returned UncompressedSHA256: the logical seal over the pre-gzip payload, surfaced via EmitResult so a reader verifies the decompressed bytes independently. This, not the S3-side checksum, is the canonical reader-verifiable seal.
io.Pipe backpressure is preserved: the marshaling goroutine blocks on the uploader's reads, so memory stays bounded to the uploader's part-buffer pool (~part_size × (concurrency+1)), independent of total payload size.
type ObjectLister ¶ added in v0.0.27
type ObjectLister interface {
ListObjectsV2(ctx context.Context, input *s3.ListObjectsV2Input, opts ...func(*s3.Options)) (*s3.ListObjectsV2Output, error)
}
ObjectLister abstracts S3 ListObjectsV2 for snapshot discovery.
func DefaultObjectListerFactory ¶ added in v0.0.27
func DefaultObjectListerFactory(ctx context.Context, region string) (ObjectLister, error)
DefaultObjectListerFactory creates a real S3 client for listing objects.
type ObjectListerFactory ¶ added in v0.0.27
type ObjectListerFactory func(ctx context.Context, region string) (ObjectLister, error)
ObjectListerFactory builds an ObjectLister for a given region.
type TransferClient ¶
type TransferClient interface {
DownloadObject(ctx context.Context, input *transfermanager.DownloadObjectInput, opts ...func(*transfermanager.Options)) (*transfermanager.DownloadObjectOutput, error)
}
TransferClient abstracts S3 downloads. DownloadObject uses the transfer manager's io.WriterAt path for parallel byte-range downloads.
func DefaultTransferClientFactory ¶
func DefaultTransferClientFactory(ctx context.Context, region string) (TransferClient, error)
DefaultTransferClientFactory creates a transfer manager backed by a real S3 service client.
type TransferClientFactory ¶
type TransferClientFactory func(ctx context.Context, region string) (TransferClient, error)
TransferClientFactory builds a TransferClient for a given region.
type Uploader ¶
type Uploader interface {
UploadObject(ctx context.Context, input *transfermanager.UploadObjectInput, opts ...func(*transfermanager.Options)) (*transfermanager.UploadObjectOutput, error)
}
Uploader abstracts the transfermanager upload call for testing.