Documentation
¶
Index ¶
- Variables
- func Drop(driver string, config map[string]any, logger *zap.Logger) error
- func NewPermissionDeniedError(msg string) error
- func RecordDownloadMetrics(ctx context.Context, m *DownloadMetrics)
- func Register(name string, driver Driver)
- func RegisterAsConnector(name string, driver Driver)
- type BucketSink
- type BucketSource
- type CatalogEntry
- type CatalogStore
- type Connection
- type DatabaseSink
- type DatabaseSource
- type Dialect
- type DownloadMetrics
- type Driver
- type FileIterator
- type FileSource
- type FileStore
- type InformationSchema
- type IngestionSummary
- type Instance
- type NoOpProgress
- type OLAPStore
- type ObjectStore
- type ObjectType
- type PermissionDeniedError
- type Progress
- type ProgressUnit
- type PropertySchema
- type PropertySchemaType
- type RegistryStore
- type RepoObjectStat
- type RepoStore
- type Result
- type Sink
- type Source
- type Spec
- type Statement
- type Table
- type TransferOption
- type TransferOpts
- type Transporter
- type WatchCallback
- type WatchEvent
- type WithConnectionFunc
Constants ¶
This section is empty.
Variables ¶
var Connectors = make(map[string]Driver)
Connectors tracks all registered connector drivers.
var Drivers = make(map[string]Driver)
Drivers is a registry of drivers.
var ErrDropNotSupported = errors.New("driver: drop not supported")
ErrDropNotSupported indicates the driver doesn't support dropping its underlying store.
var ErrFileAlreadyExists = errors.New("file already exists")
var ErrIngestionLimitExceeded = fmt.Errorf("connectors: source ingestion exceeds limit")
var ErrNotFound = errors.New("driver: not found")
ErrNotFound indicates the resource wasn't found.
var ErrUnsupportedConnector = errors.New("drivers: connector not supported")
ErrUnsupportedConnector is returned from Ingest for unsupported connectors.
Functions ¶
func Drop ¶ added in v0.27.0
Drop tears down a store. Drivers that do not support it return ErrDropNotSupported.
func NewPermissionDeniedError ¶ added in v0.30.0
func RecordDownloadMetrics ¶ added in v0.30.0
func RecordDownloadMetrics(ctx context.Context, m *DownloadMetrics)
func RegisterAsConnector ¶ added in v0.30.0
RegisterAsConnector tracks a connector driver.
Types ¶
type BucketSink ¶ added in v0.30.0
type BucketSink struct {
Path string
}
func (*BucketSink) BucketSink ¶ added in v0.30.0
func (b *BucketSink) BucketSink() (*BucketSink, bool)
func (*BucketSink) DatabaseSink ¶ added in v0.30.0
func (b *BucketSink) DatabaseSink() (*DatabaseSink, bool)
type BucketSource ¶ added in v0.30.0
type BucketSource struct {
ExtractPolicy *runtimev1.Source_ExtractPolicy
Properties map[string]any
}
func (*BucketSource) BucketSource ¶ added in v0.30.0
func (b *BucketSource) BucketSource() (*BucketSource, bool)
func (*BucketSource) DatabaseSource ¶ added in v0.30.0
func (b *BucketSource) DatabaseSource() (*DatabaseSource, bool)
func (*BucketSource) FileSource ¶ added in v0.30.0
func (b *BucketSource) FileSource() (*FileSource, bool)
type CatalogEntry ¶ added in v0.16.0
type CatalogEntry struct {
Name string
Type ObjectType
Object proto.Message
Path string
Embedded bool
BytesIngested int64
Parents []string
Children []string
CreatedOn time.Time
UpdatedOn time.Time
RefreshedOn time.Time
}
CatalogEntry represents one object in the catalog, such as a source.
func (*CatalogEntry) GetMetricsView ¶ added in v0.16.0
func (e *CatalogEntry) GetMetricsView() *runtimev1.MetricsView
func (*CatalogEntry) GetModel ¶ added in v0.16.0
func (e *CatalogEntry) GetModel() *runtimev1.Model
func (*CatalogEntry) GetSource ¶ added in v0.16.0
func (e *CatalogEntry) GetSource() *runtimev1.Source
func (*CatalogEntry) GetTable ¶ added in v0.16.0
func (e *CatalogEntry) GetTable() *runtimev1.Table
type CatalogStore ¶
type CatalogStore interface {
FindEntries(ctx context.Context, instanceID string, t ObjectType) ([]*CatalogEntry, error)
FindEntry(ctx context.Context, instanceID string, name string) (*CatalogEntry, error)
CreateEntry(ctx context.Context, instanceID string, entry *CatalogEntry) error
UpdateEntry(ctx context.Context, instanceID string, entry *CatalogEntry) error
DeleteEntry(ctx context.Context, instanceID string, name string) error
DeleteEntries(ctx context.Context, instanceID string) error
}
CatalogStore is implemented by drivers capable of storing catalog info for a specific instance.
type Connection ¶
type Connection interface {
// Driver type (like "duckdb")
Driver() string
// Config used to open the Connection
Config() map[string]any
// Migrate prepares the connection for use. It will be called before the connection is first used.
// (Not to be confused with migrating artifacts, which is handled by the runtime and tracked in the catalog.)
Migrate(ctx context.Context) error
// MigrationStatus returns the connection's current and desired migration version (if applicable)
MigrationStatus(ctx context.Context) (current int, desired int, err error)
// Close closes the connection
Close() error
// AsRegistry returns a AsRegistry if the driver can serve as such, otherwise returns false.
// The registry is responsible for tracking instances and repos.
AsRegistry() (RegistryStore, bool)
// AsCatalogStore returns a AsCatalogStore if the driver can serve as such, otherwise returns false.
// A catalog is used to store state about migrated/deployed objects (such as sources and metrics views).
AsCatalogStore() (CatalogStore, bool)
// AsRepoStore returns a AsRepoStore if the driver can serve as such, otherwise returns false.
// A repo stores file artifacts (either in a folder or virtualized in a database).
AsRepoStore() (RepoStore, bool)
// AsOLAP returns an AsOLAP if the driver can serve as such, otherwise returns false.
// OLAP stores are where we actually store, transform, and query users' data.
AsOLAP() (OLAPStore, bool)
// AsObjectStore returns an ObjectStore if the driver can serve as such, otherwise returns false.
AsObjectStore() (ObjectStore, bool)
// AsFileStore returns a Filetore if the driver can serve as such, otherwise returns false.
AsFileStore() (FileStore, bool)
// AsTransporter optionally returns an implementation for moving data between two connectors.
// One of the input connections may be the Connection itself.
// Examples:
// a) myDuckDB.AsTransporter(myGCS, myDuckDB)
// b) myBeam.AsTransporter(myGCS, myS3) // In the future
AsTransporter(from Connection, to Connection) (Transporter, bool)
}
Connection represents a connection to an underlying DB. It should implement one or more of RegistryStore, CatalogStore, RepoStore, and OLAPStore.
type DatabaseSink ¶ added in v0.30.0
func (*DatabaseSink) BucketSink ¶ added in v0.30.0
func (d *DatabaseSink) BucketSink() (*BucketSink, bool)
func (*DatabaseSink) DatabaseSink ¶ added in v0.30.0
func (d *DatabaseSink) DatabaseSink() (*DatabaseSink, bool)
type DatabaseSource ¶ added in v0.30.0
type DatabaseSource struct {
// Pass only Query OR Table
Query string
Table string
Database string
Limit int
}
func (*DatabaseSource) BucketSource ¶ added in v0.30.0
func (d *DatabaseSource) BucketSource() (*BucketSource, bool)
func (*DatabaseSource) DatabaseSource ¶ added in v0.30.0
func (d *DatabaseSource) DatabaseSource() (*DatabaseSource, bool)
func (*DatabaseSource) FileSource ¶ added in v0.30.0
func (d *DatabaseSource) FileSource() (*FileSource, bool)
type DownloadMetrics ¶ added in v0.30.0
type Driver ¶
type Driver interface {
Spec() Spec
// Open opens a new connection to an underlying store.
Open(config map[string]any, logger *zap.Logger) (Connection, error)
// Drop tears down a store. Drivers that do not support it return ErrDropNotSupported.
Drop(config map[string]any, logger *zap.Logger) error
// HasAnonymousSourceAccess returns true if external system can be accessed without credentials
HasAnonymousSourceAccess(ctx context.Context, src Source, logger *zap.Logger) (bool, error)
}
Driver represents an underlying DB.
type FileIterator ¶ added in v0.30.0
type FileIterator interface {
// Close do cleanup and release resources
Close() error
// NextBatch returns a list of file downloaded from external sources
// and cleanups file created in previous batch
NextBatch(limit int) ([]string, error)
// HasNext can be utlisied to check if iterator has more elements left
HasNext() bool
// Size returns size of data downloaded in unit.
// Returns 0,false if not able to compute size in given unit
Size(unit ProgressUnit) (int64, bool)
}
FileIterator provides ways to iteratively download files from external sources Clients should call close once they are done with iterator to release any resources
type FileSource ¶ added in v0.30.0
func (*FileSource) BucketSource ¶ added in v0.30.0
func (f *FileSource) BucketSource() (*BucketSource, bool)
func (*FileSource) DatabaseSource ¶ added in v0.30.0
func (f *FileSource) DatabaseSource() (*DatabaseSource, bool)
func (*FileSource) FileSource ¶ added in v0.30.0
func (f *FileSource) FileSource() (*FileSource, bool)
type FileStore ¶ added in v0.30.0
type FileStore interface {
// FilePaths returns local absolute paths where files are stored
FilePaths(ctx context.Context, src *FileSource) ([]string, error)
}
type InformationSchema ¶
type InformationSchema interface {
All(ctx context.Context) ([]*Table, error)
Lookup(ctx context.Context, name string) (*Table, error)
}
InformationSchema contains information about existing tables in an OLAP driver.
type IngestionSummary ¶ added in v0.24.0
type IngestionSummary struct {
BytesIngested int64
}
IngestionSummary is details about ingestion
type Instance ¶
type Instance struct {
// Identifier
ID string
// Driver to connect to for OLAP (options: duckdb, druid)
OLAPDriver string
// DSN for connection to OLAP
OLAPDSN string
// Driver for reading/editing code artifacts (options: file, metastore)
RepoDriver string
// DSN for connecting to repo
RepoDSN string
// EmbedCatalog tells the runtime to store the instance's catalog in its OLAP store instead
// of in the runtime's metadata store. Currently only supported for the duckdb driver.
EmbedCatalog bool `db:"embed_catalog"`
// CreatedOn is when the instance was created
CreatedOn time.Time `db:"created_on"`
// UpdatedOn is when the instance was last updated in the registry
UpdatedOn time.Time `db:"updated_on"`
// Variables contains user-provided variables
Variables map[string]string `db:"variables"`
// ProjectVariables contains default variables from rill.yaml
// (NOTE: This can always be reproduced from rill.yaml, so it's really just a handy cache of the values.)
ProjectVariables map[string]string `db:"project_variables"`
// IngestionLimitBytes is total data allowed to ingest across all sources
// 0 means there is no limit
IngestionLimitBytes int64 `db:"ingestion_limit_bytes"`
}
Instance represents a single data project, meaning one OLAP connection, one repo connection, and one catalog connection.
func (*Instance) ResolveVariables ¶ added in v0.23.0
ResolveVariables returns the final resolved variables
type NoOpProgress ¶ added in v0.30.0
type NoOpProgress struct{}
func (NoOpProgress) Observe ¶ added in v0.30.0
func (n NoOpProgress) Observe(val int64, unit ProgressUnit)
func (NoOpProgress) Target ¶ added in v0.30.0
func (n NoOpProgress) Target(val int64, unit ProgressUnit)
type OLAPStore ¶
type OLAPStore interface {
Dialect() Dialect
WithConnection(ctx context.Context, priority int, fn WithConnectionFunc) error
Exec(ctx context.Context, stmt *Statement) error
Execute(ctx context.Context, stmt *Statement) (*Result, error)
InformationSchema() InformationSchema
EstimateSize() (int64, bool)
}
OLAPStore is implemented by drivers that are capable of storing, transforming and serving analytical queries.
type ObjectStore ¶ added in v0.30.0
type ObjectStore interface {
// DownloadFiles provides an iterator for downloading and consuming files
DownloadFiles(ctx context.Context, src *BucketSource) (FileIterator, error)
}
type ObjectType ¶ added in v0.16.0
type ObjectType int
Constants representing the kinds of catalog objects.
const ( ObjectTypeUnspecified ObjectType = 0 ObjectTypeTable ObjectType = 1 ObjectTypeSource ObjectType = 2 ObjectTypeModel ObjectType = 3 ObjectTypeMetricsView ObjectType = 4 )
type PermissionDeniedError ¶ added in v0.30.0
type PermissionDeniedError struct {
// contains filtered or unexported fields
}
func (*PermissionDeniedError) Error ¶ added in v0.30.0
func (e *PermissionDeniedError) Error() string
type Progress ¶ added in v0.30.0
type Progress interface {
Target(val int64, unit ProgressUnit)
// Observe is used by caller to provide incremental updates
Observe(val int64, unit ProgressUnit)
}
Progress is an interface for communicating progress info
type ProgressUnit ¶ added in v0.30.0
type ProgressUnit int
const ( ProgressUnitByte ProgressUnit = iota ProgressUnitFile ProgressUnitRecord )
type PropertySchema ¶ added in v0.30.0
type PropertySchema struct {
Key string
Type PropertySchemaType
Required bool
DisplayName string
Description string
Placeholder string
// Default can be different from placeholder in the sense that placeholder should not be used as default value.
// If a default is set then it should also be used as a placeholder.
Default string
Hint string
Href string
Secret bool
ValidateFunc func(any interface{}) error
TransformFunc func(any interface{}) interface{}
}
PropertySchema provides the schema for a property supported by a connector.
func (PropertySchema) ValidateType ¶ added in v0.30.0
func (ps PropertySchema) ValidateType(val any) bool
ValidateType checks that val has the correct type.
type PropertySchemaType ¶ added in v0.30.0
type PropertySchemaType int
PropertySchemaType is an enum of types supported for connector properties.
const ( UnspecifiedPropertyType PropertySchemaType = iota StringPropertyType NumberPropertyType BooleanPropertyType InformationalPropertyType )
type RegistryStore ¶
type RegistryStore interface {
FindInstances(ctx context.Context) ([]*Instance, error)
FindInstance(ctx context.Context, id string) (*Instance, error)
CreateInstance(ctx context.Context, instance *Instance) error
DeleteInstance(ctx context.Context, id string) error
EditInstance(ctx context.Context, instance *Instance) error
}
RegistryStore is implemented by drivers capable of storing and looking up instances and repos.
type RepoObjectStat ¶ added in v0.15.0
type RepoStore ¶
type RepoStore interface {
Driver() string
// Root returns directory where artifacts are stored.
Root() string
ListRecursive(ctx context.Context, instID string, glob string) ([]string, error)
Get(ctx context.Context, instID string, path string) (string, error)
Stat(ctx context.Context, instID string, path string) (*RepoObjectStat, error)
Put(ctx context.Context, instID string, path string, reader io.Reader) error
Rename(ctx context.Context, instID string, fromPath string, toPath string) error
Delete(ctx context.Context, instID string, path string) error
Sync(ctx context.Context, instID string) error
Watch(ctx context.Context, replay bool, callback WatchCallback) error
}
RepoStore is implemented by drivers capable of storing code artifacts. It mirrors a file system, but may be virtualized by a database for non-local deployments.
type Result ¶ added in v0.15.0
type Result struct {
*sqlx.Rows
Schema *runtimev1.StructType
// contains filtered or unexported fields
}
Result wraps the results of query.
func (*Result) Close ¶ added in v0.18.0
Close wraps rows.Close and calls the Result's cleanup function (if it is set). Close should be idempotent.
func (*Result) SetCleanupFunc ¶ added in v0.18.0
SetCleanupFunc sets a function, which will be called when the Result is closed.
type Sink ¶ added in v0.30.0
type Sink interface {
BucketSink() (*BucketSink, bool)
DatabaseSink() (*DatabaseSink, bool)
}
A Sink is expected to only return ok=true for one of the sink types. The caller will know which type based on the connector type.
type Source ¶ added in v0.30.0
type Source interface {
BucketSource() (*BucketSource, bool)
DatabaseSource() (*DatabaseSource, bool)
FileSource() (*FileSource, bool)
}
A Source is expected to only return ok=true for one of the source types. The caller will know which type based on the connector type.
type Spec ¶ added in v0.30.0
type Spec struct {
DisplayName string
Description string
ServiceAccountDocs string
SourceProperties []PropertySchema
ConfigProperties []PropertySchema
Help string
}
Spec provides metadata about a connector and the properties it supports.
type Statement ¶
type Statement struct {
Query string
Args []any
DryRun bool
Priority int
ExecutionTimeout time.Duration
}
Statement wraps a query to execute against an OLAP driver.
type Table ¶
type Table struct {
Database string
DatabaseSchema string
Name string
Schema *runtimev1.StructType
}
Table represents a table in an information schema.
type TransferOption ¶ added in v0.30.0
type TransferOption func(*TransferOpts)
func WithIteratorBatch ¶ added in v0.30.0
func WithIteratorBatch(b int) TransferOption
func WithLimitInBytes ¶ added in v0.30.0
func WithLimitInBytes(limit int64) TransferOption
type TransferOpts ¶ added in v0.30.0
func NewTransferOpts ¶ added in v0.30.0
func NewTransferOpts(opts ...TransferOption) *TransferOpts
type Transporter ¶ added in v0.30.0
type Transporter interface {
Transfer(ctx context.Context, source Source, sink Sink, t *TransferOpts, p Progress) error
}
Transporter implements logic for moving data between two connectors (the actual connector objects are provided in AsTransporter)
type WatchCallback ¶ added in v0.30.0
type WatchCallback func(event WatchEvent) error
type WatchEvent ¶ added in v0.30.0
type WithConnectionFunc ¶ added in v0.18.0
WithConnectionFunc is a callback function that provides a context to be used in further OLAP store calls to enforce affinity to a single connection. It's called with two contexts: wrappedCtx wraps the input context (including cancellation), and ensuredCtx wraps a background context (ensuring it can never be cancelled).