artifact_source

package
v0.9.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 26, 2025 License: Apache-2.0 Imports: 32 Imported by: 27

Documentation

Index

Constants

View Source
const ArtifactSourceMaxConcurrency = 16

Variables

This section is empty.

Functions

func ExpandPatternIntoOptionalAlternatives

func ExpandPatternIntoOptionalAlternatives(pattern string) []string

func WithArtifactExtractor

func WithArtifactExtractor(extractor Extractor) row_source.RowSourceOption

WithArtifactExtractor is used to specify an artifact extractor this is needed if the artifact contains a collection of rows which needs explicit extraction (not this is in addition to the default extraction performed by the loaded)

func WithArtifactLoader

func WithArtifactLoader(loader artifact_loader.Loader) row_source.RowSourceOption

WithArtifactLoader is used to specify an artifact loader

func WithDefaultArtifactSourceConfig

func WithDefaultArtifactSourceConfig(config *artifact_source_config.ArtifactSourceConfigImpl) row_source.RowSourceOption

WithDefaultArtifactSourceConfig sets the default config, e.g. file layout, IF it has not been set from config NOTE: in contrast to the artifact config passed to the source from the CLI which is raw HCL which must be parsed, the default artifact source config is a ArtifactSourceConfigImpl struct which is populated by the table to set defaults

func WithHeaderRowNotification added in v0.4.0

func WithHeaderRowNotification(delimiter string) row_source.RowSourceOption

WithHeaderRowNotification is used when creating an ArtifactSourceImpl it specifies that the first row of the artifact is a header. Use the specified delimiter to split into a list of fields and notify the collector of the header. The collector will pass the header columns to all MapRow calls using the `WithHeader` option

func WithRowPerLine

func WithRowPerLine() row_source.RowSourceOption

WithRowPerLine is used when creating an ArtifactSourceImpl it specifies that the row source should treat each line as a separate row

func WithSkipHeaderRow

func WithSkipHeaderRow() row_source.RowSourceOption

WithSkipHeaderRow is used when creating an ArtifactSourceImpl it specifies that the row source should skip the first row (header row).

Types

type ArtifactSource

type ArtifactSource interface {
	row_source.RowSource

	DiscoverArtifacts(ctx context.Context) error
	DownloadArtifact(context.Context, *types.ArtifactInfo) error

	SetExtractor(extractor Extractor)
	SetLoader(loader artifact_loader.Loader)
	SetRowPerLine(b bool)
	SetSkipHeaderRow()
	SetHeaderDelimiter(b string)
	SetDefaultConfig(config *artifact_source_config.ArtifactSourceConfigImpl)
}

ArtifactSource is an interface providing methods for discovering and downloading artifacts to the local file system an row_source.RowSourceImpl must be configured to have a ArtifactSource implementation. Sources provided by the SDK: [FileSystemSource], [AwsS3BucketSource], [AwsCloudWatchSource]

type ArtifactSourceImpl

type ArtifactSourceImpl[S artifact_source_config.ArtifactSourceConfig, T parse.Config] struct {
	row_source.RowSourceImpl[S, T]

	// do we expect the a row to be a line of data
	RowPerLine bool
	// is there a header row we want to skip the first row (i.e. for a csv file)
	SkipHeaderRow bool
	// what is the delimiter for the header row
	// (if this is set, a header event will be raised for the header of each file)
	HeaderRowDelimiter string

	Loader artifact_loader.Loader

	// temporary directory for storing downloaded artifacts - this is initialised in the Init function
	// to be a subdirectory of the collection directory
	TempDir string

	// shadow the row_source.RowSourceImpl Source property, but using ArtifactSource interface
	Source ArtifactSource
	// contains filtered or unexported fields
}

ArtifactSourceImpl is a row_source.RowSource that extracts rows from an 'artifact'

Artifacts are defined as some entity which contains a collection of rows, which must be extracted/processed in some way to produce 'raw' rows which can be streamed to a collection. Examples of artifacts include: - a gzip file in an S3 bucket - a cloudwatch log group - a json file on local file system

The ArtifactSourceImpl is composable, as the same storage location may be used to store different log files in varying formats, and the source may need to be configured to know how to extract the log rows from the artifact.

An ArtifactSourceImpl is composed of:

  • an [artifact.ArtifactSource] which discovers and downloads artifacts to a temp local file, and handles incremental/restartable downloads
  • an [artifact.Loader] which loads the arifact data from the local file, performing any necessary decompression/decryption etc.
  • optionally, one or more [artifact.Mapper]s which perform processing/conversion/extraction logic required to extract individual data rows from the artifact

The lifetime of the ArtifactSourceImpl is expected to be the duration of a single collection operation

func (*ArtifactSourceImpl[S, T]) Collect

func (a *ArtifactSourceImpl[S, T]) Collect(ctx context.Context) (err error)

Collect tells our ArtifactSourceImpl to start discovering artifacts Implements plugin.RowSource

func (*ArtifactSourceImpl[S, T]) DiscoverArtifacts

func (a *ArtifactSourceImpl[S, T]) DiscoverArtifacts(ctx context.Context) error

func (*ArtifactSourceImpl[S, T]) DownloadArtifact

func (a *ArtifactSourceImpl[S, T]) DownloadArtifact(ctx context.Context, info *types.ArtifactInfo) error

func (*ArtifactSourceImpl[S, T]) Identifier

func (a *ArtifactSourceImpl[S, T]) Identifier() string

func (*ArtifactSourceImpl[S, T]) Init

func (*ArtifactSourceImpl[S, T]) OnArtifactDiscovered

func (a *ArtifactSourceImpl[S, T]) OnArtifactDiscovered(ctx context.Context, info *types.ArtifactInfo) error

func (*ArtifactSourceImpl[S, T]) OnArtifactDownloaded

func (a *ArtifactSourceImpl[S, T]) OnArtifactDownloaded(ctx context.Context, info *types.DownloadedArtifactInfo) (err error)

func (*ArtifactSourceImpl[S, T]) Properties added in v0.5.0

func (a *ArtifactSourceImpl[S, T]) Properties() map[string]*types.PropertyMetadata

func (*ArtifactSourceImpl[S, T]) SetDefaultConfig

func (a *ArtifactSourceImpl[S, T]) SetDefaultConfig(config *artifact_source_config.ArtifactSourceConfigImpl)

SetDefaultConfig sets the default config for the source

func (*ArtifactSourceImpl[S, T]) SetExtractor

func (a *ArtifactSourceImpl[S, T]) SetExtractor(extractor Extractor)

SetExtractor sets the extractor function for the source

func (*ArtifactSourceImpl[S, T]) SetHeaderDelimiter added in v0.4.0

func (a *ArtifactSourceImpl[S, T]) SetHeaderDelimiter(delimiter string)

SetHeaderDelimiter sets the skip header row flag for the source, and sets the delimiter used to split the header. The header row will be skipped and a Header event will be raised with split header. This header will then be used by the collector to pass to all MapRow calls for that artifact

func (*ArtifactSourceImpl[S, T]) SetLoader

func (a *ArtifactSourceImpl[S, T]) SetLoader(loader artifact_loader.Loader)

func (*ArtifactSourceImpl[S, T]) SetRowPerLine

func (a *ArtifactSourceImpl[S, T]) SetRowPerLine(rowPerLine bool)

func (*ArtifactSourceImpl[S, T]) SetSkipHeaderRow

func (a *ArtifactSourceImpl[S, T]) SetSkipHeaderRow()

SetSkipHeaderRow sets the skip header row flag for the source, but does not set the delimiter. The header row will be skipped but no Header event will be raised

func (*ArtifactSourceImpl[S, T]) WalkNode

func (a *ArtifactSourceImpl[S, T]) WalkNode(ctx context.Context, targetPath string, basePath string, layouts []string, isDir bool, g *grok.Grok, filterMap map[string]*filter.SqlFilter) error

WalkNode is called for each file or directory discovered by the file source - it is called as part of the folder walking discovery algorithm

type EmptyConnection

type EmptyConnection struct {
}

for now, sources must be parametrized by a connection, so we need a dummy connection for those that don't need one

func (EmptyConnection) Identifier

func (EmptyConnection) Identifier() string

func (*EmptyConnection) Validate

func (c *EmptyConnection) Validate() error

type Extractor

type Extractor interface {
	Identifier() string
	// Extract retrieves one more more rows from the artifact data
	Extract(context.Context, any) ([]any, error)
}

Extractor is an interface which provides a method for extracting rows from an artifact

type InitSourceRequest

type InitSourceRequest struct {
	// the source format to use (with raw config)
	SourceFormat  *types.FormatConfigData
	SourceParams  *row_source.RowSourceParams
	DefaultConfig *artifact_source_config.ArtifactSourceConfigImpl
}

InitSourceRequest is an sdk type which is mapped from the proto.InitSourceRequest

func InitSourceRequestFromProto

func InitSourceRequestFromProto(pr *proto.InitSourceRequest) (*InitSourceRequest, error)

type NilArtifactCollectionState

type NilArtifactCollectionState struct {
}

NilArtifactCollectionState is a collection state that does nothing it is used by PluginSourceWrapper - as the actual collection state is implemented by the source plugin

func (*NilArtifactCollectionState) Clear

func (*NilArtifactCollectionState) GetFromTime added in v0.9.0

func (s *NilArtifactCollectionState) GetFromTime() time.Time

func (*NilArtifactCollectionState) GetToTime added in v0.9.0

func (s *NilArtifactCollectionState) GetToTime() time.Time

func (*NilArtifactCollectionState) Init

func (*NilArtifactCollectionState) IsEmpty

func (*NilArtifactCollectionState) IsEmpty() bool

func (*NilArtifactCollectionState) MigrateFromLegacyState added in v0.9.0

func (*NilArtifactCollectionState) MigrateFromLegacyState(_ []byte) error

func (*NilArtifactCollectionState) OnCollected

func (*NilArtifactCollectionState) OnCollected(_ string, _ time.Time) error

func (*NilArtifactCollectionState) OnCollectionComplete added in v0.9.0

func (*NilArtifactCollectionState) OnCollectionComplete() error

func (*NilArtifactCollectionState) RegisterPath

func (s *NilArtifactCollectionState) RegisterPath(_ string, _ map[string]string)

func (*NilArtifactCollectionState) Save

func (*NilArtifactCollectionState) SetEndTime

func (s *NilArtifactCollectionState) SetEndTime(_ time.Time)

func (*NilArtifactCollectionState) ShouldCollect

func (*NilArtifactCollectionState) ShouldCollect(_ string, _ time.Time) bool

func (*NilArtifactCollectionState) Validate added in v0.9.0

func (*NilArtifactCollectionState) Validate() error

type NilArtifactSourceConfig

type NilArtifactSourceConfig struct{}

func (NilArtifactSourceConfig) DefaultTo

func (NilArtifactSourceConfig) GetFileLayout

func (n NilArtifactSourceConfig) GetFileLayout() *string

func (NilArtifactSourceConfig) Identifier

func (n NilArtifactSourceConfig) Identifier() string

func (NilArtifactSourceConfig) Validate

func (n NilArtifactSourceConfig) Validate() error

type NilConfig

type NilConfig struct{}

func (NilConfig) Identifier

func (n NilConfig) Identifier() string

func (NilConfig) Validate

func (n NilConfig) Validate() error

type PluginSourceWrapper

type PluginSourceWrapper struct {
	// NOTE: we are using the plugin source for ArtifactsSource operations (i.e. downloading the artifacts),
	// the ArtifactSourceImpl handles the remaining operations (loading/extraction)
	// We still need to parameterise the ArtifactSourceImpl, however we just pass empty config and connection -
	// the implementation of the RowSource in the plugin will handle the config and connection
	// (we pass the raw config and connection to the plugin)
	ArtifactSourceImpl[*NilArtifactSourceConfig, *NilConfig]
	// contains filtered or unexported fields
}

PluginSourceWrapper is an implementation of ArtifactSource which wraps a GRPC plugin which implements the source all RowSource implementations delegate to the plugin, while the remainder of the ArtifactSource operations: loading, extraction are handled by the base ArtifactSourceImpl

func (*PluginSourceWrapper) AddObserver

func (w *PluginSourceWrapper) AddObserver(o observable.Observer) error

AddObserver adds an observer to the source (overriding the base implementation)

func (*PluginSourceWrapper) Close

func (w *PluginSourceWrapper) Close() error

func (*PluginSourceWrapper) Collect

func (w *PluginSourceWrapper) Collect(ctx context.Context) error

Collect is called to start collecting data,

func (*PluginSourceWrapper) Description

func (w *PluginSourceWrapper) Description() (string, error)

Description returns a human readable description of the source this is used for introspection and wil not be called for the PluginSourceWrapper

func (*PluginSourceWrapper) Identifier

func (w *PluginSourceWrapper) Identifier() string

Identifier must return the source name

func (*PluginSourceWrapper) Init

Init is called when the row source is created it is responsible for parsing the source config and configuring the source

func (*PluginSourceWrapper) OnCollectionComplete added in v0.7.0

func (w *PluginSourceWrapper) OnCollectionComplete() error

func (*PluginSourceWrapper) Pause added in v0.5.0

func (w *PluginSourceWrapper) Pause() error

Pause is called to pause collection of source data

func (*PluginSourceWrapper) Properties added in v0.5.0

func (w *PluginSourceWrapper) Properties() map[string]*types.PropertyMetadata

Properties returns a map of property descriptions this is used for introspection and wil not be called for the PluginSourceWrapper

func (*PluginSourceWrapper) Resume added in v0.5.0

func (w *PluginSourceWrapper) Resume() error

Resume is called to resume collection of source data

func (*PluginSourceWrapper) SaveCollectionState

func (w *PluginSourceWrapper) SaveCollectionState() error

func (*PluginSourceWrapper) SetPlugin

func (w *PluginSourceWrapper) SetPlugin(sourcePlugin *types.SourcePluginReattach) error

SetPlugin sets the plugin client for the source this is called from WithPluginReattach option

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL