flink

package module
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 20, 2025 License: MIT Imports: 16 Imported by: 0

README

A lightweight Go driver for the Apache Flink SQL Gateway.

Quick start

package main

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/exness/go-flink-sql"
)

type NullString sql.NullString

type order struct {
	ID        int64
	Item      string
	CreatedAt time.Time
	Customer  customer
	Note      *string
}

type customer struct {
	FirstName string `json:"first_name"`
	LastName  string `json:"last_name"`
	Age       int    `json:"age"`
}

func (c *customer) Scan(src any) error {
	data, ok := src.([]byte)
	if !ok {
		return fmt.Errorf("flink: unexpected customer scan type %T", src)
	}
	err := json.Unmarshal(data, c)
	return err
}

func ptrToString(s *string) string {
	if s == nil {
		return ""
	}
	return *s
}

func main() {
	connector, err := flink.NewConnector(
		flink.WithGatewayURL("http://localhost:8083"),
		flink.WithProperties(map[string]string{
			"execution.runtime-mode": "STREAMING",
		}),
	)
	if err != nil {
		log.Fatal(err)
	}

	db := sql.OpenDB(connector)
	defer db.Close()

	ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
	defer cancel()

	ddl := `CREATE TABLE orders (
		id INT NOT NULL,
		item STRING NOT NULL,
		created_at TIMESTAMP(6) NOT NULL,
		customer ROW<
			first_name STRING not null,
			last_name STRING not null,
			age INT not null
		> NOT NULL,
		notes STRING
	) WITH (
		'connector' = 'datagen',
		'rows-per-second' = '5',
		'fields.id.kind' = 'sequence',
		'fields.id.start' = '1',
		'fields.id.end' = '100',
		'fields.item.length' = '12',
		'fields.customer.first_name.length' = '8',
		'fields.customer.last_name.length' = '10',
		'fields.customer.age.min' = '21',
		'fields.customer.age.max' = '65',
		'fields.notes.length' = '12'
	)`
	if _, err := db.ExecContext(ctx, ddl); err != nil {
		log.Fatal(err)
	}

	rows, err := db.QueryContext(ctx, `
		SELECT
			id,
			item,
			created_at,
			customer,
			notes AS note
		FROM orders
	`)
	if err != nil {
		log.Fatal(err)
	}
	defer rows.Close()

	var seen int
	for rows.Next() {
		var o order
		var note NullString
		if err := rows.Scan(&o.ID, &o.Item, &o.CreatedAt, &o.Customer, &note); err != nil {
			log.Fatal(err)
		}
		if note.Valid {
			val := note.String
			o.Note = &val
		} else {
			o.Note = nil
		}

		fmt.Printf(
			"%d\t%s\t%s\t%s %s (%d)\t%s\n",
			o.ID,
			o.Item,
			o.CreatedAt.Format(time.RFC3339Nano),
			o.Customer.FirstName,
			o.Customer.LastName,
			o.Customer.Age,
			ptrToString(o.Note),
		)

		seen++
		if seen == 5 {
			break
		}
	}
	if err := rows.Err(); err != nil {
		log.Fatal(err)
	}
}

Tip: Complex types such as ROW, MAP, ARRAY, and binary data arrive as []byte. Decode the JSON or binary payload to suit your application.

Low-level REST access

If you need features beyond database/sql, reuse the exported client in gateway.go:

client, err := flink.NewClient("http://localhost:8083", nil, "v3")
if err != nil {
    log.Fatal(err)
}

status, err := client.GetOperationStatus(ctx, "session-handle", "operation-handle")
if err != nil {
    log.Fatal(err)
}
fmt.Println("current status:", status)

Type mapping (summary)

Non‑nullable columns map to concrete Go types; nullable columns map to sql.Null* wrappers:

Flink Type Go (NOT NULL) Go (NULLABLE)
TINYINT / SMALLINT / INT int64 sql.NullInt64
BIGINT / INTERVAL int64 sql.NullInt64
FLOAT float64 sql.NullFloat64
DOUBLE float64 sql.NullFloat64
BOOLEAN bool sql.NullBool
CHAR / VARCHAR / STRING string sql.NullString
DATE / TIME / TIMESTAMP time.Time sql.NullTime
BINARY / VARBINARY []byte []byte
ROW / MAP / ARRAY []byte []byte

Development & tests

  • Requires a reachable Flink SQL Gateway; tests spin up a local cluster with Testcontainers.

License

MIT (see LICENSE)

Documentation

Index

Constants

View Source
const (
	ResultTypeNotReady ResultType = "NOT_READY"
	ResultTypeEOS      ResultType = "EOS"
	ResultTypePayload  ResultType = "PAYLOAD"

	ResultKindSuccess            ResultKind = "SUCCESS"
	ResultKindSuccessWithContent ResultKind = "SUCCESS_WITH_CONTENT"
)

Variables

This section is empty.

Functions

func NewConnector

func NewConnector(options ...ConnOption) (driver.Connector, error)

NewConnector creates a connector that can be used with `sql.OpenDB()`. This is an easier way to set up the DB instead of having to construct a DSN string.

Types

type APIResponse

type APIResponse struct {
	Versions []string `json:"versions"`
}

APIResponse holds the list of API versions supported by the server.

type Client

type Client struct {
	// BaseURL should be the root of the SQL Gateway REST API, e.g.
	// "http://localhost:8083".  Do not include a trailing slash; the
	// Client automatically appends endpoint paths.
	BaseURL *url.URL
	// HTTPClient is the http.Client used for sending requests.  If
	// nil, http.DefaultClient is used.
	HTTPClient *http.Client
	// APIVersion controls which API version is used for requests.
	// Valid values are "v1", "v2" or "v3".  If empty, "v3" is used.
	APIVersion string
}

Client represents a SQL Gateway REST API client. It holds the base URL for the Gateway and an underlying http.Client that is used to execute all requests. The Client is intentionally exported as a low-level escape hatch for scenarios that require direct REST interaction with the Flink SQL Gateway; the database/sql driver should remain the primary integration point for most applications.

func NewClient

func NewClient(baseURL string, httpClient *http.Client, apiVersion string) (*Client, error)

NewClient constructs a new Client for the given baseURL. The baseURL must be a valid URL string pointing at the SQL Gateway server (e.g. "http://localhost:8083"). Optionally, a custom http.Client can be supplied via the httpClient parameter; if nil, http.DefaultClient is used. The APIVersion argument controls the version prefix ("v1", "v2", "v3"). If left empty, "v3" is used because v3 includes all features of prior versions.

func (*Client) CancelOperation

func (c *Client) CancelOperation(ctx context.Context, sessionHandle, operationHandle string) (string, error)

CancelOperation cancels a running operation. The server will try to terminate the job associated with the operation handle.

func (*Client) CloseOperation

func (c *Client) CloseOperation(ctx context.Context, sessionHandle, operationHandle string) (string, error)

CloseOperation closes a finished operation. This should be called after fetching all results to free server-side resources.

func (*Client) CloseSession

func (c *Client) CloseSession(ctx context.Context, sessionHandle string) error

CloseSession closes the given session handle. It returns nil on success or an error if the server rejects the request.

func (*Client) CompleteStatement

func (c *Client) CompleteStatement(ctx context.Context, sessionHandle string, reqBody *CompleteStatementRequest) ([]string, error)

CompleteStatement returns SQL completion hints for the given statement at the specified cursor position. It returns a slice of candidate completions or an error.

func (*Client) ConfigureSession

func (c *Client) ConfigureSession(ctx context.Context, sessionHandle string, reqBody *ConfigureSessionRequest) error

ConfigureSession executes the given DDL or session configuration statement within the context of the provided session. It returns nil on success. Note that many configuration statements do not produce an OperationHandle; they take effect immediately.

func (*Client) ExecuteStatement

func (c *Client) ExecuteStatement(ctx context.Context, sessionHandle string, reqBody *ExecuteStatementRequest) (string, error)

ExecuteStatement submits a SQL statement for execution in the specified session. It returns the operation handle on success. Statement execution is asynchronous; use GetOperationStatus and FetchResults to monitor and retrieve results.

func (*Client) FetchResults

func (c *Client) FetchResults(ctx context.Context, sessionHandle, operationHandle, token string, rowFormat string) (*FetchResultsResponseBody, error)

FetchResults retrieves a batch of results for the given operation handle and token. The token identifies which batch to fetch: 0 indicates the first batch, while the `nextResultUri` returned from prior calls contains the token for subsequent batches. The rowFormat parameter controls the serialization format of the response. Valid values include "JSON" (default) and "PLAIN_TEXT".

func (*Client) GetAPIVersions

func (c *Client) GetAPIVersions(ctx context.Context) ([]string, error)

GetAPIVersions returns the list of supported REST API versions available on the server.

func (*Client) GetInfo

func (c *Client) GetInfo(ctx context.Context) (*InfoResponse, error)

GetInfo fetches cluster metadata from the gateway using GET /info. It returns an InfoResponse or an error.

func (*Client) GetOperationStatus

func (c *Client) GetOperationStatus(ctx context.Context, sessionHandle, operationHandle string) (string, error)

GetOperationStatus retrieves the current status of an operation.

func (*Client) GetSessionConfig

func (c *Client) GetSessionConfig(ctx context.Context, sessionHandle string) (map[string]string, error)

GetSessionConfig fetches the current configuration for a session. The returned map contains session-specific property overrides. If the session does not exist, an error is returned.

func (*Client) Heartbeat

func (c *Client) Heartbeat(ctx context.Context, sessionHandle string) error

Heartbeat triggers a heartbeat for the specified session. This method simply sends an empty POST body to keep the session alive.

func (*Client) OpenSession

func (c *Client) OpenSession(ctx context.Context, reqBody *OpenSessionRequest) (string, error)

OpenSession opens a new SQL Gateway session with the provided properties and name. It returns the session handle on success. If properties is nil, the default gateway configuration is used.

func (*Client) RefreshMaterializedTable

func (c *Client) RefreshMaterializedTable(ctx context.Context, sessionHandle, identifier string, reqBody *RefreshMaterializedTableRequest) (string, error)

RefreshMaterializedTable triggers a refresh of the named materialized table and returns the resulting operation handle. The identifier must be the fully-qualified table identifier as a string (e.g. "my_catalog.my_db.my_table"). Optional request fields may influence execution. See Flink documentation for supported configuration keys.

type ColumnInfo

type ColumnInfo struct {
	Name        string      `json:"name"`
	LogicalType LogicalType `json:"logicalType"`
	Comment     string      `json:"comment"`
}

type CompleteStatementRequest

type CompleteStatementRequest struct {
	Position  int    `json:"position"`
	Statement string `json:"statement"`
}

CompleteStatementRequest defines the payload for the complete-statement endpoint, which returns auto-completion candidates.

type CompleteStatementResponse

type CompleteStatementResponse struct {
	Candidates []string `json:"candidates"`
}

CompleteStatementResponse holds the list of completion candidates.

type ConfigureSessionRequest

type ConfigureSessionRequest struct {
	// ExecutionTimeout specifies a timeout in seconds for configuring
	// the session.  Use int64 to match the OpenAPI specification.
	ExecutionTimeout int64  `json:"executionTimeout,omitempty"`
	Statement        string `json:"statement"`
}

ConfigureSessionRequest defines a request to adjust session settings or perform catalog/database modifications. It accepts an SQL statement (CREATE, DROP, ALTER, USE, ADD JAR, etc.) and optional timeout. ExecutionTimeout is expressed in seconds; zero means server default.

type ConnConfig

type ConnConfig struct {
	GatewayURL string
	Client     *http.Client
	APIVersion string
	Properties map[string]string
}

ConnConfig captures the settings used to create a SQL Gateway session.

func WithDefaults

func WithDefaults() *ConnConfig

WithDefaults returns a ConnConfig populated with sensible defaults for connecting to a local SQL Gateway instance.

type ConnOption

type ConnOption func(c *ConnConfig)

ConnOption mutates a ConnConfig before it is used to construct a connector.

func WithAPIVersion

func WithAPIVersion(apiVersion string) ConnOption

WithAPIVersion selects the SQL Gateway REST API version (e.g. v1, v2, v3).

func WithClient

func WithClient(client *http.Client) ConnOption

WithClient sets the HTTP client used by the underlying Gateway client.

func WithGatewayURL

func WithGatewayURL(gatewayUrl string) ConnOption

WithGatewayURL sets the SQL Gateway endpoint for connection.

func WithProperties

func WithProperties(properties map[string]string) ConnOption

WithProperties sets SQL Gateway session properties.

type ExecuteStatementRequest

type ExecuteStatementRequest struct {
	ExecutionConfig map[string]string `json:"executionConfig,omitempty"`
	// ExecutionTimeout specifies a timeout in seconds for the statement
	// execution.  Use int64 to match the OpenAPI specification.
	ExecutionTimeout int64  `json:"executionTimeout,omitempty"`
	Statement        string `json:"statement"`
}

ExecuteStatementRequest represents the payload for executing a SQL statement. ExecutionConfig allows passing dynamic configuration options; ExecutionTimeout expresses a timeout in seconds for statement execution. Set ExecutionTimeout to zero to use the gateway’s default timeout. Statement must contain the SQL text to execute.

type ExecuteStatementResponse

type ExecuteStatementResponse struct {
	OperationHandle string `json:"operationHandle"`
}

ExecuteStatementResponse contains the operation handle returned after submitting a statement. Clients should use the handle to poll for results and status.

type FetchResultsResponseBody

type FetchResultsResponseBody struct {
	JobID         string     `json:"jobID"`
	NextResultUri string     `json:"nextResultUri"`
	IsQueryResult bool       `json:"isQueryResult"`
	ResultKind    ResultKind `json:"resultKind"`
	ResultType    ResultType `json:"resultType"`
	Results       struct {
		Columns   []ColumnInfo `json:"columns"`
		RowFormat string       `json:"rowFormat"`
		Data      []RowData    `json:"data"`
	} `json:"results"`
}

FetchResultsResponseBody mirrors the OpenAPI schema for the response returned by fetching results from an operation. It contains the job ID, the URI for fetching the next batch, a flag indicating whether the query returns results, the result kind and type, and a nested results structure with column metadata and payload. See the Flink SQL Gateway REST API specification for details.

func (*FetchResultsResponseBody) NextToken

func (r *FetchResultsResponseBody) NextToken() string

NextToken extracts the token from the NextResultUri, or returns "" if not present.

type GatewayClient

type GatewayClient interface {
	GetInfo(ctx context.Context) (*InfoResponse, error)
	GetAPIVersions(ctx context.Context) ([]string, error)
	OpenSession(ctx context.Context, reqBody *OpenSessionRequest) (string, error)
	CloseSession(ctx context.Context, sessionHandle string) error
	GetSessionConfig(ctx context.Context, sessionHandle string) (map[string]string, error)
	CompleteStatement(ctx context.Context, sessionHandle string, reqBody *CompleteStatementRequest) ([]string, error)
	ConfigureSession(ctx context.Context, sessionHandle string, reqBody *ConfigureSessionRequest) error
	Heartbeat(ctx context.Context, sessionHandle string) error
	RefreshMaterializedTable(ctx context.Context, sessionHandle, identifier string, reqBody *RefreshMaterializedTableRequest) (string, error)
	CancelOperation(ctx context.Context, sessionHandle, operationHandle string) (string, error)
	CloseOperation(ctx context.Context, sessionHandle, operationHandle string) (string, error)
	GetOperationStatus(ctx context.Context, sessionHandle, operationHandle string) (string, error)
	ExecuteStatement(ctx context.Context, sessionHandle string, reqBody *ExecuteStatementRequest) (string, error)
	FetchResults(ctx context.Context, sessionHandle, operationHandle, token string, rowFormat string) (*FetchResultsResponseBody, error)
}

GatewayClient is the minimal interface implemented by *Client. It exposes the raw Flink SQL Gateway HTTP surface area for advanced users who need functionality beyond the database/sql driver. Most callers should prefer using higher-level types and treat this interface as a low-level escape hatch when direct REST access is unavoidable.

type InfoResponse

type InfoResponse struct {
	ProductName string `json:"productName"`
	Version     string `json:"version"`
}

InfoResponse represents the response from GET /info. It contains the product name and version of the connected cluster.

type LogicalType

type LogicalType struct {
	Type       string  `json:"type"`
	Nullable   bool    `json:"nullable"`
	Length     *int64  `json:"length"`
	Precision  *int    `json:"precision"`
	Scale      *int    `json:"scale"`
	Resolution *string `json:"resolution"`
}

func (*LogicalType) UnmarshalJSON

func (lt *LogicalType) UnmarshalJSON(data []byte) error

type OpenSessionRequest

type OpenSessionRequest struct {
	Properties  map[string]string `json:"properties,omitempty"`
	SessionName string            `json:"sessionName,omitempty"`
}

OpenSessionRequest defines the payload for creating a new session.

type OpenSessionResponse

type OpenSessionResponse struct {
	SessionHandle string `json:"sessionHandle"`
}

OpenSessionResponse contains the session handle returned from creating a session.

type OperationStatusResponse

type OperationStatusResponse struct {
	Status string `json:"status"`
}

OperationStatusResponse captures the status of an operation, which may be "RUNNING", "COMPLETED", "CANCELLED", or another status defined by the server.

type RefreshMaterializedTableRequest

type RefreshMaterializedTableRequest struct {
	DynamicOptions   map[string]string `json:"dynamicOptions,omitempty"`
	ExecutionConfig  map[string]string `json:"executionConfig,omitempty"`
	IsPeriodic       bool              `json:"isPeriodic,omitempty"`
	Periodic         bool              `json:"periodic,omitempty"`
	ScheduleTime     string            `json:"scheduleTime,omitempty"`
	StaticPartitions map[string]string `json:"staticPartitions,omitempty"`
}

RefreshMaterializedTableRequest defines parameters to refresh a materialized table. DynamicOptions and ExecutionConfig allow adjusting job parameters; StaticPartitions define optional partitioning. ScheduleTime can be used when scheduling periodic refreshes. IsPeriodic and Periodic are synonyms; set one of them true to indicate that refresh should be scheduled repeatedly.

type RefreshMaterializedTableResponse

type RefreshMaterializedTableResponse struct {
	OperationHandle string `json:"operationHandle"`
}

RefreshMaterializedTableResponse contains the operation handle associated with the refresh job. Clients should poll the operation’s status and results as with any other SQL operation.

type ResultKind

type ResultKind string

type ResultType

type ResultType string

type RowData

type RowData struct {
	Kind   string            `json:"kind"`
	Fields []json.RawMessage `json:"fields"`
}

type Rows

type Rows struct {
	// contains filtered or unexported fields
}

Rows exposes Flink SQL query results through database/sql by implementing the driver.Rows interface on top of paged gateway responses.

func (*Rows) Close

func (r *Rows) Close() error

Close releases the iterator and asks the gateway to close the underlying operation.

func (*Rows) ColumnTypeDatabaseTypeName

func (r *Rows) ColumnTypeDatabaseTypeName(index int) string

ColumnTypeDatabaseTypeName returns the Flink logical type name for a column.

func (*Rows) ColumnTypeLength

func (r *Rows) ColumnTypeLength(index int) (length int64, ok bool)

ColumnTypeLength reports the declared length for variable-size columns.

func (*Rows) ColumnTypePrecisionScale

func (r *Rows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool)

ColumnTypePrecisionScale returns precision and scale for a column when available.

func (*Rows) ColumnTypeScanType

func (r *Rows) ColumnTypeScanType(index int) reflect.Type

ColumnTypeScanType returns the Go type expected by Scan for the column.

func (*Rows) Columns

func (r *Rows) Columns() []string

Columns reports the column names in the order returned by the gateway.

func (*Rows) Next

func (r *Rows) Next(dest []driver.Value) error

Next advances to the next row, materialising values into dest.

func (*Rows) RowsColumnTypeNullable

func (r *Rows) RowsColumnTypeNullable(index int) (nullable, ok bool)

RowsColumnTypeNullable reports whether the column permits NULL values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL