Documentation
      ¶
    
    
  
    
  
    Index ¶
- type BaseExtractor
 - func (e *BaseExtractor[Opts, F]) ExtractSQL(query string) ([]Feature, error)
 - func (e *BaseExtractor[Opts, F]) Extracted(fs []F) ([]Feature, error)
 - func (e *BaseExtractor[Opts, Feature]) Init(db db.DB, conf conf.FeatureExtractorConfig) error
 - func (e *BaseExtractor[Opts, F]) MarkAsUpdated()
 - func (e *BaseExtractor[Opts, F]) NeedsUpdate() bool
 - func (e *BaseExtractor[Opts, F]) NextPossibleExecution() time.Time
 - func (e *BaseExtractor[Opts, F]) NotifySkip()
 
- type Feature
 - type FeatureExtractor
 
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type BaseExtractor ¶
type BaseExtractor[Opts any, Feature db.Table] struct { // Options to pass via yaml to this step. conf.JsonOpts[Opts] // Database connection. DB db.DB RecencySeconds int UpdatedAt *time.Time }
Common base for all extractors that provides some functionality that would otherwise be duplicated across all extractors.
func (*BaseExtractor[Opts, F]) ExtractSQL ¶
func (e *BaseExtractor[Opts, F]) ExtractSQL(query string) ([]Feature, error)
Extract the features directly from an sql query.
func (*BaseExtractor[Opts, F]) Extracted ¶
func (e *BaseExtractor[Opts, F]) Extracted(fs []F) ([]Feature, error)
Replace all features of the given model in the database and return them as a slice of generic features for counting.
func (*BaseExtractor[Opts, Feature]) Init ¶
func (e *BaseExtractor[Opts, Feature]) Init(db db.DB, conf conf.FeatureExtractorConfig) error
Init the extractor with the database and options.
func (*BaseExtractor[Opts, F]) MarkAsUpdated ¶
func (e *BaseExtractor[Opts, F]) MarkAsUpdated()
Mark the extractor as updated by setting the UpdatedAt field to the current time.
func (*BaseExtractor[Opts, F]) NeedsUpdate ¶
func (e *BaseExtractor[Opts, F]) NeedsUpdate() bool
Checks if the last update of the extractor is older than the configured recency. If the recency is set to a positive value, it will return true if the last update is older than the configured recency in seconds.
func (*BaseExtractor[Opts, F]) NextPossibleExecution ¶
func (e *BaseExtractor[Opts, F]) NextPossibleExecution() time.Time
func (*BaseExtractor[Opts, F]) NotifySkip ¶
func (e *BaseExtractor[Opts, F]) NotifySkip()
type FeatureExtractor ¶
type FeatureExtractor interface {
	// Configure the feature extractor with a database and options.
	// This function should also create the needed database structures.
	Init(db db.DB, conf conf.FeatureExtractorConfig) error
	// Extract features from the given data.
	Extract() ([]Feature, error)
	// Get the name of this feature extractor.
	// This name is used to identify the extractor in metrics, config, logs, etc.
	// Should be something like: "my_cool_feature_extractor".
	GetName() string
	// Get message topics that trigger a re-execution of this extractor.
	Triggers() []string
	// Check if the extractors last update is older than the configured recency.
	NeedsUpdate() bool
	// Update the last update timestamp of the extractor.
	MarkAsUpdated()
	// Earliest time when this extractor can be executed again.
	NextPossibleExecution() time.Time
	// Skip the extractor if it is not needed.
	NotifySkip()
}
    Each feature extractor must conform to this interface.