Documentation
¶
Index ¶
- Constants
- Variables
- func IsArtifactSource(sourceType string) bool
- func RegisterRowSource[T RowSource]()
- func TestIdentifier(t *testing.T, c RowSource)
- func Validate(t *testing.T, ctor func() RowSource)
- type BaseSource
- type ResolvedFromTime
- type RowSource
- type RowSourceDecorator
- func (r *RowSourceDecorator) AddObserver(observer observable.Observer) error
- func (r *RowSourceDecorator) Close() error
- func (r *RowSourceDecorator) Collect(ctx context.Context) error
- func (r *RowSourceDecorator) Description() (string, error)
- func (r *RowSourceDecorator) GetFromTime() *ResolvedFromTime
- func (r *RowSourceDecorator) Identifier() string
- func (r *RowSourceDecorator) Init(ctx context.Context, params *RowSourceParams, option ...RowSourceOption) error
- func (r *RowSourceDecorator) OnCollectionComplete() error
- func (r *RowSourceDecorator) Pause() error
- func (r *RowSourceDecorator) PauseProcessingOnly() error
- func (r *RowSourceDecorator) Properties() map[string]*types.PropertyMetadata
- func (r *RowSourceDecorator) Resume() error
- func (r *RowSourceDecorator) SaveCollectionState() error
- type RowSourceFactory
- type RowSourceImpl
- func (r *RowSourceImpl[S, T]) Close() error
- func (*RowSourceImpl[S, T]) Description() (string, error)
- func (r *RowSourceImpl[S, T]) GetConfigSchema() parse.Config
- func (r *RowSourceImpl[S, T]) GetFromTime() *ResolvedFromTime
- func (r *RowSourceImpl[S, T]) Init(_ context.Context, params *RowSourceParams, opts ...RowSourceOption) error
- func (r *RowSourceImpl[S, T]) OnCollectionComplete() error
- func (r *RowSourceImpl[S, T]) OnRow(ctx context.Context, row *types.RowData) error
- func (r *RowSourceImpl[S, T]) Properties() map[string]*types.PropertyMetadata
- func (r *RowSourceImpl[S, T]) PropertiesForType(config any) map[string]*types.PropertyMetadata
- func (r *RowSourceImpl[S, T]) RegisterSource(source RowSource)
- func (r *RowSourceImpl[S, T]) SaveCollectionState() error
- type RowSourceOption
- type RowSourceParams
Constants ¶
const DefaultAPIGranularity = 1 * time.Nanosecond
DefaultAPIGranularity is the default granularity for API sources
const PluginSourceWrapperIdentifier = "plugin_source_wrapper"
PluginSourceWrapperIdentifier is the source name for the plugin source wrapper
Variables ¶
var Factory = newRowSourceFactoryFactory()
Factory is a global newFactory instance
Functions ¶
func IsArtifactSource ¶
func RegisterRowSource ¶
func RegisterRowSource[T RowSource]()
RegisterRowSource registers a row source type this is called from the package init function of the table implementation
func TestIdentifier ¶
Types ¶
type BaseSource ¶
type BaseSource interface {
RegisterSource(rowSource RowSource)
}
BaseSource registers the rowSource implementation with the base struct (_before_ calling Init) we do not want to expose this function in the RowSource interface
type ResolvedFromTime ¶
ResolvedFromTime is a struct that holds the 'resolved' from time and the source that provided it From time is determined from either: - the default (T-7d - the from time provided in the request - the end time od the collection state
func ResolvedFromTimeFromProto ¶
func ResolvedFromTimeFromProto(proto *proto.ResolvedFromTime) *ResolvedFromTime
func (ResolvedFromTime) ToProto ¶
func (t ResolvedFromTime) ToProto() *proto.ResolvedFromTime
type RowSource ¶
type RowSource interface {
observable.PausableObservable
// Init is called when the row source is created
// it is responsible for parsing the source config and configuring the source
Init(context.Context, *RowSourceParams, ...RowSourceOption) error
// Identifier must return the source name
Identifier() string
// Description returns a human readable description of the source
Description() (string, error)
// Properties returns a map of property descriptions
// this is used for introspection
Properties() map[string]*types.PropertyMetadata
Close() error
SaveCollectionState() error
// Collect is called to start collecting data,
Collect(context.Context) error
// GetFromTime returns the start time for the data collection, including the source of the from time
// (config, collection state or default)
GetFromTime() *ResolvedFromTime
// OnCollectionComplete is called when the source collection is SUCCESSFULLY completed
// this sets the collection state end time to the collection 'to' time to ensure that the next collection
// continues from the end of the last collection
OnCollectionComplete() error
}
RowSource is the interface that represents a data source A number of data sourceFuncs are provided by the SDK, and plugins may provide their own Built in data sourceFuncs: - AWS S3 Bucket - API Source (this must be implemented by the plugin) - File Source - Webhook source Sources may be configured with data transfo
func NewRowSourceDecorator ¶ added in v0.7.0
type RowSourceDecorator ¶ added in v0.7.0
type RowSourceDecorator struct {
// contains filtered or unexported fields
}
RowSourceDecorator is a struct which decorates a RowSource, allowing us to wrap the Collect call, providing a hook to call OnCollectionComplete
func (*RowSourceDecorator) AddObserver ¶ added in v0.7.0
func (r *RowSourceDecorator) AddObserver(observer observable.Observer) error
func (*RowSourceDecorator) Close ¶ added in v0.7.0
func (r *RowSourceDecorator) Close() error
func (*RowSourceDecorator) Collect ¶ added in v0.7.0
func (r *RowSourceDecorator) Collect(ctx context.Context) error
func (*RowSourceDecorator) Description ¶ added in v0.7.0
func (r *RowSourceDecorator) Description() (string, error)
func (*RowSourceDecorator) GetFromTime ¶ added in v0.7.0
func (r *RowSourceDecorator) GetFromTime() *ResolvedFromTime
func (*RowSourceDecorator) Identifier ¶ added in v0.7.0
func (r *RowSourceDecorator) Identifier() string
func (*RowSourceDecorator) Init ¶ added in v0.7.0
func (r *RowSourceDecorator) Init(ctx context.Context, params *RowSourceParams, option ...RowSourceOption) error
func (*RowSourceDecorator) OnCollectionComplete ¶ added in v0.7.0
func (r *RowSourceDecorator) OnCollectionComplete() error
func (*RowSourceDecorator) Pause ¶ added in v0.7.0
func (r *RowSourceDecorator) Pause() error
func (*RowSourceDecorator) PauseProcessingOnly ¶ added in v0.7.0
func (r *RowSourceDecorator) PauseProcessingOnly() error
func (*RowSourceDecorator) Properties ¶ added in v0.7.0
func (r *RowSourceDecorator) Properties() map[string]*types.PropertyMetadata
func (*RowSourceDecorator) Resume ¶ added in v0.7.0
func (r *RowSourceDecorator) Resume() error
func (*RowSourceDecorator) SaveCollectionState ¶ added in v0.7.0
func (r *RowSourceDecorator) SaveCollectionState() error
type RowSourceFactory ¶
type RowSourceFactory struct {
// contains filtered or unexported fields
}
func (*RowSourceFactory) DescribeSources ¶
func (b *RowSourceFactory) DescribeSources() (types.SourceMetadataMap, error)
func (*RowSourceFactory) GetRowSource ¶
func (b *RowSourceFactory) GetRowSource(ctx context.Context, params *RowSourceParams, sourceOpts ...RowSourceOption) (RowSource, error)
GetRowSource attempts to instantiate a row source, using the provided row source data It will fail if the requested source type is not registered Implements plugin.SourceFactory
func (*RowSourceFactory) GetSources ¶
func (b *RowSourceFactory) GetSources() map[string]func() RowSource
type RowSourceImpl ¶
type RowSourceImpl[S, T parse.Config] struct { observable.PausableObservableImpl Config S Connection T // store a reference to the derived RowSource type so we can call its methods // this will be set by the source factory Source RowSource // the collection state data for this source CollectionState *collection_state.SaveableCollectionState // a function to create empty collection state data NewCollectionStateFunc func() collection_state.CollectionState // store errors - we only use this to determine whether the source collection was successful, // and therefore whether we should set the CollectionState EndTime to the collection To time from OnCollectionComplete ErrorCount int32 // the collection direction (this defaults to forwards - reverse order sources should set this in their Init method) CollectionOrder collection_state.CollectionOrder // the start time for the data collection //FromTime time.Time // how was from time set (config, collection state, default) FromTimeSource string CollectionTimeRange collection_state.DirectionalTimeRange // a func to call to retrieve the granularity for the source // this is provided to avoid a tricky timing problem - we want to get the granularity within RowSourceImpl.Init // but ArtifactSourceImpl.Init needs to use our config to determine the granularity and this is not available // until after the RowSourceImpl.Init is called // so ArtifactSourceImpl.Init will set this func to return the granularity GetGranularityFunc func() time.Duration }
RowSourceImpl is a base implementation of the plugin.RowSource interface It implements the observable.Observable interface, as well as providing a default implementation of Close(), and contains the logic to raise a Row event It should be embedded in all plugin.RowSource implementations
S is the type of the source config struct T is the type of the connection struct
func (*RowSourceImpl[S, T]) Close ¶
func (r *RowSourceImpl[S, T]) Close() error
Close is a default implementation of the plugin.RowSource Close interface function
func (*RowSourceImpl[S, T]) Description ¶
func (*RowSourceImpl[S, T]) Description() (string, error)
Description returns a human readable description of the source this is used for introspection this should be overridden by the source implementation
func (*RowSourceImpl[S, T]) GetConfigSchema ¶
func (r *RowSourceImpl[S, T]) GetConfigSchema() parse.Config
GetConfigSchema returns an empty instance of the config struct used by the source
func (*RowSourceImpl[S, T]) GetFromTime ¶
func (r *RowSourceImpl[S, T]) GetFromTime() *ResolvedFromTime
GetFromTime returns the start time for the data collection, including the source of the from time (config, collection state or default)
func (*RowSourceImpl[S, T]) Init ¶
func (r *RowSourceImpl[S, T]) Init(_ context.Context, params *RowSourceParams, opts ...RowSourceOption) error
Init is called when the row source is created it is responsible for parsing the source config and configuring the source opts are populated based on the table source config
func (*RowSourceImpl[S, T]) OnCollectionComplete ¶ added in v0.7.0
func (r *RowSourceImpl[S, T]) OnCollectionComplete() error
OnCollectionComplete must be called by the source Collect function when the collection is complete this updates the end time of the collection state to the collection `to` and saves the collection state
func (*RowSourceImpl[S, T]) OnRow ¶
OnRow raise an events.Row event, which is handled by the table. It is called by the row source when it has a row to send
func (*RowSourceImpl[S, T]) Properties ¶ added in v0.5.0
func (r *RowSourceImpl[S, T]) Properties() map[string]*types.PropertyMetadata
Properties returns a map of property descriptions this is used for introspection this should be overridden by the source implementation
func (*RowSourceImpl[S, T]) PropertiesForType ¶ added in v0.5.0
func (r *RowSourceImpl[S, T]) PropertiesForType(config any) map[string]*types.PropertyMetadata
func (*RowSourceImpl[S, T]) RegisterSource ¶
func (r *RowSourceImpl[S, T]) RegisterSource(source RowSource)
RegisterSource is called by the source implementation to register itself with the base this is required so that the RowSourceImpl can call the RowSource's methods
func (*RowSourceImpl[S, T]) SaveCollectionState ¶
func (r *RowSourceImpl[S, T]) SaveCollectionState() error
type RowSourceOption ¶
RowSourceOption is a function that can be used to configure a RowSource NOTE: individual options are specific to specific row source types RowSourceOption accepts the base Observable interface, and each option must implement a safe type assertion to the specific row source type
type RowSourceParams ¶
type RowSourceParams struct {
SourceConfigData *types.SourceConfigData
ConnectionData *types.ConnectionConfigData
CollectionStatePath string
From time.Time
To time.Time
CollectionTempDir string
Overwrite bool
}
func RowSourceParamsFromProto ¶
func RowSourceParamsFromProto(pr *proto.RowSourceParams) (*RowSourceParams, error)
func (*RowSourceParams) ToProto ¶ added in v0.2.0
func (r *RowSourceParams) ToProto() *proto.RowSourceParams