Documentation
¶
Index ¶
- Constants
- func Byte(ctx context.Context, byters []Byter, data []byte) ([]byte, error)
- func NewSlice(s *[][]byte) [][]byte
- func Slice(ctx context.Context, slicers []Slicer, slice [][]byte) ([][]byte, error)
- type Base64
- type Base64Options
- type Byter
- type Capture
- type CaptureOptions
- type Case
- type CaseOptions
- type Concat
- type ConcatOptions
- type Convert
- type ConvertOptions
- type Copy
- type Count
- type Delete
- type Domain
- type DomainOptions
- type Drop
- type DynamoDB
- type DynamoDBOptions
- type Expand
- type ExpandOptions
- type Flatten
- type FlattenOptions
- type Group
- type GroupOptions
- type Gzip
- type GzipOptions
- type Hash
- type HashOptions
- type Insert
- type InsertOptions
- type Lambda
- type LambdaOptions
- type Math
- type MathOptions
- type Replace
- type ReplaceOptions
- type Slicer
- type Time
- type TimeOptions
Constants ¶
const Base64InvalidAlphabet = errors.Error("Base64InvalidAlphabet")
Base64InvalidAlphabet is returned when the Base64 processor is configured with an invalid alphabet setting.
const Base64InvalidDirection = errors.Error("Base64InvalidDirection")
Base64InvalidDirection is returned when the Base64 processor is configured with an invalid direction setting.
const Base64InvalidSettings = errors.Error("Base64InvalidSettings")
Base64InvalidSettings is returned when the Base64 processor is configured with invalid Input and Output settings.
const ByteInvalidFactoryConfig = errors.Error("ByteInvalidFactoryConfig")
ByteInvalidFactoryConfig is used when an unsupported Byte is referenced in ByteFactory.
const CaptureInvalidSettings = errors.Error("CaptureInvalidSettings")
CaptureInvalidSettings is returned when the Capture processor is configured with invalid Input and Output settings.
const CaseInvalidSettings = errors.Error("CaseInvalidSettings")
CaseInvalidSettings is returned when the Case processor is configured with invalid Input and Output settings.
const ConcatInvalidSettings = errors.Error("ConcatInvalidSettings")
ConcatInvalidSettings is returned when the Concat processor is configured with invalid Input and Output settings.
const ConvertInvalidSettings = errors.Error("ConvertInvalidSettings")
ConvertInvalidSettings is returned when the Convert processor is configured with invalid Input and Output settings.
const CopyInvalidSettings = errors.Error("CopyInvalidSettings")
CopyInvalidSettings is returned when the Copy processor is configured with invalid Input and Output settings.
const DeleteInvalidSettings = errors.Error("DeleteInvalidSettings")
DeleteInvalidSettings is returned when the Copy processor is configured with invalid Input and Output settings.
const DomainInvalidSettings = errors.Error("DomainInvalidSettings")
DomainInvalidSettings is returned when the Domain processor is configured with invalid Input and Output settings.
const DomainNoSubdomain = errors.Error("DomainNoSubdomain")
DomainNoSubdomain is used when a domain without a subdomain is processed
const DynamoDBInvalidSettings = errors.Error("DynamoDBInvalidSettings")
DynamoDBInvalidSettings is returned when the DynamoDB processor is configured with invalid Input and Output settings.
const ExpandInvalidSettings = errors.Error("ExpandInvalidSettings")
ExpandInvalidSettings is returned when the Expand processor is configured with invalid Input and Output settings.
const FlattenInvalidSettings = errors.Error("FlattenInvalidSettings")
FlattenInvalidSettings is returned when the Flatten processor is configured with invalid Input and Output settings.
const GroupInvalidSettings = errors.Error("GroupInvalidSettings")
GroupInvalidSettings is returned when the Group processor is configured with invalid Input and Output settings.
const GzipInvalidDirection = errors.Error("GzipInvalidDirection")
GzipInvalidDirection is used when an invalid direction is given to the processor
const GzipInvalidSettings = errors.Error("GzipInvalidSettings")
GzipInvalidSettings is returned when the Gzip processor is configured with invalid Input and Output settings.
const HashInvalidSettings = errors.Error("HashInvalidSettings")
HashInvalidSettings is returned when the Hash processor is configured with invalid Input and Output settings.
const HashUnsupportedAlgorithm = errors.Error("HashUnsupportedAlgorithm")
HashUnsupportedAlgorithm is returned when the Hash processor is configured with an unsupported algorithm.
const InsertInvalidSettings = errors.Error("InsertInvalidSettings")
InsertInvalidSettings is returned when the Insert processor is configured with invalid Input and Output settings.
const LambdaInvalidSettings = errors.Error("LambdaInvalidSettings")
LambdaInvalidSettings is returned when the Lambda processor is configured with invalid Input and Output settings.
const MathInvalidSettings = errors.Error("MathInvalidSettings")
MathInvalidSettings is returned when the Math processor is configured with invalid Input and Output settings.
const ReplaceInvalidSettings = errors.Error("ReplaceInvalidSettings")
ReplaceInvalidSettings is returned when the Replace processor is configured with invalid Input and Output settings.
const SliceInvalidFactoryConfig = errors.Error("SliceInvalidFactoryConfig")
SliceInvalidFactoryConfig is used when an unsupported Slice is referenced in SliceFactory.
const TimeInvalidSettings = errors.Error("TimeInvalidSettings")
TimeInvalidSettings is returned when the Time processor is configured with invalid Input and Output settings.
Variables ¶
This section is empty.
Functions ¶
Types ¶
type Base64 ¶ added in v0.2.0
type Base64 struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
Options Base64Options `json:"options"`
}
Base64 processes data by converting it to and from base64. The processor supports these patterns:
json:
{"base64":"Zm9v"} >>> {"base64":"foo"}
json array:
{"base64":["Zm9v","YmFy"]} >>> {"base64":["foo","bar"]}
data:
Zm9v >>> foo
The processor uses this Jsonnet configuration:
{
type: 'base64',
settings: {
input_key: 'base64',
output_key: 'base64',
options: {
direction: 'from',
}
},
}
type Base64Options ¶ added in v0.2.0
Base64Options contains custom options for the Base64 processor:
Direction: the direction of the encoding must be one of: to: encode to base64 from: decode from base64 Alphabet: the base64 alphabet to use, either std (https://www.rfc-editor.org/rfc/rfc4648.html#section-4) or url (https://www.rfc-editor.org/rfc/rfc4648.html#section-5) defaults to std
type Byter ¶
Byter is an interface for applying processors to bytes.
func ByterFactory ¶
ByterFactory loads a Byter from a Config. This is the recommended function for retrieving ready-to-use Byters.
type Capture ¶
type Capture struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
Options CaptureOptions `json:"options"`
}
Capture processes data by capturing values using regular expressions. The processor supports these patterns:
json:
{"capture":"foo@qux.com"} >>> {"capture":"foo"}
{"capture":"foo@qux.com"} >>> {"capture":["f","o","o"]}
json array:
{"capture":["foo@qux.com","bar@qux.com"]} >>> {"capture":["foo","bar"]}
{"capture":["foo@qux.com","bar@qux.com"]} >>> {"capture":[["f","o","o"],["b","a","r"]]}
data:
foo@qux.com >>> foo
bar quux >>> {"foo":"bar","qux":"quux"}
The processor uses this Jsonnet configuration:
{
type: 'capture',
settings: {
input_key: 'capture',
output_key: 'capture',
options: {
expression: '^([^@]*)@.*$',
_function: 'find',
},
},
}
type CaptureOptions ¶
type CaptureOptions struct {
Expression string `json:"expression"`
Function string `json:"function"`
Count int `json:"count"`
}
CaptureOptions contains custom options for the Capture processor:
Expression: the regular expression used to capture values Function: the type of regular expression applied must be one of: find: applies the Find(String)?Submatch function find_all: applies the FindAll(String)?Submatch function (see count) named_group: applies the Find(String)?Submatch function and stores values as JSON using subexpressions Count (optional): used for repeating capture groups defaults to match all capture groups
type Case ¶
type Case struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
Options CaseOptions `json:"options"`
}
Case processes data by changing the case of a string or byte slice. The processor supports these patterns:
json:
{"case":"foo"} >>> {"case":"FOO"}
json array:
{"case":["foo","bar"]} >>> {"case":["FOO","BAR"]}
data:
foo >>> FOO
The processor uses this Jsonnet configuration:
{
type: 'case',
settings: {
// if the value is "foo", then this returns "FOO"
input_key: 'case',
output_key: 'case',
options: {
case: 'upper',
}
},
}
type CaseOptions ¶
type CaseOptions struct {
Case string `json:"case"`
}
CaseOptions contains custom options for the Case processor:
Case: the case to convert the string or byte to must be one of: upper lower snake (strings only)
type Concat ¶
type Concat struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
Options ConcatOptions `json:"options"`
}
Concat processes data by concatenating multiple values together with a separator. The processor supports these patterns:
json:
{"concat":["foo","bar"]} >>> {"concat":"foo.bar"}
json array:
{"concat":[["foo","baz"],["bar","qux"]]} >>> {"concat":["foo.bar","baz.qux"]}
The processor uses this Jsonnet configuration:
{
type: 'concat',
settings: {
input_key: 'concat',
output_key: 'concat',
options: {
separator: '.',
}
},
}
type ConcatOptions ¶
type ConcatOptions struct {
Separator string `json:"separator"`
}
ConcatOptions contains custom options for the Concat processor:
Separator: the string that separates the concatenated values
type Convert ¶
type Convert struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
Options ConvertOptions `json:"options"`
}
Convert processes data by converting values between types (e.g., string to integer, integer to float). The processor supports these patterns:
json:
{"convert":"true"} >>> {"convert":true}
{"convert":"-123"} >>> {"convert":-123}
{"convert":123} >>> {"convert":"123"}
json array:
{"convert":["true","false"]} >>> {"convert":[true,false]}
{"convert":["-123","-456"]} >>> {"convert":[-123,-456]}
{"convert":[123,123.456]} >>> {"convert":["123","123.456"]}
The processor uses this Jsonnet configuration:
{
type: 'convert',
settings: {
input_key: 'convert',
output_key: 'convert',
options: {
type: 'bool',
}
},
}
type ConvertOptions ¶
type ConvertOptions struct {
Type string `json:"type"`
}
ConvertOptions contains custom options for the Convert processor:
Type: the type that the value should be converted to must be one of: bool (boolean) int (integer) float uint (unsigned integer) string
type Copy ¶
type Copy struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Copy processes data by copying it. The processor supports these patterns:
json:
{"hello":"world"} >>> {"hello":"world","goodbye":"world"}
from json:
{"hello":"world"} >>> world
to json:
world >>> {"hello":"world"}
The processor uses this Jsonnet configuration:
{
type: 'copy',
settings: {
input_key: 'hello',
output_key: 'goodbye',
},
}
type Count ¶
type Count struct{}
Count processes data by counting it.
The processor uses this Jsonnet configuration:
{
type: 'count',
}
type Delete ¶
type Delete struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
}
Delete processes data by deleting JSON keys. The processor supports these patterns:
json:
{"foo":"bar","baz":"qux"} >>> {"foo":"bar"}
The processor uses this Jsonnet configuration:
{
type: 'delete',
settings: {
input_key: 'delete',
}
}
type Domain ¶
type Domain struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
Options DomainOptions `json:"options"`
}
Domain processes data by parsing fully qualified domain names into labels. The processor supports these patterns:
json:
{"domain":"example.com"} >>> {"domain":"example.com","tld":"com"}
json array:
{"domain":["example.com","example.top"]} >>> {"domain":["example.com","example.top"],"tld":["com","top"]}
data:
example.com >>> com
The processor uses this Jsonnet configuration:
{
type: 'domain',
settings: {
input_key: 'domain',
input_key: 'tld',
options: {
_function: 'tld',
}
},
}
type DomainOptions ¶
type DomainOptions struct {
Function string `json:"function"`
}
DomainOptions contains custom options for the Domain processor:
Function: the domain processing function to apply to the data must be one of: tld domain subdomain
type Drop ¶
type Drop struct {
Condition condition.OperatorConfig `json:"condition"`
}
Drop processes data by dropping it from a data channel. The processor uses this Jsonnet configuration:
{
type: 'drop',
}
type DynamoDB ¶
type DynamoDB struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
Options DynamoDBOptions `json:"options"`
}
DynamoDB processes data by querying a DynamoDB table and returning all matched items as an array of JSON objects. The processor supports these patterns:
json:
{"ddb":{"PK":"foo"}} >>> {"ddb":[{"foo":"bar"}]}
The processor uses this Jsonnet configuration:
{
type: 'dynamodb',
settings: {
// the input key is expected to be a map containing a partition key ("PK") and an optional sort key ("SK")
input_key: 'ddb',
output_key: 'ddb',
// if the value of the PK is "foo", then this queries DynamoDB by using "foo" as the paritition key value for the table attribute "pk" and returns the last indexed item from the table.
options: {
table: 'foo-table',
key_condition_expression: 'pk = :partitionkeyval',
limit: 1,
scan_index_forward: true,
}
},
}
type DynamoDBOptions ¶
type DynamoDBOptions struct {
Table string `json:"table"`
KeyConditionExpression string `json:"key_condition_expression"`
Limit int64 `json:"limit"`
ScanIndexForward bool `json:"scan_index_forward"`
}
DynamoDBOptions contains custom options settings for the DynamoDB processor (https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Query.html#API_Query_RequestSyntax):
Table: table to query KeyConditionExpression: key condition expression (see documentation) Limit (optional): maximum number of items to evaluate defaults to evaluating all items ScanIndexForward (optional): specifies the order of index traversal must be one of: true (default): traversal is performed in ascending order false: traversal is performed in descending order
type Expand ¶
type Expand struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
Options ExpandOptions `json:"options"`
}
Expand processes data by creating individual events from objects in JSON arrays. The processor supports these patterns:
json array:
{"expand":[{"foo":"bar"}],"baz":"qux"} >>> {"foo":"bar","baz":"qux"}
The processor uses this Jsonnet configuration:
{
type: 'expand',
settings: {
input_key: 'expand',
options: {
retain: ['baz'],
}
},
}
type ExpandOptions ¶
type ExpandOptions struct {
Retain []string `json:"retain"` // retain fields found anywhere in input
}
ExpandOptions contains custom options settings for the Expand processor:
Retain (optional): array of JSON keys to retain from the original object
type Flatten ¶
type Flatten struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
Options FlattenOptions `json:"options"`
}
Flatten processes data by flattening JSON arrays. The processor supports these patterns:
json:
{"flatten":["foo",["bar"]]} >>> {"flatten":["foo","bar"]}
The processor uses this Jsonnet configuration:
{
type: 'flatten',
settings: {
input_key: 'flatten',
output_key: 'flatten',
},
}
type FlattenOptions ¶
type FlattenOptions struct {
Deep bool `json:"deep"`
}
FlattenOptions contains custom options settings for the Flatten processor:
Deep (optional): deeply flattens nested arrays
type Group ¶ added in v0.2.0
type Group struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
Options GroupOptions `json:"options"`
}
Group processes data by grouping JSON arrays into an array of tuples or array of JSON objects. The processor supports these patterns:
json array:
{"group":[["foo","bar"],[111,222]]} >>> {"group":[["foo",111],["bar",222]]}
{"group":[["foo","bar"],[111,222]]} >>> {"group":[{"name":foo","size":111},{"name":"bar","size":222}]}
The processor uses this Jsonnet configuration:
{
type: 'group',
settings: {
input_key: 'group',
output_key: 'group',
}
},
}
type GroupOptions ¶ added in v0.2.0
type GroupOptions struct {
Keys []string `json:"keys"`
}
GroupOptions contains custom options for the Group processor:
Keys (optional): where values from Inputs.Keys are written to, creating new JSON objects
type Gzip ¶ added in v0.2.0
type Gzip struct {
Condition condition.OperatorConfig `json:"condition"`
Options GzipOptions `json:"options"`
}
Gzip processes data by compressing or decompressing gzip. The processor supports these patterns:
data: [31 139 8 0 0 0 0 0 0 255 74 203 207 7 4 0 0 255 255 33 101 115 140 3 0 0 0] >>> foo foo >>> [31 139 8 0 0 0 0 0 0 255 74 203 207 7 4 0 0 255 255 33 101 115 140 3 0 0 0]
The processor uses this Jsonnet configuration:
{
type: 'gzip',
settings: {
direction: 'from',
},
}
type GzipOptions ¶ added in v0.2.0
type GzipOptions struct {
Direction string `json:"direction"`
}
GzipOptions contains custom options settings for the Gzip processor:
Direction: the direction of the compression must be one of: to: compress data to gzip from: decompress data from gzip
type Hash ¶
type Hash struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
Options HashOptions `json:"options"`
}
Hash processes data by calculating hashes. The processor supports these patterns:
json:
{"hash":"foo"} >>> {"hash":"acbd18db4cc2f85cedef654fccc4a4d8"}
json array:
{"hash":["foo","bar"]} >>> {"hash":["acbd18db4cc2f85cedef654fccc4a4d8","37b51d194a7513e45b56f6524f2d51f2"]}
data:
foo >>> acbd18db4cc2f85cedef654fccc4a4d8
The processor uses this Jsonnet configuration:
{
type: 'hash',
settings: {
input_key: 'hash',
output_key: 'hash',
options: {
algorithm: 'md5',
}
},
}
type HashOptions ¶
type HashOptions struct {
Algorithm string `json:"algorithm"`
}
HashOptions contains custom options for the Hash processor:
Algorithm: the hashing algorithm to apply must be one of: md5 sha256
type Insert ¶
type Insert struct {
Condition condition.OperatorConfig `json:"condition"`
OutputKey string `json:"output_key"`
Options InsertOptions `json:"options"`
}
Insert processes data by inserting a value into a JSON object. The processor supports these patterns:
json:
{"foo":"bar"} >>> {"foo":"bar","baz":"qux"}
The processor uses this Jsonnet configuration:
{
type: 'insert',
settings: {
output_key: 'baz',
options: {
value: 'qux',
}
},
}
type InsertOptions ¶
type InsertOptions struct {
Value interface{} `json:"value"`
}
InsertOptions contains custom options for the Insert processor:
value: the value to insert
type Lambda ¶
type Lambda struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
Options LambdaOptions `json:"options"`
}
Lambda processes data by synchronously invoking an AWS Lambda and returning the payload. The average latency of synchronously invoking a Lambda function is 10s of milliseconds, but latency can take 100s to 1000s of milliseconds depending on the function which can have significant impact on total event latency. If Substation is running in AWS Lambda with Kinesis, then this latency can be mitigated by increasing the parallelization factor of the Lambda (https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html).
The input key's value must be a JSON object that contains settings for the Lambda. It is recommended to use the copy and insert processors to create the JSON object before calling this processor and to use the delete processor to remove the JSON object after calling this processor.
The processor supports these patterns:
json:
{"foo":"bar","lambda":{"lookup":"baz"}} >>> {"foo":"bar","lambda":{"baz":"qux"}}
The processor uses this Jsonnet configuration:
{
type: 'lambda',
settings: {
input_key: 'lambda',
output_key: 'lambda',
options: {
function: 'foo-function',
}
},
}
type LambdaOptions ¶
type LambdaOptions struct {
Function string `json:"function"`
ErrorOnFailure bool `json:"error_on_failure"`
}
LambdaOptions contains custom options settings for the Lambda processor:
Function: function to invoke ErrorOnFailure (optional): if set to true, then errors from the invoked Lambda will cause the processor to fail defaults to false
type Math ¶
type Math struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
Options MathOptions `json:"options"`
}
Math processes data by applying mathematic operations. The processor supports these patterns:
json:
{"math":[1,3]} >>> {"math":4}
json array:
{"math":[[1,2],[3,4]]} >>> {"math":[4,6]}
The processor uses this Jsonnet configuration:
{
type: 'math',
settings: {
input_key: 'math',
output_key: 'math',
options: {
operation: 'add',
}
},
}
type MathOptions ¶
type MathOptions struct {
Operation string `json:"operation"`
}
MathOptions contains custom options for the Math processor:
Operation: the operator applied to the data must be one of: add subtract divide
type Replace ¶
type Replace struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
Options ReplaceOptions `json:"options"`
}
Replace processes data by replacing characters. The processor supports these patterns:
json:
{"replace":"bar"} >>> {"replace":"baz"}
json array:
{"replace":["bar","bard"]} >>> {"replace":["baz","bazd"]}
data:
bar >>> baz
The processor uses this Jsonnet configuration:
{
type: 'replace',
settings: {
input_key: 'replace',
output_key: 'replace',
options: {
old: 'r',
new: 'z',
}
},
}
type ReplaceOptions ¶
type ReplaceOptions struct {
Old string `json:"old"`
New string `json:"new"`
Count int `json:"count"`
}
ReplaceOptions contains custom options for the Replace processor:
Old: the character(s) to replace in the data New: the character(s) that replace Old Count (optional): the number of replacements to make defaults to -1, which replaces all matches
type Slicer ¶ added in v0.2.0
Slicer is an interface for applying processors to slices of bytes.
func MakeAllSlicers ¶ added in v0.2.0
MakeAllSlicers accepts an array of Config and returns populated Slicers. This a conveience function for loading many Slicers.
type Time ¶
type Time struct {
Condition condition.OperatorConfig `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
Options TimeOptions `json:"options"`
}
Time processes data by converting time values between formats. The processor supports these patterns:
json:
{"time":1639877490.061} >>> {"time":"2021-12-19T01:31:30.000000Z"}
json array:
{"time":[1639877490.061,1651705967]} >>> {"time":["2021-12-19T01:31:30.000000Z","2022-05-04T23:12:47.000000Z"]}
The processor uses this Jsonnet configuration:
{
type: 'time',
settings: {
input_key: 'time',
output_key: 'time',
options: {
input_format: 'unix',
output_format: '2006-01-02T15:04:05',
}
},
}
type TimeOptions ¶
type TimeOptions struct {
InputFormat string `json:"input_format"`
InputLocation string `json:"input_location"`
OutputFormat string `json:"output_format"`
OutputLocation string `json:"output_location"`
}
TimeOptions contains custom options for the Time processor:
InputFormat: time format of the input must be one of: pattern-based layouts (https://gobyexample.com/time-formatting-parsing) unix: epoch unix_milli: epoch milliseconds unix_nano: epoch nanoseconds now: current time InputLocation (optional): the time zone abbreviation for the input defaults to UTC OutputFormat: time format of the output must be one of: pattern-based layouts (https://gobyexample.com/time-formatting-parsing) InputLocation (optional): the time zone abbreviation for the output defaults to UTC