Documentation
¶
Index ¶
- Variables
- func GetLocalBasePath(backend Backend, logger *zerolog.Logger, feature string, fallback string) string
- func GetStoragePath(backend Backend, database, measurement string) string
- func SanitizeS3Prefix(prefix string) string
- type AppendingBackend
- type AzureBlobBackend
- func (b *AzureBlobBackend) Close() error
- func (b *AzureBlobBackend) ConfigJSON() string
- func (b *AzureBlobBackend) Delete(ctx context.Context, path string) error
- func (b *AzureBlobBackend) DeleteBatch(ctx context.Context, paths []string) error
- func (b *AzureBlobBackend) Exists(ctx context.Context, path string) (bool, error)
- func (b *AzureBlobBackend) GetAccountKey() string
- func (b *AzureBlobBackend) GetAccountName() string
- func (b *AzureBlobBackend) GetContainer() string
- func (b *AzureBlobBackend) List(ctx context.Context, prefix string) ([]string, error)
- func (b *AzureBlobBackend) ListDirectories(ctx context.Context, prefix string) ([]string, error)
- func (b *AzureBlobBackend) ListObjects(ctx context.Context, prefix string) ([]ObjectInfo, error)
- func (b *AzureBlobBackend) Read(ctx context.Context, path string) ([]byte, error)
- func (b *AzureBlobBackend) ReadTo(ctx context.Context, path string, writer io.Writer) error
- func (b *AzureBlobBackend) ReadToAt(ctx context.Context, path string, writer io.Writer, offset int64) error
- func (b *AzureBlobBackend) StatFile(ctx context.Context, path string) (int64, error)
- func (b *AzureBlobBackend) Type() string
- func (b *AzureBlobBackend) Write(ctx context.Context, path string, data []byte) error
- func (b *AzureBlobBackend) WriteReader(ctx context.Context, path string, reader io.Reader, size int64) error
- type AzureBlobConfig
- type Backend
- type BatchDeleter
- type DirectoryLister
- type DirectoryRemover
- type LocalBackend
- func (b *LocalBackend) AppendReader(ctx context.Context, path string, reader io.Reader, appendSize int64) error
- func (b *LocalBackend) Close() error
- func (b *LocalBackend) ConfigJSON() string
- func (b *LocalBackend) Delete(ctx context.Context, path string) error
- func (b *LocalBackend) DeleteBatch(ctx context.Context, paths []string) error
- func (b *LocalBackend) Exists(ctx context.Context, path string) (bool, error)
- func (b *LocalBackend) GetBasePath() string
- func (b *LocalBackend) GetFullPath(path string) string
- func (b *LocalBackend) List(ctx context.Context, prefix string) ([]string, error)
- func (b *LocalBackend) ListDirectories(ctx context.Context, prefix string) ([]string, error)
- func (b *LocalBackend) ListObjects(ctx context.Context, prefix string) ([]ObjectInfo, error)
- func (b *LocalBackend) Read(ctx context.Context, path string) ([]byte, error)
- func (b *LocalBackend) ReadTo(ctx context.Context, path string, writer io.Writer) error
- func (b *LocalBackend) ReadToAt(ctx context.Context, path string, writer io.Writer, offset int64) error
- func (b *LocalBackend) RemoveDirectory(ctx context.Context, path string) error
- func (b *LocalBackend) StatFile(ctx context.Context, path string) (int64, error)
- func (b *LocalBackend) Type() string
- func (b *LocalBackend) Write(ctx context.Context, path string, data []byte) error
- func (b *LocalBackend) WriteReader(ctx context.Context, path string, reader io.Reader, size int64) error
- type ObjectInfo
- type ObjectLister
- type ResilientBackend
- func (r *ResilientBackend) CircuitBreakerStats() map[string]interface{}
- func (r *ResilientBackend) Close() error
- func (r *ResilientBackend) Delete(ctx context.Context, path string) error
- func (r *ResilientBackend) Exists(ctx context.Context, path string) (bool, error)
- func (r *ResilientBackend) IsCircuitOpen() bool
- func (r *ResilientBackend) List(ctx context.Context, prefix string) ([]string, error)
- func (r *ResilientBackend) Read(ctx context.Context, path string) ([]byte, error)
- func (r *ResilientBackend) ReadTo(ctx context.Context, path string, writer io.Writer) error
- func (r *ResilientBackend) ResetCircuitBreaker()
- func (r *ResilientBackend) Write(ctx context.Context, path string, data []byte) error
- func (r *ResilientBackend) WriteReader(ctx context.Context, path string, reader io.Reader, size int64) error
- type ResilientConfig
- type S3Backend
- func (b *S3Backend) Close() error
- func (b *S3Backend) ConfigJSON() string
- func (b *S3Backend) Delete(ctx context.Context, path string) error
- func (b *S3Backend) DeleteBatch(ctx context.Context, paths []string) error
- func (b *S3Backend) Exists(ctx context.Context, path string) (bool, error)
- func (b *S3Backend) GetAccessKey() string
- func (b *S3Backend) GetBucket() string
- func (b *S3Backend) GetPrefix() string
- func (b *S3Backend) GetQueryPath(database, measurement string, year, month, day, hour int) string
- func (b *S3Backend) GetQueryPathRange(database, measurement string, startTime, endTime time.Time) []string
- func (b *S3Backend) GetRegion() string
- func (b *S3Backend) GetS3Path(path string) string
- func (b *S3Backend) GetSecretKey() string
- func (b *S3Backend) List(ctx context.Context, prefix string) ([]string, error)
- func (b *S3Backend) ListDirectories(ctx context.Context, prefix string) ([]string, error)
- func (b *S3Backend) ListObjects(ctx context.Context, prefix string) ([]ObjectInfo, error)
- func (b *S3Backend) Read(ctx context.Context, path string) ([]byte, error)
- func (b *S3Backend) ReadTo(ctx context.Context, path string, writer io.Writer) error
- func (b *S3Backend) ReadToAt(ctx context.Context, path string, writer io.Writer, offset int64) error
- func (b *S3Backend) StatFile(ctx context.Context, path string) (int64, error)
- func (b *S3Backend) Type() string
- func (b *S3Backend) Write(ctx context.Context, path string, data []byte) error
- func (b *S3Backend) WriteReader(ctx context.Context, path string, reader io.Reader, size int64) error
- type S3Config
Constants ¶
This section is empty.
Variables ¶
var ErrResumeNotSupported = errors.New("storage: resume not supported by this backend")
ErrResumeNotSupported is returned by AppendingBackend.AppendReader on backends that do not support append writes (S3, Azure Blob Storage). Callers should delete any partial file and retry from byte zero.
Functions ¶
func GetLocalBasePath ¶
func GetLocalBasePath(backend Backend, logger *zerolog.Logger, feature string, fallback string) string
GetLocalBasePath returns the base filesystem path for local storage backends. For cloud backends (S3, Azure), it logs a warning and returns empty string. For unknown backends, it returns the provided fallback path.
Parameters:
- backend: The storage backend to check
- logger: Logger for warnings about unsupported backends (can be nil)
- feature: Feature name for warning messages (e.g., "Continuous queries", "Retention")
- fallback: Default path to return for unknown backend types (use "" to disable)
func GetStoragePath ¶
GetStoragePath returns the full storage path for a database/measurement with glob pattern. Supports all storage backends: local, S3, and Azure.
func SanitizeS3Prefix ¶
SanitizeS3Prefix cleans and validates an S3 path prefix. Returns empty string if prefix is empty (no-op), otherwise ensures trailing /. Only allows alphanumeric characters, hyphens, underscores, dots, and slashes.
Types ¶
type AppendingBackend ¶
type AppendingBackend interface {
Backend
// AppendReader appends bytes from reader to the existing file at path.
// appendSize is the number of bytes expected from reader (informational;
// implementations may ignore it). Returns ErrResumeNotSupported if the
// backend cannot append.
AppendReader(ctx context.Context, path string, reader io.Reader, appendSize int64) error
}
AppendingBackend is an optional extension of Backend for backends that support appending bytes to an existing file. Callers type-assert Backend to AppendingBackend before calling AppendReader; if the assertion fails, the backend does not support resumable writes and the caller should fall back to a full re-fetch.
Only local-SSD backends implement this. S3 and Azure Blob Storage do not support append on block objects and return ErrResumeNotSupported instead.
type AzureBlobBackend ¶
type AzureBlobBackend struct {
// contains filtered or unexported fields
}
AzureBlobBackend implements the Backend interface for Azure Blob Storage
func NewAzureBlobBackend ¶
func NewAzureBlobBackend(cfg *AzureBlobConfig, logger zerolog.Logger) (*AzureBlobBackend, error)
NewAzureBlobBackend creates a new Azure Blob Storage backend
func (*AzureBlobBackend) Close ¶
func (b *AzureBlobBackend) Close() error
Close closes the Azure Blob backend (no-op for Azure)
func (*AzureBlobBackend) ConfigJSON ¶
func (b *AzureBlobBackend) ConfigJSON() string
ConfigJSON returns the configuration as JSON for subprocess recreation
func (*AzureBlobBackend) Delete ¶
func (b *AzureBlobBackend) Delete(ctx context.Context, path string) error
Delete deletes a blob from Azure Blob Storage
func (*AzureBlobBackend) DeleteBatch ¶
func (b *AzureBlobBackend) DeleteBatch(ctx context.Context, paths []string) error
DeleteBatch deletes multiple blobs from Azure Blob Storage Azure Blob Storage supports batch delete via the Batch API
func (*AzureBlobBackend) GetAccountKey ¶
func (b *AzureBlobBackend) GetAccountKey() string
GetAccountKey returns the account key (for subprocess credential passing)
func (*AzureBlobBackend) GetAccountName ¶
func (b *AzureBlobBackend) GetAccountName() string
GetAccountName returns the account name
func (*AzureBlobBackend) GetContainer ¶
func (b *AzureBlobBackend) GetContainer() string
GetContainer returns the container name
func (*AzureBlobBackend) ListDirectories ¶
ListDirectories lists immediate subdirectories at a prefix. Implements the DirectoryLister interface. Uses Azure's hierarchy delimiter feature to efficiently list only "directories" (common prefixes).
func (*AzureBlobBackend) ListObjects ¶
func (b *AzureBlobBackend) ListObjects(ctx context.Context, prefix string) ([]ObjectInfo, error)
ListObjects lists blobs with their metadata at a prefix. Implements the ObjectLister interface.
func (*AzureBlobBackend) ReadToAt ¶
func (b *AzureBlobBackend) ReadToAt(ctx context.Context, path string, writer io.Writer, offset int64) error
ReadToAt reads data from Azure Blob Storage starting at the given byte offset and writes to writer. Uses blob.HTTPRange to skip already-transferred bytes. offset=0 fetches the full blob without a Range header.
func (*AzureBlobBackend) StatFile ¶
StatFile returns the byte size of the Azure blob at path, or -1 if not found.
func (*AzureBlobBackend) Type ¶
func (b *AzureBlobBackend) Type() string
Type returns the storage type identifier
func (*AzureBlobBackend) WriteReader ¶
func (b *AzureBlobBackend) WriteReader(ctx context.Context, path string, reader io.Reader, size int64) error
WriteReader writes data from a reader to Azure Blob Storage
type AzureBlobConfig ¶
type AzureBlobConfig struct {
// Connection string authentication (simplest)
ConnectionString string
// Account-based authentication
AccountName string
AccountKey string
// SAS token authentication
SASToken string
// Managed Identity authentication (for Azure-hosted deployments)
UseManagedIdentity bool
// Container name (required)
ContainerName string
// Custom endpoint (for Azurite testing)
Endpoint string
}
AzureBlobConfig holds Azure Blob Storage backend configuration
type Backend ¶
type Backend interface {
// Write writes data to the specified path
Write(ctx context.Context, path string, data []byte) error
// WriteReader writes data from a reader to the specified path (for large files)
WriteReader(ctx context.Context, path string, reader io.Reader, size int64) error
// Read reads data from the specified path
Read(ctx context.Context, path string) ([]byte, error)
// ReadTo reads data from the specified path and writes it to the writer
ReadTo(ctx context.Context, path string, writer io.Writer) error
// ReadToAt reads data from path starting at the given byte offset and writes
// to writer. offset=0 starts at the beginning (equivalent to ReadTo).
// Returns an error if offset is negative or >= file size.
ReadToAt(ctx context.Context, path string, writer io.Writer, offset int64) error
// StatFile returns the byte size of the file at path.
// Returns -1 (and nil error) if the file does not exist.
// Returns a non-nil error only for unexpected backend failures.
StatFile(ctx context.Context, path string) (int64, error)
// List lists all objects with the given prefix
List(ctx context.Context, prefix string) ([]string, error)
// Delete deletes the object at the specified path
Delete(ctx context.Context, path string) error
// Exists checks if an object exists at the specified path
Exists(ctx context.Context, path string) (bool, error)
// Close closes any resources held by the backend
Close() error
// Type returns the storage type identifier ("local", "s3", etc.)
// Used for subprocess serialization
Type() string
// ConfigJSON returns the configuration as JSON for subprocess recreation
// Used for subprocess serialization
ConfigJSON() string
}
Backend defines the interface for storage backends (local, S3, MinIO)
type BatchDeleter ¶
BatchDeleter supports efficient batch deletion of multiple objects. Implementations should handle batching internally (e.g., S3 supports up to 1000 objects per batch).
type DirectoryLister ¶
type DirectoryLister interface {
ListDirectories(ctx context.Context, prefix string) ([]string, error)
}
DirectoryLister lists immediate subdirectories at a prefix. This is useful for SHOW DATABASES/TABLES commands.
type DirectoryRemover ¶
DirectoryRemover removes an empty directory. This is used to clean up database directories after all files are deleted. For object storage (S3, Azure), this is typically a no-op since directories don't exist as objects.
type LocalBackend ¶
type LocalBackend struct {
// contains filtered or unexported fields
}
LocalBackend implements the Backend interface for local filesystem storage
func NewLocalBackend ¶
func NewLocalBackend(basePath string, logger zerolog.Logger) (*LocalBackend, error)
NewLocalBackend creates a new local filesystem storage backend
func (*LocalBackend) AppendReader ¶
func (b *LocalBackend) AppendReader(ctx context.Context, path string, reader io.Reader, appendSize int64) error
AppendReader appends bytes from reader to the ".part" staging file at path. When all bytes have been appended (the transfer is complete), the caller must call WriteReader (or the coordinator renames the file externally).
This satisfies AppendingBackend, which the puller type-asserts before calling.
func (*LocalBackend) Close ¶
func (b *LocalBackend) Close() error
Close closes any resources held by the backend (no-op for local storage)
func (*LocalBackend) ConfigJSON ¶
func (b *LocalBackend) ConfigJSON() string
ConfigJSON returns the configuration as JSON for subprocess recreation
func (*LocalBackend) Delete ¶
func (b *LocalBackend) Delete(ctx context.Context, path string) error
Delete deletes the object at the specified path
func (*LocalBackend) DeleteBatch ¶
func (b *LocalBackend) DeleteBatch(ctx context.Context, paths []string) error
DeleteBatch deletes multiple objects at the specified paths. Implements the BatchDeleter interface.
func (*LocalBackend) GetBasePath ¶
func (b *LocalBackend) GetBasePath() string
GetBasePath returns the base path for the local storage
func (*LocalBackend) GetFullPath ¶
func (b *LocalBackend) GetFullPath(path string) string
GetFullPath returns the full filesystem path for a given storage path Useful for debugging and direct file access
func (*LocalBackend) ListDirectories ¶
ListDirectories lists immediate subdirectories at a prefix. Implements the DirectoryLister interface.
func (*LocalBackend) ListObjects ¶
func (b *LocalBackend) ListObjects(ctx context.Context, prefix string) ([]ObjectInfo, error)
ListObjects lists objects with their metadata at a prefix. Implements the ObjectLister interface.
func (*LocalBackend) ReadToAt ¶
func (b *LocalBackend) ReadToAt(ctx context.Context, path string, writer io.Writer, offset int64) error
ReadToAt reads data from path starting at the given byte offset and writes to writer. offset=0 starts at the beginning. Falls back to the ".part" staging file if the final file does not exist (allows the puller to hash a partial prefix before resuming a transfer).
func (*LocalBackend) RemoveDirectory ¶
func (b *LocalBackend) RemoveDirectory(ctx context.Context, path string) error
RemoveDirectory removes an empty directory. Implements the DirectoryRemover interface.
func (*LocalBackend) StatFile ¶
StatFile returns the byte size of the file at path, or -1 if neither the final file nor its ".part" staging file exist. Returns a non-nil error only for unexpected failures.
Checking the staging file allows the puller's pre-pull check to distinguish a fully-received file (size == entry.SizeBytes → skip) from a partial one (size < entry.SizeBytes → resume).
func (*LocalBackend) Type ¶
func (b *LocalBackend) Type() string
Type returns the storage type identifier
func (*LocalBackend) Write ¶
Write writes data to the specified path with atomic write (write to temp, then rename)
func (*LocalBackend) WriteReader ¶
func (b *LocalBackend) WriteReader(ctx context.Context, path string, reader io.Reader, size int64) error
WriteReader writes data from a reader to the specified path (for large files).
Writes proceed to a deterministic "<path>.part" staging file so that a partial transfer interrupted by a transport error leaves recoverable bytes on disk. On success the staging file is atomically renamed to the final path. On error the staging file is left in place so the puller can resume from it.
type ObjectInfo ¶
ObjectInfo provides metadata about a storage object.
type ObjectLister ¶
type ObjectLister interface {
ListObjects(ctx context.Context, prefix string) ([]ObjectInfo, error)
}
ObjectLister lists objects with their metadata. This is useful for retention policies that need to check file ages.
type ResilientBackend ¶
type ResilientBackend struct {
// contains filtered or unexported fields
}
ResilientBackend wraps a storage backend with circuit breaker and retry logic
func NewResilientBackend ¶
func NewResilientBackend(backend Backend, cfg *ResilientConfig, logger zerolog.Logger) *ResilientBackend
NewResilientBackend creates a new resilient storage backend
func (*ResilientBackend) CircuitBreakerStats ¶
func (r *ResilientBackend) CircuitBreakerStats() map[string]interface{}
CircuitBreakerStats returns circuit breaker statistics
func (*ResilientBackend) Close ¶
func (r *ResilientBackend) Close() error
Close closes the underlying storage backend
func (*ResilientBackend) Delete ¶
func (r *ResilientBackend) Delete(ctx context.Context, path string) error
Delete deletes a file from the storage backend
func (*ResilientBackend) IsCircuitOpen ¶
func (r *ResilientBackend) IsCircuitOpen() bool
IsCircuitOpen returns true if the circuit breaker is open
func (*ResilientBackend) ResetCircuitBreaker ¶
func (r *ResilientBackend) ResetCircuitBreaker()
ResetCircuitBreaker resets the circuit breaker
func (*ResilientBackend) WriteReader ¶
func (r *ResilientBackend) WriteReader(ctx context.Context, path string, reader io.Reader, size int64) error
WriteReader writes data from a reader to the storage backend
type ResilientConfig ¶
type ResilientConfig struct {
// Circuit breaker settings
MaxFailures int
Timeout time.Duration
HalfOpenMaxRequests int
// Retry settings
MaxRetries int
RetryDelay time.Duration
RetryMaxDelay time.Duration
}
ResilientConfig holds configuration for the resilient backend
func DefaultResilientConfig ¶
func DefaultResilientConfig() *ResilientConfig
DefaultResilientConfig returns default resilient backend configuration
type S3Backend ¶
type S3Backend struct {
// contains filtered or unexported fields
}
S3Backend implements the Backend interface for S3 and MinIO storage
func NewS3Backend ¶
NewS3Backend creates a new S3/MinIO backend
func (*S3Backend) ConfigJSON ¶
ConfigJSON returns the configuration as JSON for subprocess recreation
func (*S3Backend) DeleteBatch ¶
DeleteBatch deletes multiple objects from S3 efficiently
func (*S3Backend) GetAccessKey ¶
GetAccessKey returns the access key (for subprocess credential passing)
func (*S3Backend) GetQueryPath ¶
GetQueryPath generates S3 path patterns for time-based query pruning This enables DuckDB to efficiently scan only relevant partitions
Examples:
- GetQueryPath("mydb", "cpu", 2025, 11, 0, 0) → "s3://bucket/mydb/cpu/2025/11/*/*/*.parquet" (all November)
- GetQueryPath("mydb", "cpu", 2025, 11, 25, 0) → "s3://bucket/mydb/cpu/2025/11/25/*/*.parquet" (specific day)
- GetQueryPath("mydb", "cpu", 2025, 11, 25, 16) → "s3://bucket/mydb/cpu/2025/11/25/16/*.parquet" (specific hour)
func (*S3Backend) GetQueryPathRange ¶
func (b *S3Backend) GetQueryPathRange(database, measurement string, startTime, endTime time.Time) []string
GetQueryPathRange generates S3 path pattern for a time range This is useful for queries like "WHERE time BETWEEN start AND end"
func (*S3Backend) GetSecretKey ¶
GetSecretKey returns the secret key (for subprocess credential passing)
func (*S3Backend) ListDirectories ¶
ListDirectories lists immediate subdirectories at a prefix. Implements the DirectoryLister interface. Uses S3's delimiter feature to efficiently list only "directories" (common prefixes).
func (*S3Backend) ListObjects ¶
ListObjects lists objects with their metadata at a prefix. Implements the ObjectLister interface.
func (*S3Backend) ReadToAt ¶
func (b *S3Backend) ReadToAt(ctx context.Context, path string, writer io.Writer, offset int64) error
ReadToAt reads data from S3 starting at the given byte offset and writes to writer. Uses an HTTP Range header to skip already-transferred bytes. offset=0 fetches the full object (no Range header sent).
func (*S3Backend) StatFile ¶
StatFile returns the byte size of the S3 object at path, or -1 if not found.
type S3Config ¶
type S3Config struct {
Bucket string
Region string
Endpoint string // Custom endpoint for MinIO (e.g., "http://localhost:9000")
AccessKey string
SecretKey string
UseSSL bool
PathStyle bool // Use path-style addressing (required for MinIO)
Prefix string // Path prefix within the bucket (e.g., "instances/abc123/")
}
S3Config holds S3 backend configuration