OmniStorage

OmniStorage is a unified storage abstraction layer for Go, inspired by rclone. It provides a single interface for reading and writing to various storage backends with composable layers for compression and record framing.
Full Documentation | API Reference
Features
- 🔌 Single interface for multiple storage backends (local files, S3, cloud drives, etc.)
- 🧩 Composable layers for compression (gzip, zstd) and formatting (NDJSON)
- 🔄 Sync engine for file synchronization between backends (like
rclone sync)
- 🔍 Extended interface for metadata, server-side copy/move, and capability discovery
- 📦 Backend registration allowing external packages to implement backends
Installation
go get github.com/plexusone/omnistorage-core
Quick Start
Basic Read/Write
package main
import (
"context"
"io"
"log"
"github.com/plexusone/omnistorage-core/object/backend/file"
)
func main() {
ctx := context.Background()
// Create a file backend
backend := file.New(file.Config{Root: "/data"})
defer backend.Close()
// Write a file
w, err := backend.NewWriter(ctx, "hello.txt")
if err != nil {
log.Fatal(err)
}
w.Write([]byte("Hello, World!"))
w.Close()
// Read it back
r, err := backend.NewReader(ctx, "hello.txt")
if err != nil {
log.Fatal(err)
}
data, _ := io.ReadAll(r)
r.Close()
log.Println(string(data)) // "Hello, World!"
}
With Compression
import (
"github.com/plexusone/omnistorage-core/object/backend/file"
"github.com/plexusone/omnistorage-core/object/compress/gzip"
)
// Write compressed data
fileWriter, _ := backend.NewWriter(ctx, "data.txt.gz")
gzipWriter, _ := gzip.NewWriter(fileWriter)
gzipWriter.Write([]byte("compressed content"))
gzipWriter.Close()
// Read compressed data
fileReader, _ := backend.NewReader(ctx, "data.txt.gz")
gzipReader, _ := gzip.NewReader(fileReader)
data, _ := io.ReadAll(gzipReader)
gzipReader.Close()
import (
"github.com/plexusone/omnistorage-core/object/backend/file"
"github.com/plexusone/omnistorage-core/object/format/ndjson"
)
// Write NDJSON records
w, _ := backend.NewWriter(ctx, "records.ndjson")
ndjsonWriter := ndjson.NewWriter(w)
ndjsonWriter.Write([]byte(`{"id":1,"name":"alice"}`))
ndjsonWriter.Write([]byte(`{"id":2,"name":"bob"}`))
ndjsonWriter.Close()
// Read NDJSON records
r, _ := backend.NewReader(ctx, "records.ndjson")
ndjsonReader := ndjson.NewReader(r)
for {
record, err := ndjsonReader.Read()
if err == io.EOF {
break
}
log.Println(string(record))
}
ndjsonReader.Close()
Using the Registry
import "github.com/plexusone/omnistorage-core/object"
// Open backend by name
backend, _ := object.Open("file", map[string]string{
"root": "/data",
})
defer backend.Close()
// List registered backends
backends := object.Backends() // ["file", "memory", "channel", "sftp"]
Backends
File Backend
Local filesystem storage.
import "github.com/plexusone/omnistorage-core/object/backend/file"
backend := file.New(file.Config{
Root: "/data", // Base directory for all operations
})
Memory Backend
In-memory storage for testing.
import "github.com/plexusone/omnistorage-core/object/backend/memory"
backend := memory.New()
Cloud Backends (Separate Packages)
Cloud backends with vendor SDKs are in separate packages to keep the core lightweight:
go get github.com/plexusone/omni-aws/omnistorage
import "github.com/plexusone/omni-aws/omnistorage/s3"
backend, _ := s3.New(s3.Config{
Bucket: "my-bucket",
Region: "us-east-1",
})
Sync Operations
The sync package provides rclone-like file synchronization.
Sync (Mirror)
Make destination match source, including deletes.
import "github.com/plexusone/omnistorage-core/object/sync"
result, err := sync.Sync(ctx, srcBackend, dstBackend, "data/", "backup/", sync.Options{
DeleteExtra: true, // Delete files in dst not in src
DryRun: false,
})
fmt.Printf("Copied: %d, Updated: %d, Deleted: %d\n",
result.Copied, result.Updated, result.Deleted)
Copy
Copy files without deleting extras.
// Copy a directory
result, _ := sync.Copy(ctx, src, dst, "data/", "backup/", sync.Options{})
// Copy a single file
err := sync.CopyFile(ctx, src, dst, "file.txt", "file_copy.txt")
// Copy with progress
result, _ := sync.CopyWithProgress(ctx, src, dst, "data/", "backup/",
func(file string, bytes int64) {
fmt.Printf("Copying %s: %d bytes\n", file, bytes)
})
Bisync (Bidirectional Sync)
Two-way synchronization with conflict resolution.
import "github.com/plexusone/omnistorage-core/object/sync"
result, err := sync.Bisync(ctx, backend1, backend2, "folder1/", "folder2/", sync.BisyncOptions{
ConflictStrategy: sync.ConflictNewerWins, // Newer file wins conflicts
DryRun: false,
})
fmt.Printf("Copied to path1: %d, Copied to path2: %d, Conflicts: %d\n",
result.CopiedToPath1, result.CopiedToPath2, len(result.Conflicts))
Conflict resolution strategies:
ConflictNewerWins - Newer file overwrites older (default)
ConflictLargerWins - Larger file overwrites smaller
ConflictSourceWins - First backend (backend1) always wins
ConflictDestWins - Second backend (backend2) always wins
ConflictKeepBoth - Keep both files with conflict suffix
ConflictSkip - Skip conflicting files
ConflictError - Record as error, don't resolve
Check (Verify)
Verify files match between backends.
// Simple check
inSync, _ := sync.Verify(ctx, src, dst, "data/", "backup/", sync.Options{})
// Detailed check
result, _ := sync.Check(ctx, src, dst, "data/", "backup/", sync.Options{})
fmt.Printf("Match: %d, Differ: %d, SrcOnly: %d, DstOnly: %d\n",
len(result.Match), len(result.Differ), len(result.SrcOnly), len(result.DstOnly))
// Human-readable report
report, _ := sync.VerifyAndReport(ctx, src, dst, "data/", "backup/", sync.Options{})
fmt.Println(report)
Options
sync.Options{
DeleteExtra: true, // Delete extra files in destination
DryRun: true, // Report changes without making them
Checksum: true, // Compare by checksum (slower but accurate)
SizeOnly: true, // Compare by size only (fast)
IgnoreExisting: true, // Skip files that exist in destination
MaxErrors: 10, // Stop after N errors (0 = stop on first)
Concurrency: 4, // Concurrent transfers
Progress: func(p sync.Progress) {
fmt.Printf("%s: %d/%d files\n", p.Phase, p.FilesTransferred, p.TotalFiles)
},
}
Logging
Sync operations support structured logging via *slog.Logger.
import (
"log/slog"
"os"
"github.com/plexusone/omnistorage-core/object/sync"
)
// With custom logger
result, _ := sync.Sync(ctx, src, dst, "data/", "backup/", sync.Options{
Logger: slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: slog.LevelDebug,
})),
})
// Output includes:
// - Sync start/complete with summary
// - File scan progress
// - Copy/delete operations (at debug level)
// - Errors with context
When no logger is provided, a null logger is used (no output).
Extended Interface
Backends may implement ExtendedBackend for additional capabilities.
// Check if backend supports extended operations
if ext, ok := object.AsExtended(backend); ok {
// Get file metadata
info, _ := ext.Stat(ctx, "file.txt")
fmt.Printf("Size: %d, Modified: %s\n", info.Size(), info.ModTime())
// Server-side copy (no download/upload)
if ext.Features().Copy {
ext.Copy(ctx, "source.txt", "dest.txt")
}
// Server-side move
if ext.Features().Move {
ext.Move(ctx, "old.txt", "new.txt")
}
// Directory operations
ext.Mkdir(ctx, "new-folder")
ext.Rmdir(ctx, "empty-folder")
}
Feature Discovery
features := ext.Features()
if features.Copy {
// Backend supports server-side copy
}
if features.Move {
// Backend supports server-side move
}
Compression
Gzip
import "github.com/plexusone/omnistorage-core/object/compress/gzip"
// Write
gzWriter, _ := gzip.NewWriter(writer)
gzWriter.Write(data)
gzWriter.Close()
// Read
gzReader, _ := gzip.NewReader(reader)
data, _ := io.ReadAll(gzReader)
gzReader.Close()
Zstandard
import "github.com/plexusone/omnistorage-core/object/compress/zstd"
// Write
zstdWriter, _ := zstd.NewWriter(writer)
zstdWriter.Write(data)
zstdWriter.Close()
// Read
zstdReader, _ := zstd.NewReader(reader)
data, _ := io.ReadAll(zstdReader)
zstdReader.Close()
Interfaces
Backend
The core interface for all storage backends.
type Backend interface {
NewWriter(ctx context.Context, path string, opts ...WriterOption) (io.WriteCloser, error)
NewReader(ctx context.Context, path string, opts ...ReaderOption) (io.ReadCloser, error)
Exists(ctx context.Context, path string) (bool, error)
Delete(ctx context.Context, path string) error
List(ctx context.Context, prefix string) ([]string, error)
Close() error
}
ExtendedBackend
Extended interface for metadata and server-side operations.
type ExtendedBackend interface {
Backend
Stat(ctx context.Context, path string) (ObjectInfo, error)
Mkdir(ctx context.Context, path string) error
Rmdir(ctx context.Context, path string) error
Copy(ctx context.Context, src, dst string) error
Move(ctx context.Context, src, dst string) error
Features() Features
}
RecordWriter / RecordReader
For streaming record-oriented data (logs, events, NDJSON).
type RecordWriter interface {
Write(data []byte) error
Flush() error
Close() error
}
type RecordReader interface {
Read() ([]byte, error)
Close() error
}
Implementing a Backend
External packages can implement and register backends.
package mybackend
import "github.com/plexusone/omnistorage-core"
func init() {
object.Register("mybackend", func(config map[string]string) (object.Backend, error) {
return New(ConfigFromMap(config))
})
}
type Backend struct { /* ... */ }
func (b *Backend) NewWriter(ctx context.Context, path string, opts ...object.WriterOption) (io.WriteCloser, error) {
// Implementation
}
// ... implement other Backend methods
- omnistorage-google - Google Drive and GCS backends
- rclone - Inspiration for backend coverage and sync capabilities
- go-cloud - Google's portable cloud APIs
- afero - Filesystem abstraction
Roadmap
See ROADMAP.md for planned features including:
- Additional cloud backends (GCS, Azure, Dropbox, OneDrive)
Filtering system (glob patterns, size/age filters) (implemented)
Transfer controls (bandwidth limiting, parallel transfers) (implemented)
Bidirectional sync with conflict resolution (implemented)
Structured logging via slog (implemented)
- Security features (credential management, signed URLs)
- CLI tool
Contributing
Contributions are welcome! Priority areas:
- New backends - Follow
backend/file as a template
- Tests - Especially integration tests with real services
- Documentation - Examples, guides, GoDoc improvements
- Bug fixes - Issues labeled
good first issue
License
MIT License - see LICENSE for details.