Documentation
¶
Overview ¶
Package lakeorm is the Go ORM for data lakes. Typed structs are the schema contract; validation happens at the application boundary; writes go directly to the target Iceberg or Delta table via Spark Connect — no bronze landing zone by default.
Entry points:
lakeorm.Open(driver, dialect, backend, opts...) lakeorm.Query[T](ctx, db, sql, args...) // typed read structs.Validate(records) // boundary-validate before I/O
Composition is always three pieces: a Driver (transport — how we connect and execute), a Dialect (the data-dialect — DDL, DML, capabilities, semantics; Iceberg vs Delta), and a Backend (where the bytes live). Each is a stable public interface; concrete implementations live under drivers/, dialects/, and backends/.
Index ¶
- func Query[T any](ctx context.Context, db Client, source drivers.Source) ([]T, error)
- func QueryFirst[T any](ctx context.Context, db Client, source drivers.Source) (*T, error)
- func QueryStream[T any](ctx context.Context, db Client, source drivers.Source) iter.Seq2[T, error]
- type CleanupReport
- type Client
- type ClientOption
- func WithCompression(c Compression) ClientOption
- func WithDefaultCatalog(name string) ClientOption
- func WithDefaultDatabase(name string) ClientOption
- func WithDefaultIdempotencyTTL(d time.Duration) ClientOption
- func WithFastPathThreshold(bytes int) ClientOption
- func WithLogger(l zerolog.Logger) ClientOption
- func WithMaxInflightIngests(n int) ClientOption
- func WithSessionPoolSize(n int) ClientOption
- type Compression
- type ConflictStrategy
- type InsertOption
- type StagingLister
- type StagingPrefix
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Query ¶
Query runs source against db's driver and materialises every decoded row into []T. The driver must implement drivers.Convertible (all v0 drivers do).
Source is driver-native — build one with the driver's own conversion helper:
drv := db.Driver().(*spark.Driver)
users, err := lakeorm.Query[User](ctx, db,
drv.FromSQL("SELECT * FROM users WHERE country = ?", "UK"))
DuckDB / Databricks use the same shape with their own FromSQL / FromRows / FromTable helpers; anything the helpers don't cover can be expressed as a bare drivers.Source closure.
The `spark:"..."` / `lake:"..."` tags on T bind result columns to fields. A field in T the source doesn't project surfaces as a schema-mismatch error.
func QueryFirst ¶
QueryFirst runs source and returns the first decoded row, or errors.ErrNoRows if source yielded zero rows.
func QueryStream ¶
QueryStream runs source and yields T values one at a time with constant memory. Rangeable via Go 1.23's iter.Seq2:
drv := db.Driver().(*spark.Driver)
for row, err := range lakeorm.QueryStream[User](ctx, db,
drv.FromSQL("SELECT * FROM users")) {
if err != nil { break }
// use row
}
Schema mismatch surfaces through the iterator's error channel on the first row.
Types ¶
type CleanupReport ¶
CleanupReport summarises CleanupStaging's pass. Deleted carries prefix URIs that were removed; Failed carries prefixes that matched the TTL cutoff but whose delete call errored — the operator can retry selectively.
type Client ¶
type Client interface {
// Insert writes records (pointer, slice, or slice-of-pointer) to
// the target table. Validation runs first; then the Dialect
// plans KindDirectIngest (small batch), KindParquetIngest (large
// append), or KindParquetMerge (struct carries mergeKey) and the
// Driver executes.
Insert(ctx context.Context, records any, opts ...InsertOption) error
// Driver returns the underlying driver. Callers type-assert to
// the concrete driver type to reach per-driver conversion
// helpers (spark.Driver.FromSQL, duckdb.Driver.FromRows, etc.)
// or the raw native handle (spark.Driver.Session,
// duckdb.Driver.DB).
//
// The reason Client exposes this instead of wrapping every
// driver-specific helper behind a Client method is that read
// grammar is driver-specific: a Spark DataFrame and a *sql.Rows
// have different acquisition shapes, and hiding that costs the
// caller power without buying portability.
Driver() drivers.Driver
// Exec runs a raw SQL statement that returns no rows (DDL, DML
// that the typed surface doesn't cover).
Exec(ctx context.Context, sql string, args ...any) (drivers.ExecResult, error)
// Migrate is the bootstrap path — idempotent CREATE TABLE IF NOT
// EXISTS derived from struct tags. Sufficient for dev and fresh
// tables; ALTER TABLE-shaped schema evolution goes through
// MigrateGenerate + lake-goose.
Migrate(ctx context.Context, models ...any) error
// MigrateGenerate writes iceberg/delta-dialect .sql files for any
// pending struct diffs into dir, in goose's migration-file
// format. Destructive operations (DROP COLUMN, RENAME COLUMN,
// type narrowings, NOT-NULL tightenings) land with a
// `-- DESTRUCTIVE: <reason>` informational comment so the
// reviewer notices them in the PR diff.
//
// Execution is not this library's job — it belongs to lake-goose
// running against the Spark Connect database/sql driver. An
// lakeorm.sum manifest is emitted alongside so downstream tooling
// can detect post-generation edits.
//
// Returns the list of generated file paths.
MigrateGenerate(ctx context.Context, dir string, models ...any) ([]string, error)
// CleanupStaging walks the Backend's _staging/ namespace and
// deletes prefixes older than olderThan. Intended as a periodic
// janitor (e.g. goroutine on a ticker) to sweep orphaned parquet
// from aborted Insert calls.
CleanupStaging(ctx context.Context, olderThan time.Duration) (*CleanupReport, error)
// MetricsRegistry returns the Prometheus registry lakeorm writes
// runtime metrics into. Consumed by the composed runtime
// (lakehouse) to expose /metrics. Returns nil at v0.
MetricsRegistry() *prometheus.Registry
// Close releases the underlying driver, session pool, and any
// backend resources. Safe to call once at process shutdown.
Close() error
}
Client is the main entry point. All methods are safe to call concurrently; the internal session pool serializes per-session state as needed.
Writes bind to persisted types via Insert (auto-routes to MERGE when the struct carries a mergeKey). Reads run through the drivers.Convertible capability — build a driver-native Source with the concrete driver's conversion helpers, reach the driver via Driver(), then feed the Source to lakeorm.Query[T] / QueryStream[T] / QueryFirst[T] for typed decode:
drv := db.Driver().(*spark.Driver)
users, _ := lakeorm.Query[User](ctx, db,
drv.FromSQL("SELECT * FROM users"))
Migrate bootstraps tables from struct tags; MigrateGenerate writes .sql files for the lake-goose migration runner; CleanupStaging sweeps orphan staging prefixes; Exec is the raw-SQL escape hatch for DDL / one-off DML the typed surface doesn't cover.
type ClientOption ¶
type ClientOption func(*clientConfig)
ClientOption configures the Client at construction time. Functional options so the struct stays extensible without API breakage.
func WithCompression ¶
func WithCompression(c Compression) ClientOption
WithCompression selects the compression codec for fast-path parquet parts. Default ZSTD — 2-3x smaller than Snappy on the shapes we care about (KSUIDs, repeated S3 paths), natively readable by all engines.
func WithDefaultCatalog ¶
func WithDefaultCatalog(name string) ClientOption
WithDefaultCatalog / WithDefaultDatabase set fallbacks for unqualified table names.
func WithDefaultDatabase ¶
func WithDefaultDatabase(name string) ClientOption
func WithDefaultIdempotencyTTL ¶
func WithDefaultIdempotencyTTL(d time.Duration) ClientOption
WithDefaultIdempotencyTTL sets how long idempotency tokens remain valid in the dedup table. Default 24h.
func WithFastPathThreshold ¶
func WithFastPathThreshold(bytes int) ClientOption
WithFastPathThreshold is the advisory byte crossover the Dialect uses to choose between gRPC and object-storage ingest. Default 128 MiB.
func WithLogger ¶
func WithLogger(l zerolog.Logger) ClientOption
WithLogger sets the zerolog logger for the Client. Passed into every driver/format/backend component via the plumbing in lakeorm.Open.
func WithMaxInflightIngests ¶
func WithMaxInflightIngests(n int) ClientOption
WithMaxInflightIngests caps the number of fast-path ingests that can be running concurrently. The bounded semaphore propagates backpressure into Write() — without it a bursty Go producer OOMs the process even when the partition writer "flushes correctly." Default 4.
func WithSessionPoolSize ¶
func WithSessionPoolSize(n int) ClientOption
WithSessionPoolSize controls how many Spark Connect sessions the Client borrows in flight. Default 8 — large enough for moderate concurrency, small enough to bound cluster-side overhead. Increase for high-fanout services; decrease on tight clusters.
type Compression ¶
type Compression int
Compression selects the codec used by the fast-path partition writer.
const ( CompressionZSTD Compression = iota CompressionSnappy CompressionGzip CompressionUncompressed )
type ConflictStrategy ¶
type ConflictStrategy int
ConflictStrategy for Insert.
const ( ErrorOnConflict ConflictStrategy = iota IgnoreOnConflict UpdateOnConflict )
type InsertOption ¶
type InsertOption func(*insertConfig)
InsertOption configures a single Insert call.
func OnConflict ¶
func OnConflict(strategy ConflictStrategy) InsertOption
OnConflict sets the behaviour for primary-key collisions.
func ViaGRPC ¶
func ViaGRPC() InsertOption
ViaGRPC forces the direct gRPC ingest path regardless of batch size. Useful when latency matters more than throughput.
func ViaObjectStorage ¶
func ViaObjectStorage() InsertOption
ViaObjectStorage forces the S3-Parquet fast path regardless of batch size. Useful for benchmarks or when you know the batch is large.
func WithIdempotencyKey ¶
func WithIdempotencyKey(key string) InsertOption
WithIdempotencyKey sets the idempotency token. If omitted, a SortableID is generated automatically.
type StagingLister ¶
type StagingLister interface {
ListStagingPrefixes(ctx context.Context) ([]StagingPrefix, error)
}
StagingLister is the optional backend extension Client.CleanupStaging relies on. Backends that can enumerate their _staging/ namespace implement this; backends that can't leave it unimplemented and CleanupStaging returns lkerrors.ErrNotImplemented.
Kept as a separate interface (not folded into Backend) so the Backend contract stays narrow: the fast-path write path only needs StagingPrefix / StagingLocation / CleanupStaging by-URI, and that's the minimum any backend must implement.
type StagingPrefix ¶
StagingPrefix is one entry a StagingLister backend yields. URI is the absolute backend URI; IngestID is the final path segment, which CleanupStaging parses as a UUIDv7.
func NewStagingPrefix ¶
func NewStagingPrefix(uri, ingestID string) StagingPrefix
NewStagingPrefix builds a StagingPrefix from its absolute backend URI and the trailing ingest_id path segment.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package backends defines the Backend interface — the abstraction lake-orm uses to put bytes somewhere — and exposes the public constructors for every concrete backend in a single, ergonomic surface.
|
Package backends defines the Backend interface — the abstraction lake-orm uses to put bytes somewhere — and exposes the public constructors for every concrete backend in a single, ergonomic surface. |
|
file
Package file provides a local-disk Backend.
|
Package file provides a local-disk Backend. |
|
gcs
Package gcs provides a GCS Backend.
|
Package gcs provides a GCS Backend. |
|
memory
Package memory provides an in-memory Backend, useful for tests and for the fast-path regression harness.
|
Package memory provides an in-memory Backend, useful for tests and for the fast-path regression harness. |
|
s3
Package s3 provides an S3-compatible Backend built directly on aws-sdk-go-v2/service/s3.
|
Package s3 provides an S3-compatible Backend built directly on aws-sdk-go-v2/service/s3. |
|
cmd
|
|
|
lakeorm
command
Command lakeorm is the lakeorm CLI.
|
Command lakeorm is the lakeorm CLI. |
|
Package dialects defines the Dialect interface — the data-dialect opinion lake-orm delegates DDL, DML, and write planning to — plus the trio of concrete dialects lake-orm ships.
|
Package dialects defines the Dialect interface — the data-dialect opinion lake-orm delegates DDL, DML, and write planning to — plus the trio of concrete dialects lake-orm ships. |
|
delta
Package delta is the Delta Lake Dialect.
|
Package delta is the Delta Lake Dialect. |
|
duckdb
Package duckdb is the lake-orm Dialect for embedded DuckDB.
|
Package duckdb is the lake-orm Dialect for embedded DuckDB. |
|
iceberg
Package iceberg implements lakeorm's default Dialect — Apache Iceberg tables written via Spark Connect.
|
Package iceberg implements lakeorm's default Dialect — Apache Iceberg tables written via Spark Connect. |
|
Package drivers is the contract shared by every lake-orm driver.
|
Package drivers is the contract shared by every lake-orm driver. |
|
databricks
Package databricks is lake-orm's native Databricks driver.
|
Package databricks is lake-orm's native Databricks driver. |
|
databricksconnect
Package databricksconnect is lake-orm's Databricks Connect driver.
|
Package databricksconnect is lake-orm's Databricks Connect driver. |
|
duckdb
Package duckdb is lake-orm's embedded DuckDB driver.
|
Package duckdb is lake-orm's embedded DuckDB driver. |
|
spark
Package spark provides lakeorm's generic Spark Connect driver.
|
Package spark provides lakeorm's generic Spark Connect driver. |
|
Package errors is the public error surface of lake-orm — sentinel values callers check with errors.Is, typed wrappers callers check with errors.As, and constructors that build them.
|
Package errors is the public error surface of lake-orm — sentinel values callers check with errors.Is, typed wrappers callers check with errors.As, and constructors that build them. |
|
examples
|
|
|
arbitrary_spark_query
command
Example: arbitrary native Spark query → typed Go struct.
|
Example: arbitrary native Spark query → typed Go struct. |
|
basic
command
Example: the hero path for lakeorm.
|
Example: the hero path for lakeorm. |
|
bulk
command
Example: the fast-path write — parquet through object storage with a Spark `INSERT ...
|
Example: the fast-path write — parquet through object storage with a Spark `INSERT ... |
|
databricks
command
Example: lake-orm against a Databricks SQL warehouse using the native `databricks-sql-go` driver.
|
Example: lake-orm against a Databricks SQL warehouse using the native `databricks-sql-go` driver. |
|
databricksconnect
command
Example: lake-orm against a Databricks cluster via Databricks Connect (Spark Connect over a Databricks workspace).
|
Example: lake-orm against a Databricks cluster via Databricks Connect (Spark Connect over a Databricks workspace). |
|
delta
command
Example: writing against a Delta Lake table (instead of Iceberg).
|
Example: writing against a Delta Lake table (instead of Iceberg). |
|
duckdb
command
Example: the pure-Go embedded path.
|
Example: the pure-Go embedded path. |
|
ingest-id
command
Example: `_ingest_id` as a system-managed column for batch reconciliation.
|
Example: `_ingest_id` as a system-managed column for batch reconciliation. |
|
joins
command
Example: reading joins and aggregates with CQRS-style output types.
|
Example: reading joins and aggregates with CQRS-style output types. |
|
lake-tag
command
Example: the canonical `lake:"..."` tag.
|
Example: the canonical `lake:"..."` tag. |
|
migrations
command
Example: Django-style offline migration authoring.
|
Example: Django-style offline migration authoring. |
|
stream
command
Example: streaming a large result back with constant memory.
|
Example: streaming a large result back with constant memory. |
|
typed-helpers
command
Example: the typed read API — Query[T] / QueryStream[T] / QueryFirst[T] — built on the drivers.Convertible capability and per-driver From* conversion helpers.
|
Example: the typed read API — Query[T] / QueryStream[T] / QueryFirst[T] — built on the drivers.Convertible capability and per-driver From* conversion helpers. |
|
validation
command
Example: validation at the application boundary via go-playground/validator.
|
Example: validation at the application boundary via go-playground/validator. |
|
internal
|
|
|
migrations
Package migrations is lakeorm's authoring side of schema evolution.
|
Package migrations is lakeorm's authoring side of schema evolution. |
|
parquet
Package parquet is lakeorm's parquet-writing layer.
|
Package parquet is lakeorm's parquet-writing layer. |
|
Package local is the lake-k8s docker-compose on-ramp.
|
Package local is the lake-k8s docker-compose on-ramp. |
|
Package structs owns the "Go struct is the schema contract" invariant.
|
Package structs owns the "Go struct is the schema contract" invariant. |
|
Package teste2e is the black-box end-to-end suite for lakeorm.
|
Package teste2e is the black-box end-to-end suite for lakeorm. |
|
Package testutils is the shared test harness for lakeorm.
|
Package testutils is the shared test harness for lakeorm. |
|
Package types defines the small value types lakeorm exposes on its public API: SortableID (KSUID-backed), ObjectURI/Location (typed storage addresses), and SparkTableName.
|
Package types defines the small value types lakeorm exposes on its public API: SortableID (KSUID-backed), ObjectURI/Location (typed storage addresses), and SparkTableName. |