flink

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 18, 2025 License: MIT Imports: 16 Imported by: 0

README

todo

A lightweight Go driver for the Apache Flink SQL Gateway.

Highlights

  • Idiomatic database/sql integration (sql.OpenDB(connector)).
  • Simple programmatic configuration via NewConnector(WithGatewayURL(...), WithProperties(...)).
  • Type mapping for Flink → Go types, including nullable handling.
  • Supports batch and streaming queries.

Quick start

package main

import (
    "context"
    "database/sql"
	"github.com/exness/go-flink-sql-driver"
    "fmt"
    "log"
    "time"
)

func main() {
    endpoint := "http://localhost:8083" // Flink SQL Gateway HTTP endpoint

    // Create connector and open *sql.DB
    connector, _ := NewConnector(
        WithGatewayURL(endpoint),
        WithProperties(map[string]string{
            // example: run in streaming mode
            "execution.runtime-mode": "STREAMING",
        }),
    )
    db := sql.OpenDB(connector)
    defer db.Close()

    // A minimal sanity check: SELECT 1 must be int64
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    rows, err := db.QueryContext(ctx, "SELECT 1")
    if err != nil { log.Fatal(err) }
    defer rows.Close()

    var got1 [][]any
    for rows.Next() {
        var v any
        if err := rows.Scan(&v); err != nil { log.Fatal(err) }
        got1 = append(got1, []any{v})
        break
    }
    if len(got1) == 0 { log.Fatal("no rows") }
    if _, ok := got1[0][0].(int64); !ok {
        log.Fatalf("expected int64, got %T", got1[0][0])
    }
    fmt.Println("SELECT 1 OK (int64)")
}

Tip: For binary/complex types (e.g., BINARY, VARBINARY, ROW, MAP), values arrive as []byte. Decode as needed for your app.

Type mapping (summary)

Non‑nullable columns map to concrete Go types; nullable columns map to sql.Null* wrappers:

Flink Type Go (NOT NULL) Go (NULLABLE)
TINYINT / SMALLINT / INT int64 sql.NullInt64
BIGINT / INTERVAL int64 sql.NullInt64
FLOAT float64 sql.NullFloat64
DOUBLE float64 sql.NullFloat64
BOOLEAN bool sql.NullBool
CHAR / VARCHAR / STRING string sql.NullString
DATE / TIME / TIMESTAMP time.Time sql.NullTime
BINARY / VARBINARY []byte []byte
ROW / MAP / ARRAY []byte []byte

Development & tests

  • Requires a reachable Flink SQL Gateway; tests spin up a local cluster with Testcontainers.

License

MIT (see LICENSE)

Documentation

Index

Constants

View Source
const (
	ResultTypeNotReady ResultType = "NOT_READY"
	ResultTypeEOS      ResultType = "EOS"
	ResultTypePayload  ResultType = "PAYLOAD"

	ResultKindSuccess            ResultKind = "SUCCESS"
	ResultKindSuccessWithContent ResultKind = "SUCCESS_WITH_CONTENT"
)

Variables

This section is empty.

Functions

func NewConnector

func NewConnector(options ...ConnOption) (driver.Connector, error)

NewConnector creates a connection that can be used with `sql.OpenDB()`. This is an easier way to set up the DB instead of having to construct a DSN string.

Types

type APIResponse

type APIResponse struct {
	Versions []string `json:"versions"`
}

APIResponse holds the list of API versions supported by the server.

type Client

type Client struct {
	// BaseURL should be the root of the SQL Gateway REST API, e.g.
	// "http://localhost:8083".  Do not include a trailing slash; the
	// Client automatically appends endpoint paths.
	BaseURL *url.URL
	// HTTPClient is the http.Client used for sending requests.  If
	// nil, http.DefaultClient is used.  You can set custom timeouts
	// or transports on this client for better control.
	HTTPClient *http.Client
	// APIVersion controls which API version is used for requests.
	// Valid values are "v1", "v2" or "v3".  If empty, "v3" is used.
	APIVersion string
}

Client represents a SQL Gateway REST API client. It holds the base URL for the Gateway and an underlying http.Client that is used to execute all requests. Users may set custom timeouts or HTTP transport settings on the underlying client if necessary.

func NewClient

func NewClient(baseURL string, httpClient *http.Client, apiVersion string) (*Client, error)

NewClient constructs a new Client for the given baseURL. The baseURL must be a valid URL string pointing at the SQL Gateway server (e.g. "http://localhost:8083"). Optionally, a custom http.Client can be supplied via the httpClient parameter; if nil, http.DefaultClient is used. The APIVersion argument controls the version prefix ("v1", "v2", "v3"). If left empty, "v3" is used because v3 includes all features of prior versions.

func (*Client) CancelOperation

func (c *Client) CancelOperation(ctx context.Context, sessionHandle, operationHandle string) (string, error)

CancelOperation cancels a running operation. The server will try to terminate the job associated with the operation handle.

func (*Client) CloseOperation

func (c *Client) CloseOperation(ctx context.Context, sessionHandle, operationHandle string) (string, error)

CloseOperation closes a finished operation. This should be called after fetching all results to free server-side resources.

func (*Client) CloseSession

func (c *Client) CloseSession(ctx context.Context, sessionHandle string) error

CloseSession closes the given session handle. It returns nil on success or an error if the server rejects the request.

func (*Client) CompleteStatement

func (c *Client) CompleteStatement(ctx context.Context, sessionHandle string, reqBody *CompleteStatementRequest) ([]string, error)

CompleteStatement returns SQL completion hints for the given statement at the specified cursor position. It returns a slice of candidate completions or an error.

func (*Client) ConfigureSession

func (c *Client) ConfigureSession(ctx context.Context, sessionHandle string, reqBody *ConfigureSessionRequest) error

ConfigureSession executes the given DDL or session configuration statement within the context of the provided session. It returns nil on success. Note that many configuration statements do not produce an OperationHandle; they take effect immediately.

func (*Client) ExecuteStatement

func (c *Client) ExecuteStatement(ctx context.Context, sessionHandle string, reqBody *ExecuteStatementRequest) (string, error)

ExecuteStatement submits a SQL statement for execution in the specified session. It returns the operation handle on success. Statement execution is asynchronous; use GetOperationStatus and FetchResults to monitor and retrieve results.

func (*Client) FetchResults

func (c *Client) FetchResults(ctx context.Context, sessionHandle, operationHandle, token string, rowFormat string) (*FetchResultsResponseBody, error)

FetchResults retrieves a batch of results for the given operation handle and token. The token identifies which batch to fetch: 0 indicates the first batch, while the `nextResultUri` returned from prior calls contains the token for subsequent batches. The rowFormat parameter controls the serialization format of the response. Valid values include "JSON" (default) and "PLAIN_TEXT". This function returns an arbitrary JSON structure (decoded into interface{}) representing the result payload. The caller can cast or map the data into concrete Go types as needed.

func (*Client) GetAPIVersions

func (c *Client) GetAPIVersions(ctx context.Context) ([]string, error)

GetAPIVersions returns the list of supported REST API versions available on the server. Clients can inspect this to decide which version to use. The default version is usually the highest one.

func (*Client) GetInfo

func (c *Client) GetInfo(ctx context.Context) (*InfoResponse, error)

GetInfo fetches cluster metadata from the gateway using GET /info. It returns an InfoResponse or an error. Context may be used to cancel the request.

func (*Client) GetOperationStatus

func (c *Client) GetOperationStatus(ctx context.Context, sessionHandle, operationHandle string) (string, error)

GetOperationStatus retrieves the current status of an operation. Clients should poll this endpoint until they see a terminal state (COMPLETED, CANCELLED, etc.).

func (*Client) GetSessionConfig

func (c *Client) GetSessionConfig(ctx context.Context, sessionHandle string) (map[string]string, error)

GetSessionConfig fetches the current configuration for a session. The returned map contains session-specific property overrides. If the session does not exist, an error is returned.

func (*Client) Heartbeat

func (c *Client) Heartbeat(ctx context.Context, sessionHandle string) error

Heartbeat triggers a heartbeat for the specified session. This method simply sends an empty POST body to keep the session alive.

func (*Client) OpenSession

func (c *Client) OpenSession(ctx context.Context, reqBody *OpenSessionRequest) (string, error)

OpenSession opens a new SQL Gateway session with the provided properties and name. It returns the session handle on success. If properties is nil, the default gateway configuration is used.

func (*Client) RefreshMaterializedTable

func (c *Client) RefreshMaterializedTable(ctx context.Context, sessionHandle, identifier string, reqBody *RefreshMaterializedTableRequest) (string, error)

RefreshMaterializedTable triggers a refresh of the named materialized table and returns the resulting operation handle. The identifier must be the fully-qualified table identifier as a string (e.g. "my_catalog.my_db.my_table"). Optional request fields may influence execution. See Flink documentation for supported configuration keys.

type ColumnInfo

type ColumnInfo struct {
	Name        string      `json:"name"`
	LogicalType LogicalType `json:"logicalType"`
	Comment     string      `json:"comment"`
}

type CompleteStatementRequest

type CompleteStatementRequest struct {
	Position  int    `json:"position"`
	Statement string `json:"statement"`
}

CompleteStatementRequest defines the payload for the complete-statement endpoint, which returns auto-completion candidates.

type CompleteStatementResponse

type CompleteStatementResponse struct {
	Candidates []string `json:"candidates"`
}

CompleteStatementResponse holds the list of completion candidates.

type ConfigureSessionRequest

type ConfigureSessionRequest struct {
	// ExecutionTimeout specifies a timeout in seconds for configuring
	// the session.  Use int64 to match the OpenAPI specification.
	ExecutionTimeout int64  `json:"executionTimeout,omitempty"`
	Statement        string `json:"statement"`
}

ConfigureSessionRequest defines a request to adjust session settings or perform catalog/database modifications. It accepts an SQL statement (CREATE, DROP, ALTER, USE, ADD JAR, etc.) and optional timeout. ExecutionTimeout is expressed in seconds; zero means server default.

type ConnConfig

type ConnConfig struct {
	GatewayURL string
	Client     *http.Client
	APIVersion string
	Properties map[string]string
}

func WithDefaults

func WithDefaults() *ConnConfig

type ConnOption

type ConnOption func(c *ConnConfig)

func WithAPIVersion

func WithAPIVersion(apiVersion string) ConnOption

func WithClient

func WithClient(client *http.Client) ConnOption

func WithGatewayURL

func WithGatewayURL(gatewayUrl string) ConnOption

func WithProperties

func WithProperties(properties map[string]string) ConnOption

type ExecuteStatementRequest

type ExecuteStatementRequest struct {
	ExecutionConfig map[string]string `json:"executionConfig,omitempty"`
	// ExecutionTimeout specifies a timeout in seconds for the statement
	// execution.  Use int64 to match the OpenAPI specification.
	ExecutionTimeout int64  `json:"executionTimeout,omitempty"`
	Statement        string `json:"statement"`
}

ExecuteStatementRequest represents the payload for executing a SQL statement. ExecutionConfig allows passing dynamic configuration options; ExecutionTimeout expresses a timeout in seconds for statement execution. Set ExecutionTimeout to zero to use the gateway’s default timeout. Statement must contain the SQL text to execute.

type ExecuteStatementResponse

type ExecuteStatementResponse struct {
	OperationHandle string `json:"operationHandle"`
}

ExecuteStatementResponse contains the operation handle returned after submitting a statement. Clients should use the handle to poll for results and status.

type FetchResultsResponseBody

type FetchResultsResponseBody struct {
	JobID         string     `json:"jobID"`
	NextResultUri string     `json:"nextResultUri"`
	IsQueryResult bool       `json:"isQueryResult"`
	ResultKind    ResultKind `json:"resultKind"`
	ResultType    ResultType `json:"resultType"`
	Results       struct {
		Columns   []ColumnInfo `json:"columns"`
		RowFormat string       `json:"rowFormat"`
		Data      []RowData    `json:"data"`
	} `json:"results"`
}

FetchResultsResponseBody mirrors the OpenAPI schema for the response returned by fetching results from an operation. It contains the job ID, the URI for fetching the next batch, a flag indicating whether the query returns results, the result kind and type, and a nested results structure with column metadata and payload. See the Flink SQL Gateway REST API specification for details.

func (*FetchResultsResponseBody) NextToken

func (r *FetchResultsResponseBody) NextToken() string

NextToken extracts the token from the NextResultUri, or returns "" if not present.

type GatewayClient

type GatewayClient interface {
	GetInfo(ctx context.Context) (*InfoResponse, error)
	GetAPIVersions(ctx context.Context) ([]string, error)
	OpenSession(ctx context.Context, reqBody *OpenSessionRequest) (string, error)
	CloseSession(ctx context.Context, sessionHandle string) error
	GetSessionConfig(ctx context.Context, sessionHandle string) (map[string]string, error)
	CompleteStatement(ctx context.Context, sessionHandle string, reqBody *CompleteStatementRequest) ([]string, error)
	ConfigureSession(ctx context.Context, sessionHandle string, reqBody *ConfigureSessionRequest) error
	Heartbeat(ctx context.Context, sessionHandle string) error
	RefreshMaterializedTable(ctx context.Context, sessionHandle, identifier string, reqBody *RefreshMaterializedTableRequest) (string, error)
	CancelOperation(ctx context.Context, sessionHandle, operationHandle string) (string, error)
	CloseOperation(ctx context.Context, sessionHandle, operationHandle string) (string, error)
	GetOperationStatus(ctx context.Context, sessionHandle, operationHandle string) (string, error)
	ExecuteStatement(ctx context.Context, sessionHandle string, reqBody *ExecuteStatementRequest) (string, error)
	FetchResults(ctx context.Context, sessionHandle, operationHandle, token string, rowFormat string) (*FetchResultsResponseBody, error)
}

GatewayClient is the minimal interface implemented by *Client. It allows consumers to depend on an interface for easier testing/mocking.

type InfoResponse

type InfoResponse struct {
	ProductName string `json:"productName"`
	Version     string `json:"version"`
}

InfoResponse represents the response from GET /info. It contains the product name and version of the connected cluster.

type LogicalType

type LogicalType struct {
	Type       string  `json:"type"`
	Nullable   bool    `json:"nullable"`
	Length     *int64  `json:"length"`
	Precision  *int    `json:"precision"`
	Scale      *int    `json:"scale"`
	Resolution *string `json:"resolution"`
}

func (*LogicalType) UnmarshalJSON

func (lt *LogicalType) UnmarshalJSON(data []byte) error

type OpenSessionRequest

type OpenSessionRequest struct {
	Properties  map[string]string `json:"properties,omitempty"`
	SessionName string            `json:"sessionName,omitempty"`
}

OpenSessionRequest defines the payload for creating a new session. Properties may include arbitrary configuration entries; sessionName can be used to label the session on the server.

type OpenSessionResponse

type OpenSessionResponse struct {
	SessionHandle string `json:"sessionHandle"`
}

OpenSessionResponse contains the session handle returned from creating a session.

type OperationStatusResponse

type OperationStatusResponse struct {
	Status string `json:"status"`
}

OperationStatusResponse captures the status of an operation, which may be "RUNNING", "COMPLETED", "CANCELLED", or another status defined by the server.

type RefreshMaterializedTableRequest

type RefreshMaterializedTableRequest struct {
	DynamicOptions   map[string]string `json:"dynamicOptions,omitempty"`
	ExecutionConfig  map[string]string `json:"executionConfig,omitempty"`
	IsPeriodic       bool              `json:"isPeriodic,omitempty"`
	Periodic         bool              `json:"periodic,omitempty"`
	ScheduleTime     string            `json:"scheduleTime,omitempty"`
	StaticPartitions map[string]string `json:"staticPartitions,omitempty"`
}

RefreshMaterializedTableRequest defines parameters to refresh a materialized table. DynamicOptions and ExecutionConfig allow adjusting job parameters; StaticPartitions define optional partitioning. ScheduleTime can be used when scheduling periodic refreshes. IsPeriodic and Periodic are synonyms; set one of them true to indicate that refresh should be scheduled repeatedly.

type RefreshMaterializedTableResponse

type RefreshMaterializedTableResponse struct {
	OperationHandle string `json:"operationHandle"`
}

RefreshMaterializedTableResponse contains the operation handle associated with the refresh job. Clients should poll the operation’s status and results as with any other SQL operation.

type ResultKind

type ResultKind string

type ResultType

type ResultType string

type RowData

type RowData struct {
	Kind   string            `json:"kind"`
	Fields []json.RawMessage `json:"fields"`
}

type Rows

type Rows struct {
	// contains filtered or unexported fields
}

func (*Rows) Close

func (r *Rows) Close() error

func (*Rows) ColumnTypeDatabaseTypeName

func (r *Rows) ColumnTypeDatabaseTypeName(index int) string

func (*Rows) ColumnTypeLength

func (r *Rows) ColumnTypeLength(index int) (length int64, ok bool)

func (*Rows) ColumnTypePrecisionScale

func (r *Rows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool)

func (*Rows) ColumnTypeScanType

func (r *Rows) ColumnTypeScanType(index int) reflect.Type

func (*Rows) Columns

func (r *Rows) Columns() []string

func (*Rows) Next

func (r *Rows) Next(dest []driver.Value) error

func (*Rows) RowsColumnTypeNullable

func (r *Rows) RowsColumnTypeNullable(index int) (nullable, ok bool)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL