Documentation
¶
Index ¶
- Constants
- func NewConnector(options ...ConnOption) (driver.Connector, error)
- type APIResponse
- type Client
- func (c *Client) CancelOperation(ctx context.Context, sessionHandle, operationHandle string) (string, error)
- func (c *Client) CloseOperation(ctx context.Context, sessionHandle, operationHandle string) (string, error)
- func (c *Client) CloseSession(ctx context.Context, sessionHandle string) error
- func (c *Client) CompleteStatement(ctx context.Context, sessionHandle string, reqBody *CompleteStatementRequest) ([]string, error)
- func (c *Client) ConfigureSession(ctx context.Context, sessionHandle string, reqBody *ConfigureSessionRequest) error
- func (c *Client) ExecuteStatement(ctx context.Context, sessionHandle string, reqBody *ExecuteStatementRequest) (string, error)
- func (c *Client) FetchResults(ctx context.Context, sessionHandle, operationHandle, token string, ...) (*FetchResultsResponseBody, error)
- func (c *Client) GetAPIVersions(ctx context.Context) ([]string, error)
- func (c *Client) GetInfo(ctx context.Context) (*InfoResponse, error)
- func (c *Client) GetOperationStatus(ctx context.Context, sessionHandle, operationHandle string) (string, error)
- func (c *Client) GetSessionConfig(ctx context.Context, sessionHandle string) (map[string]string, error)
- func (c *Client) Heartbeat(ctx context.Context, sessionHandle string) error
- func (c *Client) OpenSession(ctx context.Context, reqBody *OpenSessionRequest) (string, error)
- func (c *Client) RefreshMaterializedTable(ctx context.Context, sessionHandle, identifier string, ...) (string, error)
- type ColumnInfo
- type CompleteStatementRequest
- type CompleteStatementResponse
- type ConfigureSessionRequest
- type ConnConfig
- type ConnOption
- type ExecuteStatementRequest
- type ExecuteStatementResponse
- type FetchResultsResponseBody
- type GatewayClient
- type InfoResponse
- type LogicalType
- type OpenSessionRequest
- type OpenSessionResponse
- type OperationStatusResponse
- type RefreshMaterializedTableRequest
- type RefreshMaterializedTableResponse
- type ResultKind
- type ResultType
- type RowData
- type Rows
- func (r *Rows) Close() error
- func (r *Rows) ColumnTypeDatabaseTypeName(index int) string
- func (r *Rows) ColumnTypeLength(index int) (length int64, ok bool)
- func (r *Rows) ColumnTypePrecisionScale(index int) (precision, scale int64, ok bool)
- func (r *Rows) ColumnTypeScanType(index int) reflect.Type
- func (r *Rows) Columns() []string
- func (r *Rows) Next(dest []driver.Value) error
- func (r *Rows) RowsColumnTypeNullable(index int) (nullable, ok bool)
Constants ¶
const ( ResultTypeNotReady ResultType = "NOT_READY" ResultTypeEOS ResultType = "EOS" ResultTypePayload ResultType = "PAYLOAD" ResultKindSuccess ResultKind = "SUCCESS" ResultKindSuccessWithContent ResultKind = "SUCCESS_WITH_CONTENT" )
Variables ¶
This section is empty.
Functions ¶
func NewConnector ¶
func NewConnector(options ...ConnOption) (driver.Connector, error)
NewConnector creates a connector that can be used with `sql.OpenDB()`. This is an easier way to set up the DB instead of having to construct a DSN string.
Types ¶
type APIResponse ¶
type APIResponse struct {
Versions []string `json:"versions"`
}
APIResponse holds the list of API versions supported by the server.
type Client ¶
type Client struct { // BaseURL should be the root of the SQL Gateway REST API, e.g. // "http://localhost:8083". Do not include a trailing slash; the // Client automatically appends endpoint paths. BaseURL *url.URL // HTTPClient is the http.Client used for sending requests. If // nil, http.DefaultClient is used. HTTPClient *http.Client // APIVersion controls which API version is used for requests. // Valid values are "v1", "v2" or "v3". If empty, "v3" is used. APIVersion string }
Client represents a SQL Gateway REST API client. It holds the base URL for the Gateway and an underlying http.Client that is used to execute all requests. The Client is intentionally exported as a low-level escape hatch for scenarios that require direct REST interaction with the Flink SQL Gateway; the database/sql driver should remain the primary integration point for most applications.
func NewClient ¶
NewClient constructs a new Client for the given baseURL. The baseURL must be a valid URL string pointing at the SQL Gateway server (e.g. "http://localhost:8083"). Optionally, a custom http.Client can be supplied via the httpClient parameter; if nil, http.DefaultClient is used. The APIVersion argument controls the version prefix ("v1", "v2", "v3"). If left empty, "v3" is used because v3 includes all features of prior versions.
func (*Client) CancelOperation ¶
func (c *Client) CancelOperation(ctx context.Context, sessionHandle, operationHandle string) (string, error)
CancelOperation cancels a running operation. The server will try to terminate the job associated with the operation handle.
func (*Client) CloseOperation ¶
func (c *Client) CloseOperation(ctx context.Context, sessionHandle, operationHandle string) (string, error)
CloseOperation closes a finished operation. This should be called after fetching all results to free server-side resources.
func (*Client) CloseSession ¶
CloseSession closes the given session handle. It returns nil on success or an error if the server rejects the request.
func (*Client) CompleteStatement ¶
func (c *Client) CompleteStatement(ctx context.Context, sessionHandle string, reqBody *CompleteStatementRequest) ([]string, error)
CompleteStatement returns SQL completion hints for the given statement at the specified cursor position. It returns a slice of candidate completions or an error.
func (*Client) ConfigureSession ¶
func (c *Client) ConfigureSession(ctx context.Context, sessionHandle string, reqBody *ConfigureSessionRequest) error
ConfigureSession executes the given DDL or session configuration statement within the context of the provided session. It returns nil on success. Note that many configuration statements do not produce an OperationHandle; they take effect immediately.
func (*Client) ExecuteStatement ¶
func (c *Client) ExecuteStatement(ctx context.Context, sessionHandle string, reqBody *ExecuteStatementRequest) (string, error)
ExecuteStatement submits a SQL statement for execution in the specified session. It returns the operation handle on success. Statement execution is asynchronous; use GetOperationStatus and FetchResults to monitor and retrieve results.
func (*Client) FetchResults ¶
func (c *Client) FetchResults(ctx context.Context, sessionHandle, operationHandle, token string, rowFormat string) (*FetchResultsResponseBody, error)
FetchResults retrieves a batch of results for the given operation handle and token. The token identifies which batch to fetch: 0 indicates the first batch, while the `nextResultUri` returned from prior calls contains the token for subsequent batches. The rowFormat parameter controls the serialization format of the response. Valid values include "JSON" (default) and "PLAIN_TEXT".
func (*Client) GetAPIVersions ¶
GetAPIVersions returns the list of supported REST API versions available on the server.
func (*Client) GetInfo ¶
func (c *Client) GetInfo(ctx context.Context) (*InfoResponse, error)
GetInfo fetches cluster metadata from the gateway using GET /info. It returns an InfoResponse or an error.
func (*Client) GetOperationStatus ¶
func (c *Client) GetOperationStatus(ctx context.Context, sessionHandle, operationHandle string) (string, error)
GetOperationStatus retrieves the current status of an operation.
func (*Client) GetSessionConfig ¶
func (c *Client) GetSessionConfig(ctx context.Context, sessionHandle string) (map[string]string, error)
GetSessionConfig fetches the current configuration for a session. The returned map contains session-specific property overrides. If the session does not exist, an error is returned.
func (*Client) Heartbeat ¶
Heartbeat triggers a heartbeat for the specified session. This method simply sends an empty POST body to keep the session alive.
func (*Client) OpenSession ¶
OpenSession opens a new SQL Gateway session with the provided properties and name. It returns the session handle on success. If properties is nil, the default gateway configuration is used.
func (*Client) RefreshMaterializedTable ¶
func (c *Client) RefreshMaterializedTable(ctx context.Context, sessionHandle, identifier string, reqBody *RefreshMaterializedTableRequest) (string, error)
RefreshMaterializedTable triggers a refresh of the named materialized table and returns the resulting operation handle. The identifier must be the fully-qualified table identifier as a string (e.g. "my_catalog.my_db.my_table"). Optional request fields may influence execution. See Flink documentation for supported configuration keys.
type ColumnInfo ¶
type ColumnInfo struct { Name string `json:"name"` LogicalType LogicalType `json:"logicalType"` Comment string `json:"comment"` }
type CompleteStatementRequest ¶
type CompleteStatementRequest struct { Position int `json:"position"` Statement string `json:"statement"` }
CompleteStatementRequest defines the payload for the complete-statement endpoint, which returns auto-completion candidates.
type CompleteStatementResponse ¶
type CompleteStatementResponse struct {
Candidates []string `json:"candidates"`
}
CompleteStatementResponse holds the list of completion candidates.
type ConfigureSessionRequest ¶
type ConfigureSessionRequest struct { // ExecutionTimeout specifies a timeout in seconds for configuring // the session. Use int64 to match the OpenAPI specification. ExecutionTimeout int64 `json:"executionTimeout,omitempty"` Statement string `json:"statement"` }
ConfigureSessionRequest defines a request to adjust session settings or perform catalog/database modifications. It accepts an SQL statement (CREATE, DROP, ALTER, USE, ADD JAR, etc.) and optional timeout. ExecutionTimeout is expressed in seconds; zero means server default.
type ConnConfig ¶
type ConnConfig struct { GatewayURL string Client *http.Client APIVersion string Properties map[string]string }
ConnConfig captures the settings used to create a SQL Gateway session.
func WithDefaults ¶
func WithDefaults() *ConnConfig
WithDefaults returns a ConnConfig populated with sensible defaults for connecting to a local SQL Gateway instance.
type ConnOption ¶
type ConnOption func(c *ConnConfig)
ConnOption mutates a ConnConfig before it is used to construct a connector.
func WithAPIVersion ¶
func WithAPIVersion(apiVersion string) ConnOption
WithAPIVersion selects the SQL Gateway REST API version (e.g. v1, v2, v3).
func WithClient ¶
func WithClient(client *http.Client) ConnOption
WithClient sets the HTTP client used by the underlying Gateway client.
func WithGatewayURL ¶
func WithGatewayURL(gatewayUrl string) ConnOption
WithGatewayURL sets the SQL Gateway endpoint for connection.
func WithProperties ¶
func WithProperties(properties map[string]string) ConnOption
WithProperties sets SQL Gateway session properties.
type ExecuteStatementRequest ¶
type ExecuteStatementRequest struct { ExecutionConfig map[string]string `json:"executionConfig,omitempty"` // ExecutionTimeout specifies a timeout in seconds for the statement // execution. Use int64 to match the OpenAPI specification. ExecutionTimeout int64 `json:"executionTimeout,omitempty"` Statement string `json:"statement"` }
ExecuteStatementRequest represents the payload for executing a SQL statement. ExecutionConfig allows passing dynamic configuration options; ExecutionTimeout expresses a timeout in seconds for statement execution. Set ExecutionTimeout to zero to use the gateway’s default timeout. Statement must contain the SQL text to execute.
type ExecuteStatementResponse ¶
type ExecuteStatementResponse struct {
OperationHandle string `json:"operationHandle"`
}
ExecuteStatementResponse contains the operation handle returned after submitting a statement. Clients should use the handle to poll for results and status.
type FetchResultsResponseBody ¶
type FetchResultsResponseBody struct { JobID string `json:"jobID"` NextResultUri string `json:"nextResultUri"` IsQueryResult bool `json:"isQueryResult"` ResultKind ResultKind `json:"resultKind"` ResultType ResultType `json:"resultType"` Results struct { Columns []ColumnInfo `json:"columns"` RowFormat string `json:"rowFormat"` Data []RowData `json:"data"` } `json:"results"` }
FetchResultsResponseBody mirrors the OpenAPI schema for the response returned by fetching results from an operation. It contains the job ID, the URI for fetching the next batch, a flag indicating whether the query returns results, the result kind and type, and a nested results structure with column metadata and payload. See the Flink SQL Gateway REST API specification for details.
func (*FetchResultsResponseBody) NextToken ¶
func (r *FetchResultsResponseBody) NextToken() string
NextToken extracts the token from the NextResultUri, or returns "" if not present.
type GatewayClient ¶
type GatewayClient interface { GetInfo(ctx context.Context) (*InfoResponse, error) GetAPIVersions(ctx context.Context) ([]string, error) OpenSession(ctx context.Context, reqBody *OpenSessionRequest) (string, error) CloseSession(ctx context.Context, sessionHandle string) error GetSessionConfig(ctx context.Context, sessionHandle string) (map[string]string, error) CompleteStatement(ctx context.Context, sessionHandle string, reqBody *CompleteStatementRequest) ([]string, error) ConfigureSession(ctx context.Context, sessionHandle string, reqBody *ConfigureSessionRequest) error Heartbeat(ctx context.Context, sessionHandle string) error RefreshMaterializedTable(ctx context.Context, sessionHandle, identifier string, reqBody *RefreshMaterializedTableRequest) (string, error) CancelOperation(ctx context.Context, sessionHandle, operationHandle string) (string, error) CloseOperation(ctx context.Context, sessionHandle, operationHandle string) (string, error) GetOperationStatus(ctx context.Context, sessionHandle, operationHandle string) (string, error) ExecuteStatement(ctx context.Context, sessionHandle string, reqBody *ExecuteStatementRequest) (string, error) FetchResults(ctx context.Context, sessionHandle, operationHandle, token string, rowFormat string) (*FetchResultsResponseBody, error) }
GatewayClient is the minimal interface implemented by *Client. It exposes the raw Flink SQL Gateway HTTP surface area for advanced users who need functionality beyond the database/sql driver. Most callers should prefer using higher-level types and treat this interface as a low-level escape hatch when direct REST access is unavoidable.
type InfoResponse ¶
type InfoResponse struct { ProductName string `json:"productName"` Version string `json:"version"` }
InfoResponse represents the response from GET /info. It contains the product name and version of the connected cluster.
type LogicalType ¶
type LogicalType struct { Type string `json:"type"` Nullable bool `json:"nullable"` Length *int64 `json:"length"` Precision *int `json:"precision"` Scale *int `json:"scale"` Resolution *string `json:"resolution"` }
func (*LogicalType) UnmarshalJSON ¶
func (lt *LogicalType) UnmarshalJSON(data []byte) error
type OpenSessionRequest ¶
type OpenSessionRequest struct { Properties map[string]string `json:"properties,omitempty"` SessionName string `json:"sessionName,omitempty"` }
OpenSessionRequest defines the payload for creating a new session.
type OpenSessionResponse ¶
type OpenSessionResponse struct {
SessionHandle string `json:"sessionHandle"`
}
OpenSessionResponse contains the session handle returned from creating a session.
type OperationStatusResponse ¶
type OperationStatusResponse struct {
Status string `json:"status"`
}
OperationStatusResponse captures the status of an operation, which may be "RUNNING", "COMPLETED", "CANCELLED", or another status defined by the server.
type RefreshMaterializedTableRequest ¶
type RefreshMaterializedTableRequest struct { DynamicOptions map[string]string `json:"dynamicOptions,omitempty"` ExecutionConfig map[string]string `json:"executionConfig,omitempty"` IsPeriodic bool `json:"isPeriodic,omitempty"` Periodic bool `json:"periodic,omitempty"` ScheduleTime string `json:"scheduleTime,omitempty"` StaticPartitions map[string]string `json:"staticPartitions,omitempty"` }
RefreshMaterializedTableRequest defines parameters to refresh a materialized table. DynamicOptions and ExecutionConfig allow adjusting job parameters; StaticPartitions define optional partitioning. ScheduleTime can be used when scheduling periodic refreshes. IsPeriodic and Periodic are synonyms; set one of them true to indicate that refresh should be scheduled repeatedly.
type RefreshMaterializedTableResponse ¶
type RefreshMaterializedTableResponse struct {
OperationHandle string `json:"operationHandle"`
}
RefreshMaterializedTableResponse contains the operation handle associated with the refresh job. Clients should poll the operation’s status and results as with any other SQL operation.
type ResultKind ¶
type ResultKind string
type ResultType ¶
type ResultType string
type RowData ¶
type RowData struct { Kind string `json:"kind"` Fields []json.RawMessage `json:"fields"` }
type Rows ¶
type Rows struct {
// contains filtered or unexported fields
}
Rows exposes Flink SQL query results through database/sql by implementing the driver.Rows interface on top of paged gateway responses.
func (*Rows) Close ¶
Close releases the iterator and asks the gateway to close the underlying operation.
func (*Rows) ColumnTypeDatabaseTypeName ¶
ColumnTypeDatabaseTypeName returns the Flink logical type name for a column.
func (*Rows) ColumnTypeLength ¶
ColumnTypeLength reports the declared length for variable-size columns.
func (*Rows) ColumnTypePrecisionScale ¶
ColumnTypePrecisionScale returns precision and scale for a column when available.
func (*Rows) ColumnTypeScanType ¶
ColumnTypeScanType returns the Go type expected by Scan for the column.
func (*Rows) RowsColumnTypeNullable ¶
RowsColumnTypeNullable reports whether the column permits NULL values.