Documentation
¶
Overview ¶
Arrow record batch flattener transforms complex types unsupported by Perspective (Struct, List, Map, Union) into simple columns. Structs are recursively flattened with dot-separated names; List, Map, and Union values are serialized to JSON strings.
Index ¶
- Variables
- func AsUser(ctx context.Context, userId, userName, role string) context.Context
- func ColumnValue(a arrow.Array, i int) any
- func ContextWithValidateOnly(ctx context.Context) context.Context
- func DataClose(data any)
- func ExtractResponseData(path string, data map[string]any) any
- func FlattenRecord(rec arrow.RecordBatch, mem memory.Allocator) arrow.RecordBatch
- func FlattenSchema(schema *arrow.Schema) *arrow.Schema
- func IsValidateOnlyContext(ctx context.Context) bool
- func NeedsFlatten(schema *arrow.Schema) bool
- func ParseJsonValue(v any) (map[string]interface{}, error)
- func RecordToJSON(rec arrow.RecordBatch, asArray bool, w io.Writer) error
- func RecordsColNums(rr []arrow.RecordBatch) int64
- func RecordsRowNums(rr []arrow.RecordBatch) int64
- func ReleaseRecords(rr []arrow.RecordBatch)
- func RetainRecords(rr []arrow.RecordBatch)
- func WarpGraphQLError(err error) gqlerror.List
- type ArrowTable
- type ArrowTableChunked
- func (t *ArrowTableChunked) Append(rec arrow.RecordBatch)
- func (t *ArrowTableChunked) Chunk(i int) arrow.RecordBatch
- func (t *ArrowTableChunked) DecodeMsgpack(dec *msgpack.Decoder) error
- func (t *ArrowTableChunked) EncodeMsgpack(enc *msgpack.Encoder) error
- func (t *ArrowTableChunked) Info() string
- func (t *ArrowTableChunked) MarshalJSON() ([]byte, error)
- func (t *ArrowTableChunked) NumChunks() int
- func (t *ArrowTableChunked) NumCols() int
- func (t *ArrowTableChunked) NumRows() int
- func (t *ArrowTableChunked) Reader(retain bool) (array.RecordReader, error)
- func (t *ArrowTableChunked) Records() ([]arrow.RecordBatch, error)
- func (t *ArrowTableChunked) Release()
- func (t *ArrowTableChunked) Retain()
- func (t *ArrowTableChunked) RowData(i int) (map[string]any, bool)
- func (t *ArrowTableChunked) SetInfo(info string)
- type ArrowTableStream
- func (t *ArrowTableStream) DecodeMsgpack(dec *msgpack.Decoder) error
- func (t *ArrowTableStream) EncodeMsgpack(enc *msgpack.Encoder) error
- func (t *ArrowTableStream) Info() string
- func (t *ArrowTableStream) MarshalJSON() ([]byte, error)
- func (t *ArrowTableStream) Reader(retain bool) (array.RecordReader, error)
- func (t *ArrowTableStream) Records() ([]arrow.RecordBatch, error)
- func (t *ArrowTableStream) Release()
- func (t *ArrowTableStream) Retain()
- func (t *ArrowTableStream) SetInfo(info string)
- type CatalogSource
- type CatalogSourceType
- type DataSource
- type DataSourceType
- type DateTime
- type EmbeddingResult
- type EmbeddingsResult
- type JQRequest
- type JsonValue
- type LLMMessage
- type LLMOptions
- type LLMResult
- type LLMStreamEvent
- type LLMTool
- type LLMToolCall
- type ModelInfo
- type OperationResult
- type Querier
- type Request
- type Response
- type Subscription
- type SubscriptionEvent
- type UnloadOpt
- type UnloadOpts
- type UserIdentity
- type Vector
Constants ¶
This section is empty.
Variables ¶
var ( ErrNoData = errors.New("no data") ErrWrongDataPath = errors.New("wrong data path") )
Functions ¶
func AsUser ¶ added in v0.3.21
AsUser returns a context that causes Query/Subscribe to execute as the specified user. Requires admin (secret key) authentication.
func FlattenRecord ¶ added in v0.3.6
func FlattenRecord(rec arrow.RecordBatch, mem memory.Allocator) arrow.RecordBatch
FlattenRecord transforms a record batch by recursively flattening Struct fields and converting List/Map/Union fields to JSON strings. If the record has no complex types, it is returned as-is (with Retain). The caller must Release the returned record.
func FlattenSchema ¶ added in v0.3.6
FlattenSchema returns the flattened schema without processing data.
func IsValidateOnlyContext ¶
func NeedsFlatten ¶ added in v0.3.6
NeedsFlatten returns true if the schema contains any complex types (Struct, List, Map, Union) that need transformation.
func ParseJsonValue ¶
func RecordToJSON ¶
func RecordsColNums ¶
func RecordsColNums(rr []arrow.RecordBatch) int64
func RecordsRowNums ¶
func RecordsRowNums(rr []arrow.RecordBatch) int64
func ReleaseRecords ¶
func ReleaseRecords(rr []arrow.RecordBatch)
func RetainRecords ¶
func RetainRecords(rr []arrow.RecordBatch)
func WarpGraphQLError ¶
Types ¶
type ArrowTable ¶
type ArrowTableChunked ¶
type ArrowTableChunked struct {
// contains filtered or unexported fields
}
func NewArrowTable ¶
func NewArrowTable() *ArrowTableChunked
func NewArrowTableFromReader ¶
func NewArrowTableFromReader(reader array.RecordReader) (*ArrowTableChunked, error)
func (*ArrowTableChunked) Append ¶
func (t *ArrowTableChunked) Append(rec arrow.RecordBatch)
func (*ArrowTableChunked) Chunk ¶
func (t *ArrowTableChunked) Chunk(i int) arrow.RecordBatch
func (*ArrowTableChunked) DecodeMsgpack ¶
func (t *ArrowTableChunked) DecodeMsgpack(dec *msgpack.Decoder) error
func (*ArrowTableChunked) EncodeMsgpack ¶
func (t *ArrowTableChunked) EncodeMsgpack(enc *msgpack.Encoder) error
func (*ArrowTableChunked) Info ¶
func (t *ArrowTableChunked) Info() string
func (*ArrowTableChunked) MarshalJSON ¶
func (t *ArrowTableChunked) MarshalJSON() ([]byte, error)
func (*ArrowTableChunked) NumChunks ¶
func (t *ArrowTableChunked) NumChunks() int
func (*ArrowTableChunked) NumCols ¶
func (t *ArrowTableChunked) NumCols() int
func (*ArrowTableChunked) NumRows ¶
func (t *ArrowTableChunked) NumRows() int
func (*ArrowTableChunked) Reader ¶
func (t *ArrowTableChunked) Reader(retain bool) (array.RecordReader, error)
func (*ArrowTableChunked) Records ¶
func (t *ArrowTableChunked) Records() ([]arrow.RecordBatch, error)
func (*ArrowTableChunked) Release ¶
func (t *ArrowTableChunked) Release()
func (*ArrowTableChunked) Retain ¶
func (t *ArrowTableChunked) Retain()
func (*ArrowTableChunked) RowData ¶
func (t *ArrowTableChunked) RowData(i int) (map[string]any, bool)
func (*ArrowTableChunked) SetInfo ¶
func (t *ArrowTableChunked) SetInfo(info string)
type ArrowTableStream ¶
type ArrowTableStream struct {
// contains filtered or unexported fields
}
func NewArrowTableStream ¶
func NewArrowTableStream(reader array.RecordReader) *ArrowTableStream
func (*ArrowTableStream) DecodeMsgpack ¶
func (t *ArrowTableStream) DecodeMsgpack(dec *msgpack.Decoder) error
func (*ArrowTableStream) EncodeMsgpack ¶
func (t *ArrowTableStream) EncodeMsgpack(enc *msgpack.Encoder) error
func (*ArrowTableStream) Info ¶
func (t *ArrowTableStream) Info() string
func (*ArrowTableStream) MarshalJSON ¶
func (t *ArrowTableStream) MarshalJSON() ([]byte, error)
func (*ArrowTableStream) Reader ¶
func (t *ArrowTableStream) Reader(retain bool) (array.RecordReader, error)
func (*ArrowTableStream) Records ¶
func (t *ArrowTableStream) Records() ([]arrow.RecordBatch, error)
func (*ArrowTableStream) Release ¶
func (t *ArrowTableStream) Release()
func (*ArrowTableStream) Retain ¶
func (t *ArrowTableStream) Retain()
func (*ArrowTableStream) SetInfo ¶
func (t *ArrowTableStream) SetInfo(info string)
type CatalogSource ¶
type CatalogSource struct {
Name string `json:"name"`
Type CatalogSourceType `json:"type"`
Path string `json:"path"`
Description string `json:"description"`
}
type CatalogSourceType ¶
type CatalogSourceType string
type DataSource ¶
type DataSource struct {
Name string `json:"name"`
Description string `json:"description"`
Type DataSourceType `json:"type"`
Prefix string `json:"prefix"`
Path string `json:"path"`
AsModule bool `json:"as_module"`
SelfDefined bool `json:"self_defined"`
ReadOnly bool `json:"read_only"`
Disabled bool `json:"disabled"`
Sources []CatalogSource `json:"catalogs"`
}
type DataSourceType ¶
type DataSourceType string
type DateTime ¶ added in v0.3.11
DateTime represents a naive date-time value WITHOUT timezone. Used by the DateTime scalar to distinguish from Timestamp (which uses time.Time directly). Engine SQLValue dispatches on this type to cast as TIMESTAMP instead of TIMESTAMPTZ.
type EmbeddingResult ¶ added in v0.3.16
type EmbeddingResult struct {
Vector Vector `json:"vector"`
PromptTokens int `json:"prompt_tokens"`
TokenCount int `json:"token_count"`
}
EmbeddingResult is an enriched embedding response with token count.
type EmbeddingsResult ¶ added in v0.3.16
type EmbeddingsResult struct {
Vectors []Vector `json:"vectors"`
TokenCount int `json:"token_count"`
PromptTokens int `json:"prompt_tokens"`
}
EmbeddingsResult is a batch embedding response with total token count.
type LLMMessage ¶ added in v0.3.16
type LLMMessage struct {
Role string `json:"role"`
Content string `json:"content"`
ToolCalls []LLMToolCall `json:"tool_calls,omitempty"`
ToolCallID string `json:"tool_call_id,omitempty"`
ThoughtSignature string `json:"thought_signature,omitempty"` // Gemini 2.5+ / Anthropic: encrypted signature for multi-turn continuity
Thinking string `json:"thinking,omitempty"` // Anthropic thinking text / OpenAI reasoning summary
}
LLMMessage is a single message in a chat conversation.
type LLMOptions ¶ added in v0.3.16
type LLMOptions struct {
MaxTokens int `json:"max_tokens,omitempty"`
Temperature float64 `json:"temperature,omitempty"`
TopP float64 `json:"top_p,omitempty"`
Stop []string `json:"stop,omitempty"`
Tools []LLMTool `json:"tools,omitempty"`
ToolChoice string `json:"tool_choice,omitempty"`
ThinkingBudget int `json:"thinking_budget,omitempty"` // Token budget for reasoning/thinking (Anthropic, Gemini)
}
LLMOptions configures an LLM request.
type LLMResult ¶ added in v0.3.16
type LLMResult struct {
Content string `json:"content"`
Model string `json:"model"`
FinishReason string `json:"finish_reason"`
PromptTokens int `json:"prompt_tokens"`
CompletionTokens int `json:"completion_tokens"`
TotalTokens int `json:"total_tokens"`
Provider string `json:"provider"`
LatencyMs int `json:"latency_ms"`
ToolCalls []LLMToolCall `json:"tool_calls"`
ThoughtSignature string `json:"thought_signature,omitempty"` // Gemini 2.5+ / Anthropic: encrypted signature
Thinking string `json:"thinking,omitempty"` // Anthropic thinking text / OpenAI reasoning summary
}
LLMResult is the normalized response from any LLM provider.
type LLMStreamEvent ¶ added in v0.3.20
type LLMStreamEvent struct {
Type string `json:"type"` // content_delta, reasoning, tool_use, finish, error
Content string `json:"content"` // Token content (content_delta, reasoning)
Model string `json:"model"` // Model identifier
FinishReason string `json:"finish_reason"` // stop, length, tool_use (finish only)
ToolCalls string `json:"tool_calls"` // JSON-encoded tool calls (tool_use only)
PromptTokens int `json:"prompt_tokens"` // Input token count (finish only)
CompletionTokens int `json:"completion_tokens"` // Output token count (finish only)
ThoughtSignature string `json:"thought_signature"` // Gemini 2.5+ / Anthropic: encrypted signature (finish only)
Thinking string `json:"thinking"` // Anthropic thinking text / OpenAI reasoning summary (finish only)
}
LLMStreamEvent is a single event from a streaming LLM completion.
type LLMTool ¶ added in v0.3.16
type LLMTool struct {
Name string `json:"name"`
Description string `json:"description"`
Parameters any `json:"parameters"`
}
LLMTool is a tool definition provided to the model.
type LLMToolCall ¶ added in v0.3.16
type LLMToolCall struct {
ID string `json:"id"`
Name string `json:"name"`
Arguments any `json:"arguments"`
}
LLMToolCall is a tool invocation from the model.
type ModelInfo ¶ added in v0.3.16
type ModelInfo struct {
Name string `json:"name"`
Type string `json:"type"` // "llm" or "embedding"
Provider string `json:"provider"` // "openai", "anthropic", "gemini"
Model string `json:"model"`
}
ModelInfo describes a registered AI model data source.
type OperationResult ¶
type OperationResult struct {
Succeed bool `json:"success"`
Msg string `json:"message"`
Rows int `json:"affected_rows"`
LastId int `json:"last_id"`
}
func ErrResult ¶
func ErrResult(err error) *OperationResult
func Result ¶
func Result(msg string, rows, lastId int) *OperationResult
func SQLError ¶
func SQLError(msg string, err error) *OperationResult
func (*OperationResult) CollectSQL ¶
func (r *OperationResult) CollectSQL(res sql.Result)
func (*OperationResult) ToDuckdb ¶
func (r *OperationResult) ToDuckdb() map[string]any
type Querier ¶
type Querier interface {
Query(ctx context.Context, query string, vars map[string]any) (*Response, error)
Subscribe(ctx context.Context, query string, vars map[string]any) (*Subscription, error)
RegisterDataSource(ctx context.Context, ds DataSource) error
LoadDataSource(ctx context.Context, name string) error
UnloadDataSource(ctx context.Context, name string, opts ...UnloadOpt) error
DataSourceStatus(ctx context.Context, name string) (string, error)
DescribeDataSource(ctx context.Context, name string, self bool) (string, error)
}
type Response ¶
type Response struct {
Data map[string]any `json:"data,omitempty"`
Extensions map[string]any `json:"extensions,omitempty"`
Errors gqlerror.List `json:"errors,omitempty"`
}
func ErrResponse ¶
type Subscription ¶ added in v0.3.20
type Subscription struct {
Events <-chan SubscriptionEvent
Cancel func()
// contains filtered or unexported fields
}
Subscription is the result of Querier.Subscribe. Events channel produces SubscriptionEvents until closed. Errors are reported via Reader.Err() after Reader.Next() returns false, or via Err() after Events is closed (for subscription-level errors).
func (*Subscription) Err ¶ added in v0.3.25
func (s *Subscription) Err() error
Err returns the subscription-level error (e.g. from subscription_error frame). Call after Events channel is closed.
func (*Subscription) SetErr ¶ added in v0.3.25
func (s *Subscription) SetErr(err error)
SetErr sets the subscription-level error. Called by the transport layer.
type SubscriptionEvent ¶ added in v0.3.20
type SubscriptionEvent struct {
Path string // Data object path (e.g. "core.data_sources"). Empty for native subscriptions.
Reader array.RecordReader // Data reader. Transport decides how to consume: graphql-ws reads all → JSON next; IPC streams batches.
}
SubscriptionEvent is a data event in the subscription stream. One event per data path (query) or per native source event batch. Metadata (geometry, table info) is in Reader's Arrow schema metadata.
type UnloadOpt ¶ added in v0.3.18
type UnloadOpt func(*UnloadOpts)
func WithHardUnload ¶ added in v0.3.18
func WithHardUnload() UnloadOpt
type UnloadOpts ¶ added in v0.3.18
type UnloadOpts struct {
Hard bool
}
type UserIdentity ¶ added in v0.3.21
UserIdentity represents an impersonated user identity. When set in context via AsUser, queries and subscriptions execute as this user with this role's permissions.
func AsUserFromContext ¶ added in v0.3.21
func AsUserFromContext(ctx context.Context) *UserIdentity
AsUserFromContext extracts impersonation identity from context.
type Vector ¶ added in v0.3.16
type Vector []float64