process

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 18, 2022 License: MIT Imports: 23 Imported by: 0

README

process

Contains interfaces and methods for atomically processing data. Each processor defines its own data processing patterns, but there are a set of common patterns shared among most processors:

  • processing JSON values, including arrays
  • processing bytes

The package can be used like this:

package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/process"
)

func main() {
	processor := process.Insert{
		Options: process.InsertOptions{
			Value: "bar",
		},
		OutputKey: "foo",
	}

	ctx := context.TODO()
	data := []byte(`{"hello":"world"}`)
	processed, err := processor.Byte(ctx, data)
	if err != nil {
		panic(err)
	}

	fmt.Println(string(data))
	fmt.Println(string(processed))
}

Information for each processor is available in the GoDoc.

Documentation

Index

Constants

View Source
const Base64JSONDecodedBinary = errors.Error("Base64JSONDecodedBinary")

Base64JSONDecodedBinary is returned when the Base64 processor is configured to decode output to JSON, but the output contains binary data and cannot be written as valid JSON.

View Source
const ByteInvalidFactoryConfig = errors.Error("ByteInvalidFactoryConfig")

ByteInvalidFactoryConfig is returned when an unsupported Byte is referenced in ByteFactory.

View Source
const DeleteInvalidSettings = errors.Error("DeleteInvalidSettings")

DeleteInvalidSettings is returned when the Copy processor is configured with invalid Input and Output settings.

View Source
const DomainNoSubdomain = errors.Error("DomainNoSubdomain")

DomainNoSubdomain is returned when a domain without a subdomain is processed.

View Source
const PipelineArrayInput = errors.Error("PipelineArrayInput")

PipelineArrayInput is returned when the Pipeline processor is configured to process JSON, but the input is an array. Array values are not supported by this processor, instead the input should be run through the ForEach processor (which can encapsulate the Pipeline processor).

View Source
const ProcessorInvalidSettings = errors.Error("ProcessorInvalidSettings")

ProcessorInvalidSettings is returned when a processor is configured with invalid settings. Common causes include improper input and output settings (e.g., missing keys) and missing required options.

View Source
const SliceInvalidFactoryConfig = errors.Error("SliceInvalidFactoryConfig")

SliceInvalidFactoryConfig is returned when an unsupported Slice is referenced in SliceFactory.

Variables

This section is empty.

Functions

func Byte

func Byte(ctx context.Context, byters []Byter, data []byte) ([]byte, error)

Byte accepts an array of Byters and applies all processors to the data.

func NewSlice added in v0.2.0

func NewSlice(s *[][]byte) [][]byte

NewSlice returns a byte slice with a minimum capacity of 10.

func Slice added in v0.2.0

func Slice(ctx context.Context, slicers []Slicer, slice [][]byte) ([][]byte, error)

Slice accepts an array of Slicers and applies all processors to the data.

Types

type Base64 added in v0.2.0

type Base64 struct {
	Options   Base64Options            `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Base64 processes data by converting it to and from base64. The processor supports these patterns:

JSON:
  	{"base64":"Zm9v"} >>> {"base64":"foo"}
data:
	Zm9v >>> foo

The processor uses this Jsonnet configuration:

{
	type: 'base64',
	settings: {
		options: {
			direction: 'from',
		},
		input_key: 'base64',
		output_key: 'base64',
	},
}

func (Base64) Byte added in v0.2.0

func (p Base64) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Base64 processor.

func (Base64) Slice added in v0.2.0

func (p Base64) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Base64 processor. Conditions are optionally applied on the bytes to enable processing.

type Base64Options added in v0.2.0

type Base64Options struct {
	Direction string `json:"direction"`
}

Base64Options contains custom options for the Base64 processor:

Direction:
	the direction of the encoding
	must be one of:
		to: encode to base64
		from: decode from base64

type Byter

type Byter interface {
	Byte(context.Context, []byte) ([]byte, error)
}

Byter is an interface for applying processors to bytes.

func ByterFactory

func ByterFactory(cfg config.Config) (Byter, error)

ByterFactory loads a Byter from a Config. This is the recommended function for retrieving ready-to-use Byters.

func MakeAllByters

func MakeAllByters(cfg []config.Config) ([]Byter, error)

MakeAllByters accepts an array of Config and returns populated Byters. This a conveience function for loading many Byters.

type Capture

type Capture struct {
	Options   CaptureOptions           `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Capture processes data by capturing values using regular expressions. The processor supports these patterns:

JSON:
	{"capture":"foo@qux.com"} >>> {"capture":"foo"}
	{"capture":"foo@qux.com"} >>> {"capture":["f","o","o"]}
data:
	foo@qux.com >>> foo
	bar quux >>> {"foo":"bar","qux":"quux"}

The processor uses this Jsonnet configuration:

{
	type: 'capture',
	settings: {
		options: {
			expression: '^([^@]*)@.*$',
			_function: 'find',
		},
		input_key: 'capture',
		output_key: 'capture',
	},
}

func (Capture) Byte

func (p Capture) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Capture processor.

func (Capture) Slice added in v0.2.0

func (p Capture) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Capture processor. Conditions are optionally applied on the bytes to enable processing.

type CaptureOptions

type CaptureOptions struct {
	Expression string `json:"expression"`
	Function   string `json:"function"`
	Count      int    `json:"count"`
}

CaptureOptions contains custom options for the Capture processor:

Expression:
	the regular expression used to capture values
Function:
	the type of regular expression applied
	must be one of:
		find: applies the Find(String)?Submatch function
		find_all: applies the FindAll(String)?Submatch function (see count)
		named_group: applies the Find(String)?Submatch function and stores values as JSON using subexpressions
Count (optional):
	used for repeating capture groups
	defaults to match all capture groups

type Case

type Case struct {
	Options   CaseOptions              `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Case processes data by changing the case of a string or byte slice. The processor supports these patterns:

JSON:
	{"case":"foo"} >>> {"case":"FOO"}
data:
	foo >>> FOO

The processor uses this Jsonnet configuration:

{
	type: 'case',
	settings: {
		options: {
			case: 'upper',
		},
		input_key: 'case',
		output_key: 'case',
	},
}

func (Case) Byte

func (p Case) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Case processor.

func (Case) Slice added in v0.2.0

func (p Case) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Case processor. Conditions are optionally applied on the bytes to enable processing.

type CaseOptions

type CaseOptions struct {
	Case string `json:"case"`
}

CaseOptions contains custom options for the Case processor:

Case:
	the case to convert the string or byte to
	must be one of:
		upper
		lower
		snake (strings only)

type Concat

type Concat struct {
	Options   ConcatOptions            `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Concat processes data by concatenating multiple values together with a separator. The processor supports these patterns:

JSON:
	{"concat":["foo","bar"]} >>> {"concat":"foo.bar"}

The processor uses this Jsonnet configuration:

{
	type: 'concat',
	settings: {
		options: {
			separator: '.',
		},
		input_key: 'concat',
		output_key: 'concat',
	},
}

func (Concat) Byte

func (p Concat) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Concat processor.

func (Concat) Slice added in v0.2.0

func (p Concat) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Concat processor. Conditions are optionally applied on the bytes to enable processing.

type ConcatOptions

type ConcatOptions struct {
	Separator string `json:"separator"`
}

ConcatOptions contains custom options for the Concat processor:

Separator:
	the string that separates the concatenated values

type Convert

type Convert struct {
	Options   ConvertOptions           `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Convert processes data by converting values between types (e.g., string to integer, integer to float). The processor supports these patterns:

JSON:
	{"convert":"true"} >>> {"convert":true}
	{"convert":"-123"} >>> {"convert":-123}
	{"convert":123} >>> {"convert":"123"}

The processor uses this Jsonnet configuration:

{
	type: 'convert',
	settings: {
		options: {
			type: 'bool',
		},
		input_key: 'convert',
		output_key: 'convert',
	},
}

func (Convert) Byte

func (p Convert) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Convert processor.

func (Convert) Slice added in v0.2.0

func (p Convert) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Convert processor. Conditions are optionally applied on the bytes to enable processing.

type ConvertOptions

type ConvertOptions struct {
	Type string `json:"type"`
}

ConvertOptions contains custom options for the Convert processor:

Type:
	the type that the value should be converted to
	must be one of:
		bool (boolean)
		int (integer)
		float
		uint (unsigned integer)
		string

type Copy

type Copy struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Copy processes data by copying it. The processor supports these patterns:

	JSON:
	  	{"hello":"world"} >>> {"hello":"world","goodbye":"world"}
	from JSON:
  		{"hello":"world"} >>> world
	to JSON:
  		world >>> {"hello":"world"}

The processor uses this Jsonnet configuration:

{
	type: 'copy',
	settings: {
		input_key: 'hello',
		output_key: 'goodbye',
	},
}

func (Copy) Byte

func (p Copy) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Copy processor.

func (Copy) Slice added in v0.2.0

func (p Copy) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Copy processor. Conditions are optionally applied on the bytes to enable processing.

type Count

type Count struct{}

Count processes data by counting it.

The processor uses this Jsonnet configuration:

{
	type: 'count',
}

func (Count) Slice added in v0.2.0

func (p Count) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Count processor. Conditions are optionally applied on the bytes to enable processing.

type Delete

type Delete struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
}

Delete processes data by deleting JSON keys. The processor supports these patterns:

JSON:
  	{"foo":"bar","baz":"qux"} >>> {"foo":"bar"}

The processor uses this Jsonnet configuration:

{
	type: 'delete',
	settings: {
		input_key: 'delete',
	}
}

func (Delete) Byte

func (p Delete) Byte(ctx context.Context, object []byte) ([]byte, error)

Byte processes bytes with the Delete processor.

func (Delete) Slice added in v0.2.0

func (p Delete) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Delete processor. Conditions are optionally applied on the bytes to enable processing.

type Domain

type Domain struct {
	Options   DomainOptions            `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Domain processes data by parsing fully qualified domain names into labels. The processor supports these patterns:

JSON:
	{"domain":"example.com"} >>> {"domain":"example.com","tld":"com"}
data:
	example.com >>> com

The processor uses this Jsonnet configuration:

{
	type: 'domain',
	settings: {
		input_key: 'domain',
		input_key: 'tld',
		options: {
			_function: 'tld',
		}
	},
}

func (Domain) Byte

func (p Domain) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Domain processor.

func (Domain) Slice added in v0.2.0

func (p Domain) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Domain processor. Conditions are optionally applied on the bytes to enable processing.

type DomainOptions

type DomainOptions struct {
	Function string `json:"function"`
}

DomainOptions contains custom options for the Domain processor:

Function:
	the domain processing function to apply to the data
	must be one of:
		tld
		domain
		subdomain

type Drop

type Drop struct {
	Condition condition.OperatorConfig `json:"condition"`
}

Drop processes data by dropping it from a data channel. The processor uses this Jsonnet configuration:

{
	type: 'drop',
}

func (Drop) Slice added in v0.2.0

func (p Drop) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Drop processor. Conditions are optionally applied on the bytes to enable processing.

type DynamoDB

type DynamoDB struct {
	Options   DynamoDBOptions          `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

DynamoDB processes data by querying a DynamoDB table and returning all matched items as an array of JSON objects. The processor supports these patterns:

JSON:
	{"ddb":{"PK":"foo"}} >>> {"ddb":[{"foo":"bar"}]}

The processor uses this Jsonnet configuration:

{
	type: 'dynamodb',
	settings: {
		// the input key is expected to be a map containing a partition key ("PK") and an optional sort key ("SK")
		// if the value of the PK is "foo", then this queries DynamoDB by using "foo" as the paritition key value for the table attribute "pk" and returns the last indexed item from the table.
		options: {
			table: 'foo-table',
			key_condition_expression: 'pk = :partitionkeyval',
			limit: 1,
			scan_index_forward: true,
		},
		input_key: 'ddb',
		output_key: 'ddb',
	},
}

func (DynamoDB) Byte

func (p DynamoDB) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the DynamoDB processor.

func (DynamoDB) Slice added in v0.2.0

func (p DynamoDB) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the DynamoDB processor. Conditions are optionally applied on the bytes to enable processing.

type DynamoDBOptions

type DynamoDBOptions struct {
	Table                  string `json:"table"`
	KeyConditionExpression string `json:"key_condition_expression"`
	Limit                  int64  `json:"limit"`
	ScanIndexForward       bool   `json:"scan_index_forward"`
}

DynamoDBOptions contains custom options settings for the DynamoDB processor (https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Query.html#API_Query_RequestSyntax):

Table:
	table to query
KeyConditionExpression:
	key condition expression (see documentation)
Limit (optional):
	maximum number of items to evaluate
	defaults to evaluating all items
ScanIndexForward (optional):
	specifies the order of index traversal
	must be one of:
		true (default): traversal is performed in ascending order
		false: traversal is performed in descending order

type Expand

type Expand struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
}

Expand processes data by creating individual events from objects in JSON arrays. The processor supports these patterns:

JSON:
	{"expand":[{"foo":"bar"}],"baz":"qux"} >>> {"foo":"bar","baz":"qux"}

The processor uses this Jsonnet configuration:

{
	type: 'expand',
	settings: {
		input_key: 'expand',
	},
}

func (Expand) Slice added in v0.2.0

func (p Expand) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Expand processor. Conditions are optionally applied on the bytes to enable processing.

type Flatten

type Flatten struct {
	Options   FlattenOptions           `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Flatten processes data by flattening JSON arrays. The processor supports these patterns:

JSON:
	{"flatten":["foo",["bar"]]} >>> {"flatten":["foo","bar"]}

The processor uses this Jsonnet configuration:

{
	type: 'flatten',
	settings: {
		input_key: 'flatten',
		output_key: 'flatten',
	},
}

func (Flatten) Byte

func (p Flatten) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Flatten processor.

func (Flatten) Slice added in v0.2.0

func (p Flatten) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Flatten processor. Conditions are optionally applied on the bytes to enable processing.

type FlattenOptions

type FlattenOptions struct {
	Deep bool `json:"deep"`
}

FlattenOptions contains custom options settings for the Flatten processor:

Deep (optional):
	deeply flattens nested arrays

type ForEach added in v0.3.0

type ForEach struct {
	Options   ForEachOptions           `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

ForEach processes data by iterating and applying a processor to each element in a JSON array. The processor supports these patterns:

JSON:
	{"input":["ABC","DEF"]} >>> {"input":["ABC","DEF"],"output":["abc","def"]}

The processor uses this Jsonnet configuration:

{
	type: 'for_each',
	settings: {
		options: {
			processor: {
				type: 'case',
				settings: {
					options: {
						case: 'lower',
					}
				}
			},
		},
		input_key: 'input',
		output_key: 'output.-1',
	},
}

func (ForEach) Byte added in v0.3.0

func (p ForEach) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the ForEach processor.

Data is processed by iterating an input JSON array, encapsulating the elements in a temporary JSON object, and running the configured processor. This technique avoids parsing errors when handling arrays that contain JSON objects, such as:

{"for_each":[{"foo":"bar"},{"foo":"baz"}]}

The temporary JSON object uses the configured processor's name as its key (e.g., "case"). If the configured processor has keys set (e.g., "foo"), then the keys are concatenated (e.g., "case.foo"). The example above produces this temporary JSON during processing:

{"case":{"foo":"bar"}}
{"case":{"foo":"baz"}}

func (ForEach) Slice added in v0.3.0

func (p ForEach) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the ForEach processor. Conditions are optionally applied on the bytes to enable processing.

type ForEachOptions added in v0.3.0

type ForEachOptions struct {
	Processor config.Config
}

ForEachOptions contains custom options for the ForEach processor:

Processor:
	processor to apply to the data

type Group added in v0.2.0

type Group struct {
	Options   GroupOptions             `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Group processes data by grouping JSON arrays into an array of tuples or array of JSON objects. The processor supports these patterns:

JSON array:
	{"group":[["foo","bar"],[111,222]]} >>> {"group":[["foo",111],["bar",222]]}
	{"group":[["foo","bar"],[111,222]]} >>> {"group":[{"name":foo","size":111},{"name":"bar","size":222}]}

The processor uses this Jsonnet configuration:

{
	type: 'group',
	settings: {
		input_key: 'group',
		output_key: 'group',
	},
}

func (Group) Byte added in v0.2.0

func (p Group) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Group processor.

func (Group) Slice added in v0.2.0

func (p Group) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Group processor. Conditions are optionally applied on the bytes to enable processing.

type GroupOptions added in v0.2.0

type GroupOptions struct {
	Keys []string `json:"keys"`
}

GroupOptions contains custom options for the Group processor:

Keys (optional):
	where values from Inputs.Keys are written to, creating new JSON objects

type Gzip added in v0.2.0

type Gzip struct {
	Options   GzipOptions              `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
}

Gzip processes data by compressing or decompressing gzip. The processor supports these patterns:

data:
	[31 139 8 0 0 0 0 0 0 255 74 203 207 7 4 0 0 255 255 33 101 115 140 3 0 0 0] >>> foo
	foo >>> [31 139 8 0 0 0 0 0 0 255 74 203 207 7 4 0 0 255 255 33 101 115 140 3 0 0 0]

The processor uses this Jsonnet configuration:

{
	type: 'gzip',
	settings: {
		options: {
			direction: 'from',
		},
	},
}

func (Gzip) Byte added in v0.2.0

func (p Gzip) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Gzip processor.

func (Gzip) Slice added in v0.2.0

func (p Gzip) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Gzip processor. Conditions are optionally applied on the bytes to enable processing.

type GzipOptions added in v0.2.0

type GzipOptions struct {
	Direction string `json:"direction"`
}

GzipOptions contains custom options settings for the Gzip processor:

Direction:
	the direction of the compression
	must be one of:
		to: compress data to gzip
		from: decompress data from gzip

type Hash

type Hash struct {
	Options   HashOptions              `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Hash processes data by calculating hashes. The processor supports these patterns:

JSON:
	{"hash":"foo"} >>> {"hash":"acbd18db4cc2f85cedef654fccc4a4d8"}
data:
	foo >>> acbd18db4cc2f85cedef654fccc4a4d8

The processor uses this Jsonnet configuration:

{
	type: 'hash',
	settings: {
		options: {
			algorithm: 'md5',
		},
		input_key: 'hash',
		output_key: 'hash',
	},
}

func (Hash) Byte

func (p Hash) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Hash processor.

func (Hash) Slice added in v0.2.0

func (p Hash) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Hash processor. Conditions are optionally applied on the bytes to enable processing.

type HashOptions

type HashOptions struct {
	Algorithm string `json:"algorithm"`
}

HashOptions contains custom options for the Hash processor:

Algorithm:
	the hashing algorithm to apply
	must be one of:
		md5
		sha256

type Insert

type Insert struct {
	Options   InsertOptions            `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	OutputKey string                   `json:"output_key"`
}

Insert processes data by inserting a value into a JSON object. The processor supports these patterns:

JSON:
	{"foo":"bar"} >>> {"foo":"bar","baz":"qux"}

The processor uses this Jsonnet configuration:

{
	type: 'insert',
	settings: {
		options: {
			value: 'qux',
		},
		output_key: 'baz',
	},
}

func (Insert) Byte

func (p Insert) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Insert processor.

func (Insert) Slice added in v0.2.0

func (p Insert) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Insert processor. Conditions are optionally applied on the bytes to enable processing.

type InsertOptions

type InsertOptions struct {
	Value interface{} `json:"value"`
}

InsertOptions contains custom options for the Insert processor:

value:
	the value to insert

type Lambda

type Lambda struct {
	Options   LambdaOptions            `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Lambda processes data by synchronously invoking an AWS Lambda and returning the payload. The average latency of synchronously invoking a Lambda function is 10s of milliseconds, but latency can take 100s to 1000s of milliseconds depending on the function which can have significant impact on total event latency. If Substation is running in AWS Lambda with Kinesis, then this latency can be mitigated by increasing the parallelization factor of the Lambda (https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html).

The input key's value must be a JSON object that contains settings for the Lambda. It is recommended to use the copy and insert processors to create the JSON object before calling this processor and to use the delete processor to remove the JSON object after calling this processor.

The processor supports these patterns:

JSON:
	{"foo":"bar","lambda":{"lookup":"baz"}} >>> {"foo":"bar","lambda":{"baz":"qux"}}

The processor uses this Jsonnet configuration:

{
	type: 'lambda',
	settings: {
		options: {
			function: 'foo-function',
		},
		input_key: 'lambda',
		output_key: 'lambda',
	},
}

func (Lambda) Byte

func (p Lambda) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Lambda processor.

func (Lambda) Slice added in v0.2.0

func (p Lambda) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Lambda processor. Conditions are optionally applied on the bytes to enable processing.

type LambdaOptions

type LambdaOptions struct {
	Function       string `json:"function"`
	ErrorOnFailure bool   `json:"error_on_failure"`
}

LambdaOptions contains custom options settings for the Lambda processor:

Function:
	function to invoke
ErrorOnFailure (optional):
	if set to true, then errors from the invoked Lambda will cause the processor to fail
	defaults to false

type Math

type Math struct {
	Options   MathOptions              `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Math processes data by applying mathematic operations. The processor supports these patterns:

JSON:
	{"math":[1,3]} >>> {"math":4}

The processor uses this Jsonnet configuration:

{
	type: 'math',
	settings: {
		options: {
			operation: 'add',
		},
		input_key: 'math',
		output_key: 'math',
	},
}

func (Math) Byte

func (p Math) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Math processor.

func (Math) Slice added in v0.2.0

func (p Math) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Math processor. Conditions are optionally applied on the bytes to enable processing.

type MathOptions

type MathOptions struct {
	Operation string `json:"operation"`
}

MathOptions contains custom options for the Math processor:

Operation:
	the operator applied to the data
	must be one of:
		add
		subtract
		multiply
		divide

type Pipeline added in v0.3.0

type Pipeline struct {
	Options   PipelineOptions          `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Pipeline processes data by applying a series of processors. This processor should be used when data requires complex processing outside of the boundaries of any data structures (see tests for examples). The processor supports these patterns:

JSON:
	{"pipeline":"H4sIAMpcy2IA/wXAIQ0AAACAsLbY93csBiFlc4wDAAAA"} >>> {"pipeline":"foo"}
data:
	H4sIAMpcy2IA/wXAIQ0AAACAsLbY93csBiFlc4wDAAAA >> foo

The processor uses this Jsonnet configuration:

{
	type: 'pipeline',
	settings: {
		options: {
			processors: [
				{
					type: 'base64',
					settings: {
						options: {
							direction: 'from',
						}
					}
				},
				{
					type: 'gzip',
					settings: {
						options: {
							direction: 'from',
						}
					}
				},
			]
		},
		input_key: 'pipeline',
		output_key: 'pipeline',
	},
}

func (Pipeline) Byte added in v0.3.0

func (p Pipeline) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Pipeline processor.

Process Byters only accept bytes, so when processing JSON the input value is converted from Result to its string representation to bytes. The conversion from Result to string is safe for strings and objects, but not arrays (e.g., ["foo","bar"]).

If the input is an array, then an error is raised; the input should be run through the ForEach processor (which can encapsulate the Pipeline processor).

func (Pipeline) Slice added in v0.3.0

func (p Pipeline) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Pipeline processor. Conditions are optionally applied on the bytes to enable processing.

type PipelineOptions added in v0.3.0

type PipelineOptions struct {
	Processors []config.Config
}

PipelineOptions contains custom options for the Pipeline processor:

Processors:
	array of processors to apply to the data

type Replace

type Replace struct {
	Options   ReplaceOptions           `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Replace processes data by replacing characters. The processor supports these patterns:

JSON:
	{"replace":"bar"} >>> {"replace":"baz"}
data:
	bar >>> baz

The processor uses this Jsonnet configuration:

{
	type: 'replace',
	settings: {
		options: {
			old: 'r',
			new: 'z',
		},
		input_key: 'replace',
		output_key: 'replace',
	},
}

func (Replace) Byte

func (p Replace) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Replace processor.

func (Replace) Slice added in v0.2.0

func (p Replace) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Replace processor. Conditions are optionally applied on the bytes to enable processing.

type ReplaceOptions

type ReplaceOptions struct {
	Old   string `json:"old"`
	New   string `json:"new"`
	Count int    `json:"count"`
}

ReplaceOptions contains custom options for the Replace processor:

Old:
	the character(s) to replace in the data
New:
	the character(s) that replace Old
Count (optional):
	the number of replacements to make
	defaults to -1, which replaces all matches

type Slicer added in v0.2.0

type Slicer interface {
	Slice(context.Context, [][]byte) ([][]byte, error)
}

Slicer is an interface for applying processors to slices of bytes.

func MakeAllSlicers added in v0.2.0

func MakeAllSlicers(cfg []config.Config) ([]Slicer, error)

MakeAllSlicers accepts an array of Config and returns populated Slicers. This a conveience function for loading many Slicers.

func SlicerFactory added in v0.2.0

func SlicerFactory(cfg config.Config) (Slicer, error)

SlicerFactory loads a Slicer from a Config. This is the recommended function for retrieving ready-to-use Slicers.

type Time

type Time struct {
	Options   TimeOptions              `json:"options"`
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Time processes data by converting time values between formats. The processor supports these patterns:

JSON:
	{"time":1639877490.061} >>> {"time":"2021-12-19T01:31:30.061000Z"}
data:
	1639877490.061 >>> 2021-12-19T01:31:30.061000Z

The processor uses this Jsonnet configuration:

{
	type: 'time',
	settings: {
		options: {
			input_format: 'unix',
			output_format: '2006-01-02T15:04:05.000000Z',
		},
		input_key: 'time',
		output_key: 'time',
	},
}

func (Time) Byte

func (p Time) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Time processor.

func (Time) Slice added in v0.2.0

func (p Time) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Time processor. Conditions are optionally applied on the bytes to enable processing.

type TimeOptions

type TimeOptions struct {
	InputFormat    string `json:"input_format"`
	OutputFormat   string `json:"output_format"`
	InputLocation  string `json:"input_location"`
	OutputLocation string `json:"output_location"`
}

TimeOptions contains custom options for the Time processor:

InputFormat:
	time format of the input
	must be one of:
		pattern-based layouts (https://gobyexample.com/time-formatting-parsing)
		unix: epoch (supports fractions of a second)
		unix_milli: epoch milliseconds
		now: current time
OutputFormat:
	time format of the output
	must be one of:
		pattern-based layouts (https://gobyexample.com/time-formatting-parsing)
		unix: epoch
		unix_milli: epoch milliseconds
InputLocation (optional):
	the time zone abbreviation for the input
	defaults to UTC
OutputLocation (optional):
	the time zone abbreviation for the output
	defaults to UTC

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL