Documentation
¶
Overview ¶
Plugin is the top-level package of the Steampipe plugin SDK. It provides data structures and functions that enable a plugin to read data from an API and stream it into Postgres tables by way of Steampipe's foreign data wrapper (FDW).
Flow of execution ¶
- A user runs a steampipe query against the database.
- Postgres parses the query and sends the parsed request to the FDW.
- The FDW determines which tables and columns are required.
- The FDW calls the appropriate HydrateFunc in the plugin; these functions fetch data from the APIs.
- Each table defines two special hydrate functions, `List` and `Get`, defined by plugin.ListConfig and plugin.GetConfig. The `List` or `Get` will always be called before any other hydrate function in the table, as the other functions typically depend on the result of the Get or List call.
- The transform functions are called for each column. These extract and/or reformat data returned by the hydrate functions.
- The plugin returns the transformed data to the FDW.
- Steampipe FDW returns the results to the database.
main.go ¶
The `main` function in `main.go` is the entry point for your plugin. This function must call plugin.Serve to instantiate your plugin's gRPC server.
package main
import (
"github.com/turbot/steampipe-plugin-aws/aws"
"github.com/turbot/steampipe-plugin-sdk/v4/plugin"
)
func main() {
plugin.Serve(&plugin.ServeOpts{
PluginFunc: aws.Plugin})
}
Examples:
plugin.go ¶
`plugin.go` implements a function that returns a pointer to the plugin.Plugin loaded by Steampipe's plugin manager.
By convention, the package name for your plugin should be the same name as your plugin, and the Go files for your plugin (except `main.go`) should reside in a folder with the same name.
Plugin definition ¶
Use plugin.Plugin to define a plugin.
package zendesk
import (
"context"
"github.com/turbot/steampipe-plugin-sdk/plugin"
"github.com/turbot/steampipe-plugin-sdk/plugin/transform"
)
func Plugin(ctx context.Context) *plugin.Plugin {
p := &plugin.Plugin{
Name: "steampipe-plugin-zendesk",
DefaultTransform: transform.FromGo().NullIfZero(),
TableMap: map[string]*plugin.Table{
"zendesk_brand": tableZendeskBrand(),
"zendesk_group": tableZendeskGroup(),
"zendesk_organization": tableZendeskOrganization(),
"zendesk_search": tableZendeskSearch(),
"zendesk_ticket": tableZendeskTicket(),
"zendesk_ticket_audit": tableZendeskTicketAudit(),
"zendesk_trigger": tableZendeskTrigger(),
"zendesk_user": tableZendeskUser(),
},
}
return p
}
Examples:
Table definition ¶
By convention, each table should be implemented in a separate file named `table_{table name}.go`. Each table will have a single table definition function that returns a pointer to a plugin.Table.
The table definition includes the name and description of the table, a list of column definitions, and the functions to call in order to list the data for all the rows, or to get data for a single row.
package zendesk
import (
"context"
"github.com/nukosuke/go-zendesk/zendesk"
"github.com/turbot/steampipe-plugin-sdk/v4/grpc/proto"
"github.com/turbot/steampipe-plugin-sdk/v4/plugin"
"github.com/turbot/steampipe-plugin-sdk/v4/plugin/transform"
)
func tableZendeskTicket() *plugin.Table {
return &plugin.Table{
Name: "zendesk_ticket",
Description: "Tickets are the means through which your end users (customers) communicate with agents in Zendesk Support.",
List: &plugin.ListConfig{
Hydrate: listTicket,
},
Get: &plugin.GetConfig{
KeyColumns: plugin.SingleColumn("id"),
Hydrate: getTicket,
},
Columns: []*plugin.Column{
{
Name: "allow_attachments",
Type: proto.ColumnType_BOOL,
Description: "Permission for agents to add add attachments to a comment. Defaults to true"
},
...
{
Name: "via_source_ref",
Type: proto.ColumnType_STRING,
Transform: transform.FromField("Via.Source.Ref"),
Description: "Medium used to raise the ticket"
},
},
}
}
Examples:
Column definition ¶
Hydrate functions ¶
A HydrateFunc calls an API and returns data.
List Config ¶
Get Config ¶
Hydrate dependencies ¶
Steampipe parallelizes hydrate functions as much as possible. Sometimes, however, one hydrate function requires the output from another. You can define plugin.HydrateDependencies for this case:
return &plugin.Table{
Name: "hydrate_columns_dependency",
List: &plugin.ListConfig{
Hydrate: hydrateList,
},
HydrateDependencies: []plugin.HydrateDependencies{
{
Func: hydrate2,
Depends: []plugin.HydrateFunc{hydrate1},
},
},
Columns: []*plugin.Column{
{Name: "id", Type: proto.ColumnType_INT},
{Name: "hydrate_column_1", Type: proto.ColumnType_STRING, Hydrate: hydrate1},
{Name: "hydrate_column_2", Type: proto.ColumnType_STRING, Hydrate: hydrate2},
},
}
Here, hydrate function `hydrate2` is dependent on `hydrate1`. This means `hydrate2` will not execute until `hydrate1` has completed and the results are available. `hydrate2` can refer to the results from `hydrate1` as follows:
func hydrate2(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error) {
// NOTE: in this case we know the output of hydrate1 is map[string]interface{} so we cast it accordingly.
// the data should be cast to th appropriate type
hydrate1Results := h.HydrateResults["hydrate1"].(map[string]interface{})
.....
}
Note that:
- Multiple dependencies are supported.
- Circular dependencies will be detected and cause a validation failure.
- The `Get` and `List` hydrate functions ***CANNOT*** have dependencies.
Transform functions ¶
These extract and/or reformat data returned by the hydrate functions. See transform.
Dynamic tables ¶
If plugin.SchemaMode is set to `dynamic`, every time Steampipe starts the plugin's schema will be checked for any changes since the last time it loaded, and re-import the schema if it detects any.
Dynamic tables are useful when you are building a plugin whose schema is not known at compile time; instead, its schema will be generated at runtime. For instance, a plugin with dynamic tables is useful if you want to load CSV files as tables from one or more directories. Each of these CSV files may have different column structures, resulting in a different structure for each table.
In order to create a dynamic table, plugin.TableMapFunc should call a function that returns `map[string]*plugin.Table`.
func Plugin(ctx context.Context) *plugin.Plugin {
p := &plugin.Plugin{
Name: "steampipe-plugin-csv",
ConnectionConfigSchema: &plugin.ConnectionConfigSchema{
NewInstance: ConfigInstance,
Schema: ConfigSchema,
},
DefaultTransform: transform.FromGo().NullIfZero(),
SchemaMode: plugin.SchemaModeDynamic,
TableMapFunc: PluginTables,
}
return p
}
func PluginTables(ctx context.Context, p *plugin.Plugin) (map[string]*plugin.Table, error) {
// Initialize tables
tables := map[string]*plugin.Table{}
// Search for CSV files to create as tables
paths, err := csvList(ctx, p)
if err != nil {
return nil, err
}
for _, i := range paths {
tableCtx := context.WithValue(ctx, "path", i)
base := filepath.Base(i)
// tableCSV returns a *plugin.Table type
tables[base[0:len(base)-len(filepath.Ext(base))]] = tableCSV(tableCtx, p)
}
return tables, nil
}
The `tableCSV` function mentioned in the example above looks for all CSV files in the configured paths, and for each one, builds a `*plugin.Table` type:
func tableCSV(ctx context.Context, p *plugin.Plugin) *plugin.Table {
path := ctx.Value("path").(string)
csvFile, err := os.Open(path)
if err != nil {
plugin.Logger(ctx).Error("Could not open CSV file", "path", path)
panic(err)
}
r := csv.NewReader(csvFile)
csvConfig := GetConfig(p.Connection)
if csvConfig.Separator != nil && *csvConfig.Separator != "" {
r.Comma = rune((*csvConfig.Separator)[0])
}
if csvConfig.Comment != nil {
if *csvConfig.Comment == "" {
// Disable comments
r.Comment = 0
} else {
// Set the comment character
r.Comment = rune((*csvConfig.Comment)[0])
}
}
// Read the header to peak at the column names
header, err := r.Read()
if err != nil {
plugin.Logger(ctx).Error("Error parsing CSV header:", "path", path, "header", header, "err", err)
panic(err)
}
cols := []*plugin.Column{}
for idx, i := range header {
cols = append(cols, &plugin.Column{
Name: i,
Type: proto.ColumnType_STRING,
Transform: transform.FromField(i),
Description: fmt.Sprintf("Field %d.", idx)
})
}
return &plugin.Table{
Name: path,
Description: fmt.Sprintf("CSV file at %s", path),
List: &plugin.ListConfig{
Hydrate: listCSVWithPath(path),
},
Columns: cols,
}
}
The end result is that, when using the CSV plugin, whenever Steampipe starts it will check for any new, deleted, and modified CSV files in the configured `paths` and create any discovered CSVs as tables. The CSV filenames are turned directly into table names.
For more information on how the CSV plugin can be queried as a result of being a dynamic table, please see https://hub.steampipe.io/plugins/turbot/csv/tables/%7Bcsv_filename%7D
Logging ¶
A logger is passed to the plugin via the context. You can use the logger to write messages to the log at standard log levels:
logger := plugin.Logger(ctx)
logger.Info("Log message and a variable", myVariable)
The plugin logs do not currently get written to the console, but are written to the plugin logs at `~/.steampipe/logs/plugin-YYYY-MM-DD.log`, e.g., `~/.steampipe/logs/plugin-2022-01-01.log`.
Steampipe uses https://github.com/hashicorp/go-hclog hclog, which uses standard log levels (`TRACE`, `DEBUG`, `INFO`, `WARN`, `ERROR`). By default, the log level is `WARN`. You set it using the `STEAMPIPE_LOG_LEVEL` environment variable:
export STEAMPIPE_LOG_LEVEL=TRACE
Index ¶
- Constants
- func DiagsToError(prefix string, diags hcl.Diagnostics) error
- func GetFreeMemInterval() int64
- func GetMatrixItem(ctx context.Context) map[string]interface{}deprecated
- func GetMaxMemoryBytes() int64
- func IsCancelled(ctx context.Context) bool
- func Logger(ctx context.Context) hclog.Logger
- func RetryHydrate(ctx context.Context, d *QueryData, hydrateData *HydrateData, ...) (hydrateResult interface{}, err error)
- func Serve(opts *ServeOpts)
- type Column
- type Connection
- type ConnectionConfigInstanceFunc
- type ConnectionConfigSchema
- type ConnectionData
- type CreatePlugin
- type DefaultConcurrencyConfig
- type ErrorPredicate
- type ErrorPredicateWithContext
- type GetConfig
- type HydrateConfig
- type HydrateData
- type HydrateDependenciesdeprecated
- type HydrateFunc
- type IgnoreConfig
- type KeyColumn
- type KeyColumnEqualsQualMap
- type KeyColumnQualMap
- func (m KeyColumnQualMap) GetListQualValues() quals.QualSlice
- func (m KeyColumnQualMap) GetUnsatisfiedKeyColumns(columns KeyColumnSlice) KeyColumnSlice
- func (m KeyColumnQualMap) String() string
- func (m KeyColumnQualMap) ToEqualsQualValueMap() map[string]*proto.QualValue
- func (m KeyColumnQualMap) ToProtoQualMap() map[string]*proto.Quals
- func (m KeyColumnQualMap) ToQualMap() map[string]quals.QualSlice
- type KeyColumnQuals
- type KeyColumnSlice
- func (k KeyColumnSlice) AllEquals() bool
- func (k *KeyColumnSlice) Find(name string) *KeyColumn
- func (k KeyColumnSlice) IsAnyOf() bool
- func (k KeyColumnSlice) String() string
- func (k KeyColumnSlice) StringSlice() []string
- func (k KeyColumnSlice) ToProtobuf() []*proto.KeyColumn
- func (k KeyColumnSlice) Validate() []string
- type ListConfig
- type MatrixItemFunc
- type MatrixItemMapFunc
- type NewPluginOptions
- type Plugin
- func (p *Plugin) ClearConnectionCache(ctx context.Context, connectionName string)
- func (p *Plugin) ClearQueryCache(ctx context.Context, connectionName string)
- func (p *Plugin) Execute(req *proto.ExecuteRequest, stream proto.WrapperPlugin_ExecuteServer) (err error)
- func (p *Plugin) GetSchema(connectionName string) (*grpc.PluginSchema, error)
- func (p *Plugin) SetAllConnectionConfigs(configs []*proto.ConnectionConfig, maxCacheSizeMb int) (err error)
- func (p *Plugin) SetConnectionConfig(connectionName, connectionConfigString string) (err error)
- func (p *Plugin) UpdateConnectionConfigs(added []*proto.ConnectionConfig, deleted []*proto.ConnectionConfig, ...) error
- func (p *Plugin) Validate() string
- type PluginFunc
- type QueryColumn
- type QueryContext
- type QueryData
- type QueryStatus
- type RetryConfig
- type RowData
- type ServeOpts
- type Table
- type TableCacheOptions
- type TableMapFunc
Constants ¶
const ( // Require values Required = "required" Optional = "optional" AnyOf = "any_of" )
const ( SchemaModeStatic = "static" SchemaModeDynamic = "dynamic" )
const ContextColumnName = "_ctx"
Variables ¶
This section is empty.
Functions ¶
func DiagsToError ¶
func DiagsToError(prefix string, diags hcl.Diagnostics) error
DiagsToError converts hcl diags into an error
func GetFreeMemInterval ¶
func GetFreeMemInterval() int64
func GetMatrixItem
deprecated
Deprecated: Please use plugin.Table.GetMatrixItemFunc instead.
func GetMaxMemoryBytes ¶
func GetMaxMemoryBytes() int64
func IsCancelled ¶ added in v4.1.7
A helper function that returns whether the context has been cancelled.
To check if the context has been cancelled:
for _, i := range items {
d.StreamListItem(ctx, i)
if plugin.IsCancelled(ctx) {
return nil, nil
}
}
Plugin examples:
func Logger ¶
Logger retrieves the hclog.Logger from the context.
Usage:
plugin.Logger(ctx).Trace("Code execution starts here")
plugin.Logger(ctx).Error("hackernews_item.itemList", "query_error", err)
plugin.Logger(ctx).Warn("getDomain", "invalid_name", err, "query_response", resp)
plugin.Logger(ctx).Info("listGreeting", "number", i)
func RetryHydrate ¶
func RetryHydrate(ctx context.Context, d *QueryData, hydrateData *HydrateData, hydrateFunc HydrateFunc, retryConfig *RetryConfig) (hydrateResult interface{}, err error)
RetryHydrate function invokes the hydrate function with retryable errors and retries the function until the maximum attempts before throwing error
Types ¶
type Column ¶
type Column struct {
// column name
Name string
// column type
Type proto.ColumnType
// column description
Description string
// explicitly specify the function which populates this data
// - this is only needed if any of the default hydrate functions wil NOT return this column
Hydrate HydrateFunc
// the default column value
Default interface{}
// a list of transforms to generate the column value
Transform *transform.ColumnTransforms
}
Column defines a column of a table.
A column may be populated by a List or Get call. It may alternatively define its own HydrateFunc that makes an additional API call for each row.
A column may transform the data it receives using one or more transform functions.
To define a column populated by a List or Get call:
func itemCols() []*plugin.Column {
return []*plugin.Column{
{Name: "id", Type: proto.ColumnType_INT, Description: "The item's unique id."},
}
}
To define a column populated by a HydrateFunc:
Columns: awsColumns([]*plugin.Column{
{
Name: "permissions_boundary_arn",
Description: "The ARN of the policy used to set the permissions boundary for the user.",
Type: proto.ColumnType_STRING,
Hydrate: getAwsIamUserData,
},
}
To define columns that transform the data:
Columns: awsColumns([]*plugin.Column{
{
Name: "mfa_enabled",
Description: "The MFA status of the user.",
Type: proto.ColumnType_BOOL,
Hydrate: getAwsIamUserMfaDevices,
Transform: transform.From(handleEmptyUserMfaStatus),
},
{
Name: "login_profile",
Description: "Contains the user name and password create date for a user.",
Type: proto.ColumnType_JSON,
Hydrate: getAwsIamUserLoginProfile,
Transform: transform.FromValue(),
},
...
}
Examples:
type Connection ¶
type Connection struct {
Name string
// the connection config
// NOTE: we always pass and store connection config BY VALUE
Config interface{}
}
Connection is a struct which is used to store connection config.
The connection config is parsed and stored as plugin.Plugin.Connection. The connection may be retrieved the plugin by calling: plugin.QueryData.Connection ` Example from [hackernews] [hackernews]: https://github.com/turbot/steampipe-plugin-hackernews/blob/d14efdd3f2630f0146e575fe07666eda4e126721/hackernews/connection_config.go#L23
type ConnectionConfigInstanceFunc ¶
type ConnectionConfigInstanceFunc func() interface{}
ConnectionConfigInstanceFunc is a function type which returns 'any'.
It is used to implement plugin.ConnectionConfigSchema.NewInstance.
type ConnectionConfigSchema ¶
type ConnectionConfigSchema struct {
Schema map[string]*schema.Attribute
// function which returns an instance of a connection config struct
NewInstance ConnectionConfigInstanceFunc
}
ConnectionConfigSchema is a struct that defines custom arguments in the plugin spc file that are passed to the plugin as plugin.Connection.Config.
A plugin that uses custom connection config must set plugin.Plugin.ConnectionConfigSchema.
Usage:
p := &plugin.Plugin{
Name: "steampipe-plugin-hackernews",
ConnectionConfigSchema: &plugin.ConnectionConfigSchema{
NewInstance: ConfigInstance,
Schema: ConfigSchema,
},
...
}
var ConfigSchema = map[string]*schema.Attribute{
"max_items": {
Type: schema.TypeInt,
},
}
func ConfigInstance() interface{} {
return &hackernewsConfig{}
}
Plugin examples:
func (*ConnectionConfigSchema) Validate ¶
func (c *ConnectionConfigSchema) Validate() []string
Validate validates the connection config
type ConnectionData ¶
type ConnectionData struct {
// TableMap is a map of all the tables in the plugin, keyed by the table name
TableMap map[string]*Table
// connection this plugin is instantiated for
Connection *Connection
// schema - this may be connection specific for dynamic schemas
Schema map[string]*proto.TableSchema
}
ConnectionData is the data stored by the plugin which is connection dependent
type DefaultConcurrencyConfig ¶
type DefaultConcurrencyConfig struct {
// sets how many HydrateFunc calls can run concurrently in total
TotalMaxConcurrency int
// sets the default for how many calls to each HydrateFunc can run concurrently
DefaultMaxConcurrency int
}
DefaultConcurrencyConfig sets the default maximum number of concurrent HydrateFunc calls.
Limit total concurrent hydrate calls:
DefaultConcurrency: &plugin.DefaultConcurrencyConfig{
TotalMaxConcurrency: 500,
}
Limit concurrent hydrate calls to any single HydrateFunc which does not have a HydrateConfig:
DefaultConcurrency: &plugin.DefaultConcurrencyConfig{
DefaultMaxConcurrency: 100,
}
Do both:
DefaultConcurrency: &plugin.DefaultConcurrencyConfig{
TotalMaxConcurrency: 500,
DefaultMaxConcurrency: 200,
}
Plugin examples:
type ErrorPredicate ¶
type GetConfig ¶
type GetConfig struct {
// key or keys which are used to uniquely identify rows - used to determine whether a query is a 'get' call
KeyColumns KeyColumnSlice
// the hydrate function which is called first when performing a 'get' call.
// if this returns 'not found', no further hydrate functions are called
Hydrate HydrateFunc
// a function which will return whenther to ignore a given error
// deprecated - use IgnoreConfig
ShouldIgnoreError ErrorPredicate
IgnoreConfig *IgnoreConfig
RetryConfig *RetryConfig
// max concurrency - this applies when the get function is ALSO used as a column hydrate function
MaxConcurrency int
}
A GetConfig defines how to get a single row of a table:
columns that uniquely identify a row: plugin.GetConfig.KeyColumns
which errors to ignore: plugin.GetConfig.IgnoreConfig
which errors to retry: plugin.GetConfig.RetryConfig
how many concurrent HydrateFunc calls to allow: plugin.GetConfig.MaxConcurrency
Usage ¶
A GetConfig with KeyColumns:
Get: &plugin.GetConfig{
KeyColumns: plugin.SingleColumn("id"),
Hydrate: getItem,
}
A GetConfig with IgnoreConfig:
Get: &plugin.GetConfig{
KeyColumns: plugin.SingleColumn("id"),
Hydrate: getItem,
IgnoreConfig: &plugin.IgnoreConfig{ShouldIgnoreErrorFunc: shouldIgnoreError},
}
A GetConfig with RetryConfig:
Get: &plugin.GetConfig{
KeyColumns: plugin.SingleColumn("id"),
Hydrate: getItem,
RetryConfig: &plugin.RetryConfig{
ShouldRetryErrorFunc: shouldRetryError,
},
}
A GetConfig with all fields specified:
Get: &plugin.GetConfig{
KeyColumns: plugin.SingleColumn("id"),
Hydrate: getItem,
RetryConfig: &plugin.RetryConfig{
ShouldRetryErrorFunc: shouldRetryError,
},
IgnoreConfig: &plugin.IgnoreConfig{ShouldIgnoreErrorFunc: shouldIgnoreError},
MaxConcurrency: 50,
}
Plugin examples:
type HydrateConfig ¶
type HydrateConfig struct {
Func HydrateFunc
MaxConcurrency int
RetryConfig *RetryConfig
IgnoreConfig *IgnoreConfig
// deprecated - use IgnoreConfig
ShouldIgnoreError ErrorPredicate
Depends []HydrateFunc
}
HydrateConfig defines how to run a HydrateFunc:
which errors to ignore: plugin.HydrateConfig.IgnoreConfig
which errors to retry: plugin.HydrateConfig.RetryConfig
how many concurrent calls to allow: plugin.HydrateConfig.MaxConcurrency
which hydrate calls must complete before this HydrateFunc can start: plugin.HydrateConfig.Depends
It's not valid to have a HydrateConfig for a HydrateFunc that is specified in a GetConfig.
Usage ¶
A HydrateConfig with IgnoreConfig:
HydrateConfig: []plugin.HydrateConfig{
{
Func: getRetentionPeriod,
IgnoreConfig: &plugin.IgnoreConfig{ShouldIgnoreErrorFunc: shouldIgnoreError},
}
A HydrateConfig with MaxConcurrency:
HydrateConfig: []plugin.HydrateConfig{
{
Func: getRetentionPeriod,
MaxConcurrency: 50,
IgnoreConfig: &plugin.IgnoreConfig{ShouldIgnoreErrorFunc: shouldIgnoreError},
}
A HydrateConfig with all fields specified:
HydrateConfig: []plugin.HydrateConfig{
{
Func: getRetentionPeriod,
MaxConcurrency: 50,
IgnoreConfig: &plugin.IgnoreConfig{ShouldIgnoreErrorFunc: shouldIgnoreError},
RetryConfig: &plugin.RetryConfig{
ShouldRetryErrorFunc: shouldRetryError,
},
}
Plugin examples:
func (*HydrateConfig) String ¶
func (c *HydrateConfig) String() interface{}
func (*HydrateConfig) Validate ¶
func (c *HydrateConfig) Validate(table *Table) []string
type HydrateData ¶
type HydrateData struct {
// if there was a parent-child list call, store the parent list item
ParentItem interface{}
Item interface{}
HydrateResults map[string]interface{}
}
HydrateData contains the input data passed to every hydrate function
type HydrateDependencies
deprecated
type HydrateDependencies struct {
Func HydrateFunc
Depends []HydrateFunc
}
Deprecated: Use HydrateConfig instead.
type HydrateFunc ¶
type HydrateFunc func(context.Context, *QueryData, *HydrateData) (interface{}, error)
HydrateFunc is a function which retrieves some or all row data for a single row item.
func WrapHydrate ¶
func WrapHydrate(hydrateFunc HydrateFunc, ignoreConfig *IgnoreConfig) HydrateFunc
WrapHydrate is a higher order function which returns a HydrateFunc which handles Ignorable errors
func (HydrateFunc) WithCache ¶
func (hydrate HydrateFunc) WithCache(args ...HydrateFunc) HydrateFunc
WithCache ensures the HydrateFunc results are saved in the [connection.ConnectionCache].
Use it to reduce the number of API calls if the HydrateFunc is used by multiple tables.
Usage ¶
{
Name: "account",
Type: proto.ColumnType_STRING,
Hydrate: plugin.HydrateFunc(getCommonColumns).WithCache(),
Description: "The Snowflake account ID.",
Transform: transform.FromCamel(),
}
Plugin examples:
type IgnoreConfig ¶
type IgnoreConfig struct {
ShouldIgnoreErrorFunc ErrorPredicateWithContext
// deprecated, used ShouldIgnoreErrorFunc
ShouldIgnoreError ErrorPredicate
}
IgnoreConfig defines errors to ignore. When that happens, an empty row is returned.
If a HydrateFunc has specific errors that should not block query execution, set plugin.GetConfig.IgnoreConfig, plugin.ListConfig.IgnoreConfig or plugin.HydrateConfig.IgnoreConfig.
For errors common to many HydrateFuncs, you can define a default IgnoreConfig by setting plugin.DefaultGetConfig.
Ignore errors from a HydrateFunc that has a GetConfig:
Get: &plugin.GetConfig{
IgnoreConfig: &plugin.IgnoreConfig{
ShouldIgnoreErrorFunc: isIgnorableErrorPredicate([]string{"Request_ResourceNotFound", "Invalid object identifier"}),
},
...
},
Ignore errors from a HydrateFunc that has a ListConfig:
List: &plugin.ListConfig{
IgnoreConfig: &plugin.IgnoreConfig{
ShouldIgnoreErrorFunc: isIgnorableErrorPredicate([]string{"Request_UnsupportedQuery"}),
},
...
},
Ignore errors from a HydrateFunc that has a HydrateConfig:
HydrateConfig: []plugin.HydrateConfig{
IgnoreConfig: &plugin.IgnoreConfig{
ShouldIgnoreErrorFunc: isIgnorableErrorPredicate([]string{"Request_UnsupportedQuery"}),
},
...
},
Ignore errors that may occur in many HydrateFuncs:
DefaultIgnoreConfig: &plugin.DefaultIgnoreConfig{
IgnoreConfig: &plugin.IgnoreConfig{
ShouldIgnoreErrorFunc: isIgnorableErrorPredicate([]string{"Request_ResourceNotFound"}),
},
...
},
Plugin examples:
func (*IgnoreConfig) DefaultTo ¶
func (c *IgnoreConfig) DefaultTo(other *IgnoreConfig)
func (*IgnoreConfig) String ¶
func (c *IgnoreConfig) String() interface{}
type KeyColumn ¶
KeyColumn is a struct representing the definition of a column used to filter and Get and List calls.
At least one key column must be defined for a Get call. They are optional for List calls.
Operators ¶
This property specifies the accepted operators (from a possible set: "=", "<>", "<", "<=", ">", ">=")
Require ¶
This property determines whether the column is required or optional. Possible values:
"required"
The key column must be provided as a query qualifier (i.e. in a where clause in the query).
"optional"
The key column is optional but if provided it will be used to filter the results.
"any_of"
Any one of the given columns must be provided.
CacheMatch ¶
This property determines the logic used by the query results cache to determine whether a cached value matches a given query. Possible values:
"subset" [default value]
A cached item is considered a match (i.e. a cache hit) if the qual for the query is a subset of the quals for the cached item.
For example, is the cached qual is "val < 100", and the query qual is "val < 50", this would be considered a qual subset so would lead to a cache match
"exact"
A cached item is considered a match ONLY if the qual for the cached item is the same as as the qual for the query.
This is used for columns which are only populated if the qual for that column is passed. A common pattern is to provide a "filter" column, which is populated using the qual value provided. This filter value is used when making the API call the fetch the data. If no filter qual is provided, then the filter column returned by the plugin is empty.
This breaks the subset logic as if there is a cached data with no qual for the filter column, this cached data would contain null values for the filter column. This data would be considered a superset of the data returned from a query which provides a filter qual, which is incorrect as the data returned if a filter qual is passed would include a non null filter column
The solution is to set CacheMatch="exact"
func (*KeyColumn) InitialiseOperators ¶
func (k *KeyColumn) InitialiseOperators()
InitialiseOperators adds a default '=' operator is no operators are set, and converts "!=" to "<>".
func (*KeyColumn) SingleEqualsQual ¶
SingleEqualsQual returns whether this key column has a single = operator.
func (*KeyColumn) ToProtobuf ¶
ToProtobuf converts the KeyColumn to a protobuf object.
type KeyColumnEqualsQualMap ¶
KeyColumnEqualsQualMap is a map of column name to qual value, used to represent a map of any equals quals
func (KeyColumnEqualsQualMap) GetListQualValues ¶
func (m KeyColumnEqualsQualMap) GetListQualValues() map[string]*proto.QualValueList
GetListQualValues returns a map of all qual values with a List value
func (KeyColumnEqualsQualMap) String ¶
func (m KeyColumnEqualsQualMap) String() string
type KeyColumnQualMap ¶
type KeyColumnQualMap map[string]*KeyColumnQuals
KeyColumnQualMap is a map of KeyColumnQuals keyed by column name
func NewKeyColumnQualValueMap ¶
func NewKeyColumnQualValueMap(qualMap map[string]*proto.Quals, keyColumns KeyColumnSlice) KeyColumnQualMap
NewKeyColumnQualValueMap creates a KeyColumnQualMap from a qual map and a KeyColumnSlice
func (KeyColumnQualMap) GetListQualValues ¶
func (m KeyColumnQualMap) GetListQualValues() quals.QualSlice
GetListQualValues returns a slice of any quals we have which have a list value
func (KeyColumnQualMap) GetUnsatisfiedKeyColumns ¶
func (m KeyColumnQualMap) GetUnsatisfiedKeyColumns(columns KeyColumnSlice) KeyColumnSlice
func (KeyColumnQualMap) String ¶
func (m KeyColumnQualMap) String() string
func (KeyColumnQualMap) ToEqualsQualValueMap ¶
func (m KeyColumnQualMap) ToEqualsQualValueMap() map[string]*proto.QualValue
ToEqualsQualValueMap converts a KeyColumnQualMap to a column-qual value map, including only the
func (KeyColumnQualMap) ToProtoQualMap ¶
func (m KeyColumnQualMap) ToProtoQualMap() map[string]*proto.Quals
ToQualMap converts the map into a map of column to *proto.Quals used for cache indexes
type KeyColumnQuals ¶
KeyColumnQuals defines all qualifiers for a column.
Use it in a table definition, by way of the plugin.QueryData object.
The query writer must specify the qualifiers, in a WHERE or JOIN..ON clause, in order to limit the number of API calls that Steampipe makes to satisfy the query.
func listUser(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error) {
var item User
var id string
if h.Item != nil {
user := h.Item.(*User)
id = user.ID
} else {
quals := d.KeyColumnQuals
id = quals["id"].GetStringValue()
}
...
}
Examples:
func (KeyColumnQuals) SatisfiesKeyColumn ¶
func (k KeyColumnQuals) SatisfiesKeyColumn(keyColumn *KeyColumn) bool
func (KeyColumnQuals) SingleEqualsQual ¶
func (k KeyColumnQuals) SingleEqualsQual() bool
type KeyColumnSlice ¶
type KeyColumnSlice []*KeyColumn
func AllColumns ¶
func AllColumns(columns []string) KeyColumnSlice
AllColumns creates a KeyColumnSlice based on a slice of column names, each with a single equals operator and Require=Required
func AnyColumn ¶
func AnyColumn(columns []string) KeyColumnSlice
AnyColumn Columns creates a KeyColumnSlice based on a slice of column names, each with a single equals operator and Require=AnyOf
func NewEqualsKeyColumnSlice ¶
func NewEqualsKeyColumnSlice(columns []string, require string) KeyColumnSlice
NewEqualsKeyColumnSlice creates a KeyColumnSlice from a list of column names, each with a single equals operator
func OptionalColumns ¶
func OptionalColumns(columns []string) KeyColumnSlice
OptionalColumns Columns creates a KeyColumnSlice based on a slice of column names, with a single equals operator and Require=Optional
func SingleColumn ¶
func SingleColumn(column string) KeyColumnSlice
SingleColumn creates a KeyColumnSlice based on a column name The created slice has a single KeyColumn using a single equals operator and Require=Required
func (KeyColumnSlice) AllEquals ¶
func (k KeyColumnSlice) AllEquals() bool
AllEquals returns whether all KeyColumns only use equals operators
func (*KeyColumnSlice) Find ¶
func (k *KeyColumnSlice) Find(name string) *KeyColumn
Find looks for a key column with the given name and returns it if found
func (KeyColumnSlice) IsAnyOf ¶
func (k KeyColumnSlice) IsAnyOf() bool
IsAnyOf returns whether all key columns have Require == AnyOf
func (KeyColumnSlice) String ¶
func (k KeyColumnSlice) String() string
func (KeyColumnSlice) StringSlice ¶
func (k KeyColumnSlice) StringSlice() []string
StringSlice converts a KeyColumnSlice to a slice of strings
func (KeyColumnSlice) ToProtobuf ¶
func (k KeyColumnSlice) ToProtobuf() []*proto.KeyColumn
ToProtobuf converts the KeyColumnSlice to a slice of protobuf KeyColumns
func (KeyColumnSlice) Validate ¶
func (k KeyColumnSlice) Validate() []string
Validate validates all child columns
type ListConfig ¶
type ListConfig struct {
KeyColumns KeyColumnSlice
// the list function, this should stream the list results back using the QueryData object and return nil
Hydrate HydrateFunc
// the parent list function - if we list items with a parent-child relationship, this will list the parent items
ParentHydrate HydrateFunc
// deprecated - use IgnoreConfig
ShouldIgnoreError ErrorPredicate
IgnoreConfig *IgnoreConfig
RetryConfig *RetryConfig
}
ListConfig defines the function that lists all rows of a table, the KeyColumns that may be used to optimize the fetch, and the error-handling behavior.
To define a table's List function:
func tableHackernewsItem(ctx context.Context) *plugin.Table {
return &plugin.Table{
Name: "hackernews_item",
Description: "This table includes the most recent items posted to Hacker News.",
List: &plugin.ListConfig{
Hydrate: itemList,
},
...
}
}
Examples:
func (*ListConfig) Validate ¶
func (c *ListConfig) Validate(table *Table) []string
type MatrixItemFunc ¶
type MatrixItemFunc func(context.Context, *Connection) []map[string]interface{}
deprecated
type MatrixItemMapFunc ¶
type NewPluginOptions ¶
type Plugin ¶
type Plugin struct {
Name string
Logger hclog.Logger
// TableMap is a map of all the tables in the plugin, keyed by the table name
// NOTE: it must be NULL for plugins with dynamic schema
TableMap map[string]*Table
TableMapFunc TableMapFunc
DefaultTransform *transform.ColumnTransforms
DefaultConcurrency *DefaultConcurrencyConfig
DefaultRetryConfig *RetryConfig
DefaultIgnoreConfig *IgnoreConfig
// deprecated - use DefaultRetryConfig and DefaultIgnoreConfig
DefaultGetConfig *GetConfig
// deprecated - use DefaultIgnoreConfig
DefaultShouldIgnoreError ErrorPredicate
// every table must implement these columns
RequiredColumns []*Column
ConnectionConfigSchema *ConnectionConfigSchema
// ConnectionConfigChangedFunc is a callback function which is called from UpdateConnectionConfigs
// when any connection configs have changed
ConnectionConfigChangedFunc func(ctx context.Context, p *Plugin, old, new *Connection) error
// map of connection data (schema, config, connection cache)
// keyed by connection name
ConnectionMap map[string]*ConnectionData
// is this a static or dynamic schema
SchemaMode string
// contains filtered or unexported fields
}
Plugin is the primary struct that defines a plugin.
The plugin name is defined using plugin.Plugin.Name.
The tables provided by the plugin are defined by either setting plugin.Plugin.TableMap or plugin.Plugin.TableMapFunc:
For most plugins, with a static set of tables, use the TableMap property
For dynamic plugins plugin.Flow_of_execution, use plugin.Plugin.TableMapFunc. Also, plugin.Plugin.SchemaMode must be set to “dynamic“ .
[plugin#Flow_of_execution]
[plugin#hdr-Flow_of_execution]
[plugin.hdr-Flow_of_execution]
If the plugin uses custom connection config, it must define a plugin.ConnectionConfigSchema,
Various default behaviours can be defined:
a default transform which will be applied to all columns which do not specify a transform. (plugin.Plugin.DefaultTransform).
the default HydrateFunc concurrency limits (plugin.Plugin.DefaultConcurrency).
the default error retry and ignore behaviour plugin.Plugin.DefaultRetryConfig plugin.Plugin.IgnoreConfig
Required columns can be specified by setting plugin.Plugin.RequiredColumns
Plugin examples:
func (*Plugin) ClearConnectionCache ¶ added in v4.1.0
ClearConnectionCache clears the connection cache for the given connection
func (*Plugin) ClearQueryCache ¶ added in v4.1.0
ClearQueryCache clears the query cache for the given connection
func (*Plugin) Execute ¶
func (p *Plugin) Execute(req *proto.ExecuteRequest, stream proto.WrapperPlugin_ExecuteServer) (err error)
Execute is the handler function for the Execute grpc function execute a query and streams the results using the given GRPC stream.
func (*Plugin) GetSchema ¶
func (p *Plugin) GetSchema(connectionName string) (*grpc.PluginSchema, error)
GetSchema is the handler function for the GetSchema grpc function return the plugin schema. Note: the connection config must be set before calling this function.
func (*Plugin) SetAllConnectionConfigs ¶
func (p *Plugin) SetAllConnectionConfigs(configs []*proto.ConnectionConfig, maxCacheSizeMb int) (err error)
func (*Plugin) SetConnectionConfig ¶
func (*Plugin) UpdateConnectionConfigs ¶
func (p *Plugin) UpdateConnectionConfigs(added []*proto.ConnectionConfig, deleted []*proto.ConnectionConfig, changed []*proto.ConnectionConfig) error
type PluginFunc ¶
type QueryColumn ¶
type QueryColumn struct {
*Column
// contains filtered or unexported fields
}
QueryColumn is struct storing column name and resolved hydrate name this is used in the query data when the hydrate function has been resolved
func NewQueryColumn ¶
func NewQueryColumn(column *Column, hydrateName string) *QueryColumn
type QueryContext ¶
type QueryContext struct {
Columns []string
UnsafeQuals map[string]*proto.Quals
Limit *int64
CacheEnabled bool
CacheTTL int64
}
func NewQueryContext ¶
func NewQueryContext(p *proto.QueryContext, limit *proto.NullableInt, cacheEnabled bool, cacheTTL int64) *QueryContext
NewQueryContext maps from a proto.QueryContext to a plugin.QueryContext.
func (QueryContext) GetLimit ¶
func (q QueryContext) GetLimit() int64
GetLimit converts limit from *int64 to an int64 (where -1 means no limit)
type QueryData ¶
type QueryData struct {
// The table this query is associated with
Table *Table
// if this is a get call this will be populated with the quals as a map of column name to quals
// (this will also be populated for a list call if list key columns are specified -
// however this usage is deprecated and provided for legacy reasons only)
KeyColumnQuals KeyColumnEqualsQualMap
// a map of all key column quals which were specified in the query
Quals KeyColumnQualMap
// columns which have a single equals qual
// is this a 'get' or a 'list' call
FetchType fetchType
// query context data passed from postgres - this includes the requested columns and the quals
QueryContext *QueryContext
// connection details - the connection name and any config declared in the connection config file
Connection *Connection
// Matrix is an array of parameter maps (MatrixItems)
// the list/get calls with be executed for each element of this array
Matrix []map[string]interface{}
// object to handle caching of connection specific data
// deprecated
// use ConnectionCache
ConnectionManager *connection_manager.Manager
ConnectionCache *connection_manager.ConnectionCache
// streaming funcs
StreamListItem func(context.Context, ...interface{})
// deprecated - plugins should no longer call StreamLeafListItem directly and should just call StreamListItem
// event for the child list of a parent child list call
StreamLeafListItem func(context.Context, ...interface{})
// contains filtered or unexported fields
}
func (*QueryData) KeyColumnQualString ¶
KeyColumnQualString looks for the specified key column quals and if it exists, return the value as a string
func (*QueryData) ShallowCopy ¶
ShallowCopy creates a shallow copy of the QueryData this is used to pass different quals to multiple list/get calls, when an in() clause is specified
type QueryStatus ¶
type QueryStatus struct {
// flag which is true when we have streamed enough rows (or the context is cancelled)
StreamingComplete bool
// contains filtered or unexported fields
}
func (*QueryStatus) RowsRemaining ¶
func (s *QueryStatus) RowsRemaining(ctx context.Context) int64
RowsRemaining returns how many rows are required to complete the query
- if no limit has been parsed from the query, this will return math.MaxInt32 (meaning an unknown number of rows remain)
- if there is a limit, it will return the number of rows required to reach this limit
- if the context has been cancelled, it will return zero
type RetryConfig ¶
type RetryConfig struct {
ShouldRetryErrorFunc ErrorPredicateWithContext
// deprecated use ShouldRetryErrorFunc
ShouldRetryError ErrorPredicate
}
func (*RetryConfig) DefaultTo ¶
func (c *RetryConfig) DefaultTo(other *RetryConfig)
func (*RetryConfig) GetListRetryConfig ¶
func (c *RetryConfig) GetListRetryConfig() *RetryConfig
GetListRetryConfig wraps the ShouldRetry function with an additional check of the rows streamed (as we cannot retry errors in the list hydrate function after streaming has started)
func (*RetryConfig) String ¶
func (c *RetryConfig) String() interface{}
type RowData ¶
type RowData struct {
// the output of the get/list call which is passed to all other hydrate calls
Item interface{}
// if there was a parent-child list call, store the parent list item
ParentItem interface{}
// contains filtered or unexported fields
}
RowData contains the row data
func (*RowData) GetColumnData ¶
func (r *RowData) GetColumnData(column *QueryColumn) (interface{}, error)
GetColumnData returns the root item, and, if this column has a hydrate function registered, the associated hydrate data
type ServeOpts ¶
type ServeOpts struct {
PluginName string
PluginFunc PluginFunc
}
ServeOpts are the configurations to serve a plugin.
type Table ¶
type Table struct {
Name string
// table description
Description string
// column definitions
Columns []*Column
// the function used to list table rows
List *ListConfig
// the function used to efficiently retrieve a row by id
Get *GetConfig
// deprecated
// the function used when retrieving data for multiple 'matrix items', e.g. regions
GetMatrixItem MatrixItemFunc
GetMatrixItemFunc MatrixItemMapFunc
// default transform applied to all columns
DefaultTransform *transform.ColumnTransforms
// function controlling default error handling behaviour
DefaultIgnoreConfig *IgnoreConfig
DefaultRetryConfig *RetryConfig
// deprecated - use DefaultIgnoreConfig
DefaultShouldIgnoreError ErrorPredicate
// the parent plugin object
Plugin *Plugin
// Deprecated: used HydrateConfig
HydrateDependencies []HydrateDependencies
// Config for any required hydrate functions, including dependencies between hydrate functions,
// error handling and concurrency behaviour
HydrateConfig []HydrateConfig
// cache options - allows disabling of cache for this table
Cache *TableCacheOptions
// contains filtered or unexported fields
}
Table is a struct representing a plugin table. It defines the table columns, the function used to list table results (List) as well as (optionally) the function used to retrieve a single result by key (Get) and additional the functions required to fetch specific columns (HydrateConfig).
func (Table) GetSchema ¶
func (t Table) GetSchema() (*proto.TableSchema, error)
GetSchema returns the table schema Note: an additional '_ctx' column is added to all table schemas. This contains Steampipe specific data. (currently this is populated with the connection name)
func (*Table) ValidateColumnsExist ¶
func (t *Table) ValidateColumnsExist(keyColumns KeyColumnSlice) []string
type TableCacheOptions ¶
type TableCacheOptions struct {
Enabled bool
}
type TableMapFunc ¶
TableMapFunc is callback function which can be used to populate plugin.Plugin.TableMap and allows the connection config to be used in the table creation (connection config is not available at plugin creation time)
This callback function should be implementred by the plugin writer for dynamic plugins
Source Files
¶
- column.go
- concurrency.go
- connection_config.go
- connection_config_validate.go
- connection_data.go
- context.go
- diags_to_error.go
- doc.go
- env.go
- funcs.go
- get_config.go
- hydrate_cache.go
- hydrate_call.go
- hydrate_config.go
- hydrate_data.go
- hydrate_dependencies.go
- hydrate_error.go
- ignore_error_config.go
- key_column.go
- key_column_qual.go
- key_column_qual_map.go
- key_column_qual_value_map.go
- key_column_slice.go
- key_column_slice_create.go
- list_config.go
- plugin.go
- plugin_connection_config.go
- plugin_validate.go
- query_context.go
- query_data.go
- query_data_cache.go
- query_status.go
- required_hydrate_calls.go
- retry_config.go
- row.go
- serve.go
- table.go
- table_column.go
- table_fetch.go
- table_schema.go
- table_validate.go
Directories
¶
| Path | Synopsis |
|---|---|
|
Package context_key provides keys used to retrieve items from the context
|
Package context_key provides keys used to retrieve items from the context |
|
Package os_specific provides OS specific functions to set the file limit
|
Package os_specific provides OS specific functions to set the file limit |
|
Package quals is the SDK representation of a SQL query qualifier, i.e.
|
Package quals is the SDK representation of a SQL query qualifier, i.e. |
|
Package schema provides types used to define the plugin.ConnectionConfigSchema
|
Package schema provides types used to define the plugin.ConnectionConfigSchema |
|
Package transform defines functions that modify plugin.Column values.
|
Package transform defines functions that modify plugin.Column values. |