process

package
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 17, 2022 License: MIT Imports: 20 Imported by: 0

README

process

Contains interfaces and methods for atomically processing data. Each processor defines its own data processing patterns, but there are a set of common patterns shared among most processors:

  • processing JSON values, including arrays
  • processing bytes

The package can be used like this:

package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/process"
)

func main() {
	processor := process.Insert{
		Options: process.InsertOptions{
			Value: "bar",
		},
		OutputKey: "foo",
	}

	ctx := context.TODO()
	data := []byte(`{"hello":"world"}`)
	processed, err := processor.Byte(ctx, data)
	if err != nil {
		panic(err)
	}

	fmt.Println(string(data))
	fmt.Println(string(processed))
}

Information for each processor is available in the GoDoc.

Documentation

Index

Constants

View Source
const Base64InvalidAlphabet = errors.Error("Base64InvalidAlphabet")

Base64InvalidAlphabet is returned when the Base64 processor is configured with an invalid alphabet setting.

View Source
const Base64InvalidDirection = errors.Error("Base64InvalidDirection")

Base64InvalidDirection is returned when the Base64 processor is configured with an invalid direction setting.

View Source
const Base64InvalidSettings = errors.Error("Base64InvalidSettings")

Base64InvalidSettings is returned when the Base64 processor is configured with invalid Input and Output settings.

View Source
const ByteInvalidFactoryConfig = errors.Error("ByteInvalidFactoryConfig")

ByteInvalidFactoryConfig is used when an unsupported Byte is referenced in ByteFactory.

View Source
const CaptureInvalidSettings = errors.Error("CaptureInvalidSettings")

CaptureInvalidSettings is returned when the Capture processor is configured with invalid Input and Output settings.

View Source
const CaseInvalidSettings = errors.Error("CaseInvalidSettings")

CaseInvalidSettings is returned when the Case processor is configured with invalid Input and Output settings.

View Source
const ConcatInvalidSettings = errors.Error("ConcatInvalidSettings")

ConcatInvalidSettings is returned when the Concat processor is configured with invalid Input and Output settings.

View Source
const ConvertInvalidSettings = errors.Error("ConvertInvalidSettings")

ConvertInvalidSettings is returned when the Convert processor is configured with invalid Input and Output settings.

View Source
const CopyInvalidSettings = errors.Error("CopyInvalidSettings")

CopyInvalidSettings is returned when the Copy processor is configured with invalid Input and Output settings.

View Source
const DeleteInvalidSettings = errors.Error("DeleteInvalidSettings")

DeleteInvalidSettings is returned when the Copy processor is configured with invalid Input and Output settings.

View Source
const DomainInvalidSettings = errors.Error("DomainInvalidSettings")

DomainInvalidSettings is returned when the Domain processor is configured with invalid Input and Output settings.

View Source
const DomainNoSubdomain = errors.Error("DomainNoSubdomain")

DomainNoSubdomain is used when a domain without a subdomain is processed

View Source
const DynamoDBInvalidSettings = errors.Error("DynamoDBInvalidSettings")

DynamoDBInvalidSettings is returned when the DynamoDB processor is configured with invalid Input and Output settings.

View Source
const ExpandInvalidSettings = errors.Error("ExpandInvalidSettings")

ExpandInvalidSettings is returned when the Expand processor is configured with invalid Input and Output settings.

View Source
const FlattenInvalidSettings = errors.Error("FlattenInvalidSettings")

FlattenInvalidSettings is returned when the Flatten processor is configured with invalid Input and Output settings.

View Source
const GroupInvalidSettings = errors.Error("GroupInvalidSettings")

GroupInvalidSettings is returned when the Group processor is configured with invalid Input and Output settings.

View Source
const GzipInvalidDirection = errors.Error("GzipInvalidDirection")

GzipInvalidDirection is used when an invalid direction is given to the processor

View Source
const GzipInvalidSettings = errors.Error("GzipInvalidSettings")

GzipInvalidSettings is returned when the Gzip processor is configured with invalid Input and Output settings.

View Source
const HashInvalidSettings = errors.Error("HashInvalidSettings")

HashInvalidSettings is returned when the Hash processor is configured with invalid Input and Output settings.

View Source
const HashUnsupportedAlgorithm = errors.Error("HashUnsupportedAlgorithm")

HashUnsupportedAlgorithm is returned when the Hash processor is configured with an unsupported algorithm.

View Source
const InsertInvalidSettings = errors.Error("InsertInvalidSettings")

InsertInvalidSettings is returned when the Insert processor is configured with invalid Input and Output settings.

View Source
const LambdaInvalidSettings = errors.Error("LambdaInvalidSettings")

LambdaInvalidSettings is returned when the Lambda processor is configured with invalid Input and Output settings.

View Source
const MathInvalidSettings = errors.Error("MathInvalidSettings")

MathInvalidSettings is returned when the Math processor is configured with invalid Input and Output settings.

View Source
const ReplaceInvalidSettings = errors.Error("ReplaceInvalidSettings")

ReplaceInvalidSettings is returned when the Replace processor is configured with invalid Input and Output settings.

View Source
const SliceInvalidFactoryConfig = errors.Error("SliceInvalidFactoryConfig")

SliceInvalidFactoryConfig is used when an unsupported Slice is referenced in SliceFactory.

View Source
const TimeInvalidSettings = errors.Error("TimeInvalidSettings")

TimeInvalidSettings is returned when the Time processor is configured with invalid Input and Output settings.

Variables

This section is empty.

Functions

func Byte

func Byte(ctx context.Context, byters []Byter, data []byte) ([]byte, error)

Byte accepts an array of Byters and applies all processors to the data.

func NewSlice added in v0.2.0

func NewSlice(s *[][]byte) [][]byte

NewSlice returns a byte slice with a minimum capacity of 10.

func Slice added in v0.2.0

func Slice(ctx context.Context, slicers []Slicer, slice [][]byte) ([][]byte, error)

Slice accepts an array of Slicers and applies all processors to the data.

Types

type Base64 added in v0.2.0

type Base64 struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
	Options   Base64Options            `json:"options"`
}

Base64 processes data by converting it to and from base64. The processor supports these patterns:

json:
  	{"base64":"Zm9v"} >>> {"base64":"foo"}
json array:
	{"base64":["Zm9v","YmFy"]} >>> {"base64":["foo","bar"]}
data:
	Zm9v >>> foo

The processor uses this Jsonnet configuration:

{
	type: 'base64',
	settings: {
		input_key: 'base64',
		output_key: 'base64',
		options: {
			direction: 'from',
		}
	},
}

func (Base64) Byte added in v0.2.0

func (p Base64) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Base64 processor.

func (Base64) Slice added in v0.2.0

func (p Base64) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Base64 processor. Conditions are optionally applied on the bytes to enable processing.

type Base64Options added in v0.2.0

type Base64Options struct {
	Direction string `json:"direction"`
	Alphabet  string `json:"alphabet"`
}

Base64Options contains custom options for the Base64 processor:

Direction:
	the direction of the encoding
	must be one of:
		to: encode to base64
		from: decode from base64
Alphabet:
	the base64 alphabet to use, either std (https://www.rfc-editor.org/rfc/rfc4648.html#section-4) or url (https://www.rfc-editor.org/rfc/rfc4648.html#section-5)
	defaults to std

type Byter

type Byter interface {
	Byte(context.Context, []byte) ([]byte, error)
}

Byter is an interface for applying processors to bytes.

func ByterFactory

func ByterFactory(cfg config.Config) (Byter, error)

ByterFactory loads a Byter from a Config. This is the recommended function for retrieving ready-to-use Byters.

func MakeAllByters

func MakeAllByters(cfg []config.Config) ([]Byter, error)

MakeAllByters accepts an array of Config and returns populated Byters. This a conveience function for loading many Byters.

type Capture

type Capture struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
	Options   CaptureOptions           `json:"options"`
}

Capture processes data by capturing values using regular expressions. The processor supports these patterns:

json:
	{"capture":"foo@qux.com"} >>> {"capture":"foo"}
	{"capture":"foo@qux.com"} >>> {"capture":["f","o","o"]}
json array:
	{"capture":["foo@qux.com","bar@qux.com"]} >>> {"capture":["foo","bar"]}
	{"capture":["foo@qux.com","bar@qux.com"]} >>> {"capture":[["f","o","o"],["b","a","r"]]}
data:
	foo@qux.com >>> foo
	bar quux >>> {"foo":"bar","qux":"quux"}

The processor uses this Jsonnet configuration:

{
	type: 'capture',
	settings: {
		input_key: 'capture',
		output_key: 'capture',
		options: {
			expression: '^([^@]*)@.*$',
			_function: 'find',
		},
	},
}

func (Capture) Byte

func (p Capture) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Capture processor.

func (Capture) Slice added in v0.2.0

func (p Capture) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Capture processor. Conditions are optionally applied on the bytes to enable processing.

type CaptureOptions

type CaptureOptions struct {
	Expression string `json:"expression"`
	Function   string `json:"function"`
	Count      int    `json:"count"`
}

CaptureOptions contains custom options for the Capture processor:

Expression:
	the regular expression used to capture values
Function:
	the type of regular expression applied
	must be one of:
		find: applies the Find(String)?Submatch function
		find_all: applies the FindAll(String)?Submatch function (see count)
		named_group: applies the Find(String)?Submatch function and stores values as JSON using subexpressions
Count (optional):
	used for repeating capture groups
	defaults to match all capture groups

type Case

type Case struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
	Options   CaseOptions              `json:"options"`
}

Case processes data by changing the case of a string or byte slice. The processor supports these patterns:

json:
	{"case":"foo"} >>> {"case":"FOO"}
json array:
	{"case":["foo","bar"]} >>> {"case":["FOO","BAR"]}
data:
	foo >>> FOO

The processor uses this Jsonnet configuration:

{
	type: 'case',
	settings: {
		// if the value is "foo", then this returns "FOO"
		input_key: 'case',
		output_key: 'case',
		options: {
			case: 'upper',
		}
	},
}

func (Case) Byte

func (p Case) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Case processor.

func (Case) Slice added in v0.2.0

func (p Case) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Case processor. Conditions are optionally applied on the bytes to enable processing.

type CaseOptions

type CaseOptions struct {
	Case string `json:"case"`
}

CaseOptions contains custom options for the Case processor:

Case:
	the case to convert the string or byte to
	must be one of:
		upper
		lower
		snake (strings only)

type Concat

type Concat struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
	Options   ConcatOptions            `json:"options"`
}

Concat processes data by concatenating multiple values together with a separator. The processor supports these patterns:

json:
	{"concat":["foo","bar"]} >>> {"concat":"foo.bar"}
json array:
	{"concat":[["foo","baz"],["bar","qux"]]} >>> {"concat":["foo.bar","baz.qux"]}

The processor uses this Jsonnet configuration:

{
	type: 'concat',
	settings: {
		input_key: 'concat',
		output_key: 'concat',
		options: {
			separator: '.',
		}
	},
}

func (Concat) Byte

func (p Concat) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Concat processor.

func (Concat) Slice added in v0.2.0

func (p Concat) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Concat processor. Conditions are optionally applied on the bytes to enable processing.

type ConcatOptions

type ConcatOptions struct {
	Separator string `json:"separator"`
}

ConcatOptions contains custom options for the Concat processor:

Separator:
	the string that separates the concatenated values

type Convert

type Convert struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
	Options   ConvertOptions           `json:"options"`
}

Convert processes data by converting values between types (e.g., string to integer, integer to float). The processor supports these patterns:

json:
	{"convert":"true"} >>> {"convert":true}
	{"convert":"-123"} >>> {"convert":-123}
	{"convert":123} >>> {"convert":"123"}
json array:
	{"convert":["true","false"]} >>> {"convert":[true,false]}
	{"convert":["-123","-456"]} >>> {"convert":[-123,-456]}
	{"convert":[123,123.456]} >>> {"convert":["123","123.456"]}

The processor uses this Jsonnet configuration:

{
	type: 'convert',
	settings: {
		input_key: 'convert',
		output_key: 'convert',
		options: {
			type: 'bool',
		}
	},
}

func (Convert) Byte

func (p Convert) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Convert processor.

func (Convert) Slice added in v0.2.0

func (p Convert) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Convert processor. Conditions are optionally applied on the bytes to enable processing.

type ConvertOptions

type ConvertOptions struct {
	Type string `json:"type"`
}

ConvertOptions contains custom options for the Convert processor:

Type:
	the type that the value should be converted to
	must be one of:
		bool (boolean)
		int (integer)
		float
		uint (unsigned integer)
		string

type Copy

type Copy struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
}

Copy processes data by copying it. The processor supports these patterns:

	json:
	  	{"hello":"world"} >>> {"hello":"world","goodbye":"world"}
	from json:
  		{"hello":"world"} >>> world
	to json:
  		world >>> {"hello":"world"}

The processor uses this Jsonnet configuration:

{
	type: 'copy',
	settings: {
		input_key: 'hello',
		output_key: 'goodbye',
	},
}

func (Copy) Byte

func (p Copy) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Copy processor.

func (Copy) Slice added in v0.2.0

func (p Copy) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Copy processor. Conditions are optionally applied on the bytes to enable processing.

type Count

type Count struct{}

Count processes data by counting it.

The processor uses this Jsonnet configuration:

{
	type: 'count',
}

func (Count) Slice added in v0.2.0

func (p Count) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Count processor. Conditions are optionally applied on the bytes to enable processing.

type Delete

type Delete struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
}

Delete processes data by deleting JSON keys. The processor supports these patterns:

json:
  	{"foo":"bar","baz":"qux"} >>> {"foo":"bar"}

The processor uses this Jsonnet configuration:

{
	type: 'delete',
	settings: {
		input_key: 'delete',
	}
}

func (Delete) Byte

func (p Delete) Byte(ctx context.Context, object []byte) ([]byte, error)

Byte processes bytes with the Delete processor.

func (Delete) Slice added in v0.2.0

func (p Delete) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Delete processor. Conditions are optionally applied on the bytes to enable processing.

type Domain

type Domain struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
	Options   DomainOptions            `json:"options"`
}

Domain processes data by parsing fully qualified domain names into labels. The processor supports these patterns:

json:
	{"domain":"example.com"} >>> {"domain":"example.com","tld":"com"}
json array:
	{"domain":["example.com","example.top"]} >>> {"domain":["example.com","example.top"],"tld":["com","top"]}
data:
	example.com >>> com

The processor uses this Jsonnet configuration:

{
	type: 'domain',
	settings: {
		input_key: 'domain',
		input_key: 'tld',
		options: {
			_function: 'tld',
		}
	},
}

func (Domain) Byte

func (p Domain) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Domain processor.

func (Domain) Slice added in v0.2.0

func (p Domain) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Domain processor. Conditions are optionally applied on the bytes to enable processing.

type DomainOptions

type DomainOptions struct {
	Function string `json:"function"`
}

DomainOptions contains custom options for the Domain processor:

Function:
	the domain processing function to apply to the data
	must be one of:
		tld
		domain
		subdomain

type Drop

type Drop struct {
	Condition condition.OperatorConfig `json:"condition"`
}

Drop processes data by dropping it from a data channel. The processor uses this Jsonnet configuration:

{
	type: 'drop',
}

func (Drop) Slice added in v0.2.0

func (p Drop) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Drop processor. Conditions are optionally applied on the bytes to enable processing.

type DynamoDB

type DynamoDB struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
	Options   DynamoDBOptions          `json:"options"`
}

DynamoDB processes data by querying a DynamoDB table and returning all matched items as an array of JSON objects. The processor supports these patterns:

json:
	{"ddb":{"PK":"foo"}} >>> {"ddb":[{"foo":"bar"}]}

The processor uses this Jsonnet configuration:

{
	type: 'dynamodb',
	settings: {
		// the input key is expected to be a map containing a partition key ("PK") and an optional sort key ("SK")
		input_key: 'ddb',
		output_key: 'ddb',
		// if the value of the PK is "foo", then this queries DynamoDB by using "foo" as the paritition key value for the table attribute "pk" and returns the last indexed item from the table.
		options: {
			table: 'foo-table',
			key_condition_expression: 'pk = :partitionkeyval',
			limit: 1,
			scan_index_forward: true,
		}
	},
}

func (DynamoDB) Byte

func (p DynamoDB) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the DynamoDB processor.

func (DynamoDB) Slice added in v0.2.0

func (p DynamoDB) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the DynamoDB processor. Conditions are optionally applied on the bytes to enable processing.

type DynamoDBOptions

type DynamoDBOptions struct {
	Table                  string `json:"table"`
	KeyConditionExpression string `json:"key_condition_expression"`
	Limit                  int64  `json:"limit"`
	ScanIndexForward       bool   `json:"scan_index_forward"`
}

DynamoDBOptions contains custom options settings for the DynamoDB processor (https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Query.html#API_Query_RequestSyntax):

Table:
	table to query
KeyConditionExpression:
	key condition expression (see documentation)
Limit (optional):
	maximum number of items to evaluate
	defaults to evaluating all items
ScanIndexForward (optional):
	specifies the order of index traversal
	must be one of:
		true (default): traversal is performed in ascending order
		false: traversal is performed in descending order

type Expand

type Expand struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	Options   ExpandOptions            `json:"options"`
}

Expand processes data by creating individual events from objects in JSON arrays. The processor supports these patterns:

json array:
	{"expand":[{"foo":"bar"}],"baz":"qux"} >>> {"foo":"bar","baz":"qux"}

The processor uses this Jsonnet configuration:

{
	type: 'expand',
	settings: {
		input_key: 'expand',
		options: {
			retain: ['baz'],
		}
	},
}

func (Expand) Slice added in v0.2.0

func (p Expand) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Expand processor. Conditions are optionally applied on the bytes to enable processing.

type ExpandOptions

type ExpandOptions struct {
	Retain []string `json:"retain"` // retain fields found anywhere in input
}

ExpandOptions contains custom options settings for the Expand processor:

Retain (optional):
	array of JSON keys to retain from the original object

type Flatten

type Flatten struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
	Options   FlattenOptions           `json:"options"`
}

Flatten processes data by flattening JSON arrays. The processor supports these patterns:

json:
	{"flatten":["foo",["bar"]]} >>> {"flatten":["foo","bar"]}

The processor uses this Jsonnet configuration:

{
	type: 'flatten',
	settings: {
		input_key: 'flatten',
		output_key: 'flatten',
	},
}

func (Flatten) Byte

func (p Flatten) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Flatten processor.

func (Flatten) Slice added in v0.2.0

func (p Flatten) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Flatten processor. Conditions are optionally applied on the bytes to enable processing.

type FlattenOptions

type FlattenOptions struct {
	Deep bool `json:"deep"`
}

FlattenOptions contains custom options settings for the Flatten processor:

Deep (optional):
	deeply flattens nested arrays

type Group added in v0.2.0

type Group struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
	Options   GroupOptions             `json:"options"`
}

Group processes data by grouping JSON arrays into an array of tuples or array of JSON objects. The processor supports these patterns:

json array:
	{"group":[["foo","bar"],[111,222]]} >>> {"group":[["foo",111],["bar",222]]}
	{"group":[["foo","bar"],[111,222]]} >>> {"group":[{"name":foo","size":111},{"name":"bar","size":222}]}

The processor uses this Jsonnet configuration:

{
	type: 'group',
	settings: {
		input_key: 'group',
		output_key: 'group',
		}
	},
}

func (Group) Byte added in v0.2.0

func (p Group) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Group processor.

func (Group) Slice added in v0.2.0

func (p Group) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Group processor. Conditions are optionally applied on the bytes to enable processing.

type GroupOptions added in v0.2.0

type GroupOptions struct {
	Keys []string `json:"keys"`
}

GroupOptions contains custom options for the Group processor:

Keys (optional):
	where values from Inputs.Keys are written to, creating new JSON objects

type Gzip added in v0.2.0

type Gzip struct {
	Condition condition.OperatorConfig `json:"condition"`
	Options   GzipOptions              `json:"options"`
}

Gzip processes data by compressing or decompressing gzip. The processor supports these patterns:

data:
	[31 139 8 0 0 0 0 0 0 255 74 203 207 7 4 0 0 255 255 33 101 115 140 3 0 0 0] >>> foo
	foo >>> [31 139 8 0 0 0 0 0 0 255 74 203 207 7 4 0 0 255 255 33 101 115 140 3 0 0 0]

The processor uses this Jsonnet configuration:

{
	type: 'gzip',
	settings: {
		direction: 'from',
	},
}

func (Gzip) Byte added in v0.2.0

func (p Gzip) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Gzip processor.

func (Gzip) Slice added in v0.2.0

func (p Gzip) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Gzip processor. Conditions are optionally applied on the bytes to enable processing.

type GzipOptions added in v0.2.0

type GzipOptions struct {
	Direction string `json:"direction"`
}

GzipOptions contains custom options settings for the Gzip processor:

Direction:
	the direction of the compression
	must be one of:
		to: compress data to gzip
		from: decompress data from gzip

type Hash

type Hash struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
	Options   HashOptions              `json:"options"`
}

Hash processes data by calculating hashes. The processor supports these patterns:

json:
	{"hash":"foo"} >>> {"hash":"acbd18db4cc2f85cedef654fccc4a4d8"}
json array:
	{"hash":["foo","bar"]} >>> {"hash":["acbd18db4cc2f85cedef654fccc4a4d8","37b51d194a7513e45b56f6524f2d51f2"]}
data:
	foo >>> acbd18db4cc2f85cedef654fccc4a4d8

The processor uses this Jsonnet configuration:

{
	type: 'hash',
	settings: {
		input_key: 'hash',
		output_key: 'hash',
		options: {
			algorithm: 'md5',
		}
	},
}

func (Hash) Byte

func (p Hash) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Hash processor.

func (Hash) Slice added in v0.2.0

func (p Hash) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Hash processor. Conditions are optionally applied on the bytes to enable processing.

type HashOptions

type HashOptions struct {
	Algorithm string `json:"algorithm"`
}

HashOptions contains custom options for the Hash processor:

Algorithm:
	the hashing algorithm to apply
	must be one of:
		md5
		sha256

type Insert

type Insert struct {
	Condition condition.OperatorConfig `json:"condition"`
	OutputKey string                   `json:"output_key"`
	Options   InsertOptions            `json:"options"`
}

Insert processes data by inserting a value into a JSON object. The processor supports these patterns:

json:
	{"foo":"bar"} >>> {"foo":"bar","baz":"qux"}

The processor uses this Jsonnet configuration:

{
	type: 'insert',
	settings: {
		output_key: 'baz',
		options: {
			value: 'qux',
		}
	},
}

func (Insert) Byte

func (p Insert) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Insert processor.

func (Insert) Slice added in v0.2.0

func (p Insert) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Insert processor. Conditions are optionally applied on the bytes to enable processing.

type InsertOptions

type InsertOptions struct {
	Value interface{} `json:"value"`
}

InsertOptions contains custom options for the Insert processor:

value:
	the value to insert

type Lambda

type Lambda struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
	Options   LambdaOptions            `json:"options"`
}

Lambda processes data by synchronously invoking an AWS Lambda and returning the payload. The average latency of synchronously invoking a Lambda function is 10s of milliseconds, but latency can take 100s to 1000s of milliseconds depending on the function which can have significant impact on total event latency. If Substation is running in AWS Lambda with Kinesis, then this latency can be mitigated by increasing the parallelization factor of the Lambda (https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html).

The input key's value must be a JSON object that contains settings for the Lambda. It is recommended to use the copy and insert processors to create the JSON object before calling this processor and to use the delete processor to remove the JSON object after calling this processor.

The processor supports these patterns:

json:
	{"foo":"bar","lambda":{"lookup":"baz"}} >>> {"foo":"bar","lambda":{"baz":"qux"}}

The processor uses this Jsonnet configuration:

{
	type: 'lambda',
	settings: {
		input_key: 'lambda',
		output_key: 'lambda',
		options: {
			function: 'foo-function',
		}
	},
}

func (Lambda) Byte

func (p Lambda) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Lambda processor.

func (Lambda) Slice added in v0.2.0

func (p Lambda) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Lambda processor. Conditions are optionally applied on the bytes to enable processing.

type LambdaOptions

type LambdaOptions struct {
	Function       string `json:"function"`
	ErrorOnFailure bool   `json:"error_on_failure"`
}

LambdaOptions contains custom options settings for the Lambda processor:

Function:
	function to invoke
ErrorOnFailure (optional):
	if set to true, then errors from the invoked Lambda will cause the processor to fail
	defaults to false

type Math

type Math struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
	Options   MathOptions              `json:"options"`
}

Math processes data by applying mathematic operations. The processor supports these patterns:

json:
	{"math":[1,3]} >>> {"math":4}
json array:
	{"math":[[1,2],[3,4]]} >>> {"math":[4,6]}

The processor uses this Jsonnet configuration:

{
	type: 'math',
	settings: {
		input_key: 'math',
		output_key: 'math',
		options: {
			operation: 'add',
		}
	},
}

func (Math) Byte

func (p Math) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Math processor.

func (Math) Slice added in v0.2.0

func (p Math) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Math processor. Conditions are optionally applied on the bytes to enable processing.

type MathOptions

type MathOptions struct {
	Operation string `json:"operation"`
}

MathOptions contains custom options for the Math processor:

Operation:
	the operator applied to the data
	must be one of:
		add
		subtract
		divide

type Replace

type Replace struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
	Options   ReplaceOptions           `json:"options"`
}

Replace processes data by replacing characters. The processor supports these patterns:

json:
	{"replace":"bar"} >>> {"replace":"baz"}
json array:
	{"replace":["bar","bard"]} >>> {"replace":["baz","bazd"]}
data:
	bar >>> baz

The processor uses this Jsonnet configuration:

{
	type: 'replace',
	settings: {
		input_key: 'replace',
		output_key: 'replace',
		options: {
			old: 'r',
			new: 'z',
		}
	},
}

func (Replace) Byte

func (p Replace) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Replace processor.

func (Replace) Slice added in v0.2.0

func (p Replace) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Replace processor. Conditions are optionally applied on the bytes to enable processing.

type ReplaceOptions

type ReplaceOptions struct {
	Old   string `json:"old"`
	New   string `json:"new"`
	Count int    `json:"count"`
}

ReplaceOptions contains custom options for the Replace processor:

Old:
	the character(s) to replace in the data
New:
	the character(s) that replace Old
Count (optional):
	the number of replacements to make
	defaults to -1, which replaces all matches

type Slicer added in v0.2.0

type Slicer interface {
	Slice(context.Context, [][]byte) ([][]byte, error)
}

Slicer is an interface for applying processors to slices of bytes.

func MakeAllSlicers added in v0.2.0

func MakeAllSlicers(cfg []config.Config) ([]Slicer, error)

MakeAllSlicers accepts an array of Config and returns populated Slicers. This a conveience function for loading many Slicers.

func SlicerFactory added in v0.2.0

func SlicerFactory(cfg config.Config) (Slicer, error)

SlicerFactory loads a Slicer from a Config. This is the recommended function for retrieving ready-to-use Slicers.

type Time

type Time struct {
	Condition condition.OperatorConfig `json:"condition"`
	InputKey  string                   `json:"input_key"`
	OutputKey string                   `json:"output_key"`
	Options   TimeOptions              `json:"options"`
}

Time processes data by converting time values between formats. The processor supports these patterns:

json:
	{"time":1639877490.061} >>> {"time":"2021-12-19T01:31:30.000000Z"}
json array:
	{"time":[1639877490.061,1651705967]} >>> {"time":["2021-12-19T01:31:30.000000Z","2022-05-04T23:12:47.000000Z"]}

The processor uses this Jsonnet configuration:

{
	type: 'time',
	settings: {
		input_key: 'time',
		output_key: 'time',
		options: {
			input_format: 'unix',
			output_format: '2006-01-02T15:04:05',
		}
	},
}

func (Time) Byte

func (p Time) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes bytes with the Time processor.

func (Time) Slice added in v0.2.0

func (p Time) Slice(ctx context.Context, s [][]byte) ([][]byte, error)

Slice processes a slice of bytes with the Time processor. Conditions are optionally applied on the bytes to enable processing.

type TimeOptions

type TimeOptions struct {
	InputFormat    string `json:"input_format"`
	InputLocation  string `json:"input_location"`
	OutputFormat   string `json:"output_format"`
	OutputLocation string `json:"output_location"`
}

TimeOptions contains custom options for the Time processor:

InputFormat:
	time format of the input
	must be one of:
		pattern-based layouts (https://gobyexample.com/time-formatting-parsing)
		unix: epoch
		unix_milli: epoch milliseconds
		unix_nano: epoch nanoseconds
		now: current time
InputLocation (optional):
	the time zone abbreviation for the input
	defaults to UTC
OutputFormat:
	time format of the output
	must be one of:
		pattern-based layouts (https://gobyexample.com/time-formatting-parsing)
InputLocation (optional):
	the time zone abbreviation for the output
	defaults to UTC

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL