Documentation
¶
Overview ¶
Package spark provides lakeorm's generic Spark Connect driver. Use Remote for plain Spark Connect endpoints (self-hosted, EMR, Glue, lake-k8s). For Databricks clusters see the sibling driver/databricksconnect package.
Index ¶
- type Driver
- func (d *Driver) Close() error
- func (d *Driver) Collect(ctx context.Context, source drivers.Source, out any) error
- func (d *Driver) Exec(ctx context.Context, sql string, args ...any) (drivers.ExecResult, error)
- func (d *Driver) Execute(ctx context.Context, plan drivers.ExecutionPlan) (drivers.Result, drivers.Finalizer, error)
- func (d *Driver) First(ctx context.Context, source drivers.Source, out any) error
- func (d *Driver) FromDataFrame(df scsql.DataFrame) drivers.Source
- func (d *Driver) FromRow(sql string, args ...any) drivers.Source
- func (d *Driver) FromSQL(sql string, args ...any) drivers.Source
- func (d *Driver) FromTable(name string) drivers.Source
- func (d *Driver) Name() string
- func (d *Driver) Session(ctx context.Context) (scsql.SparkSession, func(), error)
- func (d *Driver) Stream(ctx context.Context, source drivers.Source, sample any) iter.Seq2[any, error]
- type RemoteOption
- type SessionPool
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Driver ¶
type Driver struct {
// contains filtered or unexported fields
}
Driver is the shared Spark Connect driver backing both Remote and databricksconnect. Exported so callers can reach the per-driver conversion helpers (FromSQL, FromDataFrame, FromTable, FromRow) and the raw session (Session) via a Client.Driver() type assertion.
Session-level confs (WithSessionConfs) are applied inside the pool factory, not held on driver, so they survive pool refreshes and apply to every newly-created session identically.
func FromFactory ¶
func FromFactory( name string, factory func(context.Context) (scsql.SparkSession, error), opts ...RemoteOption, ) *Driver
FromFactory builds a *Driver around an arbitrary Spark Connect session factory. Exported so sibling driver packages — notably driver/databricksconnect — can reuse the full driver machinery (session pool, conf application, Execute dispatch, cluster-not- ready translation) without duplicating it.
`name` is returned verbatim from Driver.Name; "spark-remote" for the plain Remote constructor, "databricks-connect" for the Databricks Connect wrapper, etc. Shows up in logs + metrics as the driver discriminator.
`factory` produces a fresh scsql.SparkSession per pool borrow. FromFactory wraps the supplied factory so session-level confs from the opts are applied after the caller's factory returns, before the session is handed to lakeorm. Conf failures surface as factory errors — the pool will retry on the next borrow.
Regular users should not need this entry point. Use Remote for self-hosted Spark Connect; use the databricksconnect package for Databricks clusters.
func Remote ¶
func Remote(uri string, opts ...RemoteOption) *Driver
Remote returns a Driver that connects to a plain Spark Connect endpoint (no OAuth, no cluster-ID header). Use for self-hosted Spark, EMR, Glue, and the lake-k8s local stack.
The concrete *Driver is returned (not drivers.Driver) so callers can reach the per-driver conversion helpers (FromSQL, FromDataFrame, FromTable, FromRow) and the raw session via Session(). lakeorm.Open still accepts it because *Driver satisfies the drivers.Driver interface.
drv := spark.Remote("sc://spark.internal:15002")
db, _ := lakeorm.Open(drv, iceberg.Dialect(), backends.S3(...))
// Typed read via the driver's conversion helper:
users, _ := lakeorm.Query[User](ctx, db, drv.FromSQL("SELECT * FROM users"))
// Drop to the raw Spark session when you need native DataFrame chaining:
sess, _ := drv.Session(ctx)
df, _ := sess.Sql(ctx, "...").GroupBy("country").Agg(...)
agg, _ := lakeorm.Query[CountryAgg](ctx, db, drv.FromDataFrame(df))
func (*Driver) Collect ¶
Collect runs source and appends every decoded row into out (must be a *[]T).
func (*Driver) Exec ¶
Exec implements drivers.Driver.Exec — fire-and-forget SQL for DDL or one-off DML. Does not go through Dialect.
func (*Driver) Execute ¶
func (d *Driver) Execute(ctx context.Context, plan drivers.ExecutionPlan) (drivers.Result, drivers.Finalizer, error)
Execute implements drivers.Driver. Dispatches by plan kind.
func (*Driver) FromDataFrame ¶
FromDataFrame builds a Source that hands back an already-created DataFrame. The caller owns the session that produced it, so the cleanup hook is nil — nothing for the driver to release.
Used when the caller has chained Spark operations on a DataFrame (GroupBy / Agg / Join / Window functions) and wants the typed decode path over the result. Paired with Session() for the acquire side.
func (*Driver) FromRow ¶
FromRow is the single-row variant of FromSQL. Structurally identical — the "single row" semantic is enforced by the Convertible.First call at the other end, which stops iterating after the first row and returns errors.ErrNoRows when the source yielded zero rows. Kept as a named helper because lakeorm.QueryFirst[T](ctx, db, drv.FromRow("...")) reads more clearly than reusing FromSQL at a First site.
func (*Driver) FromSQL ¶
FromSQL builds a Source that borrows a session, runs sql (with ? placeholders rendered from args), and hands back the resulting DataFrame. The cleanup hook returns the session to the pool.
func (*Driver) FromTable ¶
FromTable builds a Source that reads a whole table by name. Equivalent to FromSQL("SELECT * FROM <name>") but goes through SparkSession.Table which lets Spark push column pruning and predicate pushdown further.
func (*Driver) Session ¶
Session borrows a session from the pool and returns it alongside a release hook the caller must invoke (typically via defer) when done with the session. The escape hatch for callers that need to chain native DataFrame operations (GroupBy, Agg, Join, Window) before handing the result to FromDataFrame for typed decode.
sess, release, err := drv.Session(ctx)
if err != nil { return err }
defer release()
df, err := sess.Sql(ctx, "SELECT ...")
agg := df.GroupBy("country").Agg(...)
out, err := lakeorm.Query[CountryAgg](ctx, db, drv.FromDataFrame(agg))
type RemoteOption ¶
type RemoteOption func(*remoteConfig)
RemoteOption tunes a Remote driver. Functional so the public surface stays extensible.
func WithPoolSize ¶
func WithPoolSize(n int) RemoteOption
WithPoolSize overrides the session pool size for this driver (defaults to 8 — see SessionPool). Normally tuned at the Client level via lakeorm.WithSessionPoolSize.
func WithSessionConfs ¶
func WithSessionConfs(confs map[string]string) RemoteOption
WithSessionConfs sets Spark SQL session-level configuration that every borrowed session runs `SET key=value` on first use. Useful for per-client Hadoop S3A credentials, Arrow batch sizes, and any other spark.* knobs the Spark Connect server can't hard-wire at startup. Prefer server-level configuration when the cluster allows it; fall back to this for dev stacks, on-prem, or tenant-specific knobs that shouldn't leak into the cluster defaults.
type SessionPool ¶
type SessionPool struct {
// contains filtered or unexported fields
}
SessionPool manages a bounded pool of SparkSession instances.
Spark Connect sessions are stateful on the server — each holds its own catalog context, temp views, config overrides. Three options:
- One session per goroutine. Simple model, state-isolated, but cluster-side overhead (auth, init) is per-session — painful at high concurrency.
- One shared session. Cheap, but any operation with side effects (SET, CREATE TEMP VIEW, catalog use) races.
- A bounded pool (this type). Serializes state within a borrow, caps cluster-side overhead at `size`.
Default size is 8; tune via lakeorm.WithSessionPoolSize.
func (*SessionPool) Borrow ¶
func (p *SessionPool) Borrow(ctx context.Context) (scsql.SparkSession, error)
Borrow returns a session from the pool, creating one on demand up to the configured size. Callers must call Return() when done.
func (*SessionPool) Close ¶
func (p *SessionPool) Close() error
Close stops every idle session and refuses further borrows.
func (*SessionPool) Return ¶
func (p *SessionPool) Return(s scsql.SparkSession)
Return hands a session back to the pool. If the pool is full, the session is stopped.