shipper

package
v1.2.9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2025 License: Apache-2.0 Imports: 31 Imported by: 0

Documentation

Overview

Package shipper provides metric upload and transmission services for the CloudZero Agent. This package implements the critical Secondary Adapter responsible for delivering collected and processed metrics to the CloudZero cost optimization platform for billing analysis.

The shipper service operates as the final stage in the CloudZero Agent metric processing pipeline:

  1. Periodic file discovery and processing from local storage
  2. Presigned URL allocation from CloudZero platform for secure S3 upload
  3. Parallel file upload with retry logic and error handling
  4. File lifecycle management (marking uploaded, cleanup, replay handling)
  5. Disk space management and purging of old metric files

Key architectural responsibilities:

  • Reliable metric delivery: Ensure all collected metrics reach CloudZero platform
  • Batched processing: Handle file uploads in chunks for optimal performance
  • Replay mechanisms: Reprocess files that failed upload due to transient errors
  • Resource management: Monitor disk usage and enforce retention policies
  • Operational monitoring: Provide comprehensive metrics for shipper health

The service implements robust error handling and recovery mechanisms essential for production environments where network connectivity and platform availability may vary. All operations are instrumented with detailed metrics and logging for operational visibility.

Integration points:

  • Storage layer: Reads processed metric files from disk storage
  • CloudZero API: Requests presigned URLs and uploads data
  • Prometheus metrics: Provides operational monitoring and alerting data
  • Configuration service: Receives upload intervals and retention policies

Package shipper provides domain logic for the shipper.

Index

Constants

View Source
const (
	TmpfsMagic = 0x01021994
	Ext4Magic  = 0xEF53
	XfsMagic   = 0x58465342
)

Linux filesystem type constants

View Source
const (
	HTTPMaxRetries   = 10
	HTTPRetryWaitMax = time.Second * 30
)

Keep your constants, they are good for configuring the retryablehttp.Client

View Source
const (
	ReplaySubDirectory      = "replay"
	UploadedSubDirectory    = "uploaded"
	CriticalPurgePercent    = 20
	ReplayRequestHeader     = "X-CloudZero-Replay"
	ShipperIDRequestHeader  = "X-CloudZero-Shipper-ID"
	AppVersionRequestHeader = "X-CloudZero-Version"
)

public

View Source
const ShipperErrorDefault = "unknown error"

ShipperErrorDefault is the default code given when the specific error type is not found

Variables

View Source
var (
	// HTTP errors
	ErrUnauthorized      = NewShipperError("err-unauthorized", "unauthorized request - possible invalid API key")
	ErrNoURLs            = NewShipperError("err-no-urls", "no presigned URLs returned")
	ErrInvalidShipperID  = NewShipperError("err-invalid-shipper-id", "failed to get the shipper id")
	ErrEncodeBody        = NewShipperError("err-encode-body", "failed to encode the body into a foreign format")
	ErrGetRemoteBase     = NewShipperError("err-get-remote-base", "failed to get the remote endpoint api base from the config file")
	ErrHTTPRequestFailed = NewShipperError("err-http-request-failed", "the http request failed")
	ErrHTTPUnknown       = NewShipperError("err-http-unknown", "there was an unknown issue with the http request")
	ErrInvalidBody       = NewShipperError("err-invalid-body", "decoding a response/object failed")
	ErrExpiredURL        = NewShipperError("err-expired-url", "the requested url has expired")

	ErrCreateDirectory = NewShipperError("err-dir-create", "failed to create the requested directory")
	ErrCreateLock      = NewShipperError("err-lock-create", "failed to create or acquire the lock")
	ErrReleaseLock     = NewShipperError("err-lock-release", "failed to release the lock")

	ErrFilesList  = NewShipperError("err-files-walk", "failed to list/walk the files")
	ErrFileRemove = NewShipperError("err-file-remove", "failed to remove a file")
	ErrFileCreate = NewShipperError("err-file-create", "failed to create a file")
	ErrFileRead   = NewShipperError("err-file-read", "failed to read a file")

	ErrStorageCleanup = NewShipperError("err-storage-cleanup", "failed to clean up the disk")
	ErrGetDiskUsage   = NewShipperError("err-get-disk-usage", "failed to get the disk usage")
)

Define sentinel errors for each expected condition. These errors can be used with errors.Is and errors.As to detect specific issues.

Functions

func Chunk

func Chunk[T any](list []T, n int) [][]T

Chunk splits a list into a matrix of elements with a size of `n`

func GetErrStatusCode

func GetErrStatusCode(err error) string

GetErrStatusCode extracts the error code from any wrapped ShipperError. If no matching ShipperError is found in the chain, "-1" is returned to indicate an unknown error.

func GetRemoteFileID

func GetRemoteFileID(file types.File) string

GetRemoteFileID creates the remote file id for the transposed file

func GetRootFileID added in v1.2.0

func GetRootFileID(file string) string

GetRootFileID returns the file id with no file extensions or path information

func InitMetrics

func InitMetrics() (*instr.PrometheusMetrics, error)

func InspectHTTPResponse added in v1.2.0

func InspectHTTPResponse(ctx context.Context, resp *http.Response) error

func NewHTTPClient added in v1.2.0

func NewHTTPClient(ctx context.Context, s *config.Settings) *retryablehttp.Client

Types

type AbandonAPIPayloadFile

type AbandonAPIPayloadFile struct {
	ReferenceID string `json:"reference_id"` //nolint:tagliatelle // downstream expects camel case
	Reason      string `json:"reason"`
}

type AllocatePresignedURLsResponse added in v1.2.0

type AllocatePresignedURLsResponse struct {
	Allocation PresignedURLPayload
	Replay     PresignedURLPayload
}

type CleanupResult added in v1.2.0

type CleanupResult struct {
	FilesRemoved   int
	BytesFreed     uint64
	PressureBefore PressureLevel
	PressureAfter  PressureLevel
}

CleanupResult tracks what was cleaned up

type DiskManager added in v1.2.0

type DiskManager struct {
	Store              types.ReadableStore
	Metrics            *instr.PrometheusMetrics
	StoragePath        string
	AvailableSizeBytes uint64
}

DiskManager handles disk usage monitoring and cleanup

func (*DiskManager) CalculatePressureLevel added in v1.2.0

func (dm *DiskManager) CalculatePressureLevel(usage *types.StoreUsage) PressureLevel

CalculatePressureLevel determines cleanup aggression needed

func (*DiskManager) GetCleanupPercentage added in v1.2.0

func (dm *DiskManager) GetCleanupPercentage(pressure PressureLevel) int

GetCleanupPercentage returns how much to clean based on pressure

func (*DiskManager) IsMemoryBacked added in v1.2.0

func (dm *DiskManager) IsMemoryBacked() bool

IsMemoryBacked detects if the storage path is backed by memory (tmpfs)

func (*DiskManager) ManageDiskUsage added in v1.2.0

func (dm *DiskManager) ManageDiskUsage(ctx context.Context, metricCutoff time.Time) error

ManageDiskUsage handles the complete disk management cycle

func (*DiskManager) PurgeFilesBefore added in v1.2.0

func (dm *DiskManager) PurgeFilesBefore(ctx context.Context, before time.Time) (int, error)

PurgeFilesBefore removes files older than the cutoff time

func (*DiskManager) PurgeOldestPercentage added in v1.2.0

func (dm *DiskManager) PurgeOldestPercentage(ctx context.Context, percent int) (int, error)

PurgeOldestPercentage removes the oldest percentage of files

type HTTPClient added in v1.2.0

type HTTPClient interface {
	// Do processes the request
	Do(req *http.Request) (*http.Response, error)
}

HTTPClient interface remains the same

type MetricShipper

type MetricShipper struct {

	// HTTPClient provides the configured HTTP client for CloudZero API communication.
	// Includes retry logic, timeout configuration, authentication, and connection pooling
	// optimized for reliable metric upload operations in production environments.
	HTTPClient *retryablehttp.Client
	// contains filtered or unexported fields
}

MetricShipper orchestrates the complete metric upload pipeline for CloudZero Agent data transmission. This struct manages the periodic discovery, processing, and upload of metric files from local storage to the CloudZero platform using secure presigned URLs and parallel processing for optimal performance.

The shipper implements a robust upload workflow designed for production reliability:

  • Periodic scheduling: Configurable intervals for batch processing
  • File locking: Prevents concurrent access during processing
  • Chunked uploads: Processes files in manageable batches
  • Parallel workers: Concurrent upload for throughput optimization
  • Retry logic: Handles transient network failures gracefully
  • Replay support: Reprocesses failed uploads automatically
  • Disk management: Enforces retention policies and cleanup

Operational characteristics:

  • Signal handling: Graceful shutdown on SIGINT/SIGTERM
  • Context cancellation: Proper resource cleanup and timeout handling
  • Comprehensive metrics: Detailed observability for monitoring and alerting
  • Error recovery: Panic handling and operation resilience
  • Configuration reload: Dynamic settings updates without restart

func NewMetricShipper

func NewMetricShipper(ctx context.Context, s *config.Settings, store types.ReadableStore) (*MetricShipper, error)

NewMetricShipper creates a fully configured MetricShipper instance for CloudZero metric upload operations. This constructor initializes all necessary components for reliable metric transmission including HTTP clients, metrics instrumentation, and operational configuration validation.

Initialization sequence:

  1. Creates cancellable context for graceful shutdown support
  2. Configures HTTP client with retry logic, timeouts, and authentication
  3. Initializes Prometheus metrics for operational monitoring
  4. Validates configuration settings and logs debug information
  5. Sets up all internal state for immediate operation

The constructor performs comprehensive validation and setup to ensure the shipper is ready for production operation, including proper error handling and resource cleanup if initialization fails at any stage.

Dependencies:

  • ctx: Parent context for operation lifecycle and cancellation
  • s: Configuration settings for API endpoints, credentials, and operational parameters
  • store: Storage interface for reading processed metric files

Configuration validation includes:

  • CloudZero API endpoint accessibility and authentication
  • Upload interval and batch size validation
  • Disk space and retention policy verification
  • HTTP client timeout and retry configuration

Returns a ready-to-use MetricShipper instance or an error if initialization fails. The returned shipper must have its Run() method called to begin processing operations.

func (*MetricShipper) AbandonFiles

func (m *MetricShipper) AbandonFiles(ctx context.Context, payload []*AbandonAPIPayloadFile) error

AbandonFiles sends an abandon request for a list of files with a given reason.

func (*MetricShipper) AllocatePresignedURLs

func (m *MetricShipper) AllocatePresignedURLs(ctx context.Context, files []types.File) (*AllocatePresignedURLsResponse, error)

AllocatePresignedURLs allocates a set of pre-signed urls for the passed file objects.

func (*MetricShipper) Flush added in v1.2.0

func (m *MetricShipper) Flush(ctx context.Context)

Flush will attempt to process all files and push them to the remote

func (*MetricShipper) GetBaseDir

func (m *MetricShipper) GetBaseDir() string

func (*MetricShipper) GetMetricHandler

func (m *MetricShipper) GetMetricHandler() http.Handler

func (*MetricShipper) GetShipperID

func (m *MetricShipper) GetShipperID() (string, error)

GetShipperID will return a unique id for this shipper. This id is stored on the filesystem, and is meant to represent a relation between an uploaded file and which shipper this file came from. The id is not an id this instance of the shipper, but more an id of the filesystem in which the file came from

func (*MetricShipper) GetUploadedDir

func (m *MetricShipper) GetUploadedDir() string

func (*MetricShipper) HandleDisk

func (m *MetricShipper) HandleDisk(ctx context.Context, metricCutoff time.Time) error

HandleDisk is the main entry point for disk management

func (*MetricShipper) HandleRequest

func (m *MetricShipper) HandleRequest(ctx context.Context, files []types.File) error

HandleRequest takes in a list of files and runs them through the following:

- Generate presigned URL - handles replay requests - Upload to the remote API - Rename the file to indicate upload

func (*MetricShipper) MarkFileUploaded

func (m *MetricShipper) MarkFileUploaded(ctx context.Context, file types.File) error

func (*MetricShipper) ProcessFiles added in v1.2.0

func (m *MetricShipper) ProcessFiles(ctx context.Context) error

func (*MetricShipper) Run

func (m *MetricShipper) Run() error

Run executes the main MetricShipper service loop with periodic metric upload processing. This method implements the primary operational lifecycle of the shipper, handling scheduled uploads, signal-based shutdown, and comprehensive error recovery.

Service lifecycle:

  1. Directory initialization: Creates required upload directories with proper permissions
  2. Signal handling setup: Configures SIGINT/SIGTERM for graceful shutdown
  3. Initial upload run: Processes any pending files immediately on startup
  4. Periodic processing: Executes upload cycles based on configured intervals
  5. Graceful shutdown: Completes in-flight operations and cleanup on termination

The service runs continuously until:

  • Context cancellation from parent service
  • OS signal reception (SIGINT, SIGTERM)
  • Unrecoverable error during processing

Error handling strategy:

  • Individual upload failures are logged but don't stop service operation
  • Panic recovery prevents service crashes during upload processing
  • Metrics tracking enables monitoring and alerting for operational issues
  • Directory creation failures are fatal as they prevent basic operation

Operational characteristics:

  • Non-blocking: Individual upload failures don't block subsequent processing
  • Resource cleanup: Proper directory and signal handler cleanup on exit
  • Timeout handling: 30-second shutdown timeout for graceful termination
  • Comprehensive logging: Detailed operation tracking for troubleshooting

The method blocks until shutdown is requested, making it suitable for use as the main execution path for shipper-focused services or containers.

func (*MetricShipper) Shutdown

func (m *MetricShipper) Shutdown() error

Shutdown gracefully stops the MetricShipper service.

func (*MetricShipper) UploadFile

func (m *MetricShipper) UploadFile(ctx context.Context, req *UploadFileRequest) error

UploadFile uploads the specified file to S3 using the provided presigned URL.

type PresignedURLAPIPayload

type PresignedURLAPIPayload struct {
	ShipperID string                        `json:"shipperId"`
	Files     []*PresignedURLAPIPayloadFile `json:"files"`
}

type PresignedURLAPIPayloadFile

type PresignedURLAPIPayloadFile struct {
	ReferenceID string `json:"reference_id"`      //nolint:tagliatelle // downstream expects cammel case
	SHA256      string `json:"sha_256,omitempty"` //nolint:tagliatelle // downstream expects cammel case
	Size        int64  `json:"size,omitempty"`
}

type PresignedURLPayload added in v1.2.0

type PresignedURLPayload = map[string]string

PresignedURLPayload maps a reference id to a presigned url

type PressureLevel added in v1.2.0

type PressureLevel int

PressureLevel defines how aggressive cleanup should be

const (
	PressureNone PressureLevel = iota
	PressureLow
	PressureMedium
	PressureHigh
	PressureCritical
)

type PressureThresholds added in v1.2.0

type PressureThresholds struct {
	Critical float64
	High     float64
	Medium   float64
	Low      float64
}

PressureThresholds defines cleanup trigger points

type ShipperError

type ShipperError interface {
	error
	// Code returns the associated error code as a string.
	Code() string
}

ShipperError is a wrapper around an error that includes a status code `Code()` function to use in prometheus metrics.

func NewShipperError

func NewShipperError(code, msg string) ShipperError

NewShipperError creates a new shipperError instance. If err is non-nil, it will be wrapped.

type UploadFileRequest added in v1.2.0

type UploadFileRequest struct {
	File         types.File
	PresignedURL string
}

UploadFileRequest wraps a file and it's allocated presigned URL

type ZerologRetryableHTTPAdapter added in v1.2.0

type ZerologRetryableHTTPAdapter struct {
	// contains filtered or unexported fields
}

ZerologRetryableHTTPAdapter adapts zerolog.Logger to retryablehttp.Logger

func NewZerologRetryableHTTPAdapter added in v1.2.0

func NewZerologRetryableHTTPAdapter(logger *zerolog.Logger, level zerolog.Level) *ZerologRetryableHTTPAdapter

NewZerologRetryableHTTPAdapter creates a new adapter. Default level is Debug.

func (*ZerologRetryableHTTPAdapter) Debug added in v1.2.0

func (a *ZerologRetryableHTTPAdapter) Debug(msg string, keysAndValues ...interface{})

func (*ZerologRetryableHTTPAdapter) Error added in v1.2.0

func (a *ZerologRetryableHTTPAdapter) Error(msg string, keysAndValues ...interface{})

func (*ZerologRetryableHTTPAdapter) Info added in v1.2.0

func (a *ZerologRetryableHTTPAdapter) Info(msg string, keysAndValues ...interface{})

func (*ZerologRetryableHTTPAdapter) Warn added in v1.2.0

func (a *ZerologRetryableHTTPAdapter) Warn(msg string, keysAndValues ...interface{})

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL