Documentation
¶
Overview ¶
Package storage provides object storage backends for BubuStack workloads.
This package implements a unified storage interface for S3 and file-based storage, used by engram workloads to store and retrieve large data artifacts that exceed the 1MB Kubernetes annotation limit.
Storage Manager ¶
The [Manager] provides a unified interface for storage operations:
manager, err := storage.NewManager(config) defer manager.Close() err := manager.Put(ctx, key, data) data, err := manager.Get(ctx, key) err := manager.Delete(ctx, key)
Storage Backends ¶
S3 Storage:
store, err := storage.NewS3Store(storage.S3Config{
Bucket: "my-bucket",
Region: "us-west-2",
Endpoint: "s3.amazonaws.com",
})
File Storage:
store, err := storage.NewFileStore(storage.FileConfig{
Path: "/data/storage",
})
Caching ¶
The storage manager includes an optional in-memory cache:
manager, err := storage.NewManager(storage.ManagerConfig{
Store: store,
CacheSize: 100 * 1024 * 1024, // 100MB cache
CacheTTL: 5 * time.Minute,
})
Metrics ¶
Storage operations emit Prometheus metrics:
- storage_operations_total: Total operations by type and status
- storage_operation_duration_seconds: Operation latency histogram
- storage_bytes_transferred: Bytes read/written
Environment Variables ¶
The [ApplyEnv] function configures storage via environment variables in pods. This is used by controllers when building workload pod specs.
Index ¶
- Constants
- Variables
- func IsNotFound(err error) bool
- func NamespacedKey(namespace, id string) string
- func ResetSharedManagerCacheForTests()
- func WithMaxRecursionDepth(ctx context.Context, depth int) context.Context
- func WithStepRunID(ctx context.Context, stepRunID string) context.Context
- func WithStorageSchema(ctx context.Context, schema, schemaVersion string) context.Context
- type FileStore
- func (s *FileStore) Delete(ctx context.Context, path string) error
- func (s *FileStore) List(ctx context.Context, prefix string) ([]string, error)
- func (s *FileStore) Read(ctx context.Context, path string, writer io.Writer) error
- func (s *FileStore) Write(ctx context.Context, path string, reader io.Reader) error
- type MockManager
- type RetentionPolicy
- type S3Store
- func (s *S3Store) Delete(ctx context.Context, path string) error
- func (s *S3Store) List(ctx context.Context, prefix string) ([]string, error)
- func (s *S3Store) Read(ctx context.Context, path string, writer io.Writer) error
- func (s *S3Store) Write(ctx context.Context, path string, reader io.Reader) error
- type StorageManager
- func (sm *StorageManager) Dehydrate(ctx context.Context, data any, stepRunID string) (result any, err error)
- func (sm *StorageManager) DehydrateInputs(ctx context.Context, data any, storyRunID string) (result any, err error)
- func (sm *StorageManager) DehydrateInputsWithMaxInlineSize(ctx context.Context, data any, storyRunID string, maxInlineSize int) (any, error)
- func (sm *StorageManager) Delete(ctx context.Context, path string) error
- func (sm *StorageManager) GetStore() Store
- func (sm *StorageManager) Hydrate(ctx context.Context, data any) (result any, err error)
- func (sm *StorageManager) HydrateStorageRefsOnly(ctx context.Context, data any) (result any, err error)
- func (sm *StorageManager) List(ctx context.Context, prefix string) ([]string, error)
- func (sm *StorageManager) ReadBlob(ctx context.Context, path string) ([]byte, error)
- func (sm *StorageManager) WriteBlob(ctx context.Context, path string, contentType string, data []byte) error
- type Store
- type StoreDeleter
- type StoreLister
- type StoredObject
Constants ¶
const ( // DefaultMaxInlineSize is the threshold in bytes above which data is offloaded to storage. // // Aligns with operator's DefaultMaxInlineSize (bobrapet/internal/config/controller_config.go:246). // Override via BUBU_MAX_INLINE_SIZE. Keeping this low (1 KiB) prevents apiserver/etcd overload // while allowing small outputs to remain inline for fast access. DefaultMaxInlineSize = 1 * 1024 // 1 KiB // DefaultMaxRecursionDepth is the maximum nesting depth for hydration/dehydration. // // Prevents stack overflow on deeply nested data structures. Override via BUBU_MAX_RECURSION_DEPTH. DefaultMaxRecursionDepth = 10 // DefaultStorageTimeout provides ample time for large file uploads (e.g., 100MB at 1MB/s = 100s + overhead). // // Operators should tune this based on expected output sizes and S3 latency: // timeout >= (max_output_mb / upload_bandwidth_mbps) * 1.5 + baseline_latency_sec // Override via BUBU_STORAGE_TIMEOUT. DefaultStorageTimeout = 300 * time.Second // 5min for storage ops )
const ( StorageRefKey = storageRefKey StoragePathKey = storagePathKey StorageContentTypeKey = storageTypeKey StorageSchemaKey = storageSchemaKey StorageSchemaVersionKey = storageSchemaVer )
const ( // DefaultRetentionGCInterval throttles storage retention scans. DefaultRetentionGCInterval = 2 * time.Minute // DefaultRetentionMaxScan caps objects scanned per GC pass. DefaultRetentionMaxScan = 2000 // DefaultRetentionMaxDelete caps deletions per GC pass. DefaultRetentionMaxDelete = 200 )
Variables ¶
var ErrUnsupportedOperation = errors.New("storage: operation not supported")
ErrUnsupportedOperation indicates the storage backend does not support the requested operation.
Functions ¶
func IsNotFound ¶
IsNotFound reports whether a storage error indicates a missing object.
func NamespacedKey ¶
NamespacedKey composes a storage-safe key by prefixing the identifier with the namespace.
func ResetSharedManagerCacheForTests ¶
func ResetSharedManagerCacheForTests()
ResetSharedManagerCacheForTests clears the shared manager cache. Exposed for tests in dependent modules that need to ensure isolated configuration between cases.
func WithMaxRecursionDepth ¶
WithMaxRecursionDepth overrides the recursion depth budget for storage hydration and dehydration operations performed with the provided context.
func WithStepRunID ¶
WithStepRunID attaches a StepRunID to the context for hydration metrics attribution.
Types ¶
type FileStore ¶
type FileStore struct {
// contains filtered or unexported fields
}
FileStore implements the Store interface for a local filesystem. It is intended for local development and testing. For production, use S3Store.
func NewFileStore ¶
NewFileStore creates a new FileStore rooted at the given basePath.
type MockManager ¶
MockManager is a mock implementation of the StorageManager interface.
type RetentionPolicy ¶
RetentionPolicy configures storage retention enforcement behavior.
func RetentionPolicyFromEnv ¶
func RetentionPolicyFromEnv() RetentionPolicy
RetentionPolicyFromEnv resolves retention policy settings from environment variables.
type S3Store ¶
type S3Store struct {
// contains filtered or unexported fields
}
S3Store implements the Store interface for an S3-compatible object store. It supports standard AWS S3 and S3-compatible services like MinIO. Uses manager.Uploader for automatic multipart uploads on large files.
func NewS3Store ¶
NewS3Store creates a new S3Store. It automatically configures the S3 client from environment variables.
type StorageManager ¶
type StorageManager struct {
// contains filtered or unexported fields
}
StorageManager handles the transparent offloading of large inputs and outputs to a configured storage backend.
func NewManager ¶
func NewManager(ctx context.Context) (*StorageManager, error)
NewManager creates a new StorageManager, automatically configuring the storage backend based on environment variables.
func SharedManager ¶
func SharedManager(ctx context.Context) (*StorageManager, error)
SharedManager returns a process-wide cached StorageManager keyed by the current storage configuration. Subsequent callers that resolve to the same configuration receive the cached instance, avoiding repeated backend initialization.
func (*StorageManager) Dehydrate ¶
func (sm *StorageManager) Dehydrate(ctx context.Context, data any, stepRunID string) (result any, err error)
Dehydrate recursively checks the size of a data structure. If it exceeds the inline size limit, it saves the data to the storage backend and replaces it with a storage reference.
func (*StorageManager) DehydrateInputs ¶
func (sm *StorageManager) DehydrateInputs(ctx context.Context, data any, storyRunID string) (result any, err error)
DehydrateInputs dehydrates StoryRun inputs.
func (*StorageManager) DehydrateInputsWithMaxInlineSize ¶
func (sm *StorageManager) DehydrateInputsWithMaxInlineSize( ctx context.Context, data any, storyRunID string, maxInlineSize int, ) (any, error)
DehydrateInputsWithMaxInlineSize dehydrates StoryRun inputs using an explicit inline-size threshold instead of the manager default.
func (*StorageManager) Delete ¶
func (sm *StorageManager) Delete(ctx context.Context, path string) error
Delete removes the object at the provided path when supported.
func (*StorageManager) GetStore ¶
func (sm *StorageManager) GetStore() Store
GetStore returns the underlying Store implementation used by this manager. This is primarily useful for testing and debugging.
func (*StorageManager) Hydrate ¶
Hydrate recursively scans a data structure for storage references and replaces them with the actual content from the storage backend.
func (*StorageManager) HydrateStorageRefsOnly ¶
func (sm *StorageManager) HydrateStorageRefsOnly(ctx context.Context, data any) (result any, err error)
HydrateStorageRefsOnly recursively resolves storage references while leaving Kubernetes Secret/ConfigMap refs untouched.
func (*StorageManager) List ¶
List enumerates storage object paths under the provided prefix when supported.
type Store ¶
type Store interface {
// Write saves the data from the reader to the storage backend at the specified path.
Write(ctx context.Context, path string, reader io.Reader) error
// Read retrieves the data from the storage backend at the specified path and writes it to the writer.
Read(ctx context.Context, path string, writer io.Writer) error
}
Store is the interface for a generic storage backend. It provides a streaming Read/Write interface.
type StoreDeleter ¶
type StoreDeleter interface {
// Delete removes the object at the provided path.
Delete(ctx context.Context, path string) error
}
StoreDeleter provides optional deletion support for storage backends.
type StoreLister ¶
type StoreLister interface {
// List returns object paths under the provided prefix.
List(ctx context.Context, prefix string) ([]string, error)
}
StoreLister provides optional listing support for storage backends.
type StoredObject ¶
type StoredObject struct {
// ContentType indicates whether the stored data is "json" or "raw" text.
ContentType string `json:"contentType"`
// Data holds the actual content. For JSON, it's a RawMessage; for raw, it's a string.
Data json.RawMessage `json:"data"`
}
StoredObject is a wrapper for data offloaded to storage, providing metadata about the content type to ensure correct hydration.