flight

package
v0.0.0-...-92d0331 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 7, 2025 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package flight provides a client for the Gonzo Flight service.

Package flight implements the Flight service for Gonzo.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func GetClientCert

func GetClientCert(ctx context.Context) (*credentials.TLSInfo, error)

authorizeIngest verifies that the client is permitted to ingest data.

func SliceRecord

func SliceRecord(rec arrow.Record, offset, length int64) (arrow.Record, error)

SliceRecord creates a new record containing only the specified row range. It returns an error if the offset or length is invalid.

func StreamChunksFromReader

func StreamChunksFromReader(rec arrow.Record) (flight.FlightService_DoGetServer, error)

StreamChunksFromReader converts a record into a stream of FlightData chunks.

Types

type ColumnProcessor

type ColumnProcessor struct {
	// contains filtered or unexported fields
}

ColumnProcessor applies processing functions to individual columns.

func NewColumnProcessor

func NewColumnProcessor() *ColumnProcessor

NewColumnProcessor returns a new ColumnProcessor.

func (*ColumnProcessor) AddProcessor

func (cp *ColumnProcessor) AddProcessor(column string, fn func(arrow.Array) error)

AddProcessor registers a processor function for a column.

func (*ColumnProcessor) Process

func (cp *ColumnProcessor) Process(rec arrow.Record) error

Process applies all registered processors to their corresponding columns.

type FlightClient

type FlightClient struct {
	// contains filtered or unexported fields
}

FlightClient wraps an Apache Arrow Flight client to query and ingest records.

func NewFlightClient

func NewFlightClient(addr string) (*FlightClient, error)

NewFlightClient creates a Flight client using NewClientWithMiddleware.

func (*FlightClient) Ingest

func (c *FlightClient) Ingest(ctx context.Context, records []arrow.Record) error

Ingest uses DoPut to send records to the Flight service for ingestion. Typically you'd build arrow.Records from your local data source.

func (*FlightClient) Query

func (c *FlightClient) Query(ctx context.Context, query *db.Query) ([]arrow.Record, error)

Query sends a query to the Flight server

type GonzoFlightService

type GonzoFlightService struct {
	flight.BaseFlightServer
	// contains filtered or unexported fields
}

GonzoFlightService implements the Flight service.

func NewGonzoFlightService

func NewGonzoFlightService(gonzoDB *db.DB, roleManager auth.RoleManager, dlq db.DeadLetterQueue) *GonzoFlightService

NewGonzoFlightService constructs a new GonzoFlightService.

func (*GonzoFlightService) DoGet

DoGet handles Flight DoGet requests.

func (*GonzoFlightService) DoPut

DoPut handles Flight DoPut requests for ingesting records. DoPut handles Flight DoPut requests for ingesting records.

func (*GonzoFlightService) ProcessStream

func (s *GonzoFlightService) ProcessStream(reader flight.DataStreamReader, writer flight.DataStreamWriter) error

ProcessStream demonstrates how to read, process, and write records using both a DataStreamReader and DataStreamWriter. This helper is optional and illustrates additional streaming logic with retries.

func (*GonzoFlightService) ValidateRecord

func (s *GonzoFlightService) ValidateRecord(rec arrow.Record) error

validateRecord checks that the record meets basic criteria.

type IngestionAuditLog

type IngestionAuditLog struct {
	Timestamp     time.Time
	RecordCount   int
	Checksum      string
	ClientID      string
	SchemaVersion int
	Status        string // "success" or "failed"
}

IngestionAuditLog holds metadata about record ingestion.

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts     int
	Backoff         time.Duration
	RetryableErrors map[codes.Code]bool
}

RetryPolicy defines retry logic for operations that may fail transiently.

func (*RetryPolicy) Execute

func (p *RetryPolicy) Execute(ctx context.Context, op func() error) error

Execute runs the operation with retry logic.

type SchemaManager

type SchemaManager struct {
	// contains filtered or unexported fields
}

SchemaManager handles schema registration and validation.

func NewSchemaManager

func NewSchemaManager() *SchemaManager

NewSchemaManager returns a new SchemaManager.

func (*SchemaManager) RegisterSchema

func (sm *SchemaManager) RegisterSchema(endpoint string, schema *arrow.Schema)

RegisterSchema stores the expected schema for a given endpoint.

func (*SchemaManager) Validate

func (sm *SchemaManager) Validate(rec arrow.Record, endpoint string) error

Validate checks that the record's schema matches the registered schema.

type StreamConfig

type StreamConfig struct {
	Compression  compress.Compression
	BatchSize    int
	MaxRetries   int
	SchemaPolicy SchemaManager
}

StreamConfig controls Flight stream behavior.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL