drivers

package
v0.78.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 22, 2025 License: Apache-2.0 Imports: 23 Imported by: 0

README

Each driver in the package can implement one or more of following interfaces:

Following interfaces are system interfaces and is present for all instances. Depending on cloud/local these can be shared with all instances or private to an instance as well. Global connectors are passed as runtime configs. Instance specific connectors are set in instance while creating instance.

  • OLAPStore for storing data and running analytical queries
  • CatalogStore for storing sources, models and metrics views, including metadata like last refresh time
  • RepoStore for storing code artifacts (this is essentially a file system shim)
  • RegistryStore for tracking instances (DSNs for OLAPs and repos, instance variables etc)

Following interfaces are only available as source connectors. These are always instance specific connectors.

  • ObjectStore for downloading files from remote object stores like s3,gcs etc
  • SQLStore for runnning arbitrary SQL queries against DataWarehouses like bigquery. Caution: Not to be confused with postgres, duckdb etc.
  • FileStore stores path for local files.

Special interfaces. Also instance specific.

  • Transporter for transfering data from one infra to other.

Documentation

Index

Constants

View Source
const (
	// DefaultPageSize is the default page size used when pageSize is not defined
	DefaultPageSize = 100
)
View Source
const RepoListLimit = 2000

RepoListLimit is the maximum number of files that can be listed in a call to RepoStore.ListGlob. This limit is effectively a cap on the number of files in a project because `rill start` lists the project directory using a "**" glob.

Variables

View Source
var (
	// ErrUnsupportedConnector is returned from Ingest for unsupported connectors.
	ErrUnsupportedConnector = errors.New("drivers: connector not supported")
	// ErrOptimizationFailure is returned when an optimization fails.
	ErrOptimizationFailure = errors.New("drivers: optimization failure")

	DefaultQuerySchemaTimeout = 30 * time.Second
)
View Source
var Connectors = make(map[string]Driver)

Connectors tracks all registered connector drivers.

View Source
var Drivers = make(map[string]Driver)

Drivers is a registry of drivers.

View Source
var ErrInconsistentControllerVersion = errors.New("controller: inconsistent version")

ErrInconsistentControllerVersion is returned from Controller when an unexpected controller version is observed in the DB. An unexpected controller version will only be observed if multiple controllers are running simultanesouly (split brain).

View Source
var ErrNoRows = errors.New("no rows found for the query")
View Source
var ErrNotFound = errors.New("driver: not found")

ErrNotFound indicates the resource wasn't found.

View Source
var ErrNotImplemented = errors.New("driver: not implemented")

ErrNotImplemented indicates the driver doesn't support the requested operation.

View Source
var ErrNotNotifier = errors.New("driver: not a notifier")

ErrNotNotifier indicates the driver cannot be used as a Notifier.

View Source
var ErrRepoListLimitExceeded = fmt.Errorf("glob exceeded limit of %d matched files", RepoListLimit)

ErrRepoListLimitExceeded should be returned when RepoListLimit is exceeded.

View Source
var ErrResourceAlreadyExists = errors.New("controller: resource already exists")

ErrResourceAlreadyExists is returned from catalog functions when attempting to create a resource that already exists.

View Source
var ErrResourceNotFound = errors.New("controller: resource not found")

ErrResourceNotFound is returned from catalog functions when a referenced resource does not exist.

View Source
var ErrStorageLimitExceeded = fmt.Errorf("connectors: exceeds storage limit")

ErrStorageLimitExceeded indicates the driver's storage limit was exceeded.

Functions

func IsIgnored added in v0.44.0

func IsIgnored(path string, additionalIgnoredPaths []string) bool

IsIgnored returns true if the path (and any files in nested directories) should be ignored. It checks ignoredPaths as well as any additional paths specified.

func ListBucketsFromPathPrefixes added in v0.78.0

func ListBucketsFromPathPrefixes(pathPrefixes []string, pageSize uint32, pageToken string) ([]string, string, error)

ListBucketsFromPathPrefixes returns the list of buckets allowed by PathPrefixes, with deterministic sorting and pagination.

func Register

func Register(name string, driver Driver)

Register registers a new driver.

func RegisterAsConnector added in v0.30.0

func RegisterAsConnector(name string, driver Driver)

RegisterAsConnector tracks a connector driver.

Types

type AIMessage added in v0.78.0

type AIMessage struct {
	ID          string    `db:"id"`
	ParentID    string    `db:"parent_id"`
	SessionID   string    `db:"session_id"`
	CreatedOn   time.Time `db:"created_on"`
	UpdatedOn   time.Time `db:"updated_on"`
	Index       int       `db:"index"`
	Role        string    `db:"role"`
	Type        string    `db:"type"`
	Tool        string    `db:"tool"`
	ContentType string    `db:"content_type"`
	Content     string    `db:"content"`
}

AIMessage represents a message in an AISession.

type AIService added in v0.41.0

type AIService interface {
	Complete(ctx context.Context, opts *CompleteOptions) (*CompleteResult, error)
}

type AISession added in v0.78.0

type AISession struct {
	ID         string    `db:"id"`
	InstanceID string    `db:"instance_id"`
	OwnerID    string    `db:"owner_id"`
	Title      string    `db:"title"`
	UserAgent  string    `db:"user_agent"`
	CreatedOn  time.Time `db:"created_on"`
	UpdatedOn  time.Time `db:"updated_on"`
}

AISession represents a session of AI interaction, such as a chat or MCP connection.

type AdminService added in v0.37.0

type AdminService interface {
	GetReportMetadata(ctx context.Context, reportName, ownerID, webOpenMode string, emailRecipients []string, anonRecipients bool, executionTime time.Time) (*ReportMetadata, error)
	GetAlertMetadata(ctx context.Context, alertName, ownerID string, emailRecipients []string, anonRecipients bool, annotations map[string]string, queryForUserID, queryForUserEmail string) (*AlertMetadata, error)
	ProvisionConnector(ctx context.Context, name, driver string, args map[string]any) (map[string]any, error)
}

type AlertMetadata added in v0.41.0

type AlertMetadata struct {
	RecipientURLs      map[string]AlertURLs
	QueryForAttributes map[string]any
}

type AlertStatus added in v0.43.0

type AlertStatus struct {
	// TODO: Remove ToEmail, ToName once email notifier is created
	ToEmail         string
	ToName          string
	DisplayName     string
	ExecutionTime   time.Time
	Status          runtimev1.AssertionStatus
	IsRecover       bool
	FailRow         map[string]any
	ExecutionError  string
	OpenLink        string
	EditLink        string
	UnsubscribeLink string
}

type AlertURLs added in v0.78.0

type AlertURLs struct {
	OpenURL        string
	EditURL        string
	UnsubscribeURL string
}

type BlobListfn added in v0.78.0

type BlobListfn func(ctx context.Context, path, delimiter string, pageSize uint32, pageToken string) ([]ObjectStoreEntry, string, error)

type CatalogStore

type CatalogStore interface {
	NextControllerVersion(ctx context.Context) (int64, error)
	CheckControllerVersion(ctx context.Context, v int64) error

	FindResources(ctx context.Context) ([]Resource, error)
	CreateResource(ctx context.Context, v int64, r Resource) error
	UpdateResource(ctx context.Context, v int64, r Resource) error
	DeleteResource(ctx context.Context, v int64, k, n string) error
	DeleteResources(ctx context.Context) error

	FindModelPartitions(ctx context.Context, opts *FindModelPartitionsOptions) ([]ModelPartition, error)
	FindModelPartitionsByKeys(ctx context.Context, modelID string, keys []string) ([]ModelPartition, error)
	CheckModelPartitionsHaveErrors(ctx context.Context, modelID string) (bool, error)
	InsertModelPartition(ctx context.Context, modelID string, partition ModelPartition) error
	UpdateModelPartition(ctx context.Context, modelID string, partition ModelPartition) error
	UpdateModelPartitionsTriggered(ctx context.Context, modelID string, wherePartitionKeyIn []string, whereErrored bool) error
	UpdateModelPartitionsExecuted(ctx context.Context, modelID string, keys []string) error
	DeleteModelPartitions(ctx context.Context, modelID string) error

	FindInstanceHealth(ctx context.Context, instanceID string) (*InstanceHealth, error)
	UpsertInstanceHealth(ctx context.Context, h *InstanceHealth) error

	FindAISessions(ctx context.Context, ownerID string, userAgentPattern string) ([]*AISession, error)
	FindAISession(ctx context.Context, sessionID string) (*AISession, error)
	InsertAISession(ctx context.Context, s *AISession) error
	UpdateAISession(ctx context.Context, s *AISession) error
	FindAIMessages(ctx context.Context, sessionID string) ([]*AIMessage, error)
	InsertAIMessage(ctx context.Context, m *AIMessage) error
}

CatalogStore is implemented by drivers capable of storing catalog info for a specific instance. Implementations should treat resource kinds as case sensitive, but resource names as case insensitive.

type CompleteOptions added in v0.78.0

type CompleteOptions struct {
	Messages     []*aiv1.CompletionMessage
	Tools        []*aiv1.Tool
	OutputSchema *jsonschema.Schema
}

type CompleteResult added in v0.78.0

type CompleteResult struct {
	Message      *aiv1.CompletionMessage
	InputTokens  int
	OutputTokens int
}

type DatabaseSchemaInfo added in v0.78.0

type DatabaseSchemaInfo struct {
	Database       string
	DatabaseSchema string
}

SchemaInfo represents a schema in an information schema.

type Dialect added in v0.16.0

type Dialect int

Dialect enumerates OLAP query languages.

const (
	DialectUnspecified Dialect = iota
	DialectDuckDB
	DialectDruid
	DialectClickHouse
	DialectPinot

	// Below dialects are not fully supported dialects.
	DialectBigQuery
	DialectSnowflake
	DialectAthena
	DialectRedshift
	DialectMySQL
	DialectPostgres
)

func (Dialect) AnyValueExpression added in v0.78.0

func (d Dialect) AnyValueExpression(expr string) string

AnyValueExpression applies the ANY_VALUE aggregation function (or equivalent) to the given expression.

func (Dialect) CanPivot added in v0.47.0

func (d Dialect) CanPivot() bool

func (Dialect) ConvertToDateTruncSpecifier added in v0.43.0

func (d Dialect) ConvertToDateTruncSpecifier(grain runtimev1.TimeGrain) string

func (Dialect) DateDiff added in v0.49.0

func (d Dialect) DateDiff(grain runtimev1.TimeGrain, t1, t2 time.Time) (string, error)

func (Dialect) DateTruncExpr added in v0.47.0

func (d Dialect) DateTruncExpr(dim *runtimev1.MetricsViewSpec_Dimension, grain runtimev1.TimeGrain, tz string, firstDayOfWeek, firstMonthOfYear int) (string, error)

func (Dialect) DimensionSelect added in v0.44.0

func (d Dialect) DimensionSelect(db, dbSchema, table string, dim *runtimev1.MetricsViewSpec_Dimension) (dimSelect, unnestClause string, err error)

func (Dialect) DimensionSelectPair added in v0.46.0

func (d Dialect) DimensionSelectPair(db, dbSchema, table string, dim *runtimev1.MetricsViewSpec_Dimension) (expr, alias, unnestClause string, err error)

func (Dialect) EscapeIdentifier added in v0.42.0

func (d Dialect) EscapeIdentifier(ident string) string

EscapeIdentifier returns an escaped SQL identifier in the dialect.

func (Dialect) EscapeMember added in v0.78.0

func (d Dialect) EscapeMember(tbl, name string) string

EscapeMember returns an escaped member name with table alias and column name.

func (Dialect) EscapeStringValue added in v0.47.0

func (d Dialect) EscapeStringValue(s string) string

func (Dialect) EscapeTable added in v0.43.0

func (d Dialect) EscapeTable(db, schema, table string) string

EscapeTable returns an escaped table name with database, schema and table.

func (Dialect) GetNullExpr added in v0.52.8

func (d Dialect) GetNullExpr(typ runtimev1.Type_Code) (bool, string)

func (Dialect) GetRegexMatchFunction added in v0.78.0

func (d Dialect) GetRegexMatchFunction() string

func (Dialect) GetTimeDimensionParameter added in v0.78.0

func (d Dialect) GetTimeDimensionParameter() string

func (Dialect) GetTimeExpr added in v0.52.8

func (d Dialect) GetTimeExpr(t time.Time) (bool, string)

func (Dialect) GetValExpr added in v0.52.8

func (d Dialect) GetValExpr(val any, typ runtimev1.Type_Code) (bool, string, error)

func (Dialect) IntervalSubtract added in v0.57.2

func (d Dialect) IntervalSubtract(tsExpr, unitExpr string, grain runtimev1.TimeGrain) (string, error)

func (Dialect) JoinOnExpression added in v0.47.0

func (d Dialect) JoinOnExpression(lhs, rhs string) string

func (Dialect) LateralUnnest added in v0.46.0

func (d Dialect) LateralUnnest(expr, tableAlias, colName string) (tbl string, tupleStyle, auto bool, err error)

func (Dialect) LookupExpr added in v0.61.0

func (d Dialect) LookupExpr(lookupTable, lookupValueColumn, lookupKeyExpr, lookupDefaultExpression string) (string, error)

func (Dialect) LookupSelectExpr added in v0.61.0

func (d Dialect) LookupSelectExpr(lookupTable, lookupKeyColumn string) (string, error)

func (Dialect) MetricsViewDimensionExpression added in v0.44.0

func (d Dialect) MetricsViewDimensionExpression(dimension *runtimev1.MetricsViewSpec_Dimension) (string, error)

func (Dialect) OrderByExpression added in v0.47.0

func (d Dialect) OrderByExpression(name string, desc bool) string

func (Dialect) RequiresCastForLike added in v0.49.0

func (d Dialect) RequiresCastForLike() bool

RequiresCastForLike returns true if the dialect requires an expression used in a LIKE or ILIKE condition to explicitly be cast to type TEXT.

func (Dialect) SafeDivideExpression added in v0.47.0

func (d Dialect) SafeDivideExpression(numExpr, denExpr string) string

func (Dialect) SanitizeQueryForLogging added in v0.61.0

func (d Dialect) SanitizeQueryForLogging(sql string) string

func (Dialect) SelectInlineResults added in v0.52.8

func (d Dialect) SelectInlineResults(result *Result) (string, []any, []any, error)

SelectInlineResults returns a SQL query which inline results from the result set supplied along with the positional arguments and dimension values.

func (Dialect) SelectTimeRangeBins added in v0.58.0

func (d Dialect) SelectTimeRangeBins(start, end time.Time, grain runtimev1.TimeGrain, alias string, tz *time.Location) (string, []any, error)

func (Dialect) String added in v0.16.0

func (d Dialect) String() string

func (Dialect) SupportsILike added in v0.47.0

func (d Dialect) SupportsILike() bool

func (Dialect) SupportsRegexMatch added in v0.78.0

func (d Dialect) SupportsRegexMatch() bool

func (Dialect) UnnestSQLSuffix added in v0.78.0

func (d Dialect) UnnestSQLSuffix(tbl string) string

type DirEntry added in v0.43.0

type DirEntry struct {
	Path  string
	IsDir bool
}

DirEntry represents an entry in a directory listing.

type Driver

type Driver interface {
	// Spec returns metadata about the driver, such as which configuration properties it supports.
	Spec() Spec

	// Open opens a new handle.
	// If instanceID is empty, the connection is considered shared and its As...() functions may be invoked with different instance IDs.
	Open(instanceID string, config map[string]any, st *storage.Client, ac *activity.Client, logger *zap.Logger) (Handle, error)

	// HasAnonymousSourceAccess returns true if the driver can access the data identified by srcProps without any additional configuration.
	HasAnonymousSourceAccess(ctx context.Context, srcProps map[string]any, logger *zap.Logger) (bool, error)

	// TertiarySourceConnectors returns a list of drivers required to access the data identified by srcProps, excluding the driver itself.
	TertiarySourceConnectors(ctx context.Context, srcProps map[string]any, logger *zap.Logger) ([]string, error)
}

Driver represents an external service that Rill can connect to.

type FileFormat added in v0.47.0

type FileFormat string

FileFormat is a file format for importing or exporting data.

const (
	FileFormatUnspecified FileFormat = ""
	FileFormatParquet     FileFormat = "parquet"
	FileFormatCSV         FileFormat = "csv"
	FileFormatJSON        FileFormat = "json"
	FileFormatXLSX        FileFormat = "xlsx"
)

func (FileFormat) Filename added in v0.47.0

func (f FileFormat) Filename(stem string) string

func (FileFormat) Valid added in v0.47.0

func (f FileFormat) Valid() bool

type FileInfo added in v0.78.0

type FileInfo struct {
	LastUpdated time.Time
	IsDir       bool
}

FileInfo contains metadata about a file.

type FileIterator added in v0.30.0

type FileIterator interface {
	// Close do cleanup and release resources
	Close() error
	// Format returns general file format (json, csv, parquet, etc)
	// Returns an empty string if there is no general format
	Format() string
	// SetKeepFilesUntilClose configures the iterator to keep all files until Close() is called.
	SetKeepFilesUntilClose()
	// Next returns a list of file downloaded from external sources
	// and cleanups file created in previous batch
	Next(ctx context.Context) ([]string, error)
}

FileIterator provides ways to iteratively download files from external sources Clients should call close once they are done with iterator to release any resources

type FileStore added in v0.30.0

type FileStore interface {
	// FilePaths returns local absolute paths where files are stored
	FilePaths(ctx context.Context, src map[string]any) ([]string, error)
}

type FindModelPartitionsOptions added in v0.51.0

type FindModelPartitionsOptions struct {
	ModelID          string
	Limit            int
	WherePending     bool
	WhereErrored     bool
	BeforeExecutedOn time.Time
	AfterKey         string
}

FindModelPartitionsOptions is used to filter model partitions.

type Handle added in v0.32.0

type Handle interface {
	// Ping verifies a connection to an external service is healthy
	Ping(ctx context.Context) error

	// Driver name used to open the handle.
	Driver() string

	// Config used to open the handle.
	Config() map[string]any

	// Migrate prepares the handle for use. It will always be called before any of the As...() functions.
	Migrate(ctx context.Context) error

	// MigrationStatus returns the handle's current and desired migration version (if applicable).
	MigrationStatus(ctx context.Context) (current int, desired int, err error)

	// Close closes the handle.
	Close() error

	// AsRegistry returns a RegistryStore if the handle can serve as such, otherwise returns false.
	// A registry is responsible for tracking runtime metadata, namely instances and their configuration.
	AsRegistry() (RegistryStore, bool)

	// AsCatalogStore returns a CatalogStore if the handle can serve as such, otherwise returns false.
	// A catalog stores the state of an instance's resources (such as sources, models, metrics views, alerts, etc.)
	AsCatalogStore(instanceID string) (CatalogStore, bool)

	// AsRepoStore returns a RepoStore if the handle can serve as such, otherwise returns false.
	// A repo stores an instance's file artifacts (mostly YAML and SQL files).
	AsRepoStore(instanceID string) (RepoStore, bool)

	// AsAdmin returns an AdminService if the handle can serve as such, otherwise returns false.
	// An admin service enables the runtime to interact with the control plane that deployed it.
	AsAdmin(instanceID string) (AdminService, bool)

	// AsAI returns an AIService if the driver can serve as such, otherwise returns false.
	// An AI service enables an instance to request prompt-based text inference.
	AsAI(instanceID string) (AIService, bool)

	// AsOLAP returns an OLAPStore if the driver can serve as such, otherwise returns false.
	// An OLAP store is used to serve interactive, low-latency, analytical queries.
	// NOTE: We should consider merging the OLAPStore and SQLStore interfaces.
	AsOLAP(instanceID string) (OLAPStore, bool)

	// AsInformationSchema returns a InformationSchema if the handle can serve as such, otherwise returns false.
	// InformationSchema provides metadata about existing tables in a driver.
	AsInformationSchema() (InformationSchema, bool)

	// AsObjectStore returns an ObjectStore if the driver can serve as such, otherwise returns false.
	// An object store can store, list and retrieve files on a remote server.
	AsObjectStore() (ObjectStore, bool)

	// AsFileStore returns a FileStore if the driver can serve as such, otherwise returns false.
	// A file store can store, list and retrieve local files.
	// NOTE: The file store can probably be merged with the ObjectStore interface.
	AsFileStore() (FileStore, bool)

	// AsWarehouse returns a Warehouse if the driver can serve as such, otherwise returns false.
	// A Warehouse represents a service that can execute SQL statements on cloud warehouses and return the result rows typically as files.
	AsWarehouse() (Warehouse, bool)

	// AsModelExecutor returns a ModelExecutor capable of building a model.
	// Since models may move data between connectors, the model executor can be seem as a "meta driver" that uses handles on other connectors.
	// The provided options provides both an input connector and an output connector. One or both of these will be the receiver itself.
	// It should return false if the handle is not capable of executing a model between the provided input and output connectors.
	AsModelExecutor(instanceID string, opts *ModelExecutorOptions) (ModelExecutor, error)

	// AsModelManager returns a ModelManager for managing model results produced by a ModelExecutor.
	// This is different from the ModelExecutor since sometimes, the model's input connector executes and writes the model result to the output connector.
	// But managing the result lifecycle is easier to do directly using the output connector.
	AsModelManager(instanceID string) (ModelManager, bool)

	// AsNotifier returns a Notifier (if the driver can serve as such) to send notifications: alerts, reports, etc.
	// Examples: email notifier, slack notifier.
	AsNotifier(properties map[string]any) (Notifier, error)
}

Handle represents a connection to an external service, such as a database, object store, etc. It should implement one or more of the As...() functions.

func Open

func Open(driver, instanceID string, config map[string]any, st *storage.Client, ac *activity.Client, logger *zap.Logger) (Handle, error)

Open opens a new connection. If instanceID is empty, the connection is considered shared and its As...() functions may be invoked with different instance IDs. If instanceID is not empty, the connection is considered instance-specific and its As...() functions will only be invoked with the same instance ID.

type IncrementalStrategy added in v0.45.0

type IncrementalStrategy string

IncrementalStrategy is a strategy to use for incrementally inserting data into a SQL table.

const (
	IncrementalStrategyUnspecified        IncrementalStrategy = ""
	IncrementalStrategyAppend             IncrementalStrategy = "append"
	IncrementalStrategyMerge              IncrementalStrategy = "merge"
	IncrementalStrategyPartitionOverwrite IncrementalStrategy = "partition_overwrite"
)

type InformationSchema

type InformationSchema interface {
	// ListDatabaseSchemas returns all schemas across databases
	ListDatabaseSchemas(ctx context.Context, pageSize uint32, pageToken string) ([]*DatabaseSchemaInfo, string, error)
	// ListTables returns all tables in a schema.
	ListTables(ctx context.Context, database, databaseSchema string, pageSize uint32, pageToken string) ([]*TableInfo, string, error)
	// GetTable returns metadata about a specific table.
	GetTable(ctx context.Context, database, databaseSchema, table string) (*TableMetadata, error)
}

type Instance

type Instance struct {
	// Identifier
	ID string
	// Environment is the environment that the instance represents
	Environment string
	// ProjectDisplayName is the display name from rill.yaml
	ProjectDisplayName string `db:"project_display_name"`
	// Driver name to connect to for OLAP
	OLAPConnector string
	// ProjectOLAPConnector is an override of OLAPConnector that may be set in rill.yaml.
	// NOTE: Hopefully we can merge this with OLAPConnector if/when we remove the ability to set OLAPConnector using flags.
	ProjectOLAPConnector string
	// Driver name for reading/editing code artifacts
	RepoConnector string
	// Driver name for the admin service managing the deployment (optional)
	AdminConnector string
	// Driver name for the AI service (optional)
	AIConnector string
	// ProjectAIConnector is an override of AIConnector that may be set in rill.yaml.
	ProjectAIConnector string
	// Driver name for catalog
	CatalogConnector string
	// CreatedOn is when the instance was created
	CreatedOn time.Time `db:"created_on"`
	// UpdatedOn is when the instance was last updated in the registry
	UpdatedOn time.Time `db:"updated_on"`
	// Instance specific connectors
	Connectors []*runtimev1.Connector `db:"connectors"`
	// ProjectConnectors contains default connectors from rill.yaml
	ProjectConnectors []*runtimev1.Connector `db:"project_connectors"`
	// Variables contains user-provided variables
	Variables map[string]string `db:"variables"`
	// ProjectVariables contains default variables from rill.yaml
	// (NOTE: This can always be reproduced from rill.yaml, so it's really just a handy cache of the values.)
	ProjectVariables map[string]string `db:"project_variables"`
	// FeatureFlags contains feature flags configured in rill.yaml
	FeatureFlags map[string]string `db:"feature_flags"`
	// Annotations to enrich activity events (like usage tracking)
	Annotations map[string]string
	// Paths to expose over HTTP (defaults to ./public)
	PublicPaths []string `db:"public_paths"`
	// IgnoreInitialInvalidProjectError indicates whether to ignore an invalid project error when the instance is initially created.
	IgnoreInitialInvalidProjectError bool `db:"-"`
	// AIInstructions is extra context for LLM/AI features. Used to guide natural language question answering and routing.
	AIInstructions string `db:"ai_instructions"`
	// FrontendURL is the URL of the web interface.
	FrontendURL string `db:"frontend_url"`
}

Instance represents a single data project, meaning one OLAP connection, one repo connection, and one catalog connection.

func (*Instance) Config added in v0.43.0

func (i *Instance) Config() (InstanceConfig, error)

Config resolves the current dynamic config properties for the instance. See InstanceConfig for details.

func (*Instance) ResolveAIConnector added in v0.78.0

func (i *Instance) ResolveAIConnector() string

func (*Instance) ResolveConnectors added in v0.78.0

func (i *Instance) ResolveConnectors() []*runtimev1.Connector

func (*Instance) ResolveOLAPConnector added in v0.42.0

func (i *Instance) ResolveOLAPConnector() string

ResolveOLAPConnector resolves the OLAP connector to default to for the instance.

func (*Instance) ResolveVariables added in v0.23.0

func (i *Instance) ResolveVariables(withLowerKeys bool) map[string]string

ResolveVariables returns the final resolved variables

type InstanceConfig added in v0.43.0

type InstanceConfig struct {
	// DownloadLimitBytes is the limit on size of exported file. If set to 0, there is no limit.
	DownloadLimitBytes int64 `mapstructure:"rill.download_limit_bytes"`
	// InteractiveSQLRowLimit is the row limit for interactive SQL queries. It does not apply to exports of SQL queries. If set to 0, there is no limit.
	InteractiveSQLRowLimit int64 `mapstructure:"rill.interactive_sql_row_limit"`
	// StageChanges indicates whether to keep previously ingested tables for sources/models, and only override them if ingestion of a new table is successful.
	StageChanges bool `mapstructure:"rill.stage_changes"`
	// WatchRepo configures the project parser to setup a file watcher to instantly detect and parse changes to the project files.
	WatchRepo bool `mapstructure:"rill.watch_repo"`
	// ModelDefaultMaterialize indicates whether to materialize models by default.
	ModelDefaultMaterialize bool `mapstructure:"rill.models.default_materialize"`
	// ModelMaterializeDelaySeconds adds a delay before materializing models.
	ModelMaterializeDelaySeconds uint32 `mapstructure:"rill.models.materialize_delay_seconds"`
	// ModelConcurrentExecutionLimit sets the maximum number of concurrent model executions.
	ModelConcurrentExecutionLimit uint32 `mapstructure:"rill.models.concurrent_execution_limit"`
	// MetricsComparisonsExact indicates whether to rewrite metrics comparison queries to approximately correct queries.
	// Approximated comparison queries are faster but may not return comparison data points for all values.
	MetricsApproximateComparisons bool `mapstructure:"rill.metrics.approximate_comparisons"`
	// MetricsApproximateComparisonsCTE indicates whether to rewrite metrics comparison queries to use a CTE for base query.
	MetricsApproximateComparisonsCTE bool `mapstructure:"rill.metrics.approximate_comparisons_cte"`
	// MetricsApproxComparisonTwoPhaseLimit if query limit is less than this then rewrite metrics comparison queries to use a two-phase comparison approach where first query is used to get the base values and the second query is used to get the comparison values.
	MetricsApproxComparisonTwoPhaseLimit int64 `mapstructure:"rill.metrics.approximate_comparisons_two_phase_limit"`
	// MetricsExactifyDruidTopN indicates whether to split Druid TopN queries into two queries to increase the accuracy of the returned measures.
	// Enabling it reduces the performance of Druid toplist queries.
	// See runtime/metricsview/executor_rewrite_druid_exactify.go for more details.
	MetricsExactifyDruidTopN bool `mapstructure:"rill.metrics.exactify_druid_topn"`
	// MetricsNullFillingImplementation switches between null-filling implementations for timeseries queries.
	// Can be "", "none", "new", "pushdown".
	MetricsNullFillingImplementation string `mapstructure:"rill.metrics.timeseries_null_filling_implementation"`
	// AlertsDefaultStreamingRefreshCron sets a default cron expression for refreshing alerts with streaming refs.
	// Namely, this is used to check alerts against external tables (e.g. in Druid) where new data may be added at any time (i.e. is considered "streaming").
	AlertsDefaultStreamingRefreshCron string `mapstructure:"rill.alerts.default_streaming_refresh_cron"`
	// AlertsFastStreamingRefreshCron is similar to AlertsDefaultStreamingRefreshCron but is used for alerts that are based on always-on OLAP connectors (i.e. that have MayScaleToZero == false).
	AlertsFastStreamingRefreshCron string `mapstructure:"rill.alerts.fast_streaming_refresh_cron"`
}

InstanceConfig contains dynamic configuration for an instance. It is configured by parsing instance variables prefixed with "rill.". For example, a variable "rill.stage_changes=true" would set the StageChanges field to true. InstanceConfig should only be used for config that the user is allowed to change dynamically at runtime.

type InstanceHealth added in v0.51.0

type InstanceHealth struct {
	InstanceID string    `db:"instance_id"`
	HealthJSON []byte    `db:"health_json"`
	UpdatedOn  time.Time `db:"updated_on"`
}

InstanceHealth represents the health of an instance.

type ModelEnv added in v0.45.0

type ModelEnv struct {
	AllowHostAccess    bool
	RepoRoot           string
	StageChanges       bool
	DefaultMaterialize bool
	Connectors         []*runtimev1.Connector
	AcquireConnector   func(ctx context.Context, name string) (Handle, func(), error)
}

ModelEnv contains contextual info about the model's instance.

type ModelExecuteOptions added in v0.48.0

type ModelExecuteOptions struct {
	*ModelExecutorOptions
	// InputProperties are the resolved properties of the model's input connector.
	InputProperties map[string]any
	// OutputProperties are the resolved properties of the model's output connector.
	OutputProperties map[string]any
	// Priority is the priority of the model execution.
	Priority int
	// Incremental is true if the model is an incremental model.
	Incremental bool
	// IncrementalRun is true if the execution is an incremental run.
	IncrementalRun bool
	// PartitionRun is true if the execution is a partition run.
	PartitionRun bool
	// PartitionKey is the unique key for the partition currently being run.
	// It is empty when PartitionRun is false.
	PartitionKey string
	// TempDir is a temporary directory for storing intermediate data.
	TempDir string
}

ModelExecuteOptions are options passed to a model executor's Execute function. They embed the ModelExecutorOptions that were used to initialize the ModelExecutor, plus additional options for the current execution step.

type ModelExecutor added in v0.45.0

type ModelExecutor interface {
	// Execute runs the model. The execution may be a full, incremental, or partition run.
	// For partition runs, Execute may be called concurrently by multiple workers.
	Execute(ctx context.Context, opts *ModelExecuteOptions) (*ModelResult, error)

	// Concurrency returns the number of concurrent calls that may be made to Execute given a user-provided desired concurrency.
	// If the desired concurrency is 0, it should return the recommended default concurrency.
	// If the desired concurrency is too high, it should return false.
	Concurrency(desired int) (int, bool)
}

ModelExecutor executes models. A ModelExecutor may either be the a model's input or output connector.

type ModelExecutorOptions added in v0.45.0

type ModelExecutorOptions struct {
	// Env contains contextual info about the model's instance.
	Env *ModelEnv
	// ModelName is the name of the model.
	ModelName string
	// InputHandle is the handle of the model's input connector.
	InputHandle Handle
	// InputConnector is the name of the model's input connector.
	InputConnector string
	// PreliminaryInputProperties are the preliminary properties of the model's input connector.
	// It may not always be available and may contain templating variables that have not yet been resolved.
	PreliminaryInputProperties map[string]any
	// OutputHandle is the handle of the model's output connector.
	OutputHandle Handle
	// OutputConnector is the name of the model's output connector.
	OutputConnector string
	// PreliminaryOutputProperties are the preliminary properties of the model's output connector.
	// It may not always be available and may contain templating variables that have not yet been resolved.
	PreliminaryOutputProperties map[string]any
}

ModelExecutorOptions are options passed when acquiring a ModelExecutor.

type ModelManager added in v0.45.0

type ModelManager interface {
	// Rename is called when a model is renamed, giving the ModelManager a chance to update state derived from the model's name (such as a table name).
	Rename(ctx context.Context, res *ModelResult, newName string, env *ModelEnv) (*ModelResult, error)

	// Exists returns whether the result still exists in the connector (for integrity checks).
	Exists(ctx context.Context, res *ModelResult) (bool, error)

	// Delete removes the result from the connector.
	Delete(ctx context.Context, res *ModelResult) error

	// MergePartitionResults merges two results produced by concurrent incremental partition runs.
	MergePartitionResults(a, b *ModelResult) (*ModelResult, error)
}

ModelManager manages model results returned by ModelExecutor. Unlike ModelExecutor, the result connector will always be used as the ModelManager.

type ModelPartition added in v0.51.0

type ModelPartition struct {
	// Key is a unique identifier for the partition. It should be a hash of DataJSON.
	Key string
	// DataJSON is the serialized parameters of the partition.
	DataJSON []byte
	// Index is used to order the execution of partitions.
	// Since it's just a guide and execution order usually is not critical,
	// it's okay if it's not unique or not always correct (e.g. for incrementally computed partitions).
	Index int
	// Watermark represents the time when the underlying data that the partition references was last updated.
	// If a partition's watermark advances, we automatically schedule it for re-execution.
	Watermark *time.Time
	// ExecutedOn is the time when the partition was last executed. If it is nil, the partition is considered pending.
	ExecutedOn *time.Time
	// Error is the last error that occurred when executing the partition.
	Error string
	// Elapsed is the duration of the last execution of the partition.
	Elapsed time.Duration
}

ModelPartition represents a single executable unit of a model. Partitions are an advanced feature that enables splitting and parallelizing execution of a model.

type ModelResult added in v0.45.0

type ModelResult struct {
	Connector    string
	Properties   map[string]any
	Table        string
	ExecDuration time.Duration
}

ModelResult contains metadata about the result of a model execution.

type Notifier added in v0.43.0

type Notifier interface {
	SendAlertStatus(s *AlertStatus) error
	SendScheduledReport(s *ScheduledReport) error
}

Notifier sends notifications.

type OLAPInformationSchema added in v0.78.0

type OLAPInformationSchema interface {
	// All returns metadata about all tables and views.
	// The like argument can optionally be passed to filter the tables by name.
	All(ctx context.Context, like string, pageSize uint32, pageToken string) ([]*OlapTable, string, error)
	// Lookup returns metadata about a specific tables and views.
	Lookup(ctx context.Context, db, schema, name string) (*OlapTable, error)
	// LoadPhysicalSize populates the PhysicalSizeBytes field of table metadata.
	// It should be called after All or Lookup and not on manually created tables.
	LoadPhysicalSize(ctx context.Context, tables []*OlapTable) error
}

OLAPInformationSchema contains information about existing tables in an OLAP driver. Table lookups should be case insensitive.

type OLAPStore

type OLAPStore interface {
	// Dialect is the SQL dialect that the driver uses.
	Dialect() Dialect
	// MayBeScaledToZero returns true if the driver might currently be scaled to zero.
	MayBeScaledToZero(ctx context.Context) bool
	// WithConnection acquires a connection from the pool and keeps it open until the callback returns.
	WithConnection(ctx context.Context, priority int, fn WithConnectionFunc) error
	// Exec executes a query against the OLAP driver.
	Exec(ctx context.Context, stmt *Statement) error
	// Query executes a query against the OLAP driver and returns an iterator for the resulting rows and schema.
	// The result MUST be closed after use.
	Query(ctx context.Context, stmt *Statement) (*Result, error)
	// QuerySchema returns the schema of the sql without trying not to run the actual query.
	QuerySchema(ctx context.Context, query string, args []any) (*runtimev1.StructType, error)
	// InformationSchema enables introspecting the tables and views available in the OLAP driver.
	InformationSchema() OLAPInformationSchema
}

OLAPStore is implemented by drivers that are capable of storing, transforming and serving analytical queries.

type ObjectStore added in v0.30.0

type ObjectStore interface {
	// ListBuckets lists the available buckets. pageSize limits the maximum results
	// returned in one call, and pageToken is non-empty when more results are available.
	// It returns the bucket names, the next page token, and any error.
	ListBuckets(ctx context.Context, pageSize uint32, pageToken string) ([]string, string, error)
	// ListObjects lists objects and directory-like prefixes under the given bucket and path,
	// using the provided delimiter (defaults to "/"). This is a non-recursive listing.
	// pageSize limits results, and pageToken is non-empty when more results are available.
	// It returns the entries, the next page token, and any error.
	ListObjects(ctx context.Context, bucket, path, delimiter string, pageSize uint32, pageToken string) ([]ObjectStoreEntry, string, error)
	// ListObjectsForGlob returns all objects in the given bucket whose paths match
	// the specified glob pattern. The pattern supports doublestar syntax, including
	// recursive patterns like "**". It returns the matching entries and any error.
	ListObjectsForGlob(ctx context.Context, bucket, glob string) ([]ObjectStoreEntry, error)
	// DownloadFiles provides an iterator for downloading and consuming files.
	// It resolves globs similar to ListObjects.
	DownloadFiles(ctx context.Context, path string) (FileIterator, error)
}

ObjectStore is an interface for object storage systems.

type ObjectStoreEntry added in v0.48.0

type ObjectStoreEntry struct {
	Path      string
	IsDir     bool
	Size      int64
	UpdatedOn time.Time
}

ObjectStoreEntry represents a file listing in an object store.

func ListObjects added in v0.78.0

func ListObjects(ctx context.Context, pathPrefixes []string, blobListfn BlobListfn, bucket, path, delimiter string, pageSize uint32, pageToken string) ([]ObjectStoreEntry, string, error)

ListObjects restricts listing to allowed path prefixes. If the requested path is within an allowed prefix, a normal blob listing is performed. If the path is a parent of allowed prefixes, a synthetic directory listing is returned. Otherwise, access is denied.

type ObjectStoreModelInputProperties added in v0.64.1

type ObjectStoreModelInputProperties struct {
	Path   string         `mapstructure:"path"`
	URI    string         `mapstructure:"uri"` // Deprecated: use `path` instead
	Format FileFormat     `mapstructure:"format"`
	DuckDB map[string]any `mapstructure:"duckdb"` // Deprecated: use DuckDB directly
}

ObjectStoreModelInputProperties contain common input properties for object store models.

func (*ObjectStoreModelInputProperties) Decode added in v0.64.1

func (p *ObjectStoreModelInputProperties) Decode(props map[string]any) error

type ObjectStoreModelOutputProperties added in v0.48.0

type ObjectStoreModelOutputProperties struct {
	Path   string     `mapstructure:"path"`
	Format FileFormat `mapstructure:"format"`
}

ObjectStoreModelOutputProperties contain common output properties for object store models.

type ObjectStoreModelResultProperties added in v0.48.0

type ObjectStoreModelResultProperties struct {
	Path   string `mapstructure:"path"`
	Format string `mapstructure:"format"`
}

ObjectStoreModelResultProperties contain common result properties for object store models.

type ObjectType added in v0.16.0

type ObjectType int

Constants representing the kinds of catalog objects.

const (
	ObjectTypeUnspecified ObjectType = 0
	ObjectTypeTable       ObjectType = 1
	ObjectTypeSource      ObjectType = 2
	ObjectTypeModel       ObjectType = 3
	ObjectTypeMetricsView ObjectType = 4
)

type OlapTable added in v0.78.0

type OlapTable struct {
	Database                string
	DatabaseSchema          string
	IsDefaultDatabase       bool
	IsDefaultDatabaseSchema bool
	Name                    string
	View                    bool
	// Schema is the table schema. It is only set when only single table is looked up. It is not set when listing all tables.
	Schema            *runtimev1.StructType
	UnsupportedCols   map[string]string
	PhysicalSizeBytes int64
}

OlapTable represents a table in an information schema.

func AllFromInformationSchema added in v0.78.0

func AllFromInformationSchema(ctx context.Context, like string, pageSize uint32, pageToken string, i InformationSchema) ([]*OlapTable, string, error)

AllFromInformationSchema is a helper function that drivers implementing InformationSchema can use to implement Olap.All() This is a short term solution. Longer term we should merge OLAPInformationSchema and InformationSchema interfaces.

type PropertySpec added in v0.43.0

type PropertySpec struct {
	Key         string
	Type        PropertyType
	Required    bool
	DisplayName string
	Description string
	DocsURL     string
	Hint        string
	Default     string
	Placeholder string
	Secret      bool
	NoPrompt    bool
}

PropertySpec provides metadata about a single connector property.

type PropertyType added in v0.43.0

type PropertyType int

PropertyType is an enum of types supported for connector properties.

const (
	UnspecifiedPropertyType PropertyType = iota
	NumberPropertyType
	BooleanPropertyType
	StringPropertyType
	FilePropertyType
	InformationalPropertyType
)

type RegistryStore

type RegistryStore interface {
	FindInstances(ctx context.Context) ([]*Instance, error)
	FindInstance(ctx context.Context, id string) (*Instance, error)
	CreateInstance(ctx context.Context, instance *Instance) error
	DeleteInstance(ctx context.Context, id string) error
	EditInstance(ctx context.Context, instance *Instance) error
}

RegistryStore is implemented by drivers capable of storing and looking up instances and repos.

type RepoStore

type RepoStore interface {
	// Root returns an absolute path to a physical directory where files are stored.
	// It's provided as an escape hatch, but should not generally be used as it does not have consistency guarantees.
	Root(ctx context.Context) (string, error)
	// ListGlob lists all files in the repo matching the glob pattern.
	ListGlob(ctx context.Context, glob string, skipDirs bool) ([]DirEntry, error)
	// Get retrieves the content of a file at the specified path.
	Get(ctx context.Context, path string) (string, error)
	// Hash returns a unique hash of the contents of the files at the specified paths.
	// If a file does not exist, it is skipped (does not return an error).
	Hash(ctx context.Context, paths []string) (string, error)
	// Stat returns metadata about a file.
	Stat(ctx context.Context, path string) (*FileInfo, error)
	// Put creates or overwrites a file.
	Put(ctx context.Context, path string, reader io.Reader) error
	// MkdirAll creates a directory and any required parent directories.
	MkdirAll(ctx context.Context, path string) error
	// Rename moves a file from one path to another.
	Rename(ctx context.Context, fromPath string, toPath string) error
	// Delete removes a file or directory.
	// If force is true, it will delete non-empty directories.
	Delete(ctx context.Context, path string, force bool) error
	// Watch sets up a watcher for changes in the repo.
	// The callback will be called with events for changes in the repo.
	// The function does not return until the context is cancelled or an error occurs.
	Watch(ctx context.Context, cb WatchCallback) error

	// Pull synchronizes local and remote state.
	// If discardChanges is true, it will discard any local changes made using Put/Rename/etc. and force synchronize to the remote state.
	// If forceHandshake is true, it will re-verify any cached config. Specifically, this should be used when external config changes, such as the Git branch or file archive ID.
	Pull(ctx context.Context, discardChanges, forceHandshake bool) error
	// CommitAndPush commits local changes to the remote repository and pushes them.
	CommitAndPush(ctx context.Context, message string, force bool) error
	// CommitHash returns a unique ID for the state of the remote files currently served (does not change on uncommitted local changes).
	CommitHash(ctx context.Context) (string, error)
	// CommitTimestamp returns the update timestamp for the current remote files (does not change on uncommitted local changes).
	CommitTimestamp(ctx context.Context) (time.Time, error)
}

RepoStore is implemented by drivers capable of storing project code files. All paths start with '/' and are relative to the repo root.

type ReportMetadata added in v0.37.0

type ReportMetadata struct {
	RecipientURLs map[string]ReportURLs
}

type ReportURLs added in v0.51.0

type ReportURLs struct {
	OpenURL        string
	ExportURL      string
	EditURL        string
	UnsubscribeURL string
}

type Resource added in v0.33.0

type Resource struct {
	Kind string
	Name string
	Data []byte
}

Resource is an entry in a catalog store

type Result added in v0.15.0

type Result struct {
	Rows
	Schema *runtimev1.StructType
	// contains filtered or unexported fields
}

Result is the result of a query. It wraps a Rows iterator with additional functionality.

func (*Result) Close added in v0.18.0

func (r *Result) Close() error

Close wraps rows.Close and calls the Result's cleanup function (if it is set). Close should be idempotent.

func (*Result) Err added in v0.47.0

func (r *Result) Err() error

Err returns the error of the underlying rows.

func (*Result) Next added in v0.47.0

func (r *Result) Next() bool

Next wraps rows.Next and enforces the cap set by SetCap.

func (*Result) SetCap added in v0.47.0

func (r *Result) SetCap(n int64)

SetCap caps the number of rows to return. If the number is exceeded, an error is returned.

func (*Result) SetCleanupFunc added in v0.18.0

func (r *Result) SetCleanupFunc(fn func() error)

SetCleanupFunc sets a function, which will be called when the Result is closed.

type Rows added in v0.78.0

type Rows interface {
	Next() bool
	Err() error
	Close() error
	Scan(dest ...any) error
	MapScan(dest map[string]any) error
}

Rows is an iterator for rows returned by a query. It mimics the behavior of sqlx.Rows.

type ScheduledReport added in v0.43.0

type ScheduledReport struct {
	DisplayName     string
	ReportTime      time.Time
	DownloadFormat  string
	OpenLink        string
	DownloadLink    string
	UnsubscribeLink string
}

type Spec added in v0.30.0

type Spec struct {
	DisplayName           string
	Description           string
	DocsURL               string
	ConfigProperties      []*PropertySpec
	SourceProperties      []*PropertySpec
	ImplementsRegistry    bool
	ImplementsCatalog     bool
	ImplementsRepo        bool
	ImplementsAdmin       bool
	ImplementsAI          bool
	ImplementsSQLStore    bool
	ImplementsOLAP        bool
	ImplementsObjectStore bool
	ImplementsFileStore   bool
	ImplementsNotifier    bool
	ImplementsWarehouse   bool
}

Spec provides metadata about a connector and the properties it supports.

type Statement

type Statement struct {
	// Query is the SQL query to execute.
	Query string
	// Args are positional arguments to bind to the query.
	Args []any
	// DryRun indicates if the query should be parsed and validated, but not actually executed.
	DryRun bool
	// Priority provides a query priority if the driver supports it (a higher value indicates a higher priority).
	Priority int
	// UseCache explicitly enables/disables reading from database-level caches (if supported by the driver).
	// If not set, the driver will use its default behavior.
	UseCache *bool
	// PopulateCache explicitly enables/disables writing to database-level caches (if supported by the driver).
	// If not set, the driver will use its default behavior.
	PopulateCache *bool
	// ExecutionTimeout provides a timeout for query execution.
	// Unlike a timeout on ctx, it will be enforced only for query execution, not for time spent waiting in queues.
	// It may not be supported by all drivers.
	ExecutionTimeout time.Duration
	// QueryAttributes provides additional attributes for the query (if supported by the driver).
	// These can be used to customize the behavior of the query "{{ .user.partnerId }}"
	QueryAttributes map[string]string
}

Statement wraps a query to execute against an OLAP driver.

type TableInfo added in v0.78.0

type TableInfo struct {
	Name string
	View bool
}

TableInfo represents a table in an information schema.

type TableMetadata added in v0.78.0

type TableMetadata struct {
	View   bool // TODO: populate for other drivers
	Schema map[string]string
}

type Warehouse added in v0.48.0

type Warehouse interface {
	// QueryAsFiles downloads results into files and returns an iterator to iterate over them
	QueryAsFiles(ctx context.Context, props map[string]any) (FileIterator, error)
}

type WatchCallback added in v0.30.0

type WatchCallback func(event []WatchEvent)

WatchCallback is a function that will be called with file events.

type WatchEvent added in v0.30.0

type WatchEvent struct {
	Type runtimev1.FileEvent
	Path string
	Dir  bool
}

WatchEvent represents a file event.

type WithConnectionFunc added in v0.18.0

type WithConnectionFunc func(wrappedCtx context.Context, ensuredCtx context.Context) error

WithConnectionFunc is a callback function that provides a context to be used in further OLAP store calls to enforce affinity to a single connection. It also provides pointers to the actual database/sql and database/sql/driver connections. It's called with two contexts: wrappedCtx wraps the input context (including cancellation), and ensuredCtx wraps a background context (ensuring it can never be cancelled).

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL