objectstore

package
v1.3.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 13, 2026 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Package objectstore provides an S3-compatible object storage abstraction. It can be used by any part of the system that needs to store or retrieve objects from S3, GCS (via S3 interop), MinIO, R2, or other S3-compatible stores.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Config

type Config struct {
	Type   StoreType      `json:"type"` // "s3" or "gcs"
	Bucket schemas.EnvVar `json:"bucket"`

	// Common fields (apply to all store types)
	Prefix   string `json:"prefix,omitempty"`   // Key prefix for all stored objects. Default: "bifrost".
	Compress bool   `json:"compress,omitempty"` // Enables gzip compression for stored objects. Default: false.

	// S3 fields (used when Type == "s3")
	Region          *schemas.EnvVar `json:"region,omitempty"`
	Endpoint        *schemas.EnvVar `json:"endpoint,omitempty"`
	AccessKeyID     *schemas.EnvVar `json:"access_key_id,omitempty"`
	SecretAccessKey *schemas.EnvVar `json:"secret_access_key,omitempty"`
	SessionToken    *schemas.EnvVar `json:"session_token,omitempty"`
	RoleARN         *schemas.EnvVar `json:"role_arn,omitempty"`
	ForcePathStyle  bool            `json:"force_path_style,omitempty"`

	// GCS fields (used when Type == "gcs")
	Credentials     *schemas.EnvVar `json:"credentials,omitempty"`      // Deprecated: use credentials_json
	CredentialsJSON *schemas.EnvVar `json:"credentials_json,omitempty"` // Service account JSON or path
	ProjectID       *schemas.EnvVar `json:"project_id,omitempty"`       // GCP project ID override
}

Config holds the configuration for an object store.

func (*Config) GetPrefix

func (c *Config) GetPrefix() string

GetPrefix returns the configured prefix or "bifrost" as default.

type GCSObjectStore

type GCSObjectStore struct {
	// contains filtered or unexported fields
}

GCSObjectStore implements ObjectStore using Google Cloud Storage.

func NewGCSObjectStore

func NewGCSObjectStore(ctx context.Context, cfg *Config, logger schemas.Logger) (*GCSObjectStore, error)

NewGCSObjectStore creates a new GCS object store from the given config.

func (*GCSObjectStore) Close

func (g *GCSObjectStore) Close() error

Close releases the GCS client resources.

func (*GCSObjectStore) Delete

func (g *GCSObjectStore) Delete(ctx context.Context, key string) error

Delete removes a single object by key.

func (*GCSObjectStore) DeleteBatch

func (g *GCSObjectStore) DeleteBatch(ctx context.Context, keys []string) error

DeleteBatch removes multiple objects.

func (*GCSObjectStore) Get

func (g *GCSObjectStore) Get(ctx context.Context, key string) ([]byte, error)

Get retrieves and decompresses an object by key.

func (*GCSObjectStore) Ping

func (g *GCSObjectStore) Ping(ctx context.Context) error

Ping checks connectivity by writing and deleting a small object, proving that the credentials have upload access (not just read). This is important because HybridLogStore strips DB payloads before async upload — a read-only principal would pass a read-based ping but silently fail all Put calls.

func (*GCSObjectStore) Put

func (g *GCSObjectStore) Put(ctx context.Context, key string, data []byte, tags map[string]string) error

Put uploads data with optional custom metadata. When compression is enabled, data is gzip-compressed before upload.

type InMemoryObjectStore

type InMemoryObjectStore struct {

	// PutErr, if set, is returned by Put for simulating failures.
	PutErr error
	// GetErr, if set, is returned by Get for simulating failures.
	GetErr error
	// contains filtered or unexported fields
}

InMemoryObjectStore is an in-memory ObjectStore implementation for testing.

func NewInMemoryObjectStore

func NewInMemoryObjectStore() *InMemoryObjectStore

NewInMemoryObjectStore creates a new in-memory object store.

func (*InMemoryObjectStore) Close

func (m *InMemoryObjectStore) Close() error

func (*InMemoryObjectStore) Delete

func (m *InMemoryObjectStore) Delete(_ context.Context, key string) error

func (*InMemoryObjectStore) DeleteBatch

func (m *InMemoryObjectStore) DeleteBatch(_ context.Context, keys []string) error

func (*InMemoryObjectStore) Get

func (m *InMemoryObjectStore) Get(_ context.Context, key string) ([]byte, error)

func (*InMemoryObjectStore) GetTags

func (m *InMemoryObjectStore) GetTags(key string) map[string]string

GetTags returns the tags stored for a given key. For testing assertions.

func (*InMemoryObjectStore) Keys

func (m *InMemoryObjectStore) Keys() []string

Keys returns all stored keys. For testing assertions.

func (*InMemoryObjectStore) Len

func (m *InMemoryObjectStore) Len() int

Len returns the number of stored objects. For testing assertions.

func (*InMemoryObjectStore) Ping

func (*InMemoryObjectStore) Put

func (m *InMemoryObjectStore) Put(_ context.Context, key string, data []byte, tags map[string]string) error

type ObjectStore

type ObjectStore interface {
	// Put uploads data to the given key with optional tags.
	// The implementation handles compression (e.g., gzip) internally.
	Put(ctx context.Context, key string, data []byte, tags map[string]string) error

	// Get retrieves and decompresses data for the given key.
	Get(ctx context.Context, key string) ([]byte, error)

	// Delete removes an object by key.
	Delete(ctx context.Context, key string) error

	// DeleteBatch removes multiple objects by key.
	DeleteBatch(ctx context.Context, keys []string) error

	// Ping checks connectivity to the storage backend.
	Ping(ctx context.Context) error

	// Close releases resources held by the store.
	Close() error
}

ObjectStore abstracts S3-compatible blob storage operations.

func NewObjectStore

func NewObjectStore(ctx context.Context, cfg *Config, logger schemas.Logger) (ObjectStore, error)

NewObjectStore creates the appropriate ObjectStore implementation based on config type.

type S3ObjectStore

type S3ObjectStore struct {
	// contains filtered or unexported fields
}

S3ObjectStore implements ObjectStore using an S3-compatible backend.

func NewS3ObjectStore

func NewS3ObjectStore(ctx context.Context, cfg *Config, logger schemas.Logger) (*S3ObjectStore, error)

NewS3ObjectStore creates a new S3-compatible object store from the given config.

func (*S3ObjectStore) Close

func (s *S3ObjectStore) Close() error

Close is a no-op for S3 (no persistent connections to release).

func (*S3ObjectStore) Delete

func (s *S3ObjectStore) Delete(ctx context.Context, key string) error

Delete removes a single object by key.

func (*S3ObjectStore) DeleteBatch

func (s *S3ObjectStore) DeleteBatch(ctx context.Context, keys []string) error

DeleteBatch removes multiple objects. It uses the S3 DeleteObjects API which supports up to 1000 keys per call.

func (*S3ObjectStore) Get

func (s *S3ObjectStore) Get(ctx context.Context, key string) ([]byte, error)

Get retrieves and decompresses an object by key.

func (*S3ObjectStore) Ping

func (s *S3ObjectStore) Ping(ctx context.Context) error

Ping checks connectivity by performing a HeadBucket call. Note: HeadBucket requires the s3:ListBucket IAM permission on the bucket resource.

func (*S3ObjectStore) Put

func (s *S3ObjectStore) Put(ctx context.Context, key string, data []byte, tags map[string]string) error

Put uploads data with optional S3 object tags. When compression is enabled, data is gzip-compressed before upload.

type StoreType

type StoreType string

StoreType identifies the object storage backend.

const (
	StoreTypeS3  StoreType = "s3"
	StoreTypeGCS StoreType = "gcs"
)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL