Documentation
¶
Index ¶
- type BaseExtractor
- func (e *BaseExtractor[Opts, F]) ExtractSQL(query string) ([]Feature, error)
- func (e *BaseExtractor[Opts, F]) Extracted(fs []F) ([]Feature, error)
- func (e *BaseExtractor[Opts, Feature]) Init(db db.DB, conf conf.FeatureExtractorConfig) error
- func (e *BaseExtractor[Opts, F]) MarkAsUpdated()
- func (e *BaseExtractor[Opts, F]) NeedsUpdate() bool
- func (e *BaseExtractor[Opts, F]) NextPossibleExecution() time.Time
- func (e *BaseExtractor[Opts, F]) NotifySkip()
- type Feature
- type FeatureExtractor
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseExtractor ¶
type BaseExtractor[Opts any, Feature db.Table] struct { // Options to pass via yaml to this step. conf.JsonOpts[Opts] // Database connection. DB db.DB RecencySeconds int UpdatedAt *time.Time }
Common base for all extractors that provides some functionality that would otherwise be duplicated across all extractors.
func (*BaseExtractor[Opts, F]) ExtractSQL ¶
func (e *BaseExtractor[Opts, F]) ExtractSQL(query string) ([]Feature, error)
Extract the features directly from an sql query.
func (*BaseExtractor[Opts, F]) Extracted ¶
func (e *BaseExtractor[Opts, F]) Extracted(fs []F) ([]Feature, error)
Replace all features of the given model in the database and return them as a slice of generic features for counting.
func (*BaseExtractor[Opts, Feature]) Init ¶
func (e *BaseExtractor[Opts, Feature]) Init(db db.DB, conf conf.FeatureExtractorConfig) error
Init the extractor with the database and options.
func (*BaseExtractor[Opts, F]) MarkAsUpdated ¶
func (e *BaseExtractor[Opts, F]) MarkAsUpdated()
Mark the extractor as updated by setting the UpdatedAt field to the current time.
func (*BaseExtractor[Opts, F]) NeedsUpdate ¶
func (e *BaseExtractor[Opts, F]) NeedsUpdate() bool
Checks if the last update of the extractor is older than the configured recency. If the recency is set to a positive value, it will return true if the last update is older than the configured recency in seconds.
func (*BaseExtractor[Opts, F]) NextPossibleExecution ¶
func (e *BaseExtractor[Opts, F]) NextPossibleExecution() time.Time
func (*BaseExtractor[Opts, F]) NotifySkip ¶
func (e *BaseExtractor[Opts, F]) NotifySkip()
type FeatureExtractor ¶
type FeatureExtractor interface {
// Configure the feature extractor with a database and options.
// This function should also create the needed database structures.
Init(db db.DB, conf conf.FeatureExtractorConfig) error
// Extract features from the given data.
Extract() ([]Feature, error)
// Get the name of this feature extractor.
// This name is used to identify the extractor in metrics, config, logs, etc.
// Should be something like: "my_cool_feature_extractor".
GetName() string
// Get message topics that trigger a re-execution of this extractor.
Triggers() []string
// Check if the extractors last update is older than the configured recency.
NeedsUpdate() bool
// Update the last update timestamp of the extractor.
MarkAsUpdated()
// Earliest time when this extractor can be executed again.
NextPossibleExecution() time.Time
// Skip the extractor if it is not needed.
NotifySkip()
}
Each feature extractor must conform to this interface.