storage

package
v0.0.0-...-706b979 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: AGPL-3.0 Imports: 25 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrResumeNotSupported = errors.New("storage: resume not supported by this backend")

ErrResumeNotSupported is returned by AppendingBackend.AppendReader on backends that do not support append writes (S3, Azure Blob Storage). Callers should delete any partial file and retry from byte zero.

Functions

func GetLocalBasePath

func GetLocalBasePath(backend Backend, logger *zerolog.Logger, feature string, fallback string) string

GetLocalBasePath returns the base filesystem path for local storage backends. For cloud backends (S3, Azure), it logs a warning and returns empty string. For unknown backends, it returns the provided fallback path.

Parameters:

  • backend: The storage backend to check
  • logger: Logger for warnings about unsupported backends (can be nil)
  • feature: Feature name for warning messages (e.g., "Continuous queries", "Retention")
  • fallback: Default path to return for unknown backend types (use "" to disable)

func GetStoragePath

func GetStoragePath(backend Backend, database, measurement string) string

GetStoragePath returns the full storage path for a database/measurement with glob pattern. Supports all storage backends: local, S3, and Azure.

func SanitizeS3Prefix

func SanitizeS3Prefix(prefix string) string

SanitizeS3Prefix cleans and validates an S3 path prefix. Returns empty string if prefix is empty (no-op), otherwise ensures trailing /. Only allows alphanumeric characters, hyphens, underscores, dots, and slashes.

Types

type AppendingBackend

type AppendingBackend interface {
	Backend
	// AppendReader appends bytes from reader to the existing file at path.
	// appendSize is the number of bytes expected from reader (informational;
	// implementations may ignore it). Returns ErrResumeNotSupported if the
	// backend cannot append.
	AppendReader(ctx context.Context, path string, reader io.Reader, appendSize int64) error
}

AppendingBackend is an optional extension of Backend for backends that support appending bytes to an existing file. Callers type-assert Backend to AppendingBackend before calling AppendReader; if the assertion fails, the backend does not support resumable writes and the caller should fall back to a full re-fetch.

Only local-SSD backends implement this. S3 and Azure Blob Storage do not support append on block objects and return ErrResumeNotSupported instead.

type AzureBlobBackend

type AzureBlobBackend struct {
	// contains filtered or unexported fields
}

AzureBlobBackend implements the Backend interface for Azure Blob Storage

func NewAzureBlobBackend

func NewAzureBlobBackend(cfg *AzureBlobConfig, logger zerolog.Logger) (*AzureBlobBackend, error)

NewAzureBlobBackend creates a new Azure Blob Storage backend

func (*AzureBlobBackend) Close

func (b *AzureBlobBackend) Close() error

Close closes the Azure Blob backend (no-op for Azure)

func (*AzureBlobBackend) ConfigJSON

func (b *AzureBlobBackend) ConfigJSON() string

ConfigJSON returns the configuration as JSON for subprocess recreation

func (*AzureBlobBackend) Delete

func (b *AzureBlobBackend) Delete(ctx context.Context, path string) error

Delete deletes a blob from Azure Blob Storage

func (*AzureBlobBackend) DeleteBatch

func (b *AzureBlobBackend) DeleteBatch(ctx context.Context, paths []string) error

DeleteBatch deletes multiple blobs from Azure Blob Storage Azure Blob Storage supports batch delete via the Batch API

func (*AzureBlobBackend) Exists

func (b *AzureBlobBackend) Exists(ctx context.Context, path string) (bool, error)

Exists checks if a blob exists in Azure Blob Storage

func (*AzureBlobBackend) GetAccountKey

func (b *AzureBlobBackend) GetAccountKey() string

GetAccountKey returns the account key (for subprocess credential passing)

func (*AzureBlobBackend) GetAccountName

func (b *AzureBlobBackend) GetAccountName() string

GetAccountName returns the account name

func (*AzureBlobBackend) GetContainer

func (b *AzureBlobBackend) GetContainer() string

GetContainer returns the container name

func (*AzureBlobBackend) List

func (b *AzureBlobBackend) List(ctx context.Context, prefix string) ([]string, error)

List lists blobs with the given prefix

func (*AzureBlobBackend) ListDirectories

func (b *AzureBlobBackend) ListDirectories(ctx context.Context, prefix string) ([]string, error)

ListDirectories lists immediate subdirectories at a prefix. Implements the DirectoryLister interface. Uses Azure's hierarchy delimiter feature to efficiently list only "directories" (common prefixes).

func (*AzureBlobBackend) ListObjects

func (b *AzureBlobBackend) ListObjects(ctx context.Context, prefix string) ([]ObjectInfo, error)

ListObjects lists blobs with their metadata at a prefix. Implements the ObjectLister interface.

func (*AzureBlobBackend) Read

func (b *AzureBlobBackend) Read(ctx context.Context, path string) ([]byte, error)

Read reads data from Azure Blob Storage

func (*AzureBlobBackend) ReadTo

func (b *AzureBlobBackend) ReadTo(ctx context.Context, path string, writer io.Writer) error

ReadTo reads data from Azure Blob Storage and writes to a writer

func (*AzureBlobBackend) ReadToAt

func (b *AzureBlobBackend) ReadToAt(ctx context.Context, path string, writer io.Writer, offset int64) error

ReadToAt reads data from Azure Blob Storage starting at the given byte offset and writes to writer. Uses blob.HTTPRange to skip already-transferred bytes. offset=0 fetches the full blob without a Range header.

func (*AzureBlobBackend) StatFile

func (b *AzureBlobBackend) StatFile(ctx context.Context, path string) (int64, error)

StatFile returns the byte size of the Azure blob at path, or -1 if not found.

func (*AzureBlobBackend) Type

func (b *AzureBlobBackend) Type() string

Type returns the storage type identifier

func (*AzureBlobBackend) Write

func (b *AzureBlobBackend) Write(ctx context.Context, path string, data []byte) error

Write writes data to Azure Blob Storage

func (*AzureBlobBackend) WriteReader

func (b *AzureBlobBackend) WriteReader(ctx context.Context, path string, reader io.Reader, size int64) error

WriteReader writes data from a reader to Azure Blob Storage

type AzureBlobConfig

type AzureBlobConfig struct {
	// Connection string authentication (simplest)
	ConnectionString string

	// Account-based authentication
	AccountName string
	AccountKey  string

	// SAS token authentication
	SASToken string

	// Managed Identity authentication (for Azure-hosted deployments)
	UseManagedIdentity bool

	// Container name (required)
	ContainerName string

	// Custom endpoint (for Azurite testing)
	Endpoint string
}

AzureBlobConfig holds Azure Blob Storage backend configuration

type Backend

type Backend interface {
	// Write writes data to the specified path
	Write(ctx context.Context, path string, data []byte) error

	// WriteReader writes data from a reader to the specified path (for large files)
	WriteReader(ctx context.Context, path string, reader io.Reader, size int64) error

	// Read reads data from the specified path
	Read(ctx context.Context, path string) ([]byte, error)

	// ReadTo reads data from the specified path and writes it to the writer
	ReadTo(ctx context.Context, path string, writer io.Writer) error

	// ReadToAt reads data from path starting at the given byte offset and writes
	// to writer. offset=0 starts at the beginning (equivalent to ReadTo).
	// Returns an error if offset is negative or >= file size.
	ReadToAt(ctx context.Context, path string, writer io.Writer, offset int64) error

	// StatFile returns the byte size of the file at path.
	// Returns -1 (and nil error) if the file does not exist.
	// Returns a non-nil error only for unexpected backend failures.
	StatFile(ctx context.Context, path string) (int64, error)

	// List lists all objects with the given prefix
	List(ctx context.Context, prefix string) ([]string, error)

	// Delete deletes the object at the specified path
	Delete(ctx context.Context, path string) error

	// Exists checks if an object exists at the specified path
	Exists(ctx context.Context, path string) (bool, error)

	// Close closes any resources held by the backend
	Close() error

	// Type returns the storage type identifier ("local", "s3", etc.)
	// Used for subprocess serialization
	Type() string

	// ConfigJSON returns the configuration as JSON for subprocess recreation
	// Used for subprocess serialization
	ConfigJSON() string
}

Backend defines the interface for storage backends (local, S3, MinIO)

type BatchDeleter

type BatchDeleter interface {
	DeleteBatch(ctx context.Context, paths []string) error
}

BatchDeleter supports efficient batch deletion of multiple objects. Implementations should handle batching internally (e.g., S3 supports up to 1000 objects per batch).

type DirectoryLister

type DirectoryLister interface {
	ListDirectories(ctx context.Context, prefix string) ([]string, error)
}

DirectoryLister lists immediate subdirectories at a prefix. This is useful for SHOW DATABASES/TABLES commands.

type DirectoryRemover

type DirectoryRemover interface {
	RemoveDirectory(ctx context.Context, path string) error
}

DirectoryRemover removes an empty directory. This is used to clean up database directories after all files are deleted. For object storage (S3, Azure), this is typically a no-op since directories don't exist as objects.

type LocalBackend

type LocalBackend struct {
	// contains filtered or unexported fields
}

LocalBackend implements the Backend interface for local filesystem storage

func NewLocalBackend

func NewLocalBackend(basePath string, logger zerolog.Logger) (*LocalBackend, error)

NewLocalBackend creates a new local filesystem storage backend

func (*LocalBackend) AppendReader

func (b *LocalBackend) AppendReader(ctx context.Context, path string, reader io.Reader, appendSize int64) error

AppendReader appends bytes from reader to the ".part" staging file at path. When all bytes have been appended (the transfer is complete), the caller must call WriteReader (or the coordinator renames the file externally).

This satisfies AppendingBackend, which the puller type-asserts before calling.

func (*LocalBackend) Close

func (b *LocalBackend) Close() error

Close closes any resources held by the backend (no-op for local storage)

func (*LocalBackend) ConfigJSON

func (b *LocalBackend) ConfigJSON() string

ConfigJSON returns the configuration as JSON for subprocess recreation

func (*LocalBackend) Delete

func (b *LocalBackend) Delete(ctx context.Context, path string) error

Delete deletes the object at the specified path

func (*LocalBackend) DeleteBatch

func (b *LocalBackend) DeleteBatch(ctx context.Context, paths []string) error

DeleteBatch deletes multiple objects at the specified paths. Implements the BatchDeleter interface.

func (*LocalBackend) Exists

func (b *LocalBackend) Exists(ctx context.Context, path string) (bool, error)

Exists checks if an object exists at the specified path

func (*LocalBackend) GetBasePath

func (b *LocalBackend) GetBasePath() string

GetBasePath returns the base path for the local storage

func (*LocalBackend) GetFullPath

func (b *LocalBackend) GetFullPath(path string) string

GetFullPath returns the full filesystem path for a given storage path Useful for debugging and direct file access

func (*LocalBackend) List

func (b *LocalBackend) List(ctx context.Context, prefix string) ([]string, error)

List lists all objects with the given prefix

func (*LocalBackend) ListDirectories

func (b *LocalBackend) ListDirectories(ctx context.Context, prefix string) ([]string, error)

ListDirectories lists immediate subdirectories at a prefix. Implements the DirectoryLister interface.

func (*LocalBackend) ListObjects

func (b *LocalBackend) ListObjects(ctx context.Context, prefix string) ([]ObjectInfo, error)

ListObjects lists objects with their metadata at a prefix. Implements the ObjectLister interface.

func (*LocalBackend) Read

func (b *LocalBackend) Read(ctx context.Context, path string) ([]byte, error)

Read reads data from the specified path

func (*LocalBackend) ReadTo

func (b *LocalBackend) ReadTo(ctx context.Context, path string, writer io.Writer) error

ReadTo reads data from the specified path and writes it to the writer

func (*LocalBackend) ReadToAt

func (b *LocalBackend) ReadToAt(ctx context.Context, path string, writer io.Writer, offset int64) error

ReadToAt reads data from path starting at the given byte offset and writes to writer. offset=0 starts at the beginning. Falls back to the ".part" staging file if the final file does not exist (allows the puller to hash a partial prefix before resuming a transfer).

func (*LocalBackend) RemoveDirectory

func (b *LocalBackend) RemoveDirectory(ctx context.Context, path string) error

RemoveDirectory removes an empty directory. Implements the DirectoryRemover interface.

func (*LocalBackend) StatFile

func (b *LocalBackend) StatFile(ctx context.Context, path string) (int64, error)

StatFile returns the byte size of the file at path, or -1 if neither the final file nor its ".part" staging file exist. Returns a non-nil error only for unexpected failures.

Checking the staging file allows the puller's pre-pull check to distinguish a fully-received file (size == entry.SizeBytes → skip) from a partial one (size < entry.SizeBytes → resume).

func (*LocalBackend) Type

func (b *LocalBackend) Type() string

Type returns the storage type identifier

func (*LocalBackend) Write

func (b *LocalBackend) Write(ctx context.Context, path string, data []byte) error

Write writes data to the specified path with atomic write (write to temp, then rename)

func (*LocalBackend) WriteReader

func (b *LocalBackend) WriteReader(ctx context.Context, path string, reader io.Reader, size int64) error

WriteReader writes data from a reader to the specified path (for large files).

Writes proceed to a deterministic "<path>.part" staging file so that a partial transfer interrupted by a transport error leaves recoverable bytes on disk. On success the staging file is atomically renamed to the final path. On error the staging file is left in place so the puller can resume from it.

type ObjectInfo

type ObjectInfo struct {
	Path         string
	Size         int64
	LastModified time.Time
}

ObjectInfo provides metadata about a storage object.

type ObjectLister

type ObjectLister interface {
	ListObjects(ctx context.Context, prefix string) ([]ObjectInfo, error)
}

ObjectLister lists objects with their metadata. This is useful for retention policies that need to check file ages.

type ResilientBackend

type ResilientBackend struct {
	// contains filtered or unexported fields
}

ResilientBackend wraps a storage backend with circuit breaker and retry logic

func NewResilientBackend

func NewResilientBackend(backend Backend, cfg *ResilientConfig, logger zerolog.Logger) *ResilientBackend

NewResilientBackend creates a new resilient storage backend

func (*ResilientBackend) CircuitBreakerStats

func (r *ResilientBackend) CircuitBreakerStats() map[string]interface{}

CircuitBreakerStats returns circuit breaker statistics

func (*ResilientBackend) Close

func (r *ResilientBackend) Close() error

Close closes the underlying storage backend

func (*ResilientBackend) Delete

func (r *ResilientBackend) Delete(ctx context.Context, path string) error

Delete deletes a file from the storage backend

func (*ResilientBackend) Exists

func (r *ResilientBackend) Exists(ctx context.Context, path string) (bool, error)

Exists checks if a file exists in the storage backend

func (*ResilientBackend) IsCircuitOpen

func (r *ResilientBackend) IsCircuitOpen() bool

IsCircuitOpen returns true if the circuit breaker is open

func (*ResilientBackend) List

func (r *ResilientBackend) List(ctx context.Context, prefix string) ([]string, error)

List lists files in the storage backend

func (*ResilientBackend) Read

func (r *ResilientBackend) Read(ctx context.Context, path string) ([]byte, error)

Read reads data from the storage backend with resilience

func (*ResilientBackend) ReadTo

func (r *ResilientBackend) ReadTo(ctx context.Context, path string, writer io.Writer) error

ReadTo reads data to a writer from the storage backend

func (*ResilientBackend) ResetCircuitBreaker

func (r *ResilientBackend) ResetCircuitBreaker()

ResetCircuitBreaker resets the circuit breaker

func (*ResilientBackend) Write

func (r *ResilientBackend) Write(ctx context.Context, path string, data []byte) error

Write writes data to the storage backend with resilience

func (*ResilientBackend) WriteReader

func (r *ResilientBackend) WriteReader(ctx context.Context, path string, reader io.Reader, size int64) error

WriteReader writes data from a reader to the storage backend

type ResilientConfig

type ResilientConfig struct {
	// Circuit breaker settings
	MaxFailures         int
	Timeout             time.Duration
	HalfOpenMaxRequests int

	// Retry settings
	MaxRetries    int
	RetryDelay    time.Duration
	RetryMaxDelay time.Duration
}

ResilientConfig holds configuration for the resilient backend

func DefaultResilientConfig

func DefaultResilientConfig() *ResilientConfig

DefaultResilientConfig returns default resilient backend configuration

type S3Backend

type S3Backend struct {
	// contains filtered or unexported fields
}

S3Backend implements the Backend interface for S3 and MinIO storage

func NewS3Backend

func NewS3Backend(cfg *S3Config, logger zerolog.Logger) (*S3Backend, error)

NewS3Backend creates a new S3/MinIO backend

func (*S3Backend) Close

func (b *S3Backend) Close() error

Close closes the S3 backend (no-op for S3)

func (*S3Backend) ConfigJSON

func (b *S3Backend) ConfigJSON() string

ConfigJSON returns the configuration as JSON for subprocess recreation

func (*S3Backend) Delete

func (b *S3Backend) Delete(ctx context.Context, path string) error

Delete deletes an object from S3

func (*S3Backend) DeleteBatch

func (b *S3Backend) DeleteBatch(ctx context.Context, paths []string) error

DeleteBatch deletes multiple objects from S3 efficiently

func (*S3Backend) Exists

func (b *S3Backend) Exists(ctx context.Context, path string) (bool, error)

Exists checks if an object exists in S3

func (*S3Backend) GetAccessKey

func (b *S3Backend) GetAccessKey() string

GetAccessKey returns the access key (for subprocess credential passing)

func (*S3Backend) GetBucket

func (b *S3Backend) GetBucket() string

GetBucket returns the bucket name

func (*S3Backend) GetPrefix

func (b *S3Backend) GetPrefix() string

GetPrefix returns the path prefix (empty string if none configured)

func (*S3Backend) GetQueryPath

func (b *S3Backend) GetQueryPath(database, measurement string, year, month, day, hour int) string

GetQueryPath generates S3 path patterns for time-based query pruning This enables DuckDB to efficiently scan only relevant partitions

Examples:

  • GetQueryPath("mydb", "cpu", 2025, 11, 0, 0) → "s3://bucket/mydb/cpu/2025/11/*/*/*.parquet" (all November)
  • GetQueryPath("mydb", "cpu", 2025, 11, 25, 0) → "s3://bucket/mydb/cpu/2025/11/25/*/*.parquet" (specific day)
  • GetQueryPath("mydb", "cpu", 2025, 11, 25, 16) → "s3://bucket/mydb/cpu/2025/11/25/16/*.parquet" (specific hour)

func (*S3Backend) GetQueryPathRange

func (b *S3Backend) GetQueryPathRange(database, measurement string, startTime, endTime time.Time) []string

GetQueryPathRange generates S3 path pattern for a time range This is useful for queries like "WHERE time BETWEEN start AND end"

func (*S3Backend) GetRegion

func (b *S3Backend) GetRegion() string

GetRegion returns the region

func (*S3Backend) GetS3Path

func (b *S3Backend) GetS3Path(path string) string

GetS3Path returns the S3 URI for a path

func (*S3Backend) GetSecretKey

func (b *S3Backend) GetSecretKey() string

GetSecretKey returns the secret key (for subprocess credential passing)

func (*S3Backend) List

func (b *S3Backend) List(ctx context.Context, prefix string) ([]string, error)

List lists objects with the given prefix

func (*S3Backend) ListDirectories

func (b *S3Backend) ListDirectories(ctx context.Context, prefix string) ([]string, error)

ListDirectories lists immediate subdirectories at a prefix. Implements the DirectoryLister interface. Uses S3's delimiter feature to efficiently list only "directories" (common prefixes).

func (*S3Backend) ListObjects

func (b *S3Backend) ListObjects(ctx context.Context, prefix string) ([]ObjectInfo, error)

ListObjects lists objects with their metadata at a prefix. Implements the ObjectLister interface.

func (*S3Backend) Read

func (b *S3Backend) Read(ctx context.Context, path string) ([]byte, error)

Read reads data from S3

func (*S3Backend) ReadTo

func (b *S3Backend) ReadTo(ctx context.Context, path string, writer io.Writer) error

ReadTo reads data from S3 and writes to a writer

func (*S3Backend) ReadToAt

func (b *S3Backend) ReadToAt(ctx context.Context, path string, writer io.Writer, offset int64) error

ReadToAt reads data from S3 starting at the given byte offset and writes to writer. Uses an HTTP Range header to skip already-transferred bytes. offset=0 fetches the full object (no Range header sent).

func (*S3Backend) StatFile

func (b *S3Backend) StatFile(ctx context.Context, path string) (int64, error)

StatFile returns the byte size of the S3 object at path, or -1 if not found.

func (*S3Backend) Type

func (b *S3Backend) Type() string

Type returns the storage type identifier

func (*S3Backend) Write

func (b *S3Backend) Write(ctx context.Context, path string, data []byte) error

Write writes data to S3

func (*S3Backend) WriteReader

func (b *S3Backend) WriteReader(ctx context.Context, path string, reader io.Reader, size int64) error

WriteReader writes data from a reader to S3 For files larger than 100MB, uses multipart upload to avoid OOM

type S3Config

type S3Config struct {
	Bucket    string
	Region    string
	Endpoint  string // Custom endpoint for MinIO (e.g., "http://localhost:9000")
	AccessKey string
	SecretKey string
	UseSSL    bool
	PathStyle bool   // Use path-style addressing (required for MinIO)
	Prefix    string // Path prefix within the bucket (e.g., "instances/abc123/")
}

S3Config holds S3 backend configuration

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL