duckdbservice

package
v0.0.0-...-d79ae9c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 12, 2026 License: MIT Imports: 49 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var AppendValue = arrowmap.AppendValue

AppendValue re-exports arrowmap.AppendValue for backward compatibility. The duckdb-go-specific value types (duckdb.Interval, Decimal, UUID, OrderedMap, Map) are handled via an arrowmap.Appender registered from duckdbservice/appender_init.go, so callers that import duckdbservice get full type coverage automatically.

View Source
var DuckDBTypeToArrow = arrowmap.DuckDBTypeToArrow

DuckDBTypeToArrow re-exports arrowmap.DuckDBTypeToArrow for backward compatibility with existing callers in this package.

View Source
var ErrWorkerDraining = errors.New("worker is draining")
View Source
var QualifyTableName = arrowmap.QualifyTableName

QualifyTableName re-exports arrowmap.QualifyTableName for backward compatibility with existing callers.

View Source
var QuoteIdent = arrowmap.QuoteIdent

QuoteIdent re-exports arrowmap.QuoteIdent for backward compatibility.

Functions

func BearerTokenStreamInterceptor

func BearerTokenStreamInterceptor(expectedToken string) grpc.StreamServerInterceptor

BearerTokenStreamInterceptor returns a gRPC stream interceptor that validates the bearer token from the "authorization" metadata header. If expectedToken is empty, no authentication is enforced.

func BearerTokenUnaryInterceptor

func BearerTokenUnaryInterceptor(expectedToken string) grpc.UnaryServerInterceptor

BearerTokenUnaryInterceptor returns a gRPC unary interceptor that validates the bearer token from the "authorization" metadata header. If expectedToken is empty, no authentication is enforced.

func GetQuerySchema

func GetQuerySchema(ctx context.Context, db contextQueryer, query string, tx contextQueryer) (*arrow.Schema, error)

GetQuerySchema executes a query with LIMIT 0 to discover the result schema.

func IsCopyFromStdinDescriptor

func IsCopyFromStdinDescriptor(desc *flight.FlightDescriptor) bool

IsCopyFromStdinDescriptor reports whether desc is the custom DoPut descriptor used by the control plane to stream a CSV spool file for COPY FROM STDIN. customActionServer.DoPut peeks at the first frame of every DoPut stream and routes here when this returns true.

func LogCacheProxyStatus

func LogCacheProxyStatus()

LogCacheProxyStatus logs whether the cache proxy integration is enabled. Called once from main on every duckgres process (control plane and workers) so the startup logs clearly show the cache state.

func ParseListenAddr

func ParseListenAddr(addr string) (network, listenAddr string, err error)

ParseListenAddr parses ListenAddr into a network and address for net.Listen. Supports "unix:///path/to/sock" and TCP addresses like ":8816" or "host:port".

func RowsToRecord

func RowsToRecord(alloc memory.Allocator, rows *sql.Rows, schema *arrow.Schema, batchSize int) (arrow.RecordBatch, error)

RowsToRecord converts sql.Rows into an Arrow RecordBatch of up to batchSize rows. Returns nil when there are no more rows.

func Run

func Run(cfg ServiceConfig)

Run starts the DuckDB service, blocking until shutdown.

Types

type ActivationPayload

type ActivationPayload struct {
	server.WorkerControlMetadata
	OrgID    string                `json:"org_id"`
	DuckLake server.DuckLakeConfig `json:"ducklake"`
	// Iceberg is the per-tenant Iceberg catalog (AWS S3 Tables) config. Empty
	// (Enabled=false) when the tenant has not opted in or hasn't been
	// provisioned yet — workers handle that as a no-op at attach time.
	Iceberg server.IcebergConfig `json:"iceberg"`
}

ActivationPayload carries the tenant-specific runtime that is delivered to a neutral shared warm worker over the control-plane RPC channel.

type DuckDBPair

type DuckDBPair struct {
	Main    *sql.DB
	Control *sql.DB
	// contains filtered or unexported fields
}

DuckDBPair holds two *sql.DBs that share the same underlying DuckDB instance:

  • Main is the client-query DB (MaxOpenConns=1, single-session isolation for the user's pgwire session — this is the existing behavior).
  • Control is reserved for the control plane's non-blocking ops (CREATE OR REPLACE SECRET on credential refresh, ATTACH/DETACH) and is also MaxOpenConns=1 so a control DDL can't fight itself, but it has its own pool so it never queues behind a long-running client query.

Both *sql.DBs share one *duckdb.Connector / one DuckDB Database handle, which means catalogs, the SecretManager, and registered extensions are all visible across both. The main session running a query benefits from a fresh secret the moment Control swaps it — see RefreshS3Secret callers for the rationale.

Lives in duckdbservice (not server) so the control-plane binary, which imports server but not duckdbservice, doesn't transitively pull in the libduckdb-linked duckdb-go-v2 driver. CI's "does-not-link-libduckdb" check enforces this boundary.

The owner must call Close on the pair when shutting down. Close on the individual *sql.DBs goes through a non-closing wrapper, so the underlying DuckDB instance only goes away when DuckDBPair.Close fires.

func CreateWorkerDBPair

func CreateWorkerDBPair(cfg server.Config, duckLakeSem chan struct{}, username string, serverStartTime time.Time, serverVersion string) (*DuckDBPair, error)

CreateWorkerDBPair is the worker-process factory: it opens a shared-connector DuckDB pair and runs ConfigureDBConnection on Main (pg_catalog, information_schema, DuckLake attach) so it matches what the existing CreateDBConnection produces. Control is left in its minimal post-Ping state — it sees the same Database (and therefore the same SecretManager and attached catalogs) thanks to the shared connector, so no per-conn init is needed for credential rotation.

func OpenDuckDBPair

func OpenDuckDBPair(cfg server.Config, username string) (*DuckDBPair, error)

OpenDuckDBPair builds the DSN that openBaseDB would have used, opens one DuckDB Database via *duckdb.Connector, and returns a Main + Control *sql.DB sharing it. The Main DB still goes through the same configuration path as openBaseDB (threads, memory_limit, extensions, profiling, cache settings); the Control DB is intentionally minimal.

func PairFromMain

func PairFromMain(db *sql.DB) *DuckDBPair

PairFromMain builds a *DuckDBPair that points Main and Control at the same *sql.DB and owns no connector. Intended for test fixtures that don't exercise the connector-sharing properties — production code uses CreateWorkerDBPair, which returns a real pair.

func (*DuckDBPair) Close

func (p *DuckDBPair) Close() error

Close closes both *sql.DBs and the underlying DuckDB instance. Safe to call multiple times.

type DuckDBService

type DuckDBService struct {
	// contains filtered or unexported fields
}

DuckDBService is a standalone Arrow Flight SQL service backed by DuckDB.

func NewDuckDBService

func NewDuckDBService(cfg ServiceConfig) *DuckDBService

NewDuckDBService creates a new DuckDB service with the given config.

func (*DuckDBService) BeginDrain

func (svc *DuckDBService) BeginDrain()

func (*DuckDBService) CloseAll

func (svc *DuckDBService) CloseAll()

func (*DuckDBService) Serve

func (svc *DuckDBService) Serve(listener net.Listener) error

Serve starts serving on the given listener.

func (*DuckDBService) Shutdown

func (svc *DuckDBService) Shutdown()

Shutdown gracefully stops the service.

func (*DuckDBService) WaitForDrain

func (svc *DuckDBService) WaitForDrain(ctx context.Context) bool

type FlightSQLHandler

type FlightSQLHandler struct {
	flightsql.BaseServer
	// contains filtered or unexported fields
}

FlightSQLHandler implements Arrow Flight SQL for multiple sessions. Sessions are identified by the "x-duckgres-session" gRPC metadata header.

func NewFlightSQLHandler

func NewFlightSQLHandler(pool *SessionPool) *FlightSQLHandler

NewFlightSQLHandler creates a new multi-session Flight SQL handler.

func (*FlightSQLHandler) BeginTransaction

func (*FlightSQLHandler) DoGetStatement

func (*FlightSQLHandler) DoPutCommandStatementUpdate

func (h *FlightSQLHandler) DoPutCommandStatementUpdate(ctx context.Context,
	cmd flightsql.StatementUpdate) (int64, error)

func (*FlightSQLHandler) EndTransaction

func (*FlightSQLHandler) GetFlightInfoStatement

func (h *FlightSQLHandler) GetFlightInfoStatement(ctx context.Context, cmd flightsql.StatementQuery,
	desc *flight.FlightDescriptor) (*flight.FlightInfo, error)

type QueryHandle

type QueryHandle struct {
	Query  string
	Schema *arrow.Schema
	TxnID  string
	// contains filtered or unexported fields
}

QueryHandle stores an ad-hoc query awaiting its DoGet.

type ServiceConfig

type ServiceConfig struct {
	// ListenAddr is the address to listen on.
	// Formats: "unix:///var/run/duckgres/duckdb.sock" or ":8816" or "0.0.0.0:8816"
	ListenAddr string

	// ListenFD is an inherited file descriptor for a pre-bound listener.
	// When > 0, the service uses this FD instead of creating a new socket.
	// This is used by the control plane to pass a pre-bound Unix socket to
	// worker processes, avoiding EROFS errors under ProtectSystem=strict.
	ListenFD int

	// ServerConfig is reused for CreateDBConnection (data_dir, extensions, DuckLake).
	ServerConfig server.Config

	// BearerToken is the authentication token for gRPC requests.
	// If empty, no authentication is required.
	BearerToken string

	// MaxSessions limits the number of concurrent sessions. 0 means unlimited.
	MaxSessions int

	// RequireActivation forces shared warm workers to receive tenant runtime
	// over the control-plane activation RPC before they may serve sessions.
	RequireActivation bool
}

ServiceConfig configures the standalone DuckDB Arrow Flight SQL service.

type Session

type Session struct {
	ID        string
	DB        *sql.DB
	Conn      *sql.Conn // Dedicated connection for this session
	Username  string
	CreatedAt time.Time
	// contains filtered or unexported fields
}

Session represents a single DuckDB session.

type SessionPool

type SessionPool struct {
	// contains filtered or unexported fields
}

SessionPool manages multiple DuckDB sessions keyed by session token.

func (*SessionPool) ActiveDrainWork

func (p *SessionPool) ActiveDrainWork() int

func (*SessionPool) ActiveSessions

func (p *SessionPool) ActiveSessions() int

ActiveSessions returns the number of active sessions.

func (*SessionPool) BeginDrain

func (p *SessionPool) BeginDrain()

func (*SessionPool) CloseAll

func (p *SessionPool) CloseAll()

CloseAll closes all sessions.

func (*SessionPool) CreateSession

func (p *SessionPool) CreateSession(username, memoryLimit string, threads int, secretStatements []string) (*Session, []string, error)

CreateSession creates a new DuckDB session for the given username. secretStatements are the user's persistent secrets to replay (shared-warm mode only); replay failures come back as warnings — logged on the worker and again by the control plane's session manager, never failing the session — rather than errors.

func (*SessionPool) DestroySession

func (p *SessionPool) DestroySession(token string) error

DestroySession closes and removes a session.

func (*SessionPool) GetSession

func (p *SessionPool) GetSession(token string) (*Session, bool)

GetSession returns a session by token.

func (*SessionPool) IsDraining

func (p *SessionPool) IsDraining() bool

func (*SessionPool) WaitForDrain

func (p *SessionPool) WaitForDrain(ctx context.Context) bool

func (*SessionPool) Warmup

func (p *SessionPool) Warmup() error

Warmup performs one-time initialization of the shared DuckDB instance. This loads extensions and attaches catalogs so that subsequent session creations are nearly instantaneous.

Directories

Path Synopsis
Package arrowmap provides DuckDB-free helpers for translating DuckDB type strings into Arrow types, quoting/qualifying SQL identifiers, and appending scanned values into Arrow array builders.
Package arrowmap provides DuckDB-free helpers for translating DuckDB type strings into Arrow types, quoting/qualifying SQL identifiers, and appending scanned values into Arrow array builders.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL