Documentation
¶
Overview ¶
Package flight provides a client for the Gonzo Flight service.
Package flight implements the Flight service for Gonzo.
Index ¶
- func GetClientCert(ctx context.Context) (*credentials.TLSInfo, error)
- func SliceRecord(rec arrow.Record, offset, length int64) (arrow.Record, error)
- func StreamChunksFromReader(rec arrow.Record) (flight.FlightService_DoGetServer, error)
- type ColumnProcessor
- type FlightClient
- type GonzoFlightService
- func (s *GonzoFlightService) DoGet(ticket *flight.Ticket, stream flight.FlightService_DoGetServer) error
- func (s *GonzoFlightService) DoPut(stream flight.FlightService_DoPutServer) error
- func (s *GonzoFlightService) ProcessStream(reader flight.DataStreamReader, writer flight.DataStreamWriter) error
- func (s *GonzoFlightService) ValidateRecord(rec arrow.Record) error
- type IngestionAuditLog
- type RetryPolicy
- type SchemaManager
- type StreamConfig
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func GetClientCert ¶
func GetClientCert(ctx context.Context) (*credentials.TLSInfo, error)
authorizeIngest verifies that the client is permitted to ingest data.
func SliceRecord ¶
SliceRecord creates a new record containing only the specified row range. It returns an error if the offset or length is invalid.
func StreamChunksFromReader ¶
func StreamChunksFromReader(rec arrow.Record) (flight.FlightService_DoGetServer, error)
StreamChunksFromReader converts a record into a stream of FlightData chunks.
Types ¶
type ColumnProcessor ¶
type ColumnProcessor struct {
// contains filtered or unexported fields
}
ColumnProcessor applies processing functions to individual columns.
func NewColumnProcessor ¶
func NewColumnProcessor() *ColumnProcessor
NewColumnProcessor returns a new ColumnProcessor.
func (*ColumnProcessor) AddProcessor ¶
func (cp *ColumnProcessor) AddProcessor(column string, fn func(arrow.Array) error)
AddProcessor registers a processor function for a column.
type FlightClient ¶
type FlightClient struct {
// contains filtered or unexported fields
}
FlightClient wraps an Apache Arrow Flight client to query and ingest records.
func NewFlightClient ¶
func NewFlightClient(addr string) (*FlightClient, error)
NewFlightClient creates a Flight client using NewClientWithMiddleware.
type GonzoFlightService ¶
type GonzoFlightService struct {
flight.BaseFlightServer
// contains filtered or unexported fields
}
GonzoFlightService implements the Flight service.
func NewGonzoFlightService ¶
func NewGonzoFlightService(gonzoDB *db.DB, roleManager auth.RoleManager, dlq db.DeadLetterQueue) *GonzoFlightService
NewGonzoFlightService constructs a new GonzoFlightService.
func (*GonzoFlightService) DoGet ¶
func (s *GonzoFlightService) DoGet(ticket *flight.Ticket, stream flight.FlightService_DoGetServer) error
DoGet handles Flight DoGet requests.
func (*GonzoFlightService) DoPut ¶
func (s *GonzoFlightService) DoPut(stream flight.FlightService_DoPutServer) error
DoPut handles Flight DoPut requests for ingesting records. DoPut handles Flight DoPut requests for ingesting records.
func (*GonzoFlightService) ProcessStream ¶
func (s *GonzoFlightService) ProcessStream(reader flight.DataStreamReader, writer flight.DataStreamWriter) error
ProcessStream demonstrates how to read, process, and write records using both a DataStreamReader and DataStreamWriter. This helper is optional and illustrates additional streaming logic with retries.
func (*GonzoFlightService) ValidateRecord ¶
func (s *GonzoFlightService) ValidateRecord(rec arrow.Record) error
validateRecord checks that the record meets basic criteria.
type IngestionAuditLog ¶
type IngestionAuditLog struct {
Timestamp time.Time
RecordCount int
Checksum string
ClientID string
SchemaVersion int
Status string // "success" or "failed"
}
IngestionAuditLog holds metadata about record ingestion.
type RetryPolicy ¶
type RetryPolicy struct {
MaxAttempts int
Backoff time.Duration
RetryableErrors map[codes.Code]bool
}
RetryPolicy defines retry logic for operations that may fail transiently.
type SchemaManager ¶
type SchemaManager struct {
// contains filtered or unexported fields
}
SchemaManager handles schema registration and validation.
func NewSchemaManager ¶
func NewSchemaManager() *SchemaManager
NewSchemaManager returns a new SchemaManager.
func (*SchemaManager) RegisterSchema ¶
func (sm *SchemaManager) RegisterSchema(endpoint string, schema *arrow.Schema)
RegisterSchema stores the expected schema for a given endpoint.
type StreamConfig ¶
type StreamConfig struct {
Compression compress.Compression
BatchSize int
MaxRetries int
SchemaPolicy SchemaManager
}
StreamConfig controls Flight stream behavior.