Documentation
¶
Overview ¶
Package shipper provides domain logic for the shipper.
Index ¶
- Constants
- Variables
- func Chunk[T any](list []T, n int) [][]T
- func GetErrStatusCode(err error) string
- func GetRemoteFileID(file types.File) string
- func GetRootFileID(file string) string
- func InitMetrics() (*instr.PrometheusMetrics, error)
- func InspectHTTPResponse(ctx context.Context, resp *http.Response) error
- func NewHTTPClient(ctx context.Context, s *config.Settings) *retryablehttp.Client
- type AbandonAPIPayloadFile
- type AllocatePresignedURLsResponse
- type CleanupResult
- type DiskManager
- func (dm *DiskManager) CalculatePressureLevel(usage *types.StoreUsage) PressureLevel
- func (dm *DiskManager) GetCleanupPercentage(pressure PressureLevel) int
- func (dm *DiskManager) IsMemoryBacked() bool
- func (dm *DiskManager) ManageDiskUsage(ctx context.Context, metricCutoff time.Time) error
- func (dm *DiskManager) PurgeFilesBefore(ctx context.Context, before time.Time) (int, error)
- func (dm *DiskManager) PurgeOldestPercentage(ctx context.Context, percent int) (int, error)
- type HTTPClient
- type MetricShipper
- func (m *MetricShipper) AbandonFiles(ctx context.Context, payload []*AbandonAPIPayloadFile) error
- func (m *MetricShipper) AllocatePresignedURLs(ctx context.Context, files []types.File) (*AllocatePresignedURLsResponse, error)
- func (m *MetricShipper) Flush(ctx context.Context)
- func (m *MetricShipper) GetBaseDir() string
- func (m *MetricShipper) GetMetricHandler() http.Handler
- func (m *MetricShipper) GetShipperID() (string, error)
- func (m *MetricShipper) GetUploadedDir() string
- func (m *MetricShipper) HandleDisk(ctx context.Context, metricCutoff time.Time) error
- func (m *MetricShipper) HandleRequest(ctx context.Context, files []types.File) error
- func (m *MetricShipper) MarkFileUploaded(ctx context.Context, file types.File) error
- func (m *MetricShipper) ProcessFiles(ctx context.Context) error
- func (m *MetricShipper) Run() error
- func (m *MetricShipper) Shutdown() error
- func (m *MetricShipper) UploadFile(ctx context.Context, req *UploadFileRequest) error
- type PresignedURLAPIPayload
- type PresignedURLAPIPayloadFile
- type PresignedURLPayload
- type PressureLevel
- type PressureThresholds
- type ShipperError
- type UploadFileRequest
- type ZerologRetryableHTTPAdapter
- func (a *ZerologRetryableHTTPAdapter) Debug(msg string, keysAndValues ...interface{})
- func (a *ZerologRetryableHTTPAdapter) Error(msg string, keysAndValues ...interface{})
- func (a *ZerologRetryableHTTPAdapter) Info(msg string, keysAndValues ...interface{})
- func (a *ZerologRetryableHTTPAdapter) Warn(msg string, keysAndValues ...interface{})
Constants ¶
const ( TmpfsMagic = 0x01021994 Ext4Magic = 0xEF53 XfsMagic = 0x58465342 )
Linux filesystem type constants
const ( HTTPMaxRetries = 10 HTTPRetryWaitMax = time.Second * 30 )
Keep your constants, they are good for configuring the retryablehttp.Client
const ( ReplaySubDirectory = "replay" UploadedSubDirectory = "uploaded" CriticalPurgePercent = 20 ReplayRequestHeader = "X-CloudZero-Replay" ShipperIDRequestHeader = "X-CloudZero-Shipper-ID" AppVersionRequestHeader = "X-CloudZero-Version" )
public
const ShipperErrorDefault = "unknown error"
ShipperErrorDefault is the default code given when the specific error type is not found
Variables ¶
var ( // HTTP errors ErrNoURLs = NewShipperError("err-no-urls", "no presigned URLs returned") ErrInvalidShipperID = NewShipperError("err-invalid-shipper-id", "failed to get the shipper id") ErrEncodeBody = NewShipperError("err-encode-body", "failed to encode the body into a foreign format") ErrGetRemoteBase = NewShipperError("err-get-remote-base", "failed to get the remote endpoint api base from the config file") ErrHTTPRequestFailed = NewShipperError("err-http-request-failed", "the http request failed") ErrHTTPUnknown = NewShipperError("err-http-unknown", "there was an unknown issue with the http request") ErrInvalidBody = NewShipperError("err-invalid-body", "decoding a response/object failed") ErrExpiredURL = NewShipperError("err-expired-url", "the requested url has expired") ErrCreateDirectory = NewShipperError("err-dir-create", "failed to create the requested directory") ErrCreateLock = NewShipperError("err-lock-create", "failed to create or acquire the lock") ErrReleaseLock = NewShipperError("err-lock-release", "failed to release the lock") ErrFilesList = NewShipperError("err-files-walk", "failed to list/walk the files") ErrFileRemove = NewShipperError("err-file-remove", "failed to remove a file") ErrFileCreate = NewShipperError("err-file-create", "failed to create a file") ErrFileRead = NewShipperError("err-file-read", "failed to read a file") ErrStorageCleanup = NewShipperError("err-storage-cleanup", "failed to clean up the disk") ErrGetDiskUsage = NewShipperError("err-get-disk-usage", "failed to get the disk usage") )
Define sentinel errors for each expected condition. These errors can be used with errors.Is and errors.As to detect specific issues.
Functions ¶
func GetErrStatusCode ¶
GetErrStatusCode extracts the error code from any wrapped ShipperError. If no matching ShipperError is found in the chain, "-1" is returned to indicate an unknown error.
func GetRemoteFileID ¶
GetRemoteFileID creates the remote file id for the transposed file
func GetRootFileID ¶ added in v1.2.0
GetRootFileID returns the file id with no file extensions or path information
func InitMetrics ¶
func InitMetrics() (*instr.PrometheusMetrics, error)
func InspectHTTPResponse ¶ added in v1.2.0
func NewHTTPClient ¶ added in v1.2.0
Types ¶
type AbandonAPIPayloadFile ¶
type AllocatePresignedURLsResponse ¶ added in v1.2.0
type AllocatePresignedURLsResponse struct {
Allocation PresignedURLPayload
Replay PresignedURLPayload
}
type CleanupResult ¶ added in v1.2.0
type CleanupResult struct {
FilesRemoved int
BytesFreed uint64
PressureBefore PressureLevel
PressureAfter PressureLevel
}
CleanupResult tracks what was cleaned up
type DiskManager ¶ added in v1.2.0
type DiskManager struct {
Store types.ReadableStore
Metrics *instr.PrometheusMetrics
StoragePath string
AvailableSizeBytes uint64
}
DiskManager handles disk usage monitoring and cleanup
func (*DiskManager) CalculatePressureLevel ¶ added in v1.2.0
func (dm *DiskManager) CalculatePressureLevel(usage *types.StoreUsage) PressureLevel
CalculatePressureLevel determines cleanup aggression needed
func (*DiskManager) GetCleanupPercentage ¶ added in v1.2.0
func (dm *DiskManager) GetCleanupPercentage(pressure PressureLevel) int
GetCleanupPercentage returns how much to clean based on pressure
func (*DiskManager) IsMemoryBacked ¶ added in v1.2.0
func (dm *DiskManager) IsMemoryBacked() bool
IsMemoryBacked detects if the storage path is backed by memory (tmpfs)
func (*DiskManager) ManageDiskUsage ¶ added in v1.2.0
ManageDiskUsage handles the complete disk management cycle
func (*DiskManager) PurgeFilesBefore ¶ added in v1.2.0
PurgeFilesBefore removes files older than the cutoff time
func (*DiskManager) PurgeOldestPercentage ¶ added in v1.2.0
PurgeOldestPercentage removes the oldest percentage of files
type HTTPClient ¶ added in v1.2.0
type HTTPClient interface {
// Do processes the request
Do(req *http.Request) (*http.Response, error)
}
HTTPClient interface remains the same
type MetricShipper ¶
type MetricShipper struct {
HTTPClient *retryablehttp.Client
// contains filtered or unexported fields
}
MetricShipper handles the periodic shipping of metrics to Cloudzero.
func NewMetricShipper ¶
func NewMetricShipper(ctx context.Context, s *config.Settings, store types.ReadableStore) (*MetricShipper, error)
NewMetricShipper initializes a new MetricShipper.
func (*MetricShipper) AbandonFiles ¶
func (m *MetricShipper) AbandonFiles(ctx context.Context, payload []*AbandonAPIPayloadFile) error
AbandonFiles sends an abandon request for a list of files with a given reason.
func (*MetricShipper) AllocatePresignedURLs ¶
func (m *MetricShipper) AllocatePresignedURLs(ctx context.Context, files []types.File) (*AllocatePresignedURLsResponse, error)
AllocatePresignedURLs allocates a set of pre-signed urls for the passed file objects.
func (*MetricShipper) Flush ¶ added in v1.2.0
func (m *MetricShipper) Flush(ctx context.Context)
Flush will attempt to process all files and push them to the remote
func (*MetricShipper) GetBaseDir ¶
func (m *MetricShipper) GetBaseDir() string
func (*MetricShipper) GetMetricHandler ¶
func (m *MetricShipper) GetMetricHandler() http.Handler
func (*MetricShipper) GetShipperID ¶
func (m *MetricShipper) GetShipperID() (string, error)
GetShipperID will return a unique id for this shipper. This id is stored on the filesystem, and is meant to represent a relation between an uploaded file and which shipper this file came from. The id is not an id this instance of the shipper, but more an id of the filesystem in which the file came from
func (*MetricShipper) GetUploadedDir ¶
func (m *MetricShipper) GetUploadedDir() string
func (*MetricShipper) HandleDisk ¶
HandleDisk is the main entry point for disk management
func (*MetricShipper) HandleRequest ¶
HandleRequest takes in a list of files and runs them through the following:
- Generate presigned URL - handles replay requests - Upload to the remote API - Rename the file to indicate upload
func (*MetricShipper) MarkFileUploaded ¶
func (*MetricShipper) ProcessFiles ¶ added in v1.2.0
func (m *MetricShipper) ProcessFiles(ctx context.Context) error
func (*MetricShipper) Run ¶
func (m *MetricShipper) Run() error
Run starts the MetricShipper service and blocks until a shutdown signal is received.
func (*MetricShipper) Shutdown ¶
func (m *MetricShipper) Shutdown() error
Shutdown gracefully stops the MetricShipper service.
func (*MetricShipper) UploadFile ¶
func (m *MetricShipper) UploadFile(ctx context.Context, req *UploadFileRequest) error
UploadFile uploads the specified file to S3 using the provided presigned URL.
type PresignedURLAPIPayload ¶
type PresignedURLAPIPayload struct {
ShipperID string `json:"shipperId"`
Files []*PresignedURLAPIPayloadFile `json:"files"`
}
type PresignedURLPayload ¶ added in v1.2.0
PresignedURLPayload maps a reference id to a presigned url
type PressureLevel ¶ added in v1.2.0
type PressureLevel int
PressureLevel defines how aggressive cleanup should be
const ( PressureNone PressureLevel = iota PressureLow PressureMedium PressureHigh PressureCritical )
type PressureThresholds ¶ added in v1.2.0
PressureThresholds defines cleanup trigger points
type ShipperError ¶
type ShipperError interface {
error
// Code returns the associated error code as a string.
Code() string
}
ShipperError is a wrapper around an error that includes a status code `Code()` function to use in prometheus metrics.
func NewShipperError ¶
func NewShipperError(code, msg string) ShipperError
NewShipperError creates a new shipperError instance. If err is non-nil, it will be wrapped.
type UploadFileRequest ¶ added in v1.2.0
UploadFileRequest wraps a file and it's allocated presigned URL
type ZerologRetryableHTTPAdapter ¶ added in v1.2.0
type ZerologRetryableHTTPAdapter struct {
// contains filtered or unexported fields
}
ZerologRetryableHTTPAdapter adapts zerolog.Logger to retryablehttp.Logger
func NewZerologRetryableHTTPAdapter ¶ added in v1.2.0
func NewZerologRetryableHTTPAdapter(logger *zerolog.Logger, level zerolog.Level) *ZerologRetryableHTTPAdapter
NewZerologRetryableHTTPAdapter creates a new adapter. Default level is Debug.
func (*ZerologRetryableHTTPAdapter) Debug ¶ added in v1.2.0
func (a *ZerologRetryableHTTPAdapter) Debug(msg string, keysAndValues ...interface{})
func (*ZerologRetryableHTTPAdapter) Error ¶ added in v1.2.0
func (a *ZerologRetryableHTTPAdapter) Error(msg string, keysAndValues ...interface{})
func (*ZerologRetryableHTTPAdapter) Info ¶ added in v1.2.0
func (a *ZerologRetryableHTTPAdapter) Info(msg string, keysAndValues ...interface{})
func (*ZerologRetryableHTTPAdapter) Warn ¶ added in v1.2.0
func (a *ZerologRetryableHTTPAdapter) Warn(msg string, keysAndValues ...interface{})