Documentation
¶
Index ¶
- Constants
- func Byte(ctx context.Context, byters []Byter, data []byte) ([]byte, error)
- func NewSlice(s *[][]byte) [][]byte
- func Slice(ctx context.Context, slicers []Slicer, slice [][]byte) ([][]byte, error)
- type Base64
- type Base64Options
- type Byter
- type Capture
- type CaptureOptions
- type Case
- type CaseOptions
- type Concat
- type ConcatOptions
- type Convert
- type ConvertOptions
- type Copy
- type Count
- type Delete
- type Domain
- type DomainOptions
- type Drop
- type DynamoDB
- type DynamoDBOptions
- type Expand
- type Flatten
- type FlattenOptions
- type ForEach
- type ForEachOptions
- type Group
- type GroupOptions
- type Gzip
- type GzipOptions
- type Hash
- type HashOptions
- type Insert
- type InsertOptions
- type Lambda
- type LambdaOptions
- type Math
- type MathOptions
- type Pipeline
- type PipelineOptions
- type Replace
- type ReplaceOptions
- type Slicer
- type Time
- type TimeOptions
Constants ¶
const Base64JSONDecodedBinary = errors.Error("Base64JSONDecodedBinary")
Base64JSONDecodedBinary is returned when the Base64 processor is configured to decode output to JSON, but the output contains binary data and cannot be written as valid JSON.
const ByteInvalidFactoryConfig = errors.Error("ByteInvalidFactoryConfig")
ByteInvalidFactoryConfig is returned when an unsupported Byte is referenced in ByteFactory.
const DeleteInvalidSettings = errors.Error("DeleteInvalidSettings")
DeleteInvalidSettings is returned when the Copy processor is configured with invalid Input and Output settings.
const DomainNoSubdomain = errors.Error("DomainNoSubdomain")
DomainNoSubdomain is returned when a domain without a subdomain is processed.
const PipelineArrayInput = errors.Error("PipelineArrayInput")
PipelineArrayInput is returned when the Pipeline processor is configured to process JSON, but the input is an array. Array values are not supported by this processor, instead the input should be run through the ForEach processor (which can encapsulate the Pipeline processor).
const ProcessorInvalidSettings = errors.Error("ProcessorInvalidSettings")
ProcessorInvalidSettings is returned when a processor is configured with invalid settings. Common causes include improper input and output settings (e.g., missing keys) and missing required options.
const SliceInvalidFactoryConfig = errors.Error("SliceInvalidFactoryConfig")
SliceInvalidFactoryConfig is returned when an unsupported Slice is referenced in SliceFactory.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Base64 ¶ added in v0.2.0
type Base64 struct {
Options Base64Options `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Base64 processes data by converting it to and from base64. The processor supports these patterns:
JSON:
{"base64":"Zm9v"} >>> {"base64":"foo"}
data:
Zm9v >>> foo
The processor uses this Jsonnet configuration:
{
type: 'base64',
settings: {
options: {
direction: 'from',
},
input_key: 'base64',
output_key: 'base64',
},
}
type Base64Options ¶ added in v0.2.0
type Base64Options struct {
Direction string `json:"direction"`
}
Base64Options contains custom options for the Base64 processor:
Direction: the direction of the encoding must be one of: to: encode to base64 from: decode from base64
type Byter ¶
Byter is an interface for applying processors to bytes.
func ByterFactory ¶
ByterFactory loads a Byter from a Config. This is the recommended function for retrieving ready-to-use Byters.
type Capture ¶
type Capture struct {
Options CaptureOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Capture processes data by capturing values using regular expressions. The processor supports these patterns:
JSON:
{"capture":"foo@qux.com"} >>> {"capture":"foo"}
{"capture":"foo@qux.com"} >>> {"capture":["f","o","o"]}
data:
foo@qux.com >>> foo
bar quux >>> {"foo":"bar","qux":"quux"}
The processor uses this Jsonnet configuration:
{
type: 'capture',
settings: {
options: {
expression: '^([^@]*)@.*$',
_function: 'find',
},
input_key: 'capture',
output_key: 'capture',
},
}
type CaptureOptions ¶
type CaptureOptions struct {
Expression string `json:"expression"`
Function string `json:"function"`
Count int `json:"count"`
}
CaptureOptions contains custom options for the Capture processor:
Expression: the regular expression used to capture values Function: the type of regular expression applied must be one of: find: applies the Find(String)?Submatch function find_all: applies the FindAll(String)?Submatch function (see count) named_group: applies the Find(String)?Submatch function and stores values as JSON using subexpressions Count (optional): used for repeating capture groups defaults to match all capture groups
type Case ¶
type Case struct {
Options CaseOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Case processes data by changing the case of a string or byte slice. The processor supports these patterns:
JSON:
{"case":"foo"} >>> {"case":"FOO"}
data:
foo >>> FOO
The processor uses this Jsonnet configuration:
{
type: 'case',
settings: {
options: {
case: 'upper',
},
input_key: 'case',
output_key: 'case',
},
}
type CaseOptions ¶
type CaseOptions struct {
Case string `json:"case"`
}
CaseOptions contains custom options for the Case processor:
Case: the case to convert the string or byte to must be one of: upper lower snake (strings only)
type Concat ¶
type Concat struct {
Options ConcatOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Concat processes data by concatenating multiple values together with a separator. The processor supports these patterns:
JSON:
{"concat":["foo","bar"]} >>> {"concat":"foo.bar"}
The processor uses this Jsonnet configuration:
{
type: 'concat',
settings: {
options: {
separator: '.',
},
input_key: 'concat',
output_key: 'concat',
},
}
type ConcatOptions ¶
type ConcatOptions struct {
Separator string `json:"separator"`
}
ConcatOptions contains custom options for the Concat processor:
Separator: the string that separates the concatenated values
type Convert ¶
type Convert struct {
Options ConvertOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Convert processes data by converting values between types (e.g., string to integer, integer to float). The processor supports these patterns:
JSON:
{"convert":"true"} >>> {"convert":true}
{"convert":"-123"} >>> {"convert":-123}
{"convert":123} >>> {"convert":"123"}
The processor uses this Jsonnet configuration:
{
type: 'convert',
settings: {
options: {
type: 'bool',
},
input_key: 'convert',
output_key: 'convert',
},
}
type ConvertOptions ¶
type ConvertOptions struct {
Type string `json:"type"`
}
ConvertOptions contains custom options for the Convert processor:
Type: the type that the value should be converted to must be one of: bool (boolean) int (integer) float uint (unsigned integer) string
type Copy ¶
type Copy struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Copy processes data by copying it. The processor supports these patterns:
JSON:
{"hello":"world"} >>> {"hello":"world","goodbye":"world"}
from JSON:
{"hello":"world"} >>> world
to JSON:
world >>> {"hello":"world"}
The processor uses this Jsonnet configuration:
{
type: 'copy',
settings: {
input_key: 'hello',
output_key: 'goodbye',
},
}
type Count ¶
type Count struct{}
Count processes data by counting it.
The processor uses this Jsonnet configuration:
{
type: 'count',
}
type Delete ¶
type Delete struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
}
Delete processes data by deleting JSON keys. The processor supports these patterns:
JSON:
{"foo":"bar","baz":"qux"} >>> {"foo":"bar"}
The processor uses this Jsonnet configuration:
{
type: 'delete',
settings: {
input_key: 'delete',
}
}
type Domain ¶
type Domain struct {
Options DomainOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Domain processes data by parsing fully qualified domain names into labels. The processor supports these patterns:
JSON:
{"domain":"example.com"} >>> {"domain":"example.com","tld":"com"}
data:
example.com >>> com
The processor uses this Jsonnet configuration:
{
type: 'domain',
settings: {
input_key: 'domain',
input_key: 'tld',
options: {
_function: 'tld',
}
},
}
type DomainOptions ¶
type DomainOptions struct {
Function string `json:"function"`
}
DomainOptions contains custom options for the Domain processor:
Function: the domain processing function to apply to the data must be one of: tld domain subdomain
type Drop ¶
type Drop struct {
Condition condition.OperatorConfig `json:"condition"`
}
Drop processes data by dropping it from a data channel. The processor uses this Jsonnet configuration:
{
type: 'drop',
}
type DynamoDB ¶
type DynamoDB struct {
Options DynamoDBOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
DynamoDB processes data by querying a DynamoDB table and returning all matched items as an array of JSON objects. The processor supports these patterns:
JSON:
{"ddb":{"PK":"foo"}} >>> {"ddb":[{"foo":"bar"}]}
The processor uses this Jsonnet configuration:
{
type: 'dynamodb',
settings: {
// the input key is expected to be a map containing a partition key ("PK") and an optional sort key ("SK")
// if the value of the PK is "foo", then this queries DynamoDB by using "foo" as the paritition key value for the table attribute "pk" and returns the last indexed item from the table.
options: {
table: 'foo-table',
key_condition_expression: 'pk = :partitionkeyval',
limit: 1,
scan_index_forward: true,
},
input_key: 'ddb',
output_key: 'ddb',
},
}
type DynamoDBOptions ¶
type DynamoDBOptions struct {
Table string `json:"table"`
KeyConditionExpression string `json:"key_condition_expression"`
Limit int64 `json:"limit"`
ScanIndexForward bool `json:"scan_index_forward"`
}
DynamoDBOptions contains custom options settings for the DynamoDB processor (https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Query.html#API_Query_RequestSyntax):
Table: table to query KeyConditionExpression: key condition expression (see documentation) Limit (optional): maximum number of items to evaluate defaults to evaluating all items ScanIndexForward (optional): specifies the order of index traversal must be one of: true (default): traversal is performed in ascending order false: traversal is performed in descending order
type Expand ¶
type Expand struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
}
Expand processes data by creating individual events from objects in JSON arrays. The processor supports these patterns:
JSON:
{"expand":[{"foo":"bar"}],"baz":"qux"} >>> {"foo":"bar","baz":"qux"}
The processor uses this Jsonnet configuration:
{
type: 'expand',
settings: {
input_key: 'expand',
},
}
type Flatten ¶
type Flatten struct {
Options FlattenOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Flatten processes data by flattening JSON arrays. The processor supports these patterns:
JSON:
{"flatten":["foo",["bar"]]} >>> {"flatten":["foo","bar"]}
The processor uses this Jsonnet configuration:
{
type: 'flatten',
settings: {
input_key: 'flatten',
output_key: 'flatten',
},
}
type FlattenOptions ¶
type FlattenOptions struct {
Deep bool `json:"deep"`
}
FlattenOptions contains custom options settings for the Flatten processor:
Deep (optional): deeply flattens nested arrays
type ForEach ¶ added in v0.3.0
type ForEach struct {
Options ForEachOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
ForEach processes data by iterating and applying a processor to each element in a JSON array. The processor supports these patterns:
JSON:
{"input":["ABC","DEF"]} >>> {"input":["ABC","DEF"],"output":["abc","def"]}
The processor uses this Jsonnet configuration:
{
type: 'for_each',
settings: {
options: {
processor: {
type: 'case',
settings: {
options: {
case: 'lower',
}
}
},
},
input_key: 'input',
output_key: 'output.-1',
},
}
func (ForEach) Byte ¶ added in v0.3.0
Byte processes bytes with the ForEach processor.
Data is processed by iterating an input JSON array, encapsulating the elements in a temporary JSON object, and running the configured processor. This technique avoids parsing errors when handling arrays that contain JSON objects, such as:
{"for_each":[{"foo":"bar"},{"foo":"baz"}]}
The temporary JSON object uses the configured processor's name as its key (e.g., "case"). If the configured processor has keys set (e.g., "foo"), then the keys are concatenated (e.g., "case.foo"). The example above produces this temporary JSON during processing:
{"case":{"foo":"bar"}}
{"case":{"foo":"baz"}}
type ForEachOptions ¶ added in v0.3.0
ForEachOptions contains custom options for the ForEach processor:
Processor: processor to apply to the data
type Group ¶ added in v0.2.0
type Group struct {
Options GroupOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Group processes data by grouping JSON arrays into an array of tuples or array of JSON objects. The processor supports these patterns:
JSON array:
{"group":[["foo","bar"],[111,222]]} >>> {"group":[["foo",111],["bar",222]]}
{"group":[["foo","bar"],[111,222]]} >>> {"group":[{"name":foo","size":111},{"name":"bar","size":222}]}
The processor uses this Jsonnet configuration:
{
type: 'group',
settings: {
input_key: 'group',
output_key: 'group',
},
}
type GroupOptions ¶ added in v0.2.0
type GroupOptions struct {
Keys []string `json:"keys"`
}
GroupOptions contains custom options for the Group processor:
Keys (optional): where values from Inputs.Keys are written to, creating new JSON objects
type Gzip ¶ added in v0.2.0
type Gzip struct {
Options GzipOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
}
Gzip processes data by compressing or decompressing gzip. The processor supports these patterns:
data: [31 139 8 0 0 0 0 0 0 255 74 203 207 7 4 0 0 255 255 33 101 115 140 3 0 0 0] >>> foo foo >>> [31 139 8 0 0 0 0 0 0 255 74 203 207 7 4 0 0 255 255 33 101 115 140 3 0 0 0]
The processor uses this Jsonnet configuration:
{
type: 'gzip',
settings: {
options: {
direction: 'from',
},
},
}
type GzipOptions ¶ added in v0.2.0
type GzipOptions struct {
Direction string `json:"direction"`
}
GzipOptions contains custom options settings for the Gzip processor:
Direction: the direction of the compression must be one of: to: compress data to gzip from: decompress data from gzip
type Hash ¶
type Hash struct {
Options HashOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Hash processes data by calculating hashes. The processor supports these patterns:
JSON:
{"hash":"foo"} >>> {"hash":"acbd18db4cc2f85cedef654fccc4a4d8"}
data:
foo >>> acbd18db4cc2f85cedef654fccc4a4d8
The processor uses this Jsonnet configuration:
{
type: 'hash',
settings: {
options: {
algorithm: 'md5',
},
input_key: 'hash',
output_key: 'hash',
},
}
type HashOptions ¶
type HashOptions struct {
Algorithm string `json:"algorithm"`
}
HashOptions contains custom options for the Hash processor:
Algorithm: the hashing algorithm to apply must be one of: md5 sha256
type Insert ¶
type Insert struct {
Options InsertOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
OutputKey string `json:"output_key"`
}
Insert processes data by inserting a value into a JSON object. The processor supports these patterns:
JSON:
{"foo":"bar"} >>> {"foo":"bar","baz":"qux"}
The processor uses this Jsonnet configuration:
{
type: 'insert',
settings: {
options: {
value: 'qux',
},
output_key: 'baz',
},
}
type InsertOptions ¶
type InsertOptions struct {
Value interface{} `json:"value"`
}
InsertOptions contains custom options for the Insert processor:
value: the value to insert
type Lambda ¶
type Lambda struct {
Options LambdaOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Lambda processes data by synchronously invoking an AWS Lambda and returning the payload. The average latency of synchronously invoking a Lambda function is 10s of milliseconds, but latency can take 100s to 1000s of milliseconds depending on the function which can have significant impact on total event latency. If Substation is running in AWS Lambda with Kinesis, then this latency can be mitigated by increasing the parallelization factor of the Lambda (https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html).
The input key's value must be a JSON object that contains settings for the Lambda. It is recommended to use the copy and insert processors to create the JSON object before calling this processor and to use the delete processor to remove the JSON object after calling this processor.
The processor supports these patterns:
JSON:
{"foo":"bar","lambda":{"lookup":"baz"}} >>> {"foo":"bar","lambda":{"baz":"qux"}}
The processor uses this Jsonnet configuration:
{
type: 'lambda',
settings: {
options: {
function: 'foo-function',
},
input_key: 'lambda',
output_key: 'lambda',
},
}
type LambdaOptions ¶
type LambdaOptions struct {
Function string `json:"function"`
ErrorOnFailure bool `json:"error_on_failure"`
}
LambdaOptions contains custom options settings for the Lambda processor:
Function: function to invoke ErrorOnFailure (optional): if set to true, then errors from the invoked Lambda will cause the processor to fail defaults to false
type Math ¶
type Math struct {
Options MathOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Math processes data by applying mathematic operations. The processor supports these patterns:
JSON:
{"math":[1,3]} >>> {"math":4}
The processor uses this Jsonnet configuration:
{
type: 'math',
settings: {
options: {
operation: 'add',
},
input_key: 'math',
output_key: 'math',
},
}
type MathOptions ¶
type MathOptions struct {
Operation string `json:"operation"`
}
MathOptions contains custom options for the Math processor:
Operation: the operator applied to the data must be one of: add subtract multiply divide
type Pipeline ¶ added in v0.3.0
type Pipeline struct {
Options PipelineOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Pipeline processes data by applying a series of processors. This processor should be used when data requires complex processing outside of the boundaries of any data structures (see tests for examples). The processor supports these patterns:
JSON:
{"pipeline":"H4sIAMpcy2IA/wXAIQ0AAACAsLbY93csBiFlc4wDAAAA"} >>> {"pipeline":"foo"}
data:
H4sIAMpcy2IA/wXAIQ0AAACAsLbY93csBiFlc4wDAAAA >> foo
The processor uses this Jsonnet configuration:
{
type: 'pipeline',
settings: {
options: {
processors: [
{
type: 'base64',
settings: {
options: {
direction: 'from',
}
}
},
{
type: 'gzip',
settings: {
options: {
direction: 'from',
}
}
},
]
},
input_key: 'pipeline',
output_key: 'pipeline',
},
}
func (Pipeline) Byte ¶ added in v0.3.0
Byte processes bytes with the Pipeline processor.
Process Byters only accept bytes, so when processing JSON the input value is converted from Result to its string representation to bytes. The conversion from Result to string is safe for strings and objects, but not arrays (e.g., ["foo","bar"]).
If the input is an array, then an error is raised; the input should be run through the ForEach processor (which can encapsulate the Pipeline processor).
type PipelineOptions ¶ added in v0.3.0
PipelineOptions contains custom options for the Pipeline processor:
Processors: array of processors to apply to the data
type Replace ¶
type Replace struct {
Options ReplaceOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Replace processes data by replacing characters. The processor supports these patterns:
JSON:
{"replace":"bar"} >>> {"replace":"baz"}
data:
bar >>> baz
The processor uses this Jsonnet configuration:
{
type: 'replace',
settings: {
options: {
old: 'r',
new: 'z',
},
input_key: 'replace',
output_key: 'replace',
},
}
type ReplaceOptions ¶
type ReplaceOptions struct {
Old string `json:"old"`
New string `json:"new"`
Count int `json:"count"`
}
ReplaceOptions contains custom options for the Replace processor:
Old: the character(s) to replace in the data New: the character(s) that replace Old Count (optional): the number of replacements to make defaults to -1, which replaces all matches
type Slicer ¶ added in v0.2.0
Slicer is an interface for applying processors to slices of bytes.
func MakeAllSlicers ¶ added in v0.2.0
MakeAllSlicers accepts an array of Config and returns populated Slicers. This a conveience function for loading many Slicers.
type Time ¶
type Time struct {
Options TimeOptions `json:"options"`
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Time processes data by converting time values between formats. The processor supports these patterns:
JSON:
{"time":1639877490.061} >>> {"time":"2021-12-19T01:31:30.061000Z"}
data:
1639877490.061 >>> 2021-12-19T01:31:30.061000Z
The processor uses this Jsonnet configuration:
{
type: 'time',
settings: {
options: {
input_format: 'unix',
output_format: '2006-01-02T15:04:05.000000Z',
},
input_key: 'time',
output_key: 'time',
},
}
type TimeOptions ¶
type TimeOptions struct {
InputFormat string `json:"input_format"`
OutputFormat string `json:"output_format"`
InputLocation string `json:"input_location"`
OutputLocation string `json:"output_location"`
}
TimeOptions contains custom options for the Time processor:
InputFormat: time format of the input must be one of: pattern-based layouts (https://gobyexample.com/time-formatting-parsing) unix: epoch (supports fractions of a second) unix_milli: epoch milliseconds now: current time OutputFormat: time format of the output must be one of: pattern-based layouts (https://gobyexample.com/time-formatting-parsing) unix: epoch unix_milli: epoch milliseconds InputLocation (optional): the time zone abbreviation for the input defaults to UTC OutputLocation (optional): the time zone abbreviation for the output defaults to UTC