Documentation
¶
Overview ¶
Package shipper provides domain logic for for the shipper.
Index ¶
- Constants
- Variables
- func Chunk[T any](list []T, n int) [][]T
- func GetErrStatusCode(err error) string
- func GetRemoteFileID(file types.File) string
- func InitMetrics() (*instr.PrometheusMetrics, error)
- type AbandonAPIPayloadFile
- type MetricShipper
- func (m *MetricShipper) AbandonFiles(ctx context.Context, referenceIDs []string, reason string) error
- func (m *MetricShipper) AllocatePresignedURLs(files []types.File) (PresignedURLAPIResponse, error)
- func (m *MetricShipper) GetActiveReplayRequests(ctx context.Context) ([]*ReplayRequest, error)
- func (m *MetricShipper) GetBaseDir() string
- func (m *MetricShipper) GetDiskUsage(ctx context.Context, limit uint64) (*types.StoreUsage, error)
- func (m *MetricShipper) GetMetricHandler() http.Handler
- func (m *MetricShipper) GetReplayRequestDir() string
- func (m *MetricShipper) GetShipperID() (string, error)
- func (m *MetricShipper) GetUploadedDir() string
- func (m *MetricShipper) HandleDisk(ctx context.Context, metricCutoff time.Time) error
- func (m *MetricShipper) HandleReplayRequest(ctx context.Context, rr *ReplayRequest) error
- func (m *MetricShipper) HandleRequest(ctx context.Context, files []types.File) error
- func (m *MetricShipper) MarkFileUploaded(ctx context.Context, file types.File) error
- func (m *MetricShipper) ProcessNewFiles(ctx context.Context) error
- func (m *MetricShipper) ProcessReplayRequests(ctx context.Context) error
- func (m *MetricShipper) PurgeMetricsBefore(ctx context.Context, before time.Time) error
- func (m *MetricShipper) PurgeOldestNPercentage(ctx context.Context, percent int) error
- func (m *MetricShipper) Run() error
- func (m *MetricShipper) SaveReplayRequest(ctx context.Context, rr *ReplayRequest) error
- func (m *MetricShipper) SendHTTPRequest(ctx context.Context, name string, req *http.Request) (*http.Response, error)
- func (m *MetricShipper) Shutdown() error
- func (m *MetricShipper) UploadFile(ctx context.Context, file types.File, presignedURL string) error
- type PresignedURLAPIPayload
- type PresignedURLAPIPayloadFile
- type PresignedURLAPIResponse
- type ReplayRequest
- type ShipperError
Constants ¶
const ( ReplaySubDirectory = "replay" UploadedSubDirectory = "uploaded" CriticalPurgePercent = 20 ReplayRequestHeader = "X-CloudZero-Replay" ShipperIDRequestHeader = "X-CloudZero-Shipper-ID" AppVersionRequestHeader = "X-CloudZero-Version" )
public
const ShipperErrorDefault = "unknown error"
ShipperErrorDefault is the default code given when the specific error type is not found
Variables ¶
var ( // HTTP errors ErrNoURLs = NewShipperError("err-no-urls", "no presigned URLs returned") ErrInvalidShipperID = NewShipperError("err-invalid-shipper-id", "failed to get the shipper id") ErrEncodeBody = NewShipperError("err-encode-body", "failed to encode the body into a foreign format") ErrGetRemoteBase = NewShipperError("err-get-remote-base", "failed to get the remote endpoint api base from the config file") ErrHTTPRequestFailed = NewShipperError("err-http-request-failed", "the http request failed") ErrHTTPUnknown = NewShipperError("err-http-unknown", "there was an unknown issue with the http request") ErrInvalidBody = NewShipperError("err-invalid-body", "decoding a response/object failed") ErrCreateDirectory = NewShipperError("err-dir-create", "failed to create the requested directory") ErrCreateLock = NewShipperError("err-lock-create", "failed to create or acquire the lock") ErrReleaseLock = NewShipperError("err-lock-release", "failed to release the lock") ErrFilesList = NewShipperError("err-files-walk", "failed to list/walk the files") ErrFileRemove = NewShipperError("err-file-remove", "failed to remove a file") ErrFileCreate = NewShipperError("err-file-create", "failed to create a file") ErrFileRead = NewShipperError("err-file-read", "failed to read a file") ErrStorageCleanup = NewShipperError("err-storage-cleanup", "failed to clean up the disk") ErrGetDiskUsage = NewShipperError("err-get-disk-usage", "failed to get the disk usage") )
Define sentinel errors for each expected condition. These errors can be used with errors.Is and errors.As to detect specific issues.
Functions ¶
func GetErrStatusCode ¶
GetErrStatusCode extracts the error code from any wrapped ShipperError. If no matching ShipperError is found in the chain, "-1" is returned to indicate an unknown error.
func GetRemoteFileID ¶
func InitMetrics ¶
func InitMetrics() (*instr.PrometheusMetrics, error)
Types ¶
type AbandonAPIPayloadFile ¶
type MetricShipper ¶
MetricShipper handles the periodic shipping of metrics to Cloudzero.
func NewMetricShipper ¶
func NewMetricShipper(ctx context.Context, s *config.Settings, store types.ReadableStore) (*MetricShipper, error)
NewMetricShipper initializes a new MetricShipper.
func (*MetricShipper) AbandonFiles ¶
func (m *MetricShipper) AbandonFiles(ctx context.Context, referenceIDs []string, reason string) error
AbandonFiles sends an abandon request for a list of files with a given reason.
func (*MetricShipper) AllocatePresignedURLs ¶
func (m *MetricShipper) AllocatePresignedURLs(files []types.File) (PresignedURLAPIResponse, error)
AllocatePresignedURLs allocates a set of pre-signed urls for the passed file objects.
func (*MetricShipper) GetActiveReplayRequests ¶
func (m *MetricShipper) GetActiveReplayRequests(ctx context.Context) ([]*ReplayRequest, error)
GetActiveReplayRequests gets all active replay request files
func (*MetricShipper) GetBaseDir ¶
func (m *MetricShipper) GetBaseDir() string
func (*MetricShipper) GetDiskUsage ¶
func (m *MetricShipper) GetDiskUsage(ctx context.Context, limit uint64) (*types.StoreUsage, error)
GetDiskUsage gets the storage usage of the attached volume, and also reports the usage to prometheus.
func (*MetricShipper) GetMetricHandler ¶
func (m *MetricShipper) GetMetricHandler() http.Handler
func (*MetricShipper) GetReplayRequestDir ¶
func (m *MetricShipper) GetReplayRequestDir() string
func (*MetricShipper) GetShipperID ¶
func (m *MetricShipper) GetShipperID() (string, error)
GetShipperID will return a unique id for this shipper. This id is stored on the filesystem, and is meant to represent a relation between an uploaded file and which shipper this file came from. The id is not an id this instance of the shipper, but more an id of the filesystem in which the file came from
func (*MetricShipper) GetUploadedDir ¶
func (m *MetricShipper) GetUploadedDir() string
func (*MetricShipper) HandleDisk ¶
func (*MetricShipper) HandleReplayRequest ¶
func (m *MetricShipper) HandleReplayRequest(ctx context.Context, rr *ReplayRequest) error
func (*MetricShipper) HandleRequest ¶
HandleRequest takes in a list of files and runs them through the following:
- Generate presigned URL - Upload to the remote API - Rename the file to indicate upload
func (*MetricShipper) MarkFileUploaded ¶
func (*MetricShipper) ProcessNewFiles ¶
func (m *MetricShipper) ProcessNewFiles(ctx context.Context) error
func (*MetricShipper) ProcessReplayRequests ¶
func (m *MetricShipper) ProcessReplayRequests(ctx context.Context) error
func (*MetricShipper) PurgeMetricsBefore ¶
PurgeMetricsBefore deletes all uploaded metric files older than `before`
func (*MetricShipper) PurgeOldestNPercentage ¶
func (m *MetricShipper) PurgeOldestNPercentage(ctx context.Context, percent int) error
PurgeOldestNPercentage removes the oldest `percent` of files
func (*MetricShipper) Run ¶
func (m *MetricShipper) Run() error
Run starts the MetricShipper service and blocks until a shutdown signal is received.
func (*MetricShipper) SaveReplayRequest ¶
func (m *MetricShipper) SaveReplayRequest(ctx context.Context, rr *ReplayRequest) error
SaveReplayRequest saves a reply-request from the remote to disk to be picked up on next iteration.
func (*MetricShipper) SendHTTPRequest ¶
func (*MetricShipper) Shutdown ¶
func (m *MetricShipper) Shutdown() error
Shutdown gracefully stops the MetricShipper service.
func (*MetricShipper) UploadFile ¶
UploadFile uploads the specified file to S3 using the provided presigned URL.
type PresignedURLAPIPayload ¶
type PresignedURLAPIPayload struct {
ShipperID string `json:"shipperId"`
Files []*PresignedURLAPIPayloadFile `json:"files"`
}
type PresignedURLAPIResponse ¶
PresignedURLAPIResponse is the format of the response from the remote API. The format of the response is: `{reference_id: presigned_url}`.
type ReplayRequest ¶
type ReplayRequest struct {
Filepath string `json:"filepath"`
ReferenceIDs *types.Set[string] `json:"referenceIds"` //nolint:tagliatelle // I dont want to use IDs
}
func NewReplayRequestFromHeader ¶
func NewReplayRequestFromHeader(value string) (*ReplayRequest, error)
type ShipperError ¶
type ShipperError interface {
error
// Code returns the associated error code as a string.
Code() string
}
ShipperError is a wrapper around an error that includes a status code `Code()` function to use in prometheus metrics.
func NewShipperError ¶
func NewShipperError(code, msg string) ShipperError
NewShipperError creates a new shipperError instance. If err is non-nil, it will be wrapped.