client

package module
v0.3.31 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2026 License: MIT Imports: 29 Imported by: 0

README

Hugr Go Client

Go client for the Hugr Data Mesh platform. Execute GraphQL queries and subscriptions with results delivered as Apache Arrow record readers over the IPC protocol.

Installation

go get github.com/hugr-lab/query-engine/client

Quick Start

package main

import (
    "context"
    "fmt"
    "log"

    "github.com/hugr-lab/query-engine/client"
)

func main() {
    c := client.NewClient("http://localhost:15000/ipc",
        client.WithApiKey("your-api-key"),
    )

    resp, err := c.Query(context.Background(),
        `{ core { data_sources { name type } } }`, nil,
    )
    if err != nil {
        log.Fatal(err)
    }
    defer resp.Close()

    fmt.Println(resp.Data)
}

Client Options

Authentication
Option Description
WithApiKey(key) API key via x-hugr-api-key header
WithApiKeyCustomHeader(key, header) API key with custom header name
WithSecretKeyAuth(key) Admin secret key via x-hugr-secret-key header. Enables impersonation via AsUser
WithToken(token) Bearer token via Authorization header (for JWT/OIDC)
Identity
Option Description
WithUserRole(role) Set x-hugr-role header
WithUserRoleCustomHeader(role, header) Custom role header name
WithUserInfo(id, name) Set x-hugr-user-id and x-hugr-name headers
WithUserInfoCustomHeader(id, name, idH, nameH) Custom user info header names
Connection
Option Description
WithTimeout(d) HTTP request timeout (default: 5 minutes)
WithTransport(rt) Custom http.RoundTripper
WithHttpUrl(url) Override HTTP query URL
WithJQQueryUrl(url) Override JQ query URL
WithSubscriptionPool(max, idle) WebSocket pool for subscriptions (default: 1/1)
Data Format
Option Description
WithTimezone(tz) Set X-Hugr-Timezone header (auto-detected by default)
WithoutTimezone() Disable auto timezone detection
WithArrowStructFlatten() Flatten Arrow struct fields in responses
Example
c := client.NewClient("http://localhost:15000/ipc",
    client.WithSecretKeyAuth("my-secret-key"),
    client.WithTimeout(30 * time.Second),
    client.WithSubscriptionPool(10, 5),
)

Queries

resp, err := c.Query(ctx, `
    query($limit: Int!) {
        devices(limit: $limit) { id name status }
    }
`, map[string]any{"limit": 100})
if err != nil {
    log.Fatal(err)
}
defer resp.Close()

// Scan into struct
var devices []Device
err = resp.ScanData("devices", &devices)

// Or access raw data
fmt.Println(resp.Data)
Scanning Results

The response tree has two kinds of leaves:

  • ArrowTables — list/aggregation selections. Come back column-oriented; decode via ScanTable / Rows(). JSON round-trip is not involved, so timestamps keep their timezone and geometry decodes to orb.Geometry.
  • JSON objectsby_pk, function calls, scalar fields. Come back as *types.JsonValue; decode via ScanObject / existing ScanData (they go through encoding/json).

Response.Tables() and Response.Objects() return the dotted paths in each bucket, so callers can pick the right scan method automatically.

ArrowTable path (ScanTable / Rows())

Struct fields resolved by json tag. Geometry columns decode to orb.Geometry (or a concrete subtype) for every encoding the engine emits.

import "github.com/paulmach/orb"

type Building struct {
    ID   int64        `json:"id"`
    Name string       `json:"name"`
    Geom orb.Geometry `json:"geom"`    // transparent decode
    Area float64      `json:"area_sqm"`
}

resp, _ := c.Query(ctx, `{ osm { buildings { id name geom area_sqm } } }`, nil)
defer resp.Close()

var rows []Building
if err := resp.ScanTable("osm.buildings", &rows); err != nil {
    log.Fatal(err)
}

For streaming, use the database/sql.Rows-style cursor:

tbl, _ := resp.Table("osm.buildings")
cur, _ := tbl.Rows()
defer cur.Close()
for cur.Next() {
    var b Building
    if err := cur.Scan(&b); err != nil { return err }
    // ...
}

Arrow column → Go destination matrix:

Arrow column Go destination Behaviour
Timestamp(unit, tz) time.Time Honours unit + timezone; UTC default when tz empty.
Timestamp(unit, "") types.DateTime Naive, no timezone applied.
Geometry (geoarrow.wkb, geoarrow.wkt, native coords, hugr.geojson) orb.Geometry / orb.Point / orb.LineString / … Transparent decode.
Untagged string with geometry content orb.Geometry Heuristic: { → GeoJSON, else WKT.
Geometry []byte / string / any / map[string]any Raw storage passthrough.
String with JSON content map[string]any / []any / any Auto json.Unmarshal.
String string Unchanged.

Register custom extensions with types.RegisterGeometryDecoder(name, fn).

JSON-object path (ScanObject)

by_pk lookups and function-call results come back as JSON. ScanObject runs json.Marshal/Unmarshal internally, so anything the standard library's JSON decoder supports works. Caveat: orb.Geometry is an interface — stdlib JSON can't unmarshal into it directly. Use *geojson.Geometry (from paulmach/orb/geojson) instead and call .Geometry() to obtain the concrete orb.Geometry:

import "github.com/paulmach/orb/geojson"

type Road struct {
    ID   int64             `json:"id"`
    Name string            `json:"name"`
    Geom *geojson.Geometry `json:"geom"`
}

var road Road
if err := resp.ScanObject("tf.digital_twin.roads_by_pk", &road); err != nil {
    log.Fatal(err)
}
concrete := road.Geom.Geometry()   // -> orb.LineString, orb.Point, ...
One struct for both paths

*geojson.Geometry works on both scanning paths — the Arrow scanner wraps the decoded geometry via geojson.NewGeometry, and ScanObject lets stdlib JSON populate it natively. Write one struct and use it for list and by_pk queries alike:

type Part struct {
    ID   int64             `json:"id"`
    Name string            `json:"name"`
    Geom *geojson.Geometry `json:"geom"`   // works via ScanTable and ScanObject
}

// Arrow table (list selection):
resp, _ := c.Query(ctx, `{ ... { parts { id name geom } } }`, nil)
var parts []Part
_ = resp.ScanTable("...parts", &parts)

// JSON object (by_pk or function call):
resp, _ := c.Query(ctx, `{ ... { parts_by_pk(id: 42) { id name geom } } }`, nil)
var part Part
_ = resp.ScanObject("...parts_by_pk", &part)

ScanData continues to work unchanged. Migrate call sites opportunistically when you hit timestamp or geometry fields.

Validate Without Executing
err := c.ValidateQuery(ctx, query, vars)
JQ Transform
result, err := c.QueryJSON(ctx, types.JQRequest{
    Query: types.Request{Query: graphqlQuery},
    JQ:    ".devices[] | {id, name}",
})

Subscriptions

Subscriptions use WebSocket connections via the hugr-ipc-ws protocol with Apache Arrow IPC binary frames.

Pooled Connections
c := client.NewClient(url,
    client.WithApiKey("key"),
    client.WithSubscriptionPool(10, 5),
)

sub, err := c.Subscribe(ctx, `
    subscription {
        query(interval: "5s") {
            devices { id status }
        }
    }
`, nil)
if err != nil {
    log.Fatal(err)
}

for event := range sub.Events {
    fmt.Printf("Path: %s\n", event.Path)
    for event.Reader.Next() {
        batch := event.Reader.RecordBatch()
        fmt.Printf("  %d rows\n", batch.NumRows())
    }
    event.Reader.Release()
}
Dedicated Connections

For long-running subscriptions or full WebSocket lifecycle control:

conn, err := c.NewSubscriptionConn(ctx)
if err != nil {
    log.Fatal(err)
}
defer conn.Close()

sub, err := conn.Subscribe(ctx, query, nil)
Cancel Subscription
sub.Cancel() // cancels one subscription
c.CloseSubscriptions() // closes all pooled connections

Impersonation (AsUser)

Admin clients authenticated via WithSecretKeyAuth can execute queries and subscriptions on behalf of any user with any role. The impersonated user's role permissions, field access rules, and row-level security filters are enforced.

Setup
c := client.NewClient("http://localhost:15000/ipc",
    client.WithSecretKeyAuth("admin-secret-key"),
)

// Optional: verify admin status at startup
if err := c.VerifyAdmin(ctx); err != nil {
    log.Fatal(err)
}
Query as User
// types.AsUser or client.AsUser — both work
ctx := types.AsUser(ctx, "user-123", "John Doe", "viewer")
resp, err := c.Query(ctx, `{ devices { id name } }`, nil)
// Response contains only data the "viewer" role can see
Subscribe as User
ctx := types.AsUser(ctx, "user-456", "Jane Smith", "editor")
sub, err := c.Subscribe(ctx, `
    subscription {
        query(interval: "5s") {
            devices { id status }
        }
    }
`, nil)
// Subscription events are filtered by "editor" role permissions

Multiple subscriptions for different users can coexist on the same pooled connection — each subscription independently enforces its user's permissions.

Introspect Impersonated Identity
ctx := types.AsUser(ctx, "user-123", "John", "viewer")
resp, _ := c.Query(ctx, `{
    function { core { auth { me {
        user_id
        role
        auth_type
        impersonated_by_user_id
        impersonated_by_user_name
    } } } }
}`, nil)
// Returns:
//   user_id: "user-123"
//   role: "viewer"
//   auth_type: "impersonation"
//   impersonated_by_user_id: "api"
//   impersonated_by_user_name: "api"
Security
  • Only WithSecretKeyAuth clients can impersonate. Other auth methods (JWT, OIDC, anonymous, regular API keys) have override headers silently ignored (HTTP) or rejected with an error (IPC subscriptions).
  • Row-level security filters use the impersonated user's identity: [$auth.user_id] resolves to the impersonated user's ID.
  • The original admin identity is tracked via impersonated_by_* fields for audit logging.

Data Source Management

// Register
err := c.RegisterDataSource(ctx, types.DataSource{
    Name: "my_source",
    Type: "postgres",
    URI:  "postgresql://...",
})

// Load / Unload
err = c.LoadDataSource(ctx, "my_source")
err = c.UnloadDataSource(ctx, "my_source")
err = c.UnloadDataSource(ctx, "my_source", types.WithHardUnload())

// Status
status, err := c.DataSourceStatus(ctx, "my_source")

// Describe schema
sdl, err := c.DescribeDataSource(ctx, "my_source", true)

Hugr Applications

The client supports running pluggable applications that register tables and functions:

c := client.NewClient("http://localhost:15000/ipc",
    client.WithApiKey("app-key"),
)

err := c.RunApplication(ctx, myApp,
    client.WithSecretKey("admin-secret"),
    client.WithStartupTimeout(30 * time.Second),
)

See the client/app package for the application framework API.

See Also

Documentation

Overview

Package client provides an IPC client for the Hugr query engine. It communicates via the Hugr IPC multipart/mixed protocol.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func AsUser added in v0.3.21

func AsUser(ctx context.Context, userId, userName, role string) context.Context

AsUser returns a context that causes the client to execute queries and subscriptions as the specified user with the specified role. Only works when the client is authenticated via WithSecretKeyAuth.

Types

type ApplicationOptions added in v0.3.11

type ApplicationOptions func(*serveConfig)

func WithAllocator added in v0.3.11

func WithAllocator(alloc memory.Allocator) ApplicationOptions

func WithLogLevel added in v0.3.11

func WithLogLevel(level *slog.Level) ApplicationOptions

func WithLogger added in v0.3.11

func WithLogger(logger *slog.Logger) ApplicationOptions

func WithMaxMessageSize added in v0.3.11

func WithMaxMessageSize(size int) ApplicationOptions

func WithSecretKey added in v0.3.11

func WithSecretKey(key string) ApplicationOptions

func WithStartupTimeout added in v0.3.11

func WithStartupTimeout(timeout time.Duration) ApplicationOptions

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(url string, opts ...Option) *Client

func (*Client) CloseSubscriptions added in v0.3.20

func (c *Client) CloseSubscriptions()

CloseSubscriptions closes all pool connections.

func (*Client) DataSourceStatus

func (c *Client) DataSourceStatus(ctx context.Context, name string) (string, error)

func (*Client) DescribeDataSource

func (c *Client) DescribeDataSource(ctx context.Context, name string, self bool) (string, error)

DescribeDataSource returns the description of the data source.

func (*Client) LoadDataSource

func (c *Client) LoadDataSource(ctx context.Context, name string) error

func (*Client) NewSubscriptionConn added in v0.3.20

func (c *Client) NewSubscriptionConn(ctx context.Context) (*SubscriptionConn, error)

NewSubscriptionConn creates a dedicated WebSocket connection.

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) (string, error)

func (*Client) Query

func (c *Client) Query(ctx context.Context, query string, vars map[string]any) (*types.Response, error)

func (*Client) QueryJSON

func (c *Client) QueryJSON(ctx context.Context, req types.JQRequest) (*types.JsonValue, error)

func (*Client) RegisterDataSource

func (c *Client) RegisterDataSource(ctx context.Context, ds types.DataSource) error

func (*Client) RunApplication added in v0.3.11

func (c *Client) RunApplication(ctx context.Context, application app.Application, opts ...ApplicationOptions) error

func (*Client) Subscribe added in v0.3.20

func (c *Client) Subscribe(ctx context.Context, query string, vars map[string]any) (*types.Subscription, error)

func (*Client) UnloadDataSource

func (c *Client) UnloadDataSource(ctx context.Context, name string, opts ...types.UnloadOpt) error

func (*Client) ValidateQuery

func (c *Client) ValidateQuery(ctx context.Context, query string, vars map[string]any) error

func (*Client) ValidateQueryJSON

func (c *Client) ValidateQueryJSON(ctx context.Context, req types.JQRequest) error

func (*Client) VerifyAdmin added in v0.3.21

func (c *Client) VerifyAdmin(ctx context.Context) error

VerifyAdmin queries the server to verify the client has admin privileges. This is a client-side convenience check; the server enforces impersonation authorization via the can_impersonate role property in the permissions system.

type ClientConfig

type ClientConfig struct {
	Timeout            time.Duration
	HttpUrl            string
	JQQueryUrl         string
	Transport          http.RoundTripper
	ArrowStructFlatten bool
	SubPool            SubscriptionPoolConfig
}

type Option

type Option func(*ClientConfig)

func WithApiKey

func WithApiKey(apiKey string) Option

func WithApiKeyCustomHeader

func WithApiKeyCustomHeader(apiKey, header string) Option

func WithArrowStructFlatten added in v0.3.6

func WithArrowStructFlatten() Option

func WithHttpUrl

func WithHttpUrl(httpUrl string) Option

func WithJQQueryUrl

func WithJQQueryUrl(jq string) Option

func WithSecretKeyAuth added in v0.3.21

func WithSecretKeyAuth(key string) Option

WithSecretKeyAuth sets the x-hugr-secret-key header for admin authentication. This enables impersonation via types.AsUser context.

func WithSubscriptionPool added in v0.3.20

func WithSubscriptionPool(max, idle int) Option

WithSubscriptionPool sets the subscription connection pool size.

func WithTimeout

func WithTimeout(timeout time.Duration) Option

func WithTimezone added in v0.3.11

func WithTimezone(timezone string) Option

func WithToken

func WithToken(token string) Option

func WithTransport

func WithTransport(transport http.RoundTripper) Option

func WithUserInfo

func WithUserInfo(id, name string) Option

func WithUserInfoCustomHeader

func WithUserInfoCustomHeader(id, name, idHeader, nameHeader string) Option

func WithUserRole

func WithUserRole(role string) Option

func WithUserRoleCustomHeader

func WithUserRoleCustomHeader(role, header string) Option

func WithoutTimezone added in v0.3.11

func WithoutTimezone() Option

type SubscriptionConn added in v0.3.20

type SubscriptionConn struct {
	// contains filtered or unexported fields
}

SubscriptionConn is a single WebSocket connection multiplexing subscriptions.

func (*SubscriptionConn) Close added in v0.3.20

func (sc *SubscriptionConn) Close()

Close gracefully shuts down the WebSocket connection. conn.Close sends the close frame; readLoop's conn.Read sees it and returns, triggering deferred cancel() + closeAllSubs().

func (*SubscriptionConn) Count added in v0.3.20

func (sc *SubscriptionConn) Count() int

Count returns the number of active subscriptions.

func (*SubscriptionConn) Subscribe added in v0.3.20

func (sc *SubscriptionConn) Subscribe(ctx context.Context, query string, vars map[string]any) (*types.Subscription, error)

Subscribe creates a subscription on this connection.

type SubscriptionPoolConfig added in v0.3.20

type SubscriptionPoolConfig struct {
	MaxConns int // max connections in pool (default 1)
	IdleConn int // idle connections to keep open (default 1)
}

SubscriptionPoolConfig controls how the client manages WebSocket connections for subscriptions.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL