Documentation
¶
Overview ¶
Package flightsql is an ADBC Driver Implementation for Flight SQL natively in go.
It can be used to register a driver for database/sql by importing github.com/apache/arrow-adbc/go/adbc/sqldriver and running:
sql.Register("flightsql", sqldriver.Driver{flightsql.Driver{}})
You can then open a flightsql connection with the database/sql standard package by using:
db, err := sql.Open("flightsql", "uri=<flight sql db url>")
The URI passed *must* contain a scheme, most likely "grpc+tcp://"
Example ¶
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// RECIPE STARTS HERE
// Tests that use the SQLite server example.
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"log"
"github.com/apache/arrow-adbc/go/adbc"
drv "github.com/apache/arrow-adbc/go/adbc/driver/flightsql"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/flight"
"github.com/apache/arrow-go/v18/arrow/flight/flightsql"
sqlite "github.com/apache/arrow-go/v18/arrow/flight/flightsql/example"
"github.com/apache/arrow-go/v18/arrow/memory"
_ "modernc.org/sqlite"
)
var headers = map[string]string{"foo": "bar"}
func FlightSQLExample(uri string) (err error) {
ctx := context.Background()
options := map[string]string{
adbc.OptionKeyURI: uri,
}
for k, v := range headers {
options[drv.OptionRPCCallHeaderPrefix+k] = v
}
var alloc memory.Allocator
drv := drv.NewDriver(alloc)
db, err := drv.NewDatabase(options)
if err != nil {
return fmt.Errorf("failed to open database: %s\n", err.Error())
}
defer func() {
err = errors.Join(err, db.Close())
}()
cnxn, err := db.Open(ctx)
if err != nil {
return fmt.Errorf("failed to open connection: %s", err.Error())
}
defer func() {
err = errors.Join(err, cnxn.Close())
}()
stmt, err := cnxn.NewStatement()
if err != nil {
return fmt.Errorf("failed to create statement: %s", err.Error())
}
defer func() {
err = errors.Join(err, stmt.Close())
}()
if err = stmt.SetSqlQuery("SELECT 1 AS theresult"); err != nil {
return fmt.Errorf("failed to set query: %s", err.Error())
}
reader, _, err := stmt.ExecuteQuery(ctx)
if err != nil {
return fmt.Errorf("failed to execute query: %s", err.Error())
}
defer reader.Release()
for reader.Next() {
arr, ok := reader.RecordBatch().Column(0).(*array.Int64)
if !ok {
return fmt.Errorf("result data was not int64")
}
for i := 0; i < arr.Len(); i++ {
if arr.IsNull(i) {
fmt.Println("theresult: NULL")
} else {
fmt.Printf("theresult: %d\n", arr.Value(i))
}
}
}
return nil
}
func main() {
// For this example we will spawn the Flight SQL server ourselves.
// Create a new database that isn't tied to any other databases that
// may be in process.
db, err := sql.Open("sqlite", "file:example_in_memory?mode=memory")
if err != nil {
log.Fatal(err)
}
defer func() {
err := db.Close()
if err != nil {
log.Fatal(err)
}
}()
srv, err := sqlite.NewSQLiteFlightSQLServer(db)
if err != nil {
log.Fatal(err)
}
server := flight.NewServerWithMiddleware(nil)
server.RegisterFlightService(flightsql.NewFlightServer(srv))
err = server.Init("localhost:8080")
if err != nil {
log.Fatal(err)
}
go func() {
if err := server.Serve(); err != nil {
log.Fatal(err)
}
}()
uri := fmt.Sprintf("grpc://%s", server.Addr().String())
if err := FlightSQLExample(uri); err != nil {
log.Printf("Error: %s\n", err.Error())
}
server.Shutdown()
}
Output: theresult: 1
Index ¶
Examples ¶
Constants ¶
View Source
const ( OptionAuthority = "adbc.flight.sql.client_option.authority" OptionMTLSCertChain = "adbc.flight.sql.client_option.mtls_cert_chain" OptionMTLSPrivateKey = "adbc.flight.sql.client_option.mtls_private_key" OptionSSLOverrideHostname = "adbc.flight.sql.client_option.tls_override_hostname" OptionSSLSkipVerify = "adbc.flight.sql.client_option.tls_skip_verify" OptionSSLRootCerts = "adbc.flight.sql.client_option.tls_root_certs" OptionWithBlock = "adbc.flight.sql.client_option.with_block" OptionWithMaxMsgSize = "adbc.flight.sql.client_option.with_max_msg_size" OptionAuthorizationHeader = "adbc.flight.sql.authorization_header" OptionTimeoutConnect = "adbc.flight.sql.rpc.timeout_seconds.connect" OptionTimeoutFetch = "adbc.flight.sql.rpc.timeout_seconds.fetch" OptionTimeoutQuery = "adbc.flight.sql.rpc.timeout_seconds.query" OptionTimeoutUpdate = "adbc.flight.sql.rpc.timeout_seconds.update" OptionRPCCallHeaderPrefix = "adbc.flight.sql.rpc.call_header." OptionCookieMiddleware = "adbc.flight.sql.rpc.with_cookie_middleware" OptionSessionOptions = "adbc.flight.sql.session.options" OptionSessionOptionPrefix = "adbc.flight.sql.session.option." OptionEraseSessionOptionPrefix = "adbc.flight.sql.session.optionerase." OptionBoolSessionOptionPrefix = "adbc.flight.sql.session.optionbool." OptionStringListSessionOptionPrefix = "adbc.flight.sql.session.optionstringlist." OptionLastFlightInfo = "adbc.flight.sql.statement.exec.last_flight_info" // Oauth2 options OptionKeyOauthFlow = "adbc.flight.sql.oauth.flow" OptionKeyAuthURI = "adbc.flight.sql.oauth.auth_uri" OptionKeyTokenURI = "adbc.flight.sql.oauth.token_uri" OptionKeyRedirectURI = "adbc.flight.sql.oauth.redirect_uri" OptionKeyScope = "adbc.flight.sql.oauth.scope" OptionKeyClientId = "adbc.flight.sql.oauth.client_id" OptionKeyClientSecret = "adbc.flight.sql.oauth.client_secret" OptionKeySubjectToken = "adbc.flight.sql.oauth.exchange.subject_token" OptionKeySubjectTokenType = "adbc.flight.sql.oauth.exchange.subject_token_type" OptionKeyActorToken = "adbc.flight.sql.oauth.exchange.actor_token" OptionKeyActorTokenType = "adbc.flight.sql.oauth.exchange.actor_token_type" OptionKeyReqTokenType = "adbc.flight.sql.oauth.exchange.requested_token_type" OptionKeyExchangeScope = "adbc.flight.sql.oauth.exchange.scope" OptionKeyExchangeAud = "adbc.flight.sql.oauth.exchange.aud" OptionKeyExchangeResource = "adbc.flight.sql.oauth.exchange.resource" )
View Source
const ( ClientCredentials = "client_credentials" TokenExchange = "token_exchange" )
View Source
const ( OptionStatementQueueSize = "adbc.rpc.result_queue_size" // Explicitly set substrait version for Flight SQL // substrait *does* include the version in the serialized plan // so this is not entirely necessary depending on the version // of substrait and the capabilities of the server. OptionStatementSubstraitVersion = "adbc.flight.sql.substrait.version" )
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Driver ¶
type Driver interface {
adbc.Driver
NewDatabaseWithOptions(map[string]string, ...grpc.DialOption) (adbc.Database, error)
NewDatabaseWithOptionsContext(context.Context, map[string]string, ...grpc.DialOption) (adbc.Database, error)
}
Driver is the extended adbc.Driver interface for Flight SQL.
It adds an additional method to create a database with grpc specific options that cannot be passed through the options map.
Source Files
¶
Directories
¶
| Path | Synopsis |
|---|---|
|
cmd
|
|
|
oauthserver
command
A simple OAuth 2.0 test server supporting Client Credentials (RFC 6749) and Token Exchange (RFC 8693) flows for testing ADBC FlightSQL authentication.
|
A simple OAuth 2.0 test server supporting Client Credentials (RFC 6749) and Token Exchange (RFC 8693) flows for testing ADBC FlightSQL authentication. |
|
testserver
command
|
Click to show internal directories.
Click to hide internal directories.