row_source

package
v0.7.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 4, 2025 License: Apache-2.0 Imports: 22 Imported by: 41

Documentation

Index

Constants

View Source
const PluginSourceWrapperIdentifier = "plugin_source_wrapper"

PluginSourceWrapperIdentifier is the source name for the plugin source wrapper

Variables

View Source
var Factory = newRowSourceFactoryFactory()

Factory is a global newFactory instance

Functions

func IsArtifactSource

func IsArtifactSource(sourceType string) bool

func RegisterRowSource

func RegisterRowSource[T RowSource]()

RegisterRowSource registers a row source type this is called from the package init function of the table implementation

func TestIdentifier

func TestIdentifier(t *testing.T, c RowSource)

func Validate

func Validate(t *testing.T, ctor func() RowSource)

Types

type BaseSource

type BaseSource interface {
	RegisterSource(rowSource RowSource)
}

BaseSource registers the rowSource implementation with the base struct (_before_ calling Init) we do not want to expose this function in the RowSource interface

type ResolvedFromTime

type ResolvedFromTime struct {
	Time   time.Time
	Source string
}

ResolvedFromTime is a struct that holds the 'resolved' from time and the source that provided it From time is determined from either: - the default (T-7d - the from time provided in the request - the end time od the collection state

func ResolvedFromTimeFromProto

func ResolvedFromTimeFromProto(proto *proto.ResolvedFromTime) *ResolvedFromTime

func (ResolvedFromTime) ToProto

type RowSource

type RowSource interface {
	observable.PausableObservable

	// Init is called when the row source is created
	// it is responsible for parsing the source config and configuring the source
	Init(context.Context, *RowSourceParams, ...RowSourceOption) error

	// Identifier must return the source name
	Identifier() string

	// Description returns a human readable description of the source
	Description() (string, error)

	// Properties returns a map of property descriptions
	// this is used for introspection
	Properties() map[string]*types.PropertyMetadata

	Close() error

	SaveCollectionState() error

	// Collect is called to start collecting data,
	Collect(context.Context) error

	// GetFromTime returns the start time for the data collection, including the source of the from time
	// (config, collection state or default)
	GetFromTime() *ResolvedFromTime

	// 	OnCollectionComplete is called when the source collection is SUCCESSFULLY completed
	// this sets the collection state end time to the collection 'to' time to ensure that the next collection
	// continues from the end of the last collection
	OnCollectionComplete() error
}

RowSource is the interface that represents a data source A number of data sourceFuncs are provided by the SDK, and plugins may provide their own Built in data sourceFuncs: - AWS S3 Bucket - API Source (this must be implemented by the plugin) - File Source - Webhook source Sources may be configured with data transfo

func NewRowSourceDecorator added in v0.7.0

func NewRowSourceDecorator(rowSource RowSource) RowSource

type RowSourceDecorator added in v0.7.0

type RowSourceDecorator struct {
	// contains filtered or unexported fields
}

RowSourceDecorator is a struct which decorates a RowSource, allowing us to wrap the Collect call, providing a hook to call OnCollectionComplete

func (*RowSourceDecorator) AddObserver added in v0.7.0

func (r *RowSourceDecorator) AddObserver(observer observable.Observer) error

func (*RowSourceDecorator) Close added in v0.7.0

func (r *RowSourceDecorator) Close() error

func (*RowSourceDecorator) Collect added in v0.7.0

func (r *RowSourceDecorator) Collect(ctx context.Context) error

func (*RowSourceDecorator) Description added in v0.7.0

func (r *RowSourceDecorator) Description() (string, error)

func (*RowSourceDecorator) GetFromTime added in v0.7.0

func (r *RowSourceDecorator) GetFromTime() *ResolvedFromTime

func (*RowSourceDecorator) Identifier added in v0.7.0

func (r *RowSourceDecorator) Identifier() string

func (*RowSourceDecorator) Init added in v0.7.0

func (r *RowSourceDecorator) Init(ctx context.Context, params *RowSourceParams, option ...RowSourceOption) error

func (*RowSourceDecorator) OnCollectionComplete added in v0.7.0

func (r *RowSourceDecorator) OnCollectionComplete() error

func (*RowSourceDecorator) Pause added in v0.7.0

func (r *RowSourceDecorator) Pause() error

func (*RowSourceDecorator) PauseProcessingOnly added in v0.7.0

func (r *RowSourceDecorator) PauseProcessingOnly() error

func (*RowSourceDecorator) Properties added in v0.7.0

func (r *RowSourceDecorator) Properties() map[string]*types.PropertyMetadata

func (*RowSourceDecorator) Resume added in v0.7.0

func (r *RowSourceDecorator) Resume() error

func (*RowSourceDecorator) SaveCollectionState added in v0.7.0

func (r *RowSourceDecorator) SaveCollectionState() error

type RowSourceFactory

type RowSourceFactory struct {
	// contains filtered or unexported fields
}

func (*RowSourceFactory) DescribeSources

func (b *RowSourceFactory) DescribeSources() (types.SourceMetadataMap, error)

func (*RowSourceFactory) GetRowSource

func (b *RowSourceFactory) GetRowSource(ctx context.Context, params *RowSourceParams, sourceOpts ...RowSourceOption) (RowSource, error)

GetRowSource attempts to instantiate a row source, using the provided row source data It will fail if the requested source type is not registered Implements plugin.SourceFactory

func (*RowSourceFactory) GetSources

func (b *RowSourceFactory) GetSources() map[string]func() RowSource

type RowSourceImpl

type RowSourceImpl[S, T parse.Config] struct {
	observable.PausableObservableImpl
	Config     S
	Connection T
	// store a reference to the derived RowSource type so we can call its methods
	// this will be set by the source factory
	Source RowSource

	// the collection state data for this source
	CollectionState collection_state.CollectionState[S]
	// a function to create empty collection state data
	NewCollectionStateFunc func() collection_state.CollectionState[S]
	// the start time for the data collection
	FromTime time.Time
	// how was from time set (config, collection state, default)
	FromTimeSource string
	// the end time for the data collection
	ToTime time.Time

	// store errors - we only use this to determine whether the source collection was successful,
	// and therefore whether we should set the CollectionState EndTime to the collection To time from OnCollectionComplete
	ErrorCount int32
}

RowSourceImpl is a base implementation of the plugin.RowSource interface It implements the observable.Observable interface, as well as providing a default implementation of Close(), and contains the logic to raise a Row event It should be embedded in all plugin.RowSource implementations

S is the type of the source config struct T is the type of the connection struct

func (*RowSourceImpl[S, T]) Close

func (r *RowSourceImpl[S, T]) Close() error

Close is a default implementation of the plugin.RowSource Close interface function

func (*RowSourceImpl[S, T]) Description

func (*RowSourceImpl[S, T]) Description() (string, error)

Description returns a human readable description of the source this is used for introspection this should be overridden by the source implementation

func (*RowSourceImpl[S, T]) GetConfigSchema

func (r *RowSourceImpl[S, T]) GetConfigSchema() parse.Config

GetConfigSchema returns an empty instance of the config struct used by the source

func (*RowSourceImpl[S, T]) GetFromTime

func (r *RowSourceImpl[S, T]) GetFromTime() *ResolvedFromTime

GetFromTime returns the start time for the data collection, including the source of the from time (config, collection state or default)

func (*RowSourceImpl[S, T]) Init

func (r *RowSourceImpl[S, T]) Init(_ context.Context, params *RowSourceParams, opts ...RowSourceOption) error

Init is called when the row source is created it is responsible for parsing the source config and configuring the source

func (*RowSourceImpl[S, T]) NotifyError added in v0.7.0

func (r *RowSourceImpl[S, T]) NotifyError(ctx context.Context, executionId string, err error)

func (*RowSourceImpl[S, T]) OnCollectionComplete added in v0.7.0

func (r *RowSourceImpl[S, T]) OnCollectionComplete() error

OnCollectionComplete must be called by the source Collect function when the collection is complete this updates the end time of the collection state to the collection `To` and saves the collection state

func (*RowSourceImpl[S, T]) OnRow

func (r *RowSourceImpl[S, T]) OnRow(ctx context.Context, row *types.RowData) error

OnRow raise an events.Row event, which is handled by the table. It is called by the row source when it has a row to send

func (*RowSourceImpl[S, T]) Properties added in v0.5.0

func (r *RowSourceImpl[S, T]) Properties() map[string]*types.PropertyMetadata

Properties returns a map of property descriptions this is used for introspection this should be overridden by the source implementation

func (*RowSourceImpl[S, T]) PropertiesForType added in v0.5.0

func (r *RowSourceImpl[S, T]) PropertiesForType(config any) map[string]*types.PropertyMetadata

func (*RowSourceImpl[S, T]) RegisterSource

func (r *RowSourceImpl[S, T]) RegisterSource(source RowSource)

RegisterSource is called by the source implementation to register itself with the base this is required so that the RowSourceImpl can call the RowSource's methods

func (*RowSourceImpl[S, T]) SaveCollectionState

func (r *RowSourceImpl[S, T]) SaveCollectionState() error

type RowSourceOption

type RowSourceOption func(source RowSource) error

RowSourceOption is a function that can be used to configure a RowSource NOTE: individual options are specific to specific row source types RowSourceOption accepts the base Observable interface, and each option must implement a safe type assertion to the specific row source type

type RowSourceParams

type RowSourceParams struct {
	SourceConfigData    *types.SourceConfigData
	ConnectionData      *types.ConnectionConfigData
	CollectionStatePath string
	From                time.Time
	To                  time.Time
	CollectionTempDir   string
}

func RowSourceParamsFromProto

func RowSourceParamsFromProto(pr *proto.RowSourceParams) (*RowSourceParams, error)

func (*RowSourceParams) ToProto added in v0.2.0

func (r *RowSourceParams) ToProto() *proto.RowSourceParams

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL