storage

package
v0.1.5 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2026 License: Apache-2.0 Imports: 37 Imported by: 0

Documentation

Overview

Package storage provides object storage backends for BubuStack workloads.

This package implements a unified storage interface for S3 and file-based storage, used by engram workloads to store and retrieve large data artifacts that exceed the 1MB Kubernetes annotation limit.

Storage Manager

The [Manager] provides a unified interface for storage operations:

manager, err := storage.NewManager(config)
defer manager.Close()

err := manager.Put(ctx, key, data)
data, err := manager.Get(ctx, key)
err := manager.Delete(ctx, key)

Storage Backends

S3 Storage:

store, err := storage.NewS3Store(storage.S3Config{
    Bucket:   "my-bucket",
    Region:   "us-west-2",
    Endpoint: "s3.amazonaws.com",
})

File Storage:

store, err := storage.NewFileStore(storage.FileConfig{
    Path: "/data/storage",
})

Caching

The storage manager includes an optional in-memory cache:

manager, err := storage.NewManager(storage.ManagerConfig{
    Store:       store,
    CacheSize:   100 * 1024 * 1024, // 100MB cache
    CacheTTL:    5 * time.Minute,
})

Metrics

Storage operations emit Prometheus metrics:

  • storage_operations_total: Total operations by type and status
  • storage_operation_duration_seconds: Operation latency histogram
  • storage_bytes_transferred: Bytes read/written

Environment Variables

The [ApplyEnv] function configures storage via environment variables in pods. This is used by controllers when building workload pod specs.

Index

Constants

View Source
const (
	// DefaultMaxInlineSize is the threshold in bytes above which data is offloaded to storage.
	//
	// Aligns with operator's DefaultMaxInlineSize (bobrapet/internal/config/controller_config.go:246).
	// Override via BUBU_MAX_INLINE_SIZE. Keeping this low (1 KiB) prevents apiserver/etcd overload
	// while allowing small outputs to remain inline for fast access.
	DefaultMaxInlineSize = 1 * 1024 // 1 KiB

	// DefaultMaxRecursionDepth is the maximum nesting depth for hydration/dehydration.
	//
	// Prevents stack overflow on deeply nested data structures. Override via BUBU_MAX_RECURSION_DEPTH.
	DefaultMaxRecursionDepth = 10

	// DefaultStorageTimeout provides ample time for large file uploads (e.g., 100MB at 1MB/s = 100s + overhead).
	//
	// Operators should tune this based on expected output sizes and S3 latency:
	//   timeout >= (max_output_mb / upload_bandwidth_mbps) * 1.5 + baseline_latency_sec
	// Override via BUBU_STORAGE_TIMEOUT.
	DefaultStorageTimeout = 300 * time.Second // 5min for storage ops

)
View Source
const (
	StorageRefKey           = storageRefKey
	StoragePathKey          = storagePathKey
	StorageContentTypeKey   = storageTypeKey
	StorageSchemaKey        = storageSchemaKey
	StorageSchemaVersionKey = storageSchemaVer
)
View Source
const (
	// DefaultRetentionGCInterval throttles storage retention scans.
	DefaultRetentionGCInterval = 2 * time.Minute
	// DefaultRetentionMaxScan caps objects scanned per GC pass.
	DefaultRetentionMaxScan = 2000
	// DefaultRetentionMaxDelete caps deletions per GC pass.
	DefaultRetentionMaxDelete = 200
)

Variables

View Source
var ErrUnsupportedOperation = errors.New("storage: operation not supported")

ErrUnsupportedOperation indicates the storage backend does not support the requested operation.

Functions

func IsNotFound

func IsNotFound(err error) bool

IsNotFound reports whether a storage error indicates a missing object.

func NamespacedKey

func NamespacedKey(namespace, id string) string

NamespacedKey composes a storage-safe key by prefixing the identifier with the namespace.

func ResetSharedManagerCacheForTests

func ResetSharedManagerCacheForTests()

ResetSharedManagerCacheForTests clears the shared manager cache. Exposed for tests in dependent modules that need to ensure isolated configuration between cases.

func WithMaxRecursionDepth

func WithMaxRecursionDepth(ctx context.Context, depth int) context.Context

WithMaxRecursionDepth overrides the recursion depth budget for storage hydration and dehydration operations performed with the provided context.

func WithStepRunID

func WithStepRunID(ctx context.Context, stepRunID string) context.Context

WithStepRunID attaches a StepRunID to the context for hydration metrics attribution.

func WithStorageSchema

func WithStorageSchema(ctx context.Context, schema, schemaVersion string) context.Context

WithStorageSchema attaches optional schema metadata to the context for storage refs. Empty values are ignored by the offload path.

Types

type FileStore

type FileStore struct {
	// contains filtered or unexported fields
}

FileStore implements the Store interface for a local filesystem. It is intended for local development and testing. For production, use S3Store.

func NewFileStore

func NewFileStore(basePath string) (*FileStore, error)

NewFileStore creates a new FileStore rooted at the given basePath.

func (*FileStore) Delete

func (s *FileStore) Delete(ctx context.Context, path string) error

Delete removes the file at the provided path relative to the basePath.

func (*FileStore) List

func (s *FileStore) List(ctx context.Context, prefix string) ([]string, error)

List enumerates files under the provided prefix relative to the basePath.

func (*FileStore) Read

func (s *FileStore) Read(ctx context.Context, path string, writer io.Writer) error

Read reads data from a file at the given path and writes it to a writer.

func (*FileStore) Write

func (s *FileStore) Write(ctx context.Context, path string, reader io.Reader) error

Write writes data from a reader to a file at the given path, relative to the basePath.

type MockManager

type MockManager struct {
	mock.Mock
}

MockManager is a mock implementation of the StorageManager interface.

func (*MockManager) Dehydrate

func (m *MockManager) Dehydrate(ctx context.Context, data any, stepRunID string) (any, error)

Dehydrate mocks the Dehydrate method for testing.

func (*MockManager) Hydrate

func (m *MockManager) Hydrate(ctx context.Context, data any) (any, error)

Hydrate mocks the Hydrate method for testing.

type RetentionPolicy

type RetentionPolicy struct {
	GCInterval time.Duration
	MaxScan    int
	MaxDelete  int
}

RetentionPolicy configures storage retention enforcement behavior.

func RetentionPolicyFromEnv

func RetentionPolicyFromEnv() RetentionPolicy

RetentionPolicyFromEnv resolves retention policy settings from environment variables.

type S3Store

type S3Store struct {
	// contains filtered or unexported fields
}

S3Store implements the Store interface for an S3-compatible object store. It supports standard AWS S3 and S3-compatible services like MinIO. Uses manager.Uploader for automatic multipart uploads on large files.

func NewS3Store

func NewS3Store(ctx context.Context) (*S3Store, error)

NewS3Store creates a new S3Store. It automatically configures the S3 client from environment variables.

func (*S3Store) Delete

func (s *S3Store) Delete(ctx context.Context, path string) error

Delete removes the object at the provided path.

func (*S3Store) List

func (s *S3Store) List(ctx context.Context, prefix string) ([]string, error)

List enumerates object keys under the provided prefix.

func (*S3Store) Read

func (s *S3Store) Read(ctx context.Context, path string, writer io.Writer) error

Read downloads data from S3 and writes it to a writer.

func (*S3Store) Write

func (s *S3Store) Write(ctx context.Context, path string, reader io.Reader) error

Write uploads data to S3 from a reader. Uses manager.Uploader which automatically handles multipart uploads for large files. For files >5MB, it will automatically split into parts and upload in parallel.

type StorageManager

type StorageManager struct {
	// contains filtered or unexported fields
}

StorageManager handles the transparent offloading of large inputs and outputs to a configured storage backend.

func NewManager

func NewManager(ctx context.Context) (*StorageManager, error)

NewManager creates a new StorageManager, automatically configuring the storage backend based on environment variables.

func SharedManager

func SharedManager(ctx context.Context) (*StorageManager, error)

SharedManager returns a process-wide cached StorageManager keyed by the current storage configuration. Subsequent callers that resolve to the same configuration receive the cached instance, avoiding repeated backend initialization.

func (*StorageManager) Dehydrate

func (sm *StorageManager) Dehydrate(ctx context.Context, data any, stepRunID string) (result any, err error)

Dehydrate recursively checks the size of a data structure. If it exceeds the inline size limit, it saves the data to the storage backend and replaces it with a storage reference.

func (*StorageManager) DehydrateInputs

func (sm *StorageManager) DehydrateInputs(ctx context.Context, data any, storyRunID string) (result any, err error)

DehydrateInputs dehydrates StoryRun inputs.

func (*StorageManager) DehydrateInputsWithMaxInlineSize

func (sm *StorageManager) DehydrateInputsWithMaxInlineSize(
	ctx context.Context,
	data any,
	storyRunID string,
	maxInlineSize int,
) (any, error)

DehydrateInputsWithMaxInlineSize dehydrates StoryRun inputs using an explicit inline-size threshold instead of the manager default.

func (*StorageManager) Delete

func (sm *StorageManager) Delete(ctx context.Context, path string) error

Delete removes the object at the provided path when supported.

func (*StorageManager) GetStore

func (sm *StorageManager) GetStore() Store

GetStore returns the underlying Store implementation used by this manager. This is primarily useful for testing and debugging.

func (*StorageManager) Hydrate

func (sm *StorageManager) Hydrate(ctx context.Context, data any) (result any, err error)

Hydrate recursively scans a data structure for storage references and replaces them with the actual content from the storage backend.

func (*StorageManager) HydrateStorageRefsOnly

func (sm *StorageManager) HydrateStorageRefsOnly(ctx context.Context, data any) (result any, err error)

HydrateStorageRefsOnly recursively resolves storage references while leaving Kubernetes Secret/ConfigMap refs untouched.

func (*StorageManager) List

func (sm *StorageManager) List(ctx context.Context, prefix string) ([]string, error)

List enumerates storage object paths under the provided prefix when supported.

func (*StorageManager) ReadBlob

func (sm *StorageManager) ReadBlob(ctx context.Context, path string) ([]byte, error)

ReadBlob retrieves raw bytes from storage for the provided path.

func (*StorageManager) WriteBlob

func (sm *StorageManager) WriteBlob(ctx context.Context, path string, contentType string, data []byte) error

WriteBlob stores arbitrary binary data at the provided path with integrity metadata.

type Store

type Store interface {
	// Write saves the data from the reader to the storage backend at the specified path.
	Write(ctx context.Context, path string, reader io.Reader) error

	// Read retrieves the data from the storage backend at the specified path and writes it to the writer.
	Read(ctx context.Context, path string, writer io.Writer) error
}

Store is the interface for a generic storage backend. It provides a streaming Read/Write interface.

type StoreDeleter

type StoreDeleter interface {
	// Delete removes the object at the provided path.
	Delete(ctx context.Context, path string) error
}

StoreDeleter provides optional deletion support for storage backends.

type StoreLister

type StoreLister interface {
	// List returns object paths under the provided prefix.
	List(ctx context.Context, prefix string) ([]string, error)
}

StoreLister provides optional listing support for storage backends.

type StoredObject

type StoredObject struct {
	// ContentType indicates whether the stored data is "json" or "raw" text.
	ContentType string `json:"contentType"`
	// Data holds the actual content. For JSON, it's a RawMessage; for raw, it's a string.
	Data json.RawMessage `json:"data"`
}

StoredObject is a wrapper for data offloaded to storage, providing metadata about the content type to ensure correct hydration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL