shipper

package
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 29, 2025 License: Apache-2.0 Imports: 30 Imported by: 0

Documentation

Overview

Package shipper provides domain logic for for the shipper.

Index

Constants

View Source
const (
	ReplaySubDirectory      = "replay"
	UploadedSubDirectory    = "uploaded"
	CriticalPurgePercent    = 20
	ReplayRequestHeader     = "X-CloudZero-Replay"
	ShipperIDRequestHeader  = "X-CloudZero-Shipper-ID"
	AppVersionRequestHeader = "X-CloudZero-Version"
)

public

View Source
const ShipperErrorDefault = "unknown error"

ShipperErrorDefault is the default code given when the specific error type is not found

Variables

View Source
var (
	// HTTP errors
	ErrUnauthorized      = NewShipperError("err-unauthorized", "unauthorized request - possible invalid API key")
	ErrNoURLs            = NewShipperError("err-no-urls", "no presigned URLs returned")
	ErrInvalidShipperID  = NewShipperError("err-invalid-shipper-id", "failed to get the shipper id")
	ErrEncodeBody        = NewShipperError("err-encode-body", "failed to encode the body into a foreign format")
	ErrGetRemoteBase     = NewShipperError("err-get-remote-base", "failed to get the remote endpoint api base from the config file")
	ErrHTTPRequestFailed = NewShipperError("err-http-request-failed", "the http request failed")
	ErrHTTPUnknown       = NewShipperError("err-http-unknown", "there was an unknown issue with the http request")
	ErrInvalidBody       = NewShipperError("err-invalid-body", "decoding a response/object failed")

	ErrCreateDirectory = NewShipperError("err-dir-create", "failed to create the requested directory")
	ErrCreateLock      = NewShipperError("err-lock-create", "failed to create or acquire the lock")
	ErrReleaseLock     = NewShipperError("err-lock-release", "failed to release the lock")

	ErrFilesList  = NewShipperError("err-files-walk", "failed to list/walk the files")
	ErrFileRemove = NewShipperError("err-file-remove", "failed to remove a file")
	ErrFileCreate = NewShipperError("err-file-create", "failed to create a file")
	ErrFileRead   = NewShipperError("err-file-read", "failed to read a file")

	ErrStorageCleanup = NewShipperError("err-storage-cleanup", "failed to clean up the disk")
	ErrGetDiskUsage   = NewShipperError("err-get-disk-usage", "failed to get the disk usage")
)

Define sentinel errors for each expected condition. These errors can be used with errors.Is and errors.As to detect specific issues.

Functions

func Chunk

func Chunk[T any](list []T, n int) [][]T

Chunk splits a list into a matrix of elements with a size of `n`

func GetErrStatusCode

func GetErrStatusCode(err error) string

GetErrStatusCode extracts the error code from any wrapped ShipperError. If no matching ShipperError is found in the chain, "-1" is returned to indicate an unknown error.

func GetRemoteFileID

func GetRemoteFileID(file types.File) string

func InitMetrics

func InitMetrics() (*instr.PrometheusMetrics, error)

Types

type AbandonAPIPayloadFile

type AbandonAPIPayloadFile struct {
	ReferenceID string `json:"reference_id"` //nolint:tagliatelle // downstream expects camel case
	Reason      string `json:"reason"`
}

type MetricShipper

type MetricShipper struct {
	HTTPClient *http.Client
	// contains filtered or unexported fields
}

MetricShipper handles the periodic shipping of metrics to Cloudzero.

func NewMetricShipper

func NewMetricShipper(ctx context.Context, s *config.Settings, store types.ReadableStore) (*MetricShipper, error)

NewMetricShipper initializes a new MetricShipper.

func (*MetricShipper) AbandonFiles

func (m *MetricShipper) AbandonFiles(ctx context.Context, referenceIDs []string, reason string) error

AbandonFiles sends an abandon request for a list of files with a given reason.

func (*MetricShipper) AllocatePresignedURLs

func (m *MetricShipper) AllocatePresignedURLs(files []types.File) (PresignedURLAPIResponse, error)

AllocatePresignedURLs allocates a set of pre-signed urls for the passed file objects.

func (*MetricShipper) GetActiveReplayRequests

func (m *MetricShipper) GetActiveReplayRequests(ctx context.Context) ([]*ReplayRequest, error)

GetActiveReplayRequests gets all active replay request files

func (*MetricShipper) GetBaseDir

func (m *MetricShipper) GetBaseDir() string

func (*MetricShipper) GetDiskUsage

func (m *MetricShipper) GetDiskUsage(ctx context.Context, limit uint64) (*types.StoreUsage, error)

GetDiskUsage gets the storage usage of the attached volume, and also reports the usage to prometheus.

func (*MetricShipper) GetMetricHandler

func (m *MetricShipper) GetMetricHandler() http.Handler

func (*MetricShipper) GetReplayRequestDir

func (m *MetricShipper) GetReplayRequestDir() string

func (*MetricShipper) GetShipperID

func (m *MetricShipper) GetShipperID() (string, error)

GetShipperID will return a unique id for this shipper. This id is stored on the filesystem, and is meant to represent a relation between an uploaded file and which shipper this file came from. The id is not an id this instance of the shipper, but more an id of the filesystem in which the file came from

func (*MetricShipper) GetUploadedDir

func (m *MetricShipper) GetUploadedDir() string

func (*MetricShipper) HandleDisk

func (m *MetricShipper) HandleDisk(ctx context.Context, metricCutoff time.Time) error

func (*MetricShipper) HandleReplayRequest

func (m *MetricShipper) HandleReplayRequest(ctx context.Context, rr *ReplayRequest) error

func (*MetricShipper) HandleRequest

func (m *MetricShipper) HandleRequest(ctx context.Context, files []types.File) error

HandleRequest takes in a list of files and runs them through the following:

- Generate presigned URL - Upload to the remote API - Rename the file to indicate upload

func (*MetricShipper) MarkFileUploaded

func (m *MetricShipper) MarkFileUploaded(ctx context.Context, file types.File) error

func (*MetricShipper) ProcessNewFiles

func (m *MetricShipper) ProcessNewFiles(ctx context.Context) error

func (*MetricShipper) ProcessReplayRequests

func (m *MetricShipper) ProcessReplayRequests(ctx context.Context) error

func (*MetricShipper) PurgeMetricsBefore

func (m *MetricShipper) PurgeMetricsBefore(ctx context.Context, before time.Time) error

PurgeMetricsBefore deletes all uploaded metric files older than `before`

func (*MetricShipper) PurgeOldestNPercentage

func (m *MetricShipper) PurgeOldestNPercentage(ctx context.Context, percent int) error

PurgeOldestNPercentage removes the oldest `percent` of files

func (*MetricShipper) Run

func (m *MetricShipper) Run() error

Run starts the MetricShipper service and blocks until a shutdown signal is received.

func (*MetricShipper) SaveReplayRequest

func (m *MetricShipper) SaveReplayRequest(ctx context.Context, rr *ReplayRequest) error

SaveReplayRequest saves a reply-request from the remote to disk to be picked up on next iteration.

func (*MetricShipper) SendHTTPRequest

func (m *MetricShipper) SendHTTPRequest(
	ctx context.Context,
	name string,
	req *http.Request,
) (*http.Response, error)

func (*MetricShipper) Shutdown

func (m *MetricShipper) Shutdown() error

Shutdown gracefully stops the MetricShipper service.

func (*MetricShipper) UploadFile

func (m *MetricShipper) UploadFile(ctx context.Context, file types.File, presignedURL string) error

UploadFile uploads the specified file to S3 using the provided presigned URL.

type PresignedURLAPIPayload

type PresignedURLAPIPayload struct {
	ShipperID string                        `json:"shipperId"`
	Files     []*PresignedURLAPIPayloadFile `json:"files"`
}

type PresignedURLAPIPayloadFile

type PresignedURLAPIPayloadFile struct {
	ReferenceID string `json:"reference_id"`      //nolint:tagliatelle // downstream expects cammel case
	SHA256      string `json:"sha_256,omitempty"` //nolint:tagliatelle // downstream expects cammel case
	Size        int64  `json:"size,omitempty"`
}

type PresignedURLAPIResponse

type PresignedURLAPIResponse = map[string]string

PresignedURLAPIResponse is the format of the response from the remote API. The format of the response is: `{reference_id: presigned_url}`.

type ReplayRequest

type ReplayRequest struct {
	Filepath     string             `json:"filepath"`
	ReferenceIDs *types.Set[string] `json:"referenceIds"` //nolint:tagliatelle // I dont want to use IDs
}

func NewReplayRequestFromHeader

func NewReplayRequestFromHeader(value string) (*ReplayRequest, error)

type ShipperError

type ShipperError interface {
	error
	// Code returns the associated error code as a string.
	Code() string
}

ShipperError is a wrapper around an error that includes a status code `Code()` function to use in prometheus metrics.

func NewShipperError

func NewShipperError(code, msg string) ShipperError

NewShipperError creates a new shipperError instance. If err is non-nil, it will be wrapped.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL