spark

package
v0.0.0-...-74b1f9c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: MIT Imports: 15 Imported by: 0

Documentation

Overview

Package spark provides lakeorm's generic Spark Connect driver. Use Remote for plain Spark Connect endpoints (self-hosted, EMR, Glue, lake-k8s). For Databricks clusters see the sibling driver/databricksconnect package.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Driver

type Driver struct {
	// contains filtered or unexported fields
}

Driver is the shared Spark Connect driver backing both Remote and databricksconnect. Exported so callers can reach the per-driver conversion helpers (FromSQL, FromDataFrame, FromTable, FromRow) and the raw session (Session) via a Client.Driver() type assertion.

Session-level confs (WithSessionConfs) are applied inside the pool factory, not held on driver, so they survive pool refreshes and apply to every newly-created session identically.

func FromFactory

func FromFactory(
	name string,
	factory func(context.Context) (scsql.SparkSession, error),
	opts ...RemoteOption,
) *Driver

FromFactory builds a *Driver around an arbitrary Spark Connect session factory. Exported so sibling driver packages — notably driver/databricksconnect — can reuse the full driver machinery (session pool, conf application, Execute dispatch, cluster-not- ready translation) without duplicating it.

`name` is returned verbatim from Driver.Name; "spark-remote" for the plain Remote constructor, "databricks-connect" for the Databricks Connect wrapper, etc. Shows up in logs + metrics as the driver discriminator.

`factory` produces a fresh scsql.SparkSession per pool borrow. FromFactory wraps the supplied factory so session-level confs from the opts are applied after the caller's factory returns, before the session is handed to lakeorm. Conf failures surface as factory errors — the pool will retry on the next borrow.

Regular users should not need this entry point. Use Remote for self-hosted Spark Connect; use the databricksconnect package for Databricks clusters.

func Remote

func Remote(uri string, opts ...RemoteOption) *Driver

Remote returns a Driver that connects to a plain Spark Connect endpoint (no OAuth, no cluster-ID header). Use for self-hosted Spark, EMR, Glue, and the lake-k8s local stack.

The concrete *Driver is returned (not drivers.Driver) so callers can reach the per-driver conversion helpers (FromSQL, FromDataFrame, FromTable, FromRow) and the raw session via Session(). lakeorm.Open still accepts it because *Driver satisfies the drivers.Driver interface.

drv := spark.Remote("sc://spark.internal:15002")
db, _ := lakeorm.Open(drv, iceberg.Dialect(), backends.S3(...))

// Typed read via the driver's conversion helper:
users, _ := lakeorm.Query[User](ctx, db, drv.FromSQL("SELECT * FROM users"))

// Drop to the raw Spark session when you need native DataFrame chaining:
sess, _ := drv.Session(ctx)
df, _ := sess.Sql(ctx, "...").GroupBy("country").Agg(...)
agg, _ := lakeorm.Query[CountryAgg](ctx, db, drv.FromDataFrame(df))

func (*Driver) Close

func (d *Driver) Close() error

Close implements drivers.Driver. Stops the session pool.

func (*Driver) Collect

func (d *Driver) Collect(ctx context.Context, source drivers.Source, out any) error

Collect runs source and appends every decoded row into out (must be a *[]T).

func (*Driver) Exec

func (d *Driver) Exec(ctx context.Context, sql string, args ...any) (drivers.ExecResult, error)

Exec implements drivers.Driver.Exec — fire-and-forget SQL for DDL or one-off DML. Does not go through Dialect.

func (*Driver) Execute

Execute implements drivers.Driver. Dispatches by plan kind.

func (*Driver) First

func (d *Driver) First(ctx context.Context, source drivers.Source, out any) error

First decodes the first row into out (*T) or returns errors.ErrNoRows.

func (*Driver) FromDataFrame

func (d *Driver) FromDataFrame(df scsql.DataFrame) drivers.Source

FromDataFrame builds a Source that hands back an already-created DataFrame. The caller owns the session that produced it, so the cleanup hook is nil — nothing for the driver to release.

Used when the caller has chained Spark operations on a DataFrame (GroupBy / Agg / Join / Window functions) and wants the typed decode path over the result. Paired with Session() for the acquire side.

func (*Driver) FromRow

func (d *Driver) FromRow(sql string, args ...any) drivers.Source

FromRow is the single-row variant of FromSQL. Structurally identical — the "single row" semantic is enforced by the Convertible.First call at the other end, which stops iterating after the first row and returns errors.ErrNoRows when the source yielded zero rows. Kept as a named helper because lakeorm.QueryFirst[T](ctx, db, drv.FromRow("...")) reads more clearly than reusing FromSQL at a First site.

func (*Driver) FromSQL

func (d *Driver) FromSQL(sql string, args ...any) drivers.Source

FromSQL builds a Source that borrows a session, runs sql (with ? placeholders rendered from args), and hands back the resulting DataFrame. The cleanup hook returns the session to the pool.

func (*Driver) FromTable

func (d *Driver) FromTable(name string) drivers.Source

FromTable builds a Source that reads a whole table by name. Equivalent to FromSQL("SELECT * FROM <name>") but goes through SparkSession.Table which lets Spark push column pruning and predicate pushdown further.

func (*Driver) Name

func (d *Driver) Name() string

Name implements drivers.Driver.

func (*Driver) Session

func (d *Driver) Session(ctx context.Context) (scsql.SparkSession, func(), error)

Session borrows a session from the pool and returns it alongside a release hook the caller must invoke (typically via defer) when done with the session. The escape hatch for callers that need to chain native DataFrame operations (GroupBy, Agg, Join, Window) before handing the result to FromDataFrame for typed decode.

sess, release, err := drv.Session(ctx)
if err != nil { return err }
defer release()
df, err := sess.Sql(ctx, "SELECT ...")
agg := df.GroupBy("country").Agg(...)
out, err := lakeorm.Query[CountryAgg](ctx, db, drv.FromDataFrame(agg))

func (*Driver) Stream

func (d *Driver) Stream(ctx context.Context, source drivers.Source, sample any) iter.Seq2[any, error]

Stream yields decoded rows one at a time. sample is a *T used to discover the element type via reflect.

type RemoteOption

type RemoteOption func(*remoteConfig)

RemoteOption tunes a Remote driver. Functional so the public surface stays extensible.

func WithLogger

func WithLogger(l zerolog.Logger) RemoteOption

WithLogger sets the driver logger.

func WithPoolSize

func WithPoolSize(n int) RemoteOption

WithPoolSize overrides the session pool size for this driver (defaults to 8 — see SessionPool). Normally tuned at the Client level via lakeorm.WithSessionPoolSize.

func WithSessionConfs

func WithSessionConfs(confs map[string]string) RemoteOption

WithSessionConfs sets Spark SQL session-level configuration that every borrowed session runs `SET key=value` on first use. Useful for per-client Hadoop S3A credentials, Arrow batch sizes, and any other spark.* knobs the Spark Connect server can't hard-wire at startup. Prefer server-level configuration when the cluster allows it; fall back to this for dev stacks, on-prem, or tenant-specific knobs that shouldn't leak into the cluster defaults.

type SessionPool

type SessionPool struct {
	// contains filtered or unexported fields
}

SessionPool manages a bounded pool of SparkSession instances.

Spark Connect sessions are stateful on the server — each holds its own catalog context, temp views, config overrides. Three options:

  • One session per goroutine. Simple model, state-isolated, but cluster-side overhead (auth, init) is per-session — painful at high concurrency.
  • One shared session. Cheap, but any operation with side effects (SET, CREATE TEMP VIEW, catalog use) races.
  • A bounded pool (this type). Serializes state within a borrow, caps cluster-side overhead at `size`.

Default size is 8; tune via lakeorm.WithSessionPoolSize.

func (*SessionPool) Borrow

func (p *SessionPool) Borrow(ctx context.Context) (scsql.SparkSession, error)

Borrow returns a session from the pool, creating one on demand up to the configured size. Callers must call Return() when done.

func (*SessionPool) Close

func (p *SessionPool) Close() error

Close stops every idle session and refuses further borrows.

func (*SessionPool) Return

func (p *SessionPool) Return(s scsql.SparkSession)

Return hands a session back to the pool. If the pool is full, the session is stopped.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL