Documentation
¶
Index ¶
- Variables
- func BearerTokenStreamInterceptor(expectedToken string) grpc.StreamServerInterceptor
- func BearerTokenUnaryInterceptor(expectedToken string) grpc.UnaryServerInterceptor
- func GetQuerySchema(ctx context.Context, db contextQueryer, query string, tx contextQueryer) (*arrow.Schema, error)
- func IsCopyFromStdinDescriptor(desc *flight.FlightDescriptor) bool
- func LogCacheProxyStatus()
- func ParseListenAddr(addr string) (network, listenAddr string, err error)
- func RowsToRecord(alloc memory.Allocator, rows *sql.Rows, schema *arrow.Schema, batchSize int) (arrow.RecordBatch, error)
- func Run(cfg ServiceConfig)
- type ActivationPayload
- type DuckDBPair
- type DuckDBService
- type FlightSQLHandler
- func (h *FlightSQLHandler) BeginTransaction(ctx context.Context, req flightsql.ActionBeginTransactionRequest) ([]byte, error)
- func (h *FlightSQLHandler) DoGetStatement(ctx context.Context, ticket flightsql.StatementQueryTicket) (*arrow.Schema, <-chan flight.StreamChunk, error)
- func (h *FlightSQLHandler) DoPutCommandStatementUpdate(ctx context.Context, cmd flightsql.StatementUpdate) (int64, error)
- func (h *FlightSQLHandler) EndTransaction(ctx context.Context, req flightsql.ActionEndTransactionRequest) error
- func (h *FlightSQLHandler) GetFlightInfoStatement(ctx context.Context, cmd flightsql.StatementQuery, ...) (*flight.FlightInfo, error)
- type QueryHandle
- type ServiceConfig
- type Session
- type SessionPool
- func (p *SessionPool) ActiveDrainWork() int
- func (p *SessionPool) ActiveSessions() int
- func (p *SessionPool) BeginDrain()
- func (p *SessionPool) CloseAll()
- func (p *SessionPool) CreateSession(username, memoryLimit string, threads int, secretStatements []string) (*Session, []string, error)
- func (p *SessionPool) DestroySession(token string) error
- func (p *SessionPool) GetSession(token string) (*Session, bool)
- func (p *SessionPool) IsDraining() bool
- func (p *SessionPool) WaitForDrain(ctx context.Context) bool
- func (p *SessionPool) Warmup() error
Constants ¶
This section is empty.
Variables ¶
var AppendValue = arrowmap.AppendValue
AppendValue re-exports arrowmap.AppendValue for backward compatibility. The duckdb-go-specific value types (duckdb.Interval, Decimal, UUID, OrderedMap, Map) are handled via an arrowmap.Appender registered from duckdbservice/appender_init.go, so callers that import duckdbservice get full type coverage automatically.
var DuckDBTypeToArrow = arrowmap.DuckDBTypeToArrow
DuckDBTypeToArrow re-exports arrowmap.DuckDBTypeToArrow for backward compatibility with existing callers in this package.
var ErrWorkerDraining = errors.New("worker is draining")
var QualifyTableName = arrowmap.QualifyTableName
QualifyTableName re-exports arrowmap.QualifyTableName for backward compatibility with existing callers.
var QuoteIdent = arrowmap.QuoteIdent
QuoteIdent re-exports arrowmap.QuoteIdent for backward compatibility.
Functions ¶
func BearerTokenStreamInterceptor ¶
func BearerTokenStreamInterceptor(expectedToken string) grpc.StreamServerInterceptor
BearerTokenStreamInterceptor returns a gRPC stream interceptor that validates the bearer token from the "authorization" metadata header. If expectedToken is empty, no authentication is enforced.
func BearerTokenUnaryInterceptor ¶
func BearerTokenUnaryInterceptor(expectedToken string) grpc.UnaryServerInterceptor
BearerTokenUnaryInterceptor returns a gRPC unary interceptor that validates the bearer token from the "authorization" metadata header. If expectedToken is empty, no authentication is enforced.
func GetQuerySchema ¶
func GetQuerySchema(ctx context.Context, db contextQueryer, query string, tx contextQueryer) (*arrow.Schema, error)
GetQuerySchema executes a query with LIMIT 0 to discover the result schema.
func IsCopyFromStdinDescriptor ¶
func IsCopyFromStdinDescriptor(desc *flight.FlightDescriptor) bool
IsCopyFromStdinDescriptor reports whether desc is the custom DoPut descriptor used by the control plane to stream a CSV spool file for COPY FROM STDIN. customActionServer.DoPut peeks at the first frame of every DoPut stream and routes here when this returns true.
func LogCacheProxyStatus ¶
func LogCacheProxyStatus()
LogCacheProxyStatus logs whether the cache proxy integration is enabled. Called once from main on every duckgres process (control plane and workers) so the startup logs clearly show the cache state.
func ParseListenAddr ¶
ParseListenAddr parses ListenAddr into a network and address for net.Listen. Supports "unix:///path/to/sock" and TCP addresses like ":8816" or "host:port".
Types ¶
type ActivationPayload ¶
type ActivationPayload struct {
server.WorkerControlMetadata
OrgID string `json:"org_id"`
DuckLake server.DuckLakeConfig `json:"ducklake"`
// Iceberg is the per-tenant Iceberg catalog (AWS S3 Tables) config. Empty
// (Enabled=false) when the tenant has not opted in or hasn't been
// provisioned yet — workers handle that as a no-op at attach time.
Iceberg server.IcebergConfig `json:"iceberg"`
}
ActivationPayload carries the tenant-specific runtime that is delivered to a neutral shared warm worker over the control-plane RPC channel.
type DuckDBPair ¶
DuckDBPair holds two *sql.DBs that share the same underlying DuckDB instance:
- Main is the client-query DB (MaxOpenConns=1, single-session isolation for the user's pgwire session — this is the existing behavior).
- Control is reserved for the control plane's non-blocking ops (CREATE OR REPLACE SECRET on credential refresh, ATTACH/DETACH) and is also MaxOpenConns=1 so a control DDL can't fight itself, but it has its own pool so it never queues behind a long-running client query.
Both *sql.DBs share one *duckdb.Connector / one DuckDB Database handle, which means catalogs, the SecretManager, and registered extensions are all visible across both. The main session running a query benefits from a fresh secret the moment Control swaps it — see RefreshS3Secret callers for the rationale.
Lives in duckdbservice (not server) so the control-plane binary, which imports server but not duckdbservice, doesn't transitively pull in the libduckdb-linked duckdb-go-v2 driver. CI's "does-not-link-libduckdb" check enforces this boundary.
The owner must call Close on the pair when shutting down. Close on the individual *sql.DBs goes through a non-closing wrapper, so the underlying DuckDB instance only goes away when DuckDBPair.Close fires.
func CreateWorkerDBPair ¶
func CreateWorkerDBPair(cfg server.Config, duckLakeSem chan struct{}, username string, serverStartTime time.Time, serverVersion string) (*DuckDBPair, error)
CreateWorkerDBPair is the worker-process factory: it opens a shared-connector DuckDB pair and runs ConfigureDBConnection on Main (pg_catalog, information_schema, DuckLake attach) so it matches what the existing CreateDBConnection produces. Control is left in its minimal post-Ping state — it sees the same Database (and therefore the same SecretManager and attached catalogs) thanks to the shared connector, so no per-conn init is needed for credential rotation.
func OpenDuckDBPair ¶
func OpenDuckDBPair(cfg server.Config, username string) (*DuckDBPair, error)
OpenDuckDBPair builds the DSN that openBaseDB would have used, opens one DuckDB Database via *duckdb.Connector, and returns a Main + Control *sql.DB sharing it. The Main DB still goes through the same configuration path as openBaseDB (threads, memory_limit, extensions, profiling, cache settings); the Control DB is intentionally minimal.
func PairFromMain ¶
func PairFromMain(db *sql.DB) *DuckDBPair
PairFromMain builds a *DuckDBPair that points Main and Control at the same *sql.DB and owns no connector. Intended for test fixtures that don't exercise the connector-sharing properties — production code uses CreateWorkerDBPair, which returns a real pair.
func (*DuckDBPair) Close ¶
func (p *DuckDBPair) Close() error
Close closes both *sql.DBs and the underlying DuckDB instance. Safe to call multiple times.
type DuckDBService ¶
type DuckDBService struct {
// contains filtered or unexported fields
}
DuckDBService is a standalone Arrow Flight SQL service backed by DuckDB.
func NewDuckDBService ¶
func NewDuckDBService(cfg ServiceConfig) *DuckDBService
NewDuckDBService creates a new DuckDB service with the given config.
func (*DuckDBService) BeginDrain ¶
func (svc *DuckDBService) BeginDrain()
func (*DuckDBService) CloseAll ¶
func (svc *DuckDBService) CloseAll()
func (*DuckDBService) Serve ¶
func (svc *DuckDBService) Serve(listener net.Listener) error
Serve starts serving on the given listener.
func (*DuckDBService) Shutdown ¶
func (svc *DuckDBService) Shutdown()
Shutdown gracefully stops the service.
func (*DuckDBService) WaitForDrain ¶
func (svc *DuckDBService) WaitForDrain(ctx context.Context) bool
type FlightSQLHandler ¶
type FlightSQLHandler struct {
flightsql.BaseServer
// contains filtered or unexported fields
}
FlightSQLHandler implements Arrow Flight SQL for multiple sessions. Sessions are identified by the "x-duckgres-session" gRPC metadata header.
func NewFlightSQLHandler ¶
func NewFlightSQLHandler(pool *SessionPool) *FlightSQLHandler
NewFlightSQLHandler creates a new multi-session Flight SQL handler.
func (*FlightSQLHandler) BeginTransaction ¶
func (h *FlightSQLHandler) BeginTransaction(ctx context.Context, req flightsql.ActionBeginTransactionRequest) ([]byte, error)
func (*FlightSQLHandler) DoGetStatement ¶
func (h *FlightSQLHandler) DoGetStatement(ctx context.Context, ticket flightsql.StatementQueryTicket) (*arrow.Schema, <-chan flight.StreamChunk, error)
func (*FlightSQLHandler) DoPutCommandStatementUpdate ¶
func (h *FlightSQLHandler) DoPutCommandStatementUpdate(ctx context.Context, cmd flightsql.StatementUpdate) (int64, error)
func (*FlightSQLHandler) EndTransaction ¶
func (h *FlightSQLHandler) EndTransaction(ctx context.Context, req flightsql.ActionEndTransactionRequest) error
func (*FlightSQLHandler) GetFlightInfoStatement ¶
func (h *FlightSQLHandler) GetFlightInfoStatement(ctx context.Context, cmd flightsql.StatementQuery, desc *flight.FlightDescriptor) (*flight.FlightInfo, error)
type QueryHandle ¶
type QueryHandle struct {
Query string
Schema *arrow.Schema
TxnID string
// contains filtered or unexported fields
}
QueryHandle stores an ad-hoc query awaiting its DoGet.
type ServiceConfig ¶
type ServiceConfig struct {
// ListenAddr is the address to listen on.
// Formats: "unix:///var/run/duckgres/duckdb.sock" or ":8816" or "0.0.0.0:8816"
ListenAddr string
// ListenFD is an inherited file descriptor for a pre-bound listener.
// When > 0, the service uses this FD instead of creating a new socket.
// This is used by the control plane to pass a pre-bound Unix socket to
// worker processes, avoiding EROFS errors under ProtectSystem=strict.
ListenFD int
// ServerConfig is reused for CreateDBConnection (data_dir, extensions, DuckLake).
ServerConfig server.Config
// BearerToken is the authentication token for gRPC requests.
// If empty, no authentication is required.
BearerToken string
// MaxSessions limits the number of concurrent sessions. 0 means unlimited.
MaxSessions int
// RequireActivation forces shared warm workers to receive tenant runtime
// over the control-plane activation RPC before they may serve sessions.
RequireActivation bool
}
ServiceConfig configures the standalone DuckDB Arrow Flight SQL service.
type Session ¶
type Session struct {
ID string
DB *sql.DB
Conn *sql.Conn // Dedicated connection for this session
Username string
CreatedAt time.Time
// contains filtered or unexported fields
}
Session represents a single DuckDB session.
type SessionPool ¶
type SessionPool struct {
// contains filtered or unexported fields
}
SessionPool manages multiple DuckDB sessions keyed by session token.
func (*SessionPool) ActiveDrainWork ¶
func (p *SessionPool) ActiveDrainWork() int
func (*SessionPool) ActiveSessions ¶
func (p *SessionPool) ActiveSessions() int
ActiveSessions returns the number of active sessions.
func (*SessionPool) BeginDrain ¶
func (p *SessionPool) BeginDrain()
func (*SessionPool) CreateSession ¶
func (p *SessionPool) CreateSession(username, memoryLimit string, threads int, secretStatements []string) (*Session, []string, error)
CreateSession creates a new DuckDB session for the given username. secretStatements are the user's persistent secrets to replay (shared-warm mode only); replay failures come back as warnings — logged on the worker and again by the control plane's session manager, never failing the session — rather than errors.
func (*SessionPool) DestroySession ¶
func (p *SessionPool) DestroySession(token string) error
DestroySession closes and removes a session.
func (*SessionPool) GetSession ¶
func (p *SessionPool) GetSession(token string) (*Session, bool)
GetSession returns a session by token.
func (*SessionPool) IsDraining ¶
func (p *SessionPool) IsDraining() bool
func (*SessionPool) WaitForDrain ¶
func (p *SessionPool) WaitForDrain(ctx context.Context) bool
func (*SessionPool) Warmup ¶
func (p *SessionPool) Warmup() error
Warmup performs one-time initialization of the shared DuckDB instance. This loads extensions and attaches catalogs so that subsequent session creations are nearly instantaneous.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
Package arrowmap provides DuckDB-free helpers for translating DuckDB type strings into Arrow types, quoting/qualifying SQL identifiers, and appending scanned values into Arrow array builders.
|
Package arrowmap provides DuckDB-free helpers for translating DuckDB type strings into Arrow types, quoting/qualifying SQL identifiers, and appending scanned values into Arrow array builders. |