README
¶
tailpipe-plugin-sdk
1 Implementing a plugin
1.1. Tailpipe Overview
Tailpipe consists of a CLI and an ecosystem of GRPC plugins, similar to Steampipe.
Overview of Tailpipe operation:
- configure a collection in a HCL config file
- execute CLI
collect - CLI starts the required GRPC plugin and calls
AddObserverto receive plugin status messages - CLI issues a
Collectcommand to the plugin. - Plugin writes chunks of data to JSONL files in the specified output directory, and send a
Chunkevent for each file - The CLI handles the
Chunkevent by loading the JSONL file and converting it to partitioned parquet files in the configured parquet location - (In future) the CLI will automatically generate DuckDB views on this data
Plugin Collection Process:
- Plugin receives a
Collectcommand - Plugin instantiates the configured
Collectionwhich in turn instantiates the configuredSource - The
Sourceretrieves rows of data from the source location and raisesRowevents which are handled by theCollection - The
Collectionenriches the rows and raisesRowevents which are handled by the plugin sdk, which buffers the rows - When the row buffer is full the plugin sdk writes the rows to a JSONL file in the specified output directory, and raises a
Chunkevent back across the GRPC event stream
Plugins are composed of the following components:
- Plugin struct (a minimal shell)
- Collections
- Sources
The plugin must implement at least one collection. It can optionally define sources to retrieve data, or it can use the build in sources provided by the SDK.
Examples:
1.2 Implementation Steps
1.2.1 Implement plugin struct
- Implement a plugin struct which embeds
plugin.Base, and implements theIdentifierfunction - Add a constructor function for this and call
plugin.Servefrom the main function, passing this constructor in theServeOpts.
1.2.2 Define one or more Collections
- Implement a collection struct which embeds
collection.Base - Implements the required Collection interface functions:
IdentifierGetRowSchemaGetConfigSchemaEnrichRow
- Define a row struct row the collection will return - this defines the schema of the collection and should embed
enrichment.CommonFields - Define a config struct with HCL tags for the table config
- [TODO] Define/register which sources the collection supports
- Register the collection with the plugin by calling
RegisterCollectionon the plugin struct within the plugin constructor.
1.2.3 [optional] Define custom Sources
- Implement a source struct which embeds
row_source.Base - Implement the required RowSource interface functions:
IdentifierCollect
- [TODO] resister the source with the collection/plugin
The Collect function should retrieve row and (optionally) enrichment data and for each row retrieved
create an ArtifactData struct and raise a row event by calling the OnRow method, implemented by the row_source.Base struct.
2 Details
2.1 Plugin
2.1.1 TailpipePlugin Interface
The plugin should define a Plugin struct which implements the TailpipePlugin interface:
type TailpipePlugin interface {
// Identifier returns the plugin name
// this must be implemented by the plugin implementation
Identifier() string
// GetSchema returns the duck DB schema for all collections
// this must be implemented by the plugin implementation
GetSchema() schema.SchemaMap
// AddObserver adda an observer to the plugin to receive status events
// this is implemented by plugin.Base and should not be overridden
AddObserver(observable.Observer) error
// Collect is called to start a collection run
// this is implemented by plugin.Base and should not be overridden
Collect(context.Context, *proto.CollectRequest) error
// Other interface functions
// Init is implemented by plugin.Base.
// If overridden by the plugin it MUST call the base version
Init(context.Context) error
// Shutdown is implemented by plugin.Base (empty implementation)
// it may be overridden by the plugin
Shutdown(context.Context) error
}
The Plugin struct should embed the plugin.Base struct, which provides a default implementation of the Observable interface,
and a lot of the underlying functionality of the plugin.
Identifier- return the plugin nameGetSchema- return the schema for all collections
Optionally, the plugin may implement the following functions:
Init- any initialisation required by the plugin. Note: if this is implemented it must callBase.Init().Shutdown- any cleanup required by the plugin. Note: if this is implemented it must callBase.Shutdown().
Example:
2.1.2 Plugin constructor
The plugin must implement a constructor function. This should be in the file <plugin_name>/ plugin.go. The plugin constructor must:
- instantiate a
Pluginstruct - register the collections which the plugin provides by calling
RegisterCollectionson the plugin struct (this is a method provided by theplugin.Basestruct). - return the
Pluginobject
Example:
In the main function of the plugin, call plugin.Serve with the plugin constructor function as an argument.
for example from aws
func main() {
err := plugin.Serve(&plugin.ServeOpts{
PluginFunc: aws.NewPlugin,
})
if err != nil {
slog.Error("Error starting plugin", "error", err)
}
}
2.2 Folder conventions
the plugin folder structure should be:
main.go
<plugin_name>/
plugin.go
<plugin_name>_collection/
<collection_name>_collection.go
<collection_name>_collection_config.go
<plugin_name>_source/
<source_name>_source.go*
<source_name>_collection_config.go*
<source_name>_mapper.go*
<plugin_name>_types/
<plugin_name>_<rowdata_type>.go
Notes:
- Files marked with an asterisk (*) are optional.
- <plugin_name> is a placeholder for the name of the plugin.
- <collection_name> is a placeholder for the name of the collection. There will be one or more collections.
- <source_name> is a placeholder for the name of the source. There will be zero or more sources defined.
For example AWS:
main.go
aws/
plugin.go
aws_collection/
cloudtrail_log_collection.go
cloudtrail_log_collection_config.go
vpc_flow_log_collection.go
vpc_flow_log_collection_config.go
aws_source/
cloudtrail_mapper.go
aws_types/
aws_cloudtrail.go
vpc_flow_log.go
vpc_flow_log_test.go
2.3 Collections
A Collection is broadly analogous to a table in steampipe. It returns a set of data which follows a specific schema.
This schema will have a number of standard fields (see GetRowSchema below) as well as fields specific to the collection.
A plugin must define at least one collection, and may define more.
2.3.1 Collection Interface
The collection must implement the collection.Collection interface:
type Collection interface {
// Observable must be implemented by collections (it is implemented by collection.Base)
observable.Observable
// Init is called when the collection created
// it is responsible for parsing the config and creating the configured Source
Init(ctx context.Context, config []byte) error
// Identifier must return the collection name
Identifier() string
// GetRowSchema returns an empty instance of the row struct returned by the collection
GetRowSchema()any
// GetConfigStruct returns an empty instance of the config struct returned by the collection
GetConfigSchema()any
// GetCollectionStateStruct returns an empty instance of the collection state data struct
// Collect is called to start collecting data,
// Collect will send enriched rows which satisfy the tailpipe row requirements (todo link/document)
Collect(context.Context, *proto.CollectRequest) error
// EnrichRow is called for each raw row of data, it must enrich the row and return it
EnrichRow(row any, sourceEnrichmentFields enrichment.SourceEnrichment) (any, error)
}
All collection implementations should embed the collection.Base struct, which provides a default implementation of the Observable interface.
It also implements the Collect function and provides a default implementation of GetCollectionStateStruct.
-
Init(MAYBE???)- Parse the config (using the
base.ParseConfigfunction) - Create the configured
Source - Any other collection specific intialisation
- Parse the config (using the
-
Identifier- Return the collection name
-
GetRowSchema- This specifies the row schema that the collection will return. This should return an empty instance of the struct that the collection will return.
-
GetConfigSchema- This specifies the schema of the config that the collection expects. This should return an empty instance of the struct that the collection expects. The struct should have HCL tags for the config fields.
Defining the Row Struct
The 'row struct' is the type returned by the collection, and they define the collection schema.
The struct definitions should be in the folder <plugin_name>_types/ in files named <plugin_name>_<rowdata_type>.go. All fields should have json tags
The row struct should embed enrichment.CommonFields to include a set of standard Tailpipe fields.
By default, the collection schema is inferred automatically be reflecting the row struct. all field names are converted to snake case and the field types are converted to the matching DuckDb types.
If the schema of particular fields needs to be customised, a parquet tag can be added to the field. This tag should contain the duckDB type of the field, and/or desired field name (it is possible to provide just one of these fields).
For example:
type MyRow struct {
*enrichment.CommonFields
// override type
Id string `json:"id" parquet:"type=UUID"`
// override name
MyField string `json:"my_field" parquet:"name=description"`
// exclude from schema
Exclude string `json:exclude" parquet:"-"`
}
Enriching the row
The primary function of the collection is to enrich/normalise the raw data returned by the source, returning a standardised row struct.
This is achieved by the EnrichRow function. This function is called for each raw row of data returned by the source.
It is expected that the collection will know (oe be able to deduce) what the format of the source data.
(This may be achieved by the plugin implementing a custom Mapper to perform the final stage of data conversion for the source.
For example, the AWS plugin uses a CloudTrailMapper to convert JSON data from CloudTrail into an AWSCloudTrailBatch and then extracts thge rows from this.)
The EnrichRow function should create an instance of the row struct, populate it with the data from the raw row, and populate whichever of the standard Tailpipe fields are available/relevant.
The standard Tailpipe fields are contained in the enrichment.CommonFields struct which must be embedded into the row struct. The following standard fields MUST be populated in the row struct:
TpIDTpConnectionTpTimestampTpYearTpMonthTpDay
The following optional enrichment fields may also be added.
TpSourceTypeTpSourceNameTpSourceLocationTpIngestTimestampTpSourceIPTpDestinationIPTpPartitionTpAkasTpIpsTpTagsTpDomainsTpEmailsTpUsernames
2.4 Sources
A Source is responsible for retrieving raw rows from their source location, and streaming them to the collection for enrichment.
The source is responsible for a combination of the following tasks:
- Locating artifacts containing log data (e.g. gz files in an S3 bucket)
- Downloading the artifact from storage (local/remote/cloud)
- Extracting the raw log rows from the artifact (this may involve extraction/mapping of the log format/location)
- Retrieving log rows from an API
- Keeping track of which log rows have been downloaded and only downloading new ones
2.4.1 RowSource Interface
The source must implement the plugin.RowSource interface:
// RowSource is the interface that represents a data source
// A number of data sources are provided by the SDK, and plugins may provide their own
// Built in data sources:
// - AWS S3 Bucket
// - API Source (this must be implemented by the plugin)
// - File Source
// - Webhook source
// Sources may be configured with data transfo
type RowSource interface {
// Observable must be implemented by row sources (it is implemented by row_source.Base)
observable.Observable
Close() error
Collect(context.Context) error
}
All RowSource implementations should embed the row_source.Base struct, which provides a default implementation of the Observable interface.
It also implements the Close function and implements raising Row events with the OnRow function.
Collect- retrieve row and (optionally) enrichment data and for each row retrieved create an
ArtifactDatastruct and raise a row event by calling theOnRowmethod, implemented by therow_source.Basestruct.
- retrieve row and (optionally) enrichment data and for each row retrieved create an
2.4.2 ArtifactRowSource
ArtifactRowSource is a RowSource implementation provided by the SDK. It is used to retrieve log data from some kind of artifact, such as a file in a local or remote file system, or an object in an object store.
The ArtifactRowSource is composable, as the same storage location may be used to store different log files in varying formats, and the source may need to be configured to know how to extract the log rows from the artifact.
The ArtifactRowSource is split into 3 parts
Responsible for locating and downloading the artifact from storage. The artifact is downloaded to a temp local location.
Artifact sources provided by the SDK:
Responsible for loading the locally downloaded artifact and potentially performing some initial processing on it. Artifact loaders provided by the SDK:
- [artifact.GzipLoader]https://github.com/turbot/tailpipe-plugin-sdk/blob/development/artifact/gzip_loader.go
- [artifact.GzipRowLoader]https://github.com/turbot/tailpipe-plugin-sdk/blob/development/artifact/gzip_row_loader.go
- [artifact.FileSystemLoader]https://github.com/turbot/tailpipe-plugin-sdk/blob/development/artifact/file_system_loader.go
- [artifact.FileSystemRowLoader]https://github.com/turbot/tailpipe-plugin-sdk/blob/development/artifact/file_system_row_loader.go
Responsible for performing additional processing on the loaded artifact to extract the log rows. (note - several mappers may be chained together) Mappers provided by the SDK
- The source discovers artifacts and raises an ArtifactDiscovered event, which is handled by the parent ArtifactRowSource.
- The ArtifactRowSource initiates the download of the artifact by calling the source's
Downloadmethod. ArtifactRowSource is responsible for managing rate limiting/parallelization - The source downloads the artifact and raises an ArtifactDownloaded event, which is handled by the parent ArtifactRowSource.
- The ArtifactRowSource tells the loader to load the artifact, passing an
ArtifactInfocontaining the local file path. - The loader loads the artifact and performs and processing it needs to and returns the result
- If any mappers are configured, they are called in turn, passing the result along
- The final result is published in a
Rowevent.
_Note: a mapper is not always necessary - sometimes the output of the loader will be raw rows. An example of this is when FlowLog collection uses the GzipExtractorSource, which simply unzips the artifact, splits it into texting and passes the raw rows to the collection.
Examples:
CloudTrail local file gzipped logs
- source: FileSystemSource
- loader: GzipExtractorSource
- mapper: CloudTrailMapper
CloudTrail s3 bucket gzipped logs
- source: S3BucketArtifactSource
- loader: GzipLoader
- mapper: CloudTrailMapper
VPC FlowLog local file gzipped logs
- source: FileSystemSource
- loader: GzipRowLoader
2.4.3 Custom Row Sources
For log sources that are accessed via an API, the plugin may define a custom which has specific knowledge of the API and credentials and directly fetches log items from the API.
The source would be responsible for:
- managing the credentials (using the configured connection)
- maintaining state of the most recent log item fetched so only new items are fetched
- applying source filtering of fetched items as specified by the collection/source config
- streaming the log items to the collection for enrichment
3 Technical Details
Plugin GRPC Interface
AddObserver
Returns a stream used by the plugin to send status/progress events
Collect
-
Tell the plugin to start collection
-
NOTE: The plugin will execute collection asyncronously, i.e. this call will return immediately and the collection status will be updated via the event streeam returned from
AddObserver -
The plugin sends an event on start and completions (progress events tbd)
-
The plugin will rows in chunks to JSONL files in the specified directory. The filename will start with the execution id and end with a sequence number
-
the complete event will contain the number of files written - then when collection is complete the plugin manager will ensure it has processed all files for that execution id)