storage

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 18, 2025 License: Apache-2.0 Imports: 20 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// DefaultMaxInlineSize is the threshold in bytes above which data is offloaded to storage.
	//
	// Aligns with operator's DefaultMaxInlineSize (bobrapet/internal/config/controller_config.go:246).
	// Override via BUBU_MAX_INLINE_SIZE. Keeping this low (1 KiB) prevents apiserver/etcd overload
	// while allowing small outputs to remain inline for fast access.
	DefaultMaxInlineSize = 1 * 1024 // 1 KiB

	// DefaultMaxRecursionDepth is the maximum nesting depth for hydration/dehydration.
	//
	// Prevents stack overflow on deeply nested data structures. Override via BUBU_MAX_RECURSION_DEPTH.
	DefaultMaxRecursionDepth = 10

	// DefaultStorageTimeout provides ample time for large file uploads (e.g., 100MB at 1MB/s = 100s + overhead).
	//
	// Operators should tune this based on expected output sizes and S3 latency:
	//   timeout >= (max_output_mb / upload_bandwidth_mbps) * 1.5 + baseline_latency_sec
	// Override via BUBU_STORAGE_TIMEOUT.
	DefaultStorageTimeout = 300 * time.Second // 5min for storage ops

)

Variables

This section is empty.

Functions

func WithStepRunID

func WithStepRunID(ctx context.Context, stepRunID string) context.Context

WithStepRunID attaches a StepRunID to the context for hydration metrics attribution.

Types

type FileStore

type FileStore struct {
	// contains filtered or unexported fields
}

FileStore implements the Store interface for a local filesystem.

func NewFileStore

func NewFileStore(basePath string) (*FileStore, error)

NewFileStore creates a new FileStore. It requires a base path where all files will be stored.

func (*FileStore) Read

func (fs *FileStore) Read(ctx context.Context, path string, writer io.Writer) error

Read retrieves data from a file and writes it to a writer.

func (*FileStore) Write

func (fs *FileStore) Write(ctx context.Context, path string, reader io.Reader) error

Write saves data from a reader to a file at the given path relative to the basePath.

type MockManager

type MockManager struct {
	mock.Mock
}

MockManager is a mock implementation of the StorageManager interface.

func (*MockManager) Dehydrate

func (m *MockManager) Dehydrate(ctx context.Context, data any, stepRunID string) (any, error)

Dehydrate mocks the Dehydrate method for testing.

func (*MockManager) Hydrate

func (m *MockManager) Hydrate(ctx context.Context, data any) (any, error)

Hydrate mocks the Hydrate method for testing.

type S3Store

type S3Store struct {
	// contains filtered or unexported fields
}

S3Store implements the Store interface for an S3-compatible object store. It supports standard AWS S3 and S3-compatible services like MinIO. Uses manager.Uploader for automatic multipart uploads on large files.

func NewS3Store

func NewS3Store(ctx context.Context) (*S3Store, error)

NewS3Store creates a new S3Store. It automatically configures the S3 client from environment variables.

func (*S3Store) Read

func (s *S3Store) Read(ctx context.Context, path string, writer io.Writer) error

Read downloads data from S3 and writes it to a writer.

func (*S3Store) Write

func (s *S3Store) Write(ctx context.Context, path string, reader io.Reader) error

Write uploads data to S3 from a reader. Uses manager.Uploader which automatically handles multipart uploads for large files. For files >5MB, it will automatically split into parts and upload in parallel.

type StorageManager

type StorageManager struct {
	// contains filtered or unexported fields
}

StorageManager handles the transparent offloading of large inputs and outputs to a configured storage backend.

func NewManager

func NewManager(ctx context.Context) (*StorageManager, error)

NewManager creates a new StorageManager, automatically configuring the storage backend based on environment variables.

func (*StorageManager) Dehydrate

func (sm *StorageManager) Dehydrate(ctx context.Context, data any, stepRunID string) (any, error)

Dehydrate recursively checks the size of a data structure. If it exceeds the inline size limit, it saves the data to the storage backend and replaces it with a storage reference.

func (*StorageManager) DehydrateInputs

func (sm *StorageManager) DehydrateInputs(ctx context.Context, data any, storyRunID string) (any, error)

DehydrateInputs dehydrates StoryRun inputs.

func (*StorageManager) GetStore

func (sm *StorageManager) GetStore() Store

GetStore returns the underlying Store implementation used by this manager. This is primarily useful for testing and debugging.

func (*StorageManager) Hydrate

func (sm *StorageManager) Hydrate(ctx context.Context, data any) (any, error)

Hydrate recursively scans a data structure for storage references and replaces them with the actual content from the storage backend.

type Store

type Store interface {
	// Write saves the data from the reader to the storage backend at the specified path.
	Write(ctx context.Context, path string, reader io.Reader) error

	// Read retrieves the data from the storage backend at the specified path and writes it to the writer.
	Read(ctx context.Context, path string, writer io.Writer) error
}

Store is the interface for a generic storage backend. It provides a streaming Read/Write interface.

type StoredObject

type StoredObject struct {
	// ContentType indicates whether the stored data is "json" or "raw" text.
	ContentType string `json:"contentType"`
	// Data holds the actual content. For JSON, it's a RawMessage; for raw, it's a string.
	Data json.RawMessage `json:"data"`
}

StoredObject is a wrapper for data offloaded to storage, providing metadata about the content type to ensure correct hydration.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL