row_source

package
v0.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2025 License: Apache-2.0 Imports: 22 Imported by: 41

Documentation

Index

Constants

View Source
const DefaultAPIGranularity = 1 * time.Nanosecond

DefaultAPIGranularity is the default granularity for API sources

View Source
const PluginSourceWrapperIdentifier = "plugin_source_wrapper"

PluginSourceWrapperIdentifier is the source name for the plugin source wrapper

Variables

View Source
var Factory = newRowSourceFactoryFactory()

Factory is a global newFactory instance

Functions

func IsArtifactSource

func IsArtifactSource(sourceType string) bool

func RegisterRowSource

func RegisterRowSource[T RowSource]()

RegisterRowSource registers a row source type this is called from the package init function of the table implementation

func TestIdentifier

func TestIdentifier(t *testing.T, c RowSource)

func Validate

func Validate(t *testing.T, ctor func() RowSource)

Types

type BaseSource

type BaseSource interface {
	RegisterSource(rowSource RowSource)
}

BaseSource registers the rowSource implementation with the base struct (_before_ calling Init) we do not want to expose this function in the RowSource interface

type ResolvedFromTime

type ResolvedFromTime struct {
	Time   time.Time
	Source string
}

ResolvedFromTime is a struct that holds the 'resolved' from time and the source that provided it From time is determined from either: - the default (T-7d - the from time provided in the request - the end time od the collection state

func ResolvedFromTimeFromProto

func ResolvedFromTimeFromProto(proto *proto.ResolvedFromTime) *ResolvedFromTime

func (ResolvedFromTime) ToProto

type RowSource

type RowSource interface {
	observable.PausableObservable

	// Init is called when the row source is created
	// it is responsible for parsing the source config and configuring the source
	Init(context.Context, *RowSourceParams, ...RowSourceOption) error

	// Identifier must return the source name
	Identifier() string

	// Description returns a human readable description of the source
	Description() (string, error)

	// Properties returns a map of property descriptions
	// this is used for introspection
	Properties() map[string]*types.PropertyMetadata

	Close() error

	SaveCollectionState() error

	// Collect is called to start collecting data,
	Collect(context.Context) error

	// GetFromTime returns the start time for the data collection, including the source of the from time
	// (config, collection state or default)
	GetFromTime() *ResolvedFromTime

	// 	OnCollectionComplete is called when the source collection is SUCCESSFULLY completed
	// this sets the collection state end time to the collection 'to' time to ensure that the next collection
	// continues from the end of the last collection
	OnCollectionComplete() error
}

RowSource is the interface that represents a data source A number of data sourceFuncs are provided by the SDK, and plugins may provide their own Built in data sourceFuncs: - AWS S3 Bucket - API Source (this must be implemented by the plugin) - File Source - Webhook source Sources may be configured with data transfo

func NewRowSourceDecorator added in v0.7.0

func NewRowSourceDecorator(rowSource RowSource) RowSource

type RowSourceDecorator added in v0.7.0

type RowSourceDecorator struct {
	// contains filtered or unexported fields
}

RowSourceDecorator is a struct which decorates a RowSource, allowing us to wrap the Collect call, providing a hook to call OnCollectionComplete

func (*RowSourceDecorator) AddObserver added in v0.7.0

func (r *RowSourceDecorator) AddObserver(observer observable.Observer) error

func (*RowSourceDecorator) Close added in v0.7.0

func (r *RowSourceDecorator) Close() error

func (*RowSourceDecorator) Collect added in v0.7.0

func (r *RowSourceDecorator) Collect(ctx context.Context) error

func (*RowSourceDecorator) Description added in v0.7.0

func (r *RowSourceDecorator) Description() (string, error)

func (*RowSourceDecorator) GetFromTime added in v0.7.0

func (r *RowSourceDecorator) GetFromTime() *ResolvedFromTime

func (*RowSourceDecorator) Identifier added in v0.7.0

func (r *RowSourceDecorator) Identifier() string

func (*RowSourceDecorator) Init added in v0.7.0

func (r *RowSourceDecorator) Init(ctx context.Context, params *RowSourceParams, option ...RowSourceOption) error

func (*RowSourceDecorator) OnCollectionComplete added in v0.7.0

func (r *RowSourceDecorator) OnCollectionComplete() error

func (*RowSourceDecorator) Pause added in v0.7.0

func (r *RowSourceDecorator) Pause() error

func (*RowSourceDecorator) PauseProcessingOnly added in v0.7.0

func (r *RowSourceDecorator) PauseProcessingOnly() error

func (*RowSourceDecorator) Properties added in v0.7.0

func (r *RowSourceDecorator) Properties() map[string]*types.PropertyMetadata

func (*RowSourceDecorator) Resume added in v0.7.0

func (r *RowSourceDecorator) Resume() error

func (*RowSourceDecorator) SaveCollectionState added in v0.7.0

func (r *RowSourceDecorator) SaveCollectionState() error

type RowSourceFactory

type RowSourceFactory struct {
	// contains filtered or unexported fields
}

func (*RowSourceFactory) DescribeSources

func (b *RowSourceFactory) DescribeSources() (types.SourceMetadataMap, error)

func (*RowSourceFactory) GetRowSource

func (b *RowSourceFactory) GetRowSource(ctx context.Context, params *RowSourceParams, sourceOpts ...RowSourceOption) (RowSource, error)

GetRowSource attempts to instantiate a row source, using the provided row source data It will fail if the requested source type is not registered Implements plugin.SourceFactory

func (*RowSourceFactory) GetSources

func (b *RowSourceFactory) GetSources() map[string]func() RowSource

type RowSourceImpl

type RowSourceImpl[S, T parse.Config] struct {
	observable.PausableObservableImpl
	Config     S
	Connection T
	// store a reference to the derived RowSource type so we can call its methods
	// this will be set by the source factory
	Source RowSource

	// the collection state data for this source
	CollectionState *collection_state.SaveableCollectionState
	// a function to create empty collection state data
	NewCollectionStateFunc func() collection_state.CollectionState
	// store errors - we only use this to determine whether the source collection was successful,
	// and therefore whether we should set the CollectionState EndTime to the collection To time from OnCollectionComplete
	ErrorCount int32

	// the collection direction (this defaults to forwards - reverse order sources should set this in their Init method)
	CollectionOrder collection_state.CollectionOrder
	// the start time for the data collection
	//FromTime time.Time
	// how was from time set (config, collection state, default)
	FromTimeSource      string
	CollectionTimeRange collection_state.DirectionalTimeRange
	// a func to call to retrieve the granularity for the source
	// this is provided to avoid a tricky timing problem - we want to get the granularity within RowSourceImpl.Init
	// but ArtifactSourceImpl.Init  needs to use our config to determine the granularity and this is not available
	// until after the RowSourceImpl.Init is called
	// so ArtifactSourceImpl.Init will set this func to return the granularity
	GetGranularityFunc func() time.Duration
}

RowSourceImpl is a base implementation of the plugin.RowSource interface It implements the observable.Observable interface, as well as providing a default implementation of Close(), and contains the logic to raise a Row event It should be embedded in all plugin.RowSource implementations

S is the type of the source config struct T is the type of the connection struct

func (*RowSourceImpl[S, T]) Close

func (r *RowSourceImpl[S, T]) Close() error

Close is a default implementation of the plugin.RowSource Close interface function

func (*RowSourceImpl[S, T]) Description

func (*RowSourceImpl[S, T]) Description() (string, error)

Description returns a human readable description of the source this is used for introspection this should be overridden by the source implementation

func (*RowSourceImpl[S, T]) GetConfigSchema

func (r *RowSourceImpl[S, T]) GetConfigSchema() parse.Config

GetConfigSchema returns an empty instance of the config struct used by the source

func (*RowSourceImpl[S, T]) GetFromTime

func (r *RowSourceImpl[S, T]) GetFromTime() *ResolvedFromTime

GetFromTime returns the start time for the data collection, including the source of the from time (config, collection state or default)

func (*RowSourceImpl[S, T]) Init

func (r *RowSourceImpl[S, T]) Init(_ context.Context, params *RowSourceParams, opts ...RowSourceOption) error

Init is called when the row source is created it is responsible for parsing the source config and configuring the source opts are populated based on the table source config

func (*RowSourceImpl[S, T]) OnCollectionComplete added in v0.7.0

func (r *RowSourceImpl[S, T]) OnCollectionComplete() error

OnCollectionComplete must be called by the source Collect function when the collection is complete this updates the end time of the collection state to the collection `to` and saves the collection state

func (*RowSourceImpl[S, T]) OnRow

func (r *RowSourceImpl[S, T]) OnRow(ctx context.Context, row *types.RowData) error

OnRow raise an events.Row event, which is handled by the table. It is called by the row source when it has a row to send

func (*RowSourceImpl[S, T]) Properties added in v0.5.0

func (r *RowSourceImpl[S, T]) Properties() map[string]*types.PropertyMetadata

Properties returns a map of property descriptions this is used for introspection this should be overridden by the source implementation

func (*RowSourceImpl[S, T]) PropertiesForType added in v0.5.0

func (r *RowSourceImpl[S, T]) PropertiesForType(config any) map[string]*types.PropertyMetadata

func (*RowSourceImpl[S, T]) RegisterSource

func (r *RowSourceImpl[S, T]) RegisterSource(source RowSource)

RegisterSource is called by the source implementation to register itself with the base this is required so that the RowSourceImpl can call the RowSource's methods

func (*RowSourceImpl[S, T]) SaveCollectionState

func (r *RowSourceImpl[S, T]) SaveCollectionState() error

type RowSourceOption

type RowSourceOption func(source RowSource) error

RowSourceOption is a function that can be used to configure a RowSource NOTE: individual options are specific to specific row source types RowSourceOption accepts the base Observable interface, and each option must implement a safe type assertion to the specific row source type

type RowSourceParams

type RowSourceParams struct {
	SourceConfigData    *types.SourceConfigData
	ConnectionData      *types.ConnectionConfigData
	CollectionStatePath string
	From                time.Time
	To                  time.Time
	CollectionTempDir   string
	Overwrite           bool
}

func RowSourceParamsFromProto

func RowSourceParamsFromProto(pr *proto.RowSourceParams) (*RowSourceParams, error)

func (*RowSourceParams) ToProto added in v0.2.0

func (r *RowSourceParams) ToProto() *proto.RowSourceParams

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL