Documentation
¶
Overview ¶
Package dorm is the Go ORM for data lakes. Typed structs are the schema contract; validation happens at the application boundary; writes go directly to the target Iceberg or Delta table via Spark Connect — no bronze landing zone by default.
Entry points:
lakeorm.Open(driver, dialect, backend, opts...) lakeorm.Query[T](db) // typed query builder lakeorm.Validate(records) // boundary-validate before I/O
Composition is always three pieces: a Driver (transport — how we connect and execute), a Dialect (the data-dialect — DDL, DML, capabilities, semantics; Iceberg vs Delta), and a Backend (where the bytes live). Each is a stable public interface; concrete implementations live under driver/, dialect/, and backend/.
Index ¶
- Variables
- func CollectAs[T any](ctx context.Context, df DataFrame) ([]T, error)
- func FirstAs[T any](ctx context.Context, df DataFrame) (*T, error)
- func IsClusterNotReady(err error) bool
- func Query[T any](ctx context.Context, db Client, sql string, args ...any) ([]T, error)
- func QueryFirst[T any](ctx context.Context, db Client, sql string, args ...any) (*T, error)
- func QueryStream[T any](ctx context.Context, db Client, sql string, args ...any) iter.Seq2[T, error]
- func RegisterValidator(name string, fn ValidatorFunc)
- func SchemaFingerprint(v any) (string, error)
- func StreamAs[T any](ctx context.Context, df DataFrame) iter.Seq2[T, error]
- func Table(model any, name string)
- func Validate(records any) error
- func Verify(_ context.Context, _ Client) error
- type AutoBehavior
- type Backend
- type CleanupReport
- type Client
- type ClientOption
- func WithCompression(c Compression) ClientOption
- func WithDefaultCatalog(name string) ClientOption
- func WithDefaultDatabase(name string) ClientOption
- func WithDefaultIdempotencyTTL(d time.Duration) ClientOption
- func WithFastPathThreshold(bytes int) ClientOption
- func WithLogger(l zerolog.Logger) ClientOption
- func WithMaxInflightIngests(n int) ClientOption
- func WithSessionPoolSize(n int) ClientOption
- type ColumnInfo
- type Compression
- type ConflictStrategy
- type DataFrame
- type DeleteOption
- type DeleteRequest
- type Dialect
- type Driver
- type ErrClientStaging
- type ErrClusterNotReady
- type ErrDriverRead
- type ErrURIMismatch
- type ExecResult
- type ExecutionPlan
- type FieldError
- type Finalizer
- type IndexIntent
- type IndexStrategy
- type InsertOption
- type LakeField
- type LakePartitionSpec
- type LakeSchema
- type LayoutIntent
- type LayoutStrategy
- type Maintenance
- type MaintenanceOptions
- type MergeAction
- type MergeFuture
- type MergeOpts
- type OrderSpec
- type ParquetSchema
- type PartitionStrategy
- type PlanKind
- type PooledSession
- type QueryBuilder
- type QueryRequest
- type RawInsertion
- type Result
- type Row
- type RowStream
- type Scanner
- type StagingLister
- type StagingPrefix
- type StagingRef
- type TableInfo
- type TableStats
- type UpdateOption
- type UpsertOption
- type UpsertRequest
- type VacuumOptions
- type ValidationError
- type ValidatorFunc
- type WritePath
- type WriteRequest
Constants ¶
This section is empty.
Variables ¶
var ( ErrNotImplemented = errors.New("lakeorm: not implemented") ErrAlreadyCommitted = errors.New("lakeorm: finalizer already committed") ErrNoRows = errors.New("lakeorm: no rows") ErrSessionPoolClosed = errors.New("lakeorm: session pool closed") ErrInvalidTag = errors.New("lakeorm: invalid struct tag") ErrUnknownDriver = errors.New("lakeorm: unknown driver") ErrDriverMismatch = errors.New("lakeorm: driver mismatch") )
Sentinel errors used across the package.
Functions ¶
func CollectAs ¶
CollectAs materialises every row of df into []T, hiding the type-assertion to the driver's native DataFrame. At v0 the only supported driver is Spark Connect, so df must unwrap to a sparksql.DataFrame — callers get ErrDriverMismatch otherwise.
Equivalent to:
sparkDF, ok := df.DriverType().(sparksql.DataFrame)
if !ok { ... }
rows, err := sparksql.Collect[T](ctx, sparkDF)
but compressed to a single call site. Use this when the result shape is known at compile time — the `spark:"..."` tags on T bind result columns to fields.
func FirstAs ¶
FirstAs returns the first row of df decoded as T, or ErrNoRows if the DataFrame produced no rows. Symmetric with CollectAs / StreamAs — edge-typed materialisation, no chainable builders.
func IsClusterNotReady ¶
IsClusterNotReady is a convenience for errors.As callers.
func Query ¶
Query runs a SQL statement against db and materialises every row into []T. One-shot ergonomic wrapper over db.DataFrame + CollectAs[T] — equivalent to:
df, err := db.DataFrame(ctx, sql, args...)
if err != nil { return nil, err }
return CollectAs[T](ctx, df)
but compressed to a single call. Use when the result shape is known at compile time — the `spark:"..."` tags on T bind result columns to fields.
Query is a top-level convenience, not a typed-query-builder abstraction. Filtering, ordering, projection, and joins belong in the SQL string, not in chainable Where/OrderBy/Select methods. Typed DataFrame transformations are deliberately absent — see TYPING.md for the design contract.
func QueryFirst ¶
QueryFirst runs a SQL statement and returns the first matching row decoded as T. Returns ErrNoRows if the result is empty. Ergonomic wrapper over db.DataFrame + FirstAs[T].
func QueryStream ¶
func QueryStream[T any](ctx context.Context, db Client, sql string, args ...any) iter.Seq2[T, error]
QueryStream runs a SQL statement and yields T values one at a time with constant memory. Rangeable via Go 1.23's iter.Seq2:
for row, err := range lakeorm.QueryStream[User](ctx, db,
"SELECT * FROM users WHERE country = ?", "UK") {
if err != nil { break }
// use row
}
Schema mismatch (a field in T that the projection doesn't contain) surfaces through the iterator's error channel on the first row.
func RegisterValidator ¶
func RegisterValidator(name string, fn ValidatorFunc)
RegisterValidator installs a custom validator callable via `spark:"...,validate=name"`. Safe to call from init().
func SchemaFingerprint ¶
SchemaFingerprint returns a stable SHA-256 over the expected LakeSchema for v (or reflect.Type(v) if v is a reflect.Type). The fingerprint is:
- deterministic across process invocations
- insensitive to field declaration order within a struct (so struct refactors that only reorder fields don't invalidate previously-applied migrations)
- sensitive to column names, SQL-level types (reflect.Kind plus the time.Time special case), nullability, pk/mergeKey membership, and the table name
Client.AssertSchema hashes the compiled expectation and compares against the catalog's runtime fingerprint; a mismatch surfaces as a startup error rather than a silent drift.
func StreamAs ¶
StreamAs yields decoded T values one at a time. Constant memory regardless of result size. Iteration uses Go 1.23's iter.Seq2 so callers can range directly:
for row, err := range lakeorm.StreamAs[User](ctx, df) { ... }
Schema binding happens on the first row; a mismatch surfaces through the second return value on the first iteration.
func Table ¶
Table overrides the derived table name for a Go type. Call once at init, before any Insert / Query / Migrate.
func Validate ¶
Validate runs all registered validators on a record or slice of records. The public entry point — callable from HTTP handlers to fail fast before any I/O happens. Insert calls this internally, so callers who want the fast-fail behaviour at the API boundary call it directly first.
func Verify ¶
Verify runs an end-to-end reachability probe against an opened Client. Writes a small probe parquet via the Backend, reads it back via the Driver, compares. Returns nil if all three legs (client → backend, driver → backend, URI agreement) succeed.
v0 stub — implementation lands in a follow-up commit. The signature is stable so callers can wire it into service startup today.
Types ¶
type AutoBehavior ¶
type AutoBehavior int
AutoBehavior describes auto-populated column content. Client.Insert visits every field with AutoBehavior != AutoNone before validation and writes the canonical value for the behaviour.
const ( AutoNone AutoBehavior = iota AutoCreateTime AutoUpdateTime // AutoIngestID marks a string field to receive the UUIDv7 // ingest_id of the current Insert operation. Useful for // correlating rows in the target table with the staging prefix // they were uploaded through. Only string-typed fields are // accepted; non-string fields produce ErrInvalidTag at first // use. See INGEST_ID.md for the full flow. AutoIngestID )
type Backend ¶
type Backend interface {
Name() string
// RootURI returns the URI that the Driver will interpolate into
// its SQL. Client and Driver must resolve this string to the same
// physical storage; endpoint/credential differences are handled
// per-actor, not in Backend.
RootURI() string
TableLocation(tableName string) types.Location
StagingPrefix(ingestID string) string
// StagingLocation returns the absolute URI that Spark should read
// from (e.g. "s3a://bucket/lake/staging/<id>"). Distinct from
// StagingPrefix — Spark's Hadoop-AWS integration requires s3a://
// even though the Backend's own SDK calls use s3://. The scheme
// translation happens here, not in the Dialect.
StagingLocation(ingestID string) types.Location
Writer(ctx context.Context, key string) (io.WriteCloser, error)
Reader(ctx context.Context, key string) (io.ReadCloser, error)
Delete(ctx context.Context, key string) error
List(ctx context.Context, prefix string) ([]string, error)
// CleanupStaging removes every object under prefix. Called by
// Finalizer.Abort and by the staging-TTL janitor.
CleanupStaging(ctx context.Context, prefix string) error
}
Backend is where the bytes live. Each concrete backend owns its SDK directly (aws-sdk-go-v2 for S3, cloud.google.com/go/storage for GCS, stdlib for File / Memory) — no generic abstraction layer.
type CleanupReport ¶
CleanupReport summarises CleanupStaging's pass. Deleted carries prefix URIs that were removed; Failed carries prefixes that matched the TTL cutoff but whose delete call errored — the operator can retry selectively.
type Client ¶
type Client interface {
// Core CRUD. records is *T, []*T, or []T where T has `spark` tags.
// Validation runs before any I/O.
Insert(ctx context.Context, records any, opts ...InsertOption) error
InsertRaw(ctx context.Context, records any, opts ...InsertOption) RawInsertion
Update(ctx context.Context, records any, opts ...UpdateOption) error
Upsert(ctx context.Context, records any, opts ...UpsertOption) error
Delete(ctx context.Context, records any, opts ...DeleteOption) error
// Query is the dynamic (non-generic) entry point. Prefer the
// top-level lakeorm.Query[T] / QueryStream[T] / QueryFirst[T]
// helpers when T is known at compile time.
Query(ctx context.Context) QueryBuilder
// Escape hatches to raw SQL / DataFrame / Spark.
Exec(ctx context.Context, sql string, args ...any) (ExecResult, error)
DataFrame(ctx context.Context, sql string, args ...any) (DataFrame, error)
Session(ctx context.Context) (*PooledSession, error)
// Migrate is the bootstrap path — idempotent CREATE TABLE IF NOT
// EXISTS derived from struct tags. Sufficient for dev + fresh
// tables; schema evolution on existing tables goes through
// lake-goose (authoring via MigrateGenerate, execution via the
// goose-spark-ish CLI or goose's library API against the
// database/sql driver in datalake-go/spark-connect-go).
Migrate(ctx context.Context, models ...any) error
Maintain() Maintenance
Ping(ctx context.Context) error
Close() error
// MetricsRegistry returns the Prometheus registry lakeorm writes
// runtime metrics into. Returns nil at v0 as a placeholder for
// v1+ integration.
MetricsRegistry() *prometheus.Registry
// MigrateGenerate writes iceberg/delta-dialect .sql files for any
// pending struct diffs into dir, in goose's migration-file format.
// Destructive operations (DROP COLUMN, RENAME COLUMN, type
// narrowings, NOT-NULL tightenings) land with a `-- DESTRUCTIVE:
// <reason>` informational comment so the reviewer notices them in
// the PR diff.
//
// Execution is not this library's job — it belongs to lake-goose
// running against the Spark Connect database/sql driver. An
// atlas.sum manifest is emitted alongside so downstream tooling
// can detect post-generation edits.
//
// Returns the list of generated file paths.
MigrateGenerate(ctx context.Context, dir string, structs ...any) ([]string, error)
// AssertSchema verifies the catalog's current schema matches
// compiled code's expectation (SchemaFingerprint) for each
// struct. Recommended at app startup after lakeorm.Verify.
// v0 stub — the DESCRIBE TABLE catalog read lands in v1.
AssertSchema(ctx context.Context, structs ...any) error
}
Client is the main entry point. All methods are safe to call concurrently; the internal session pool serializes per-session state as needed.
func Local ¶
Local is an intentional no-op in the root package — it exists only so godoc surfaces the "how do I run this locally?" question here. The real implementation lives in the `local` subpackage:
import lakeormlocal "github.com/datalake-go/lake-orm/local" db, err := lakeormlocal.Open()
The split exists because the real Local() must import the spark, iceberg, and backend driver packages, each of which already imports this package for the Driver/Dialect/Backend interfaces — composing them at the top-level would introduce an import cycle. Making Local() a subpackage avoids the cycle and matches database/sql's pattern of sub-package factory imports.
type ClientOption ¶
type ClientOption func(*clientConfig)
ClientOption configures the Client at construction time. Functional options so the struct stays extensible without API breakage.
func WithCompression ¶
func WithCompression(c Compression) ClientOption
WithCompression selects the compression codec for fast-path parquet parts. Default ZSTD — 2-3x smaller than Snappy on the shapes we care about (KSUIDs, repeated S3 paths), natively readable by all engines.
func WithDefaultCatalog ¶
func WithDefaultCatalog(name string) ClientOption
WithDefaultCatalog / WithDefaultDatabase set fallbacks for unqualified table names.
func WithDefaultDatabase ¶
func WithDefaultDatabase(name string) ClientOption
func WithDefaultIdempotencyTTL ¶
func WithDefaultIdempotencyTTL(d time.Duration) ClientOption
WithDefaultIdempotencyTTL sets how long idempotency tokens remain valid in the dedup table. Default 24h.
func WithFastPathThreshold ¶
func WithFastPathThreshold(bytes int) ClientOption
WithFastPathThreshold is the advisory byte crossover the Dialect uses to choose between gRPC and object-storage ingest. Default 128 MiB.
func WithLogger ¶
func WithLogger(l zerolog.Logger) ClientOption
WithLogger sets the zerolog logger for the Client. Passed into every driver/format/backend component via the plumbing in lakeorm.Open.
func WithMaxInflightIngests ¶
func WithMaxInflightIngests(n int) ClientOption
WithMaxInflightIngests caps the number of fast-path ingests that can be running concurrently. The bounded semaphore propagates backpressure into Write() — without it a bursty Go producer OOMs the process even when the partition writer "flushes correctly." Default 4.
func WithSessionPoolSize ¶
func WithSessionPoolSize(n int) ClientOption
WithSessionPoolSize controls how many Spark Connect sessions the Client borrows in flight. Default 8 — large enough for moderate concurrency, small enough to bound cluster-side overhead. Increase for high-fanout services; decrease on tight clusters.
type ColumnInfo ¶
type Compression ¶
type Compression int
Compression selects the codec used by the fast-path partition writer.
const ( CompressionZSTD Compression = iota CompressionSnappy CompressionGzip CompressionUncompressed )
type ConflictStrategy ¶
type ConflictStrategy int
ConflictStrategy for Insert.
const ( ErrorOnConflict ConflictStrategy = iota IgnoreOnConflict UpdateOnConflict )
type DataFrame ¶
type DataFrame interface {
Schema(ctx context.Context) ([]ColumnInfo, error)
Collect(ctx context.Context) ([][]any, error)
Count(ctx context.Context) (int64, error)
Stream(ctx context.Context) iter.Seq2[Row, error]
// DriverType returns the Driver's native DataFrame handle for
// callers that need to drop to the underlying API. Kept as any
// to avoid leaking driver-specific types into the public
// signature; callers type-assert to the concrete type they
// expect (e.g. sparksql.DataFrame). Prefer the lakeorm.CollectAs
// / lakeorm.StreamAs helpers when materialising into a typed
// result struct.
DriverType() any
}
DataFrame is the Driver-agnostic handle to a remote DataFrame. At v0 it wraps the Spark Connect DataFrame; v1+ drivers may implement their own. The interface is deliberately minimal — anything fancier is available by unwrapping with DriverType().
type DeleteOption ¶
type DeleteOption func(*deleteConfig)
type DeleteRequest ¶
type Dialect ¶
type Dialect interface {
Name() string
CreateTableDDL(schema *LakeSchema, loc types.Location) (string, error)
AlterTableDDL(schema *LakeSchema, existing *TableInfo) ([]string, error)
PlanInsert(req WriteRequest) (ExecutionPlan, error)
PlanUpsert(req UpsertRequest) (ExecutionPlan, error)
PlanDelete(req DeleteRequest) (ExecutionPlan, error)
PlanQuery(req QueryRequest) (ExecutionPlan, error)
IndexStrategy(intent IndexIntent) IndexStrategy
LayoutStrategy(intent LayoutIntent) LayoutStrategy
Maintenance() Maintenance
}
Dialect describes the data-dialect opinion: DDL shape, DML shape, capabilities, and semantics for Iceberg vs Delta vs any future lakehouse dialect. "Data dialect" rather than "format" because it's not just on-disk layout — it covers CREATE TABLE clauses, MERGE semantics, table properties, partition grammar, and schema- evolution rules. Exactly one Dialect per Client at v0.
type Driver ¶
type Driver interface {
Name() string
// Execute runs a plan and returns a Finalizer for two-phase commit.
// Single-phase plans (e.g. small-batch direct ingest) return a
// no-op Finalizer so callers never need to branch.
Execute(ctx context.Context, plan ExecutionPlan) (Result, Finalizer, error)
// ExecuteStreaming runs a read plan and returns a pull-based row
// stream. No finalizer — reads don't stage.
ExecuteStreaming(ctx context.Context, plan ExecutionPlan) (RowStream, error)
// DataFrame is the escape hatch — raw SQL to a DataFrame that the
// caller can then chain Spark operations on.
DataFrame(ctx context.Context, sql string, args ...any) (DataFrame, error)
// Exec runs a SQL statement that returns no rows (DDL, DML that
// doesn't need the Dialect-planned path).
Exec(ctx context.Context, sql string, args ...any) (ExecResult, error)
Close() error
}
Driver is the connection + execution mechanism. At v0 both concrete Drivers are Spark Connect variants (spark.Remote, databricksconnect.Driver) — same protocol, different connection setup.
type ErrClientStaging ¶
type ErrClientStaging struct {
URI string
BackendName string
Op string // "stage-write" | "probe-write" | "cleanup"
Cause error
}
ErrClientStaging is returned when the Go client fails to write a staging object via Backend. Indicates a client-side reachability or credential problem.
func (*ErrClientStaging) Error ¶
func (e *ErrClientStaging) Error() string
func (*ErrClientStaging) Unwrap ¶
func (e *ErrClientStaging) Unwrap() error
type ErrClusterNotReady ¶
type ErrClusterNotReady struct {
State string // e.g. "Pending", "PENDING"
RequestID string
Message string
Cause error
}
ErrClusterNotReady indicates the backing Spark cluster is warming up and the operation should be retried. Databricks in particular returns [FailedPrecondition] errors with state Pending during cluster startup — without this typed error, callers get an opaque gRPC failure and no way to distinguish "cluster warming up, retry" from "cluster dead, give up."
func NewClusterNotReady ¶
func NewClusterNotReady(err error) *ErrClusterNotReady
NewClusterNotReady parses a Databricks gRPC error for the [FailedPrecondition] + "state Pending" pattern and returns a typed error. Returns nil if the error doesn't match. The string-matching looks fragile, but it's the canonical detection pattern and has been stable across multiple Databricks runtime versions.
func (*ErrClusterNotReady) Error ¶
func (e *ErrClusterNotReady) Error() string
func (*ErrClusterNotReady) IsRetryable ¶
func (e *ErrClusterNotReady) IsRetryable() bool
IsRetryable marks this error as safe to retry after backoff.
func (*ErrClusterNotReady) Unwrap ¶
func (e *ErrClusterNotReady) Unwrap() error
type ErrDriverRead ¶
ErrDriverRead is returned when the compute driver fails to read a staging prefix that the client wrote successfully. Indicates the driver cannot see (or cannot authenticate against) storage the client can reach — the split-view failure mode where client and driver credentials diverge silently.
func (*ErrDriverRead) Error ¶
func (e *ErrDriverRead) Error() string
func (*ErrDriverRead) Unwrap ¶
func (e *ErrDriverRead) Unwrap() error
type ErrURIMismatch ¶
ErrURIMismatch is returned when client and driver both succeed at their respective operations but resolve the same URI to different physical storage. Detected by lakeorm.Verify via probe comparison.
func (*ErrURIMismatch) Error ¶
func (e *ErrURIMismatch) Error() string
type ExecResult ¶
type ExecResult struct {
RowsAffected int64
}
ExecResult mirrors database/sql.Result for the raw Exec escape hatch.
type ExecutionPlan ¶
type ExecutionPlan struct {
Kind PlanKind
SQL string // For KindSQL / KindStream / KindDDL
Args []any // SQL-parameter bindings
Target string // table name for writes
Staging StagingRef // populated for KindParquetIngest
Rows any // typed slice for KindDirectIngest
Schema *LakeSchema // referenced by the Driver for type-aware ingest
Options map[string]any // Dialect-specific hints opaque to the Driver
}
ExecutionPlan is the opaque artifact a Dialect hands to a Driver. The Driver executes the plan without knowing the Dialect's name. Internally it is a tagged union of variants (DirectIngest, ParquetIngest, SQL, Stream); Dialect constructs them, Driver reads the Kind and dispatches.
The type is intentionally a struct (not an interface) so the wire shape can evolve without breaking v1+ drivers that consume plans they didn't construct.
type FieldError ¶
FieldError is one per-field failure.
type Finalizer ¶
Finalizer is the commit phase of a two-phase write. Modeled on database/sql.Tx: call Commit on success, Abort otherwise, safely defer Abort for the error path. Commit is idempotent.
type IndexIntent ¶
type IndexIntent int
IndexIntent captures the `indexed` / `sortable` / `mergeKey` tag intent. Dialects read this and return a concrete IndexStrategy.
const ( IntentNone IndexIntent = iota IntentIndexed IntentSortable IntentMergeKey )
type IndexStrategy ¶
type IndexStrategy string
IndexStrategy / LayoutStrategy are opaque to the core — they are stringified descriptors that Dialect implementations emit for their own DDL generation. Kept as strings rather than typed-per-Dialect enums to avoid a compatibility matrix in the core package.
type InsertOption ¶
type InsertOption func(*insertConfig)
InsertOption configures a single Insert call.
func OnConflict ¶
func OnConflict(strategy ConflictStrategy) InsertOption
OnConflict sets the behaviour for primary-key collisions.
func ViaGRPC ¶
func ViaGRPC() InsertOption
ViaGRPC forces the direct gRPC ingest path regardless of batch size. Useful when latency matters more than throughput.
func ViaObjectStorage ¶
func ViaObjectStorage() InsertOption
ViaObjectStorage forces the S3-Parquet fast path regardless of batch size. Useful for benchmarks or when you know the batch is large.
func WithIdempotencyKey ¶
func WithIdempotencyKey(key string) InsertOption
WithIdempotencyKey sets the idempotency token. If omitted, a SortableID is generated automatically.
type LakeField ¶
type LakeField struct {
Name string // Go field name
Column string // DB column name
Index []int // reflect.FieldByIndex path
Type reflect.Type
IsNullable bool
IsPointer bool
AutoBehavior AutoBehavior
IsJSON bool
IsRequired bool
Intent IndexIntent
Layout LayoutIntent
Validators []string // names resolved against the validator registry
Ignored bool
}
LakeField is the parsed metadata for one struct field.
type LakePartitionSpec ¶
type LakePartitionSpec struct {
FieldIndex int
Strategy PartitionStrategy
Param int // N for Bucket(N) / Truncate(N); zero otherwise
}
LakePartitionSpec names the field index and strategy of a partition.
type LakeSchema ¶
type LakeSchema struct {
GoType reflect.Type
TableName string
Fields []LakeField
PrimaryKeys []int
MergeKeys []int
Partitions []LakePartitionSpec
}
LakeSchema is the parsed form of a struct tagged with `spark:"..."`.
func ParseSchema ¶
func ParseSchema(t reflect.Type) (*LakeSchema, error)
ParseSchema extracts LakeSchema from a Go type via reflection. Cached per-type; safe to call repeatedly.
func (*LakeSchema) ColumnNames ¶
func (s *LakeSchema) ColumnNames() []string
ColumnNames returns the column names in declaration order, skipping ignored fields. Used by the query builder to emit projection-pushdown SELECTs instead of SELECT *.
type LayoutIntent ¶
type LayoutIntent int
LayoutIntent is the coarse layout hint currently exposed — may grow later to cover clustering, data-skipping ranges, etc.
const ( LayoutNone LayoutIntent = iota LayoutSortable )
type LayoutStrategy ¶
type LayoutStrategy string
IndexStrategy / LayoutStrategy are opaque to the core — they are stringified descriptors that Dialect implementations emit for their own DDL generation. Kept as strings rather than typed-per-Dialect enums to avoid a compatibility matrix in the core package.
type Maintenance ¶
type Maintenance interface {
Optimize(ctx context.Context, table string, opts MaintenanceOptions) error
Vacuum(ctx context.Context, table string, opts VacuumOptions) error
Stats(ctx context.Context, table string) (TableStats, error)
}
Maintenance is the Dialect-level physical-optimization surface. At v0 all implementations return ErrNotImplemented — the interface exists so the Client API doesn't shift when v1 fleshes each Dialect out.
type MaintenanceOptions ¶
MaintenanceOptions / VacuumOptions are the v0 stubs. Specific Dialects may carry richer option structs behind Dialect.Maintenance() as v1 lands.
type MergeAction ¶
type MergeAction int
MergeAction is the Upsert conflict strategy.
const ( MergeUpdate MergeAction = iota MergeIgnore MergeError )
type MergeFuture ¶
MergeFuture is returned by RawInsertion.AsyncThenMerge. Stub at v0.
type ParquetSchema ¶
type ParquetSchema struct {
// contains filtered or unexported fields
}
ParquetSchema is the bridge between a LakeSchema (the user's dorm-tagged model) and a parquet schema (what the fast-path writer serializes against). The whole point: users tag their structs with `spark:"..."` once; they do NOT need to also write `parquet:"..."` tags. The translation lives here.
Internally it synthesizes an anonymous struct type at runtime whose parquet tags are derived from dorm tags, then hands that type to parquet.SchemaOf. At write time, user rows are projected field-by- field into the synthesized struct.
func BuildParquetSchema ¶
func BuildParquetSchema(lake *LakeSchema) (*ParquetSchema, error)
BuildParquetSchema translates a LakeSchema into a parquet schema + row projector. The returned ParquetSchema is cheap to call; the reflection work happens once and is reused per Write call.
The fast-path writer (internal/parquet) wires this up automatically when Insert routes through object storage — users don't construct ParquetSchema directly.
func (*ParquetSchema) Convert ¶
func (p *ParquetSchema) Convert(row any) any
Convert projects a user-struct row into the synthesized-struct row parquet-go writes. Accepts either T or *T; same semantics either way. Intended to be passed as the RowConverter to the internal partition writer.
func (*ParquetSchema) Schema ¶
func (p *ParquetSchema) Schema() *pq.Schema
Schema returns the underlying parquet schema. Pass to parquet.NewWriter (or to the internal partition writer).
type PartitionStrategy ¶
type PartitionStrategy int
PartitionStrategy is the tag-declared partition intent. Dialect implementations translate this into a physical strategy (Iceberg bucket, Delta ZORDER, etc.) via IndexStrategy().
const ( PartitionNone PartitionStrategy = iota PartitionRaw PartitionBucket PartitionTruncate )
type PlanKind ¶
type PlanKind int
PlanKind identifies the plan variant. Values are stable — drivers branch on them.
type PooledSession ¶
type PooledSession struct {
// contains filtered or unexported fields
}
PooledSession is the typed-session handle returned by Client.Session. v0: wraps driver.DataFrame/Exec; in v1 it carries the raw scsql.SparkSession through for users who need it directly.
func (*PooledSession) Exec ¶
func (s *PooledSession) Exec(ctx context.Context, query string) (ExecResult, error)
type QueryBuilder ¶
type QueryBuilder interface {
From(table string) QueryBuilder
Where(sql string, args ...any) QueryBuilder
OrderBy(col string, desc bool) QueryBuilder
Limit(n int) QueryBuilder
Offset(n int) QueryBuilder
Select(cols ...string) QueryBuilder
Collect(ctx context.Context) ([][]any, error)
DataFrame(ctx context.Context) (DataFrame, error)
}
QueryBuilder is the dynamic (non-generic) query entry point for cases where the result type isn't known at compile time. Prefer the top-level Query[T] generic when possible.
type QueryRequest ¶
type QueryRequest struct {
Ctx context.Context
Schema *LakeSchema
Table string
Columns []string
Where string
WhereArg []any
OrderBy []OrderSpec
Limit int
Offset int
}
QueryRequest is what the Client hands the Dialect on query execution.
type RawInsertion ¶
type RawInsertion interface {
ThenMerge(ctx context.Context, opts MergeOpts) error
AsyncThenMerge(opts MergeOpts) MergeFuture
Commit(ctx context.Context) error
}
RawInsertion is the opt-in raw-then-merge escape hatch for genuinely untrusted external inputs (research datasets, untyped third-party feeds, CSVs of unknown provenance). It is NOT the default — db.Insert writes straight to the target table. Bronze- style landing zones exist here as a conscious opt-in, never as an accidental default.
type Result ¶
type Result struct {
RowsAffected int64
}
Result is the outcome of a single ExecutionPlan that did not produce rows (writes, DDL). Reads return a RowStream instead.
type RowStream ¶
RowStream is the Driver's streaming primitive — one Row per iteration, constant memory, natural backpressure. It is an alias for iter.Seq2[Row, error] so it rangeable directly.
type Scanner ¶
type Scanner struct {
// contains filtered or unexported fields
}
Scanner provides reflection-based row scanning into Go structs via the lake-orm tag trio (`lake`, `lakeorm`, `spark`). It is the Driver- agnostic counterpart of what every Driver's underlying protocol produces (Spark Connect Rows, DuckDB vectors, future Arrow Flight batches) — each Driver converts its native rows into lakeorm.Row and the Scanner takes over.
Two design decisions worth calling out:
- sqlx/reflectx for field resolution (embedded structs, dot-notation, pointer-to-struct auto-init). Rolling our own would diverge from every other sqlx-using Go service on edge cases.
- scannerTarget (defined below) for nullable custom types that implement sql.Scanner — SortableID, Location, etc. The wrapper tracks NULL separately from the underlying Scan so pointer fields can be set to nil rather than zero-valued.
sqlx/reflectx binds one tag key per mapper, so we carry three in priority order (lake > lakeorm > spark) and consult them in turn when resolving a column. Same precedence as tag.go's effectiveTag.
func NewScanner ¶
func NewScanner() *Scanner
NewScanner creates a Scanner backed by sqlx's reflectx field mapper. One mapper per accepted tag key — lookups walk them in priority order.
func (*Scanner) ScanRow ¶
func (s *Scanner) ScanRow(row Row, dest any, _ *LakeSchema) error
ScanRow scans a single Row produced by a Driver into dest (a *T). schema is used only for column-intent introspection; the actual mapping is by column name via the reflectx mapper.
type StagingLister ¶
type StagingLister interface {
ListStagingPrefixes(ctx context.Context) ([]StagingPrefix, error)
}
StagingLister is the optional backend extension Client.CleanupStaging relies on. Backends that can enumerate their _staging/ namespace implement this; backends that can't leave it unimplemented and CleanupStaging returns ErrNotImplemented.
Kept as a separate interface (not folded into Backend) so the Backend contract stays narrow: the fast-path write path only needs StagingPrefix / StagingLocation / CleanupStaging by-URI, and that's the minimum any backend must implement.
type StagingPrefix ¶
StagingPrefix is one entry a StagingLister backend yields. URI is the absolute backend URI; IngestID is the final path segment, which CleanupStaging parses as a UUIDv7.
type StagingRef ¶
StagingRef names the URI prefix and parts produced by a fast-path partition writer. The Driver reads this to emit the `INSERT ... SELECT FROM parquet.<prefix>/*.parquet` statement.
type TableStats ¶
type UpdateOption ¶
type UpdateOption func(*updateConfig)
UpdateOption / UpsertOption / DeleteOption are kept as stubs so the Client signature doesn't shift when v1 adds real tuning knobs.
type UpsertOption ¶
type UpsertOption func(*upsertConfig)
type UpsertRequest ¶
type UpsertRequest struct {
Ctx context.Context
Schema *LakeSchema
Records any
OnMatch MergeAction
Backend Backend
}
UpsertRequest / DeleteRequest share the same shape at v0.
type VacuumOptions ¶
type VacuumOptions struct {
RetentionHours int
}
type ValidationError ¶
type ValidationError struct {
Errors []FieldError
}
ValidationError aggregates per-field failures so the caller sees every problem rather than just the first.
func (*ValidationError) Error ¶
func (e *ValidationError) Error() string
func (*ValidationError) Field ¶
func (e *ValidationError) Field(name string) *FieldError
Field returns the FieldError for a specific struct field, or nil.
type ValidatorFunc ¶
ValidatorFunc is the shape of a user-registered custom validator. Validators receive the raw field value (already dereferenced from pointer) and must return nil or a human-readable error message.
type WritePath ¶
type WritePath int
WritePath is the caller's optional override of the Dialect's routing.
type WriteRequest ¶
type WriteRequest struct {
Ctx context.Context
Schema *LakeSchema
// IngestID is a UUIDv7 correlation ID generated by Client.Insert
// for every operation. Threads through staging prefix, logs, and
// the Finalizer's cleanup target. Required for the fast path;
// Dialect implementations use this (not Idempotency) to compute
// staging locations.
IngestID string
Records any
RecordCount int
ApproxRowBytes int
// Idempotency is an optional caller-supplied deduplication token.
// Distinct from IngestID: IngestID is internal operational
// correlation; Idempotency is the contract with the caller for
// retry safety. Empty when the caller didn't supply one.
Idempotency string
Backend Backend
FastPathBytes int // advisory crossover threshold
ForcePath WritePath // None / ViaGRPC / ViaObjectStorage
Options map[string]any // dialect-specific overrides
}
WriteRequest is what the Client hands the Dialect on Insert.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package backend exposes the public constructors for every Backend implementation in a single, ergonomic surface.
|
Package backend exposes the public constructors for every Backend implementation in a single, ergonomic surface. |
|
file
Package file provides a local-disk Backend.
|
Package file provides a local-disk Backend. |
|
gcs
Package gcs provides a GCS Backend.
|
Package gcs provides a GCS Backend. |
|
memory
Package memory provides an in-memory Backend, useful for tests and for the fast-path regression harness.
|
Package memory provides an in-memory Backend, useful for tests and for the fast-path regression harness. |
|
s3
Package s3 provides an S3-compatible Backend built directly on aws-sdk-go-v2/service/s3.
|
Package s3 provides an S3-compatible Backend built directly on aws-sdk-go-v2/service/s3. |
|
cmd
|
|
|
lakeorm
command
Command lakeorm is the lakeorm CLI.
|
Command lakeorm is the lakeorm CLI. |
|
dialect
|
|
|
delta
Package delta is the Delta Lake Dialect.
|
Package delta is the Delta Lake Dialect. |
|
iceberg
Package iceberg implements lakeorm's default Dialect — Apache Iceberg tables written via Spark Connect.
|
Package iceberg implements lakeorm's default Dialect — Apache Iceberg tables written via Spark Connect. |
|
driver
|
|
|
databricks
Package databricks is lake-orm's native Databricks driver.
|
Package databricks is lake-orm's native Databricks driver. |
|
databricksconnect
Package databricksconnect is lake-orm's Databricks Connect driver.
|
Package databricksconnect is lake-orm's Databricks Connect driver. |
|
spark
Package spark provides lakeorm's generic Spark Connect driver.
|
Package spark provides lakeorm's generic Spark Connect driver. |
|
examples
|
|
|
basic
command
Example: the hero path for lakeorm.
|
Example: the hero path for lakeorm. |
|
bulk
command
Example: the fast-path write — parquet through object storage with a Spark `INSERT ...
|
Example: the fast-path write — parquet through object storage with a Spark `INSERT ... |
|
databricks
command
Example: lake-orm against a Databricks SQL warehouse using the native `databricks-sql-go` driver.
|
Example: lake-orm against a Databricks SQL warehouse using the native `databricks-sql-go` driver. |
|
databricksconnect
command
Example: lake-orm against a Databricks cluster via Databricks Connect (Spark Connect over a Databricks workspace).
|
Example: lake-orm against a Databricks cluster via Databricks Connect (Spark Connect over a Databricks workspace). |
|
delta
command
Example: writing against a Delta Lake table (instead of Iceberg).
|
Example: writing against a Delta Lake table (instead of Iceberg). |
|
ingest-id
command
Example: UUIDv7 ingest_id threading via `auto=ingestID`.
|
Example: UUIDv7 ingest_id threading via `auto=ingestID`. |
|
joins
command
Example: reading joins and aggregates with CQRS-style output types.
|
Example: reading joins and aggregates with CQRS-style output types. |
|
lake-tag
command
Example: the canonical `lake:"..."` tag.
|
Example: the canonical `lake:"..."` tag. |
|
migrations
command
Example: Django-style offline migration authoring.
|
Example: Django-style offline migration authoring. |
|
stream
command
Example: streaming a large result back with constant memory.
|
Example: streaming a large result back with constant memory. |
|
typed-helpers
command
Example: the typed helper API — Query[T] / QueryStream[T] / QueryFirst[T] and their DataFrame-shaped siblings CollectAs[T] / StreamAs[T] / FirstAs[T].
|
Example: the typed helper API — Query[T] / QueryStream[T] / QueryFirst[T] and their DataFrame-shaped siblings CollectAs[T] / StreamAs[T] / FirstAs[T]. |
|
validation
command
Example: applying validation at the application boundary.
|
Example: applying validation at the application boundary. |
|
internal
|
|
|
migrations/migrate
Package migrate is lakeorm's side of the schema-evolution story: it computes struct-diffs against a prior target-state (replayed from the most recent migration file's State-JSON header — the Django MigrationLoader pattern), classifies each change as destructive or not per the target dialect's rule table, and emits goose-format .sql files.
|
Package migrate is lakeorm's side of the schema-evolution story: it computes struct-diffs against a prior target-state (replayed from the most recent migration file's State-JSON header — the Django MigrationLoader pattern), classifies each change as destructive or not per the target dialect's rule table, and emits goose-format .sql files. |
|
parquet
Package parquet is lakeorm's parquet-writing layer.
|
Package parquet is lakeorm's parquet-writing layer. |
|
sqlbuild
Package sqlbuild renders the generic SELECT shape shared by every Dialect's read path.
|
Package sqlbuild renders the generic SELECT shape shared by every Dialect's read path. |
|
Package local is the lake-k8s docker-compose on-ramp.
|
Package local is the lake-k8s docker-compose on-ramp. |
|
Package teste2e is the black-box end-to-end suite for lakeorm.
|
Package teste2e is the black-box end-to-end suite for lakeorm. |
|
Package testutils is the shared test harness for lakeorm.
|
Package testutils is the shared test harness for lakeorm. |
|
Package types defines the small value types lakeorm exposes on its public API: SortableID (KSUID-backed), ObjectURI/Location (typed storage addresses), and SparkTableName.
|
Package types defines the small value types lakeorm exposes on its public API: SortableID (KSUID-backed), ObjectURI/Location (typed storage addresses), and SparkTableName. |