process

package
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 5, 2022 License: MIT Imports: 26 Imported by: 0

README

process

Contains interfaces and methods for processing data. Processors contain methods for processing data singletons and batches (slices). Each processor defines its own data processing patterns, but there are a common set of patterns shared among most processors:

  • processing unstructured data
  • processing JSON objects

The package can be used like this (more examples are also available):

package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/process"
)

func main() {
	proc := process.Insert{
		OutputKey: "baz",
		Options: process.InsertOptions{
			Value: "qux",
		},
	}

	data := []byte(`{"foo":"bar"}`)
	data, err := process.Byte(context.TODO(), data, proc)
	if err != nil {
		panic(err)
	}

	fmt.Printf("%s\n", data)
}

Information for each processor is available in the GoDoc.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Apply added in v0.4.0

func Apply(ctx context.Context, capsule config.Capsule, apps ...Applicator) (config.Capsule, error)

Apply accepts one or many Applicators and applies processors in series to encapsulated data.

func ApplyBatch added in v0.4.0

func ApplyBatch(ctx context.Context, batch []config.Capsule, apps ...BatchApplicator) ([]config.Capsule, error)

ApplyBatch accepts one or many BatchApplicators and applies processors in series to a slice of encapsulated data.

func ApplyByte added in v0.4.0

func ApplyByte(ctx context.Context, data []byte, apps ...Applicator) ([]byte, error)

ApplyByte is a convenience function for applying one or many Applicators to bytes.

Types

type Aggregate added in v0.4.0

type Aggregate struct {
	Options   AggregateOptions `json:"options"`
	Condition condition.Config `json:"condition"`
	OutputKey string           `json:"output_key"`
}

Aggregate processes data by buffering and aggregating it into a single item.

Data is processed by aggregating it into in-memory buffers until the configured count or size of the aggregate meets a threshold and new data is produced. This supports multiple data aggregation patterns:

- concatenate batches of data with a separator value

- store batches of data in a JSON array

- organize nested JSON in a JSON array based on unique keys

The processor supports these patterns:

JSON array:
	foo bar baz qux >>> {"aggregate":["foo","bar","baz","qux"]}
	{"foo":"bar"} {"baz":"qux"} >>> {"aggregate":[{"foo":"bar"},{"baz":"qux"}]}
data:
	foo bar baz qux >>> foo\nbar\nbaz\qux
	{"foo":"bar"} {"baz":"qux"} >>> {"foo":"bar"}\n{"baz":"qux"}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "aggregate",
	"settings": {
		"options": {
			"max_count": 1000,
			"max_size": 1000
		},
		"output_key": "aggregate.-1"
	}
}

func (Aggregate) ApplyBatch added in v0.4.0

func (p Aggregate) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Aggregate processor. Conditions are optionally applied to the data to enable processing.

type AggregateOptions added in v0.4.0

type AggregateOptions struct {
	AggregateKey string `json:"aggregate_key"`
	Separator    string `json:"separator"`
	MaxCount     int    `json:"max_count"`
	MaxSize      int    `json:"max_size"`
}

AggregateOptions contains custom options settings for the Aggregate processor:

AggregateKey (optional):
	JSON key-value that is used to organize aggregated data
	defaults to empty string, only applies to JSON
Separator (optional):
	string that separates aggregated data
	defaults to empty string, only applies to data
MaxCount (optional):
	maximum number of items stored in a buffer when aggregating data
	defaults to 1000
MaxSize (optional):
	maximum size, in bytes, of items stored in a buffer when aggregating data
	defaults to 10000 (10KB)

type Applicator added in v0.4.0

type Applicator interface {
	Apply(context.Context, config.Capsule) (config.Capsule, error)
}

Applicator is an interface for applying a processor to encapsulated data.

func ApplicatorFactory added in v0.4.0

func ApplicatorFactory(cfg config.Config) (Applicator, error)

ApplicatorFactory returns a configured Applicator from a config. This is the recommended method for retrieving ready-to-use Applicators.

func MakeApplicators added in v0.4.0

func MakeApplicators(cfg []config.Config) ([]Applicator, error)

MakeApplicators accepts multiple processor configs and returns populated Applicators. This is a convenience function for generating many Applicators.

type Base64 added in v0.2.0

type Base64 struct {
	Options   Base64Options    `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Base64 processes data by converting it to and from base64 encoding. The processor supports these patterns:

JSON:
  	{"base64":"Zm9v"} >>> {"base64":"foo"}
data:
	Zm9v >>> foo

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "base64",
	"settings": {
		"options": {
			"direction": "from"
		},
		"input_key": "base64",
		"output_key": "base64"
	}
}

func (Base64) Apply added in v0.4.0

func (p Base64) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Base64 processor.

func (Base64) ApplyBatch added in v0.4.0

func (p Base64) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Base64 processor. Conditions are optionally applied to the data to enable processing.

type Base64Options added in v0.2.0

type Base64Options struct {
	Direction string `json:"direction"`
}

Base64Options contains custom options for the Base64 processor:

Direction:
	direction of the encoding
	must be one of:
		to: encode to base64
		from: decode from base64

type BatchApplicator added in v0.4.0

type BatchApplicator interface {
	ApplyBatch(context.Context, []config.Capsule) ([]config.Capsule, error)
}

BatchApplicator is an interface for applying a processor to a slice of encapsulated data.

func BatchApplicatorFactory added in v0.4.0

func BatchApplicatorFactory(cfg config.Config) (BatchApplicator, error)

BatchApplicatorFactory returns a configured BatchApplicator from a config. This is the recommended method for retrieving ready-to-use BatchApplicators.

func MakeBatchApplicators added in v0.4.0

func MakeBatchApplicators(cfg []config.Config) ([]BatchApplicator, error)

MakeBatchApplicators accepts multiple processor configs and returns populated BatchApplicators. This is a convenience function for generating many BatchApplicators.

type Capture

type Capture struct {
	Options   CaptureOptions   `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Capture processes data by capturing values using regular expressions. The processor supports these patterns:

JSON:
	{"capture":"foo@qux.com"} >>> {"capture":"foo"}
	{"capture":"foo@qux.com"} >>> {"capture":["f","o","o"]}
data:
	foo@qux.com >>> foo
	bar quux >>> {"foo":"bar","qux":"quux"}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "capture",
	"settings": {
		"options": {
			"expression": "^([^@]*)@.*$",
			"function": "find"
		},
		"input_key": "capture",
		"output_key": "capture"
	}
}

func (Capture) Apply added in v0.4.0

func (p Capture) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Capture processor.

func (Capture) ApplyBatch added in v0.4.0

func (p Capture) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Capture processor. Conditions are optionally applied to the data to enable processing.

type CaptureOptions

type CaptureOptions struct {
	Expression string `json:"expression"`
	Function   string `json:"function"`
	Count      int    `json:"count"`
}

CaptureOptions contains custom options for the Capture processor:

Expression:
	regular expression used to capture values
Function:
	type of regular expression applied
	must be one of:
		find: applies the Find(String)?Submatch function
		find_all: applies the FindAll(String)?Submatch function (see count)
		named_group: applies the Find(String)?Submatch function and stores values as JSON using subexpressions
Count (optional):
	used for repeating capture groups
	defaults to match all capture groups

type Case

type Case struct {
	Options   CaseOptions      `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Case processes data by changing the case of a string or byte slice. The processor supports these patterns:

JSON:
	{"case":"foo"} >>> {"case":"FOO"}
data:
	foo >>> FOO

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "case",
	"settings": {
		"options": {
			"case": "upper"
		},
		"input_key": "case",
		"output_key": "case"
	}
}

func (Case) Apply added in v0.4.0

func (p Case) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Case processor.

func (Case) ApplyBatch added in v0.4.0

func (p Case) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Case processor. Conditions are optionally applied to the data to enable processing.

type CaseOptions

type CaseOptions struct {
	Case string `json:"case"`
}

CaseOptions contains custom options for the Case processor:

Case:
	case to convert the string or byte to
	must be one of:
		upper
		lower
		snake (strings only)

type Concat

type Concat struct {
	Options   ConcatOptions    `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Concat processes data by concatenating multiple values together with a separator. The processor supports these patterns:

JSON:
	{"concat":["foo","bar"]} >>> {"concat":"foo.bar"}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "concat",
	"settings": {
		"options": {
			"separator": "."
		},
		"input_key": "concat",
		"output_key": "concat"
	}
}

func (Concat) Apply added in v0.4.0

func (p Concat) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Concat processor.

func (Concat) ApplyBatch added in v0.4.0

func (p Concat) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Concat processor. Conditions are optionally applied to the data to enable processing.

type ConcatOptions

type ConcatOptions struct {
	Separator string `json:"separator"`
}

ConcatOptions contains custom options for the Concat processor:

Separator:
	string that separates the concatenated values

type Convert

type Convert struct {
	Options   ConvertOptions   `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Convert processes data by converting values between types (e.g., string to integer, integer to float). The processor supports these patterns:

JSON:
	{"convert":"true"} >>> {"convert":true}
	{"convert":"-123"} >>> {"convert":-123}
	{"convert":123} >>> {"convert":"123"}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "convert",
	"settings": {
		"options": {
			"type": "bool"
		},
		"input_key": "convert",
		"output_key": "convert"
	}
}

func (Convert) Apply added in v0.4.0

func (p Convert) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Convert processor.

func (Convert) ApplyBatch added in v0.4.0

func (p Convert) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Convert processor. Conditions are optionally applied to the data to enable processing.

type ConvertOptions

type ConvertOptions struct {
	Type string `json:"type"`
}

ConvertOptions contains custom options for the Convert processor:

Type:
	type that the value is converted to
	must be one of:
		bool (boolean)
		int (integer)
		float
		uint (unsigned integer)
		string

type Copy

type Copy struct {
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Copy processes data by copying it. The processor supports these patterns:

JSON:
	{"hello":"world"} >>> {"hello":"world","goodbye":"world"}
from JSON:
	{"hello":"world"} >>> world
to JSON:
	world >>> {"hello":"world"}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "copy",
	"settings": {
		"input_key": "hello",
		"output_key": "goodbye"
	}
}

func (Copy) Apply added in v0.4.0

func (p Copy) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Copy processor.

func (Copy) ApplyBatch added in v0.4.0

func (p Copy) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Copy processor. Conditions are optionally applied to the data to enable processing.

type Count

type Count struct{}

Count processes data by counting it.

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "count"
}

func (Count) ApplyBatch added in v0.4.0

func (p Count) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Count processor. Conditions are optionally applied to the data to enable processing.

type Delete

type Delete struct {
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
}

Delete processes data by deleting JSON keys. The processor supports these patterns:

JSON:
  	{"foo":"bar","baz":"qux"} >>> {"foo":"bar"}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "delete",
	"settings": {
		"input_key": "delete"
	}
}

func (Delete) Apply added in v0.4.0

func (p Delete) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Delete processor.

func (Delete) ApplyBatch added in v0.4.0

func (p Delete) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Delete processor. Conditions are optionally applied to the data to enable processing.

type Domain

type Domain struct {
	Options   DomainOptions    `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Domain processes data by parsing fully qualified domain names into labels. The processor supports these patterns:

JSON:
	{"domain":"example.com"} >>> {"domain":"example.com","tld":"com"}
data:
	example.com >>> com

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "domain",
	"settings": {
		"options": {
			"function": "tld"
		},
		"input_key": "domain",
		"output_key": "tld"
	}
}

func (Domain) Apply added in v0.4.0

func (p Domain) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Domain processor.

func (Domain) ApplyBatch added in v0.4.0

func (p Domain) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Domain processor. Conditions are optionally applied to the data to enable processing.

type DomainOptions

type DomainOptions struct {
	Function string `json:"function"`
}

DomainOptions contains custom options for the Domain processor:

Function:
	domain processing function applied to the data
	must be one of:
		tld
		domain
		subdomain

type Drop

type Drop struct {
	Condition condition.Config `json:"condition"`
}

Drop processes data by "dropping" it -- the data is entirely removed and not emitted.

When loaded with a factory, the processor uses this JSON configuration:

{
	type: "drop"
}

func (Drop) ApplyBatch added in v0.4.0

func (p Drop) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Drop processor. Conditions are optionally applied to the data to enable processing.

type DynamoDB

type DynamoDB struct {
	Options   DynamoDBOptions  `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

DynamoDB processes data by querying a DynamoDB table and returning all matched items as an array of JSON objects. The input must be a JSON object containing a partition key ("PK") and optionally containing a sort key ("SK"). This processor uses the DynamoDB Query operation, refer to the DynamoDB documentation for the Query operation's request syntax and key condition expression patterns:

- https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Query.html#API_Query_RequestSyntax

- https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.KeyConditionExpressions

The processor supports these patterns:

JSON:
	{"ddb":{"PK":"foo"}} >>> {"ddb":[{"foo":"bar"}]}
	{"ddb":{"PK":"foo","SK":"baz"}} >>> {"ddb":[{"foo":"bar"}]}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "dynamodb",
	"settings": {
		"options": {
			"table": "foo-table",
			"key_condition_expression": "PK = :pk and begins_with(SK, :sk)",
			"limit": 1,
			"scan_index_forward": true
		},
		"input_key": "ddb",
		"output_key": "ddb"
	}
}

func (DynamoDB) Apply added in v0.4.0

func (p DynamoDB) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the DynamoDB processor.

func (DynamoDB) ApplyBatch added in v0.4.0

func (p DynamoDB) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the DynamoDB processor. Conditions are optionally applied to the data to enable processing.

type DynamoDBOptions

type DynamoDBOptions struct {
	Table                  string `json:"table"`
	KeyConditionExpression string `json:"key_condition_expression"`
	Limit                  int64  `json:"limit"`
	ScanIndexForward       bool   `json:"scan_index_forward"`
}

DynamoDBOptions contains custom options settings for the DynamoDB processor (https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Query.html#API_Query_RequestSyntax):

Table:
	DynamoDB table to query
KeyConditionExpression:
	key condition expression (see documentation)
Limit (optional):
	maximum number of items to evaluate
	defaults to evaluating all items
ScanIndexForward (optional):
	specifies the order of index traversal
	must be one of:
		true (default): traversal is performed in ascending order
		false: traversal is performed in descending order

type Expand

type Expand struct {
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
}

Expand processes data by creating individual events from objects in arrays. The processor supports these patterns:

JSON:
	{"expand":[{"foo":"bar"}],"baz":"qux"} >>> {"foo":"bar","baz":"qux"}
data:
	[{"foo":"bar"}] >>> {"foo":"bar"}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "expand",
	"settings": {
		"input_key": "expand"
	}
}

func (Expand) ApplyBatch added in v0.4.0

func (p Expand) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Expand processor. Conditions are optionally applied to the data to enable processing.

type Flatten

type Flatten struct {
	Options   FlattenOptions   `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Flatten processes data by flattening JSON arrays. The processor supports these patterns:

JSON:
	{"flatten":["foo",["bar"]]} >>> {"flatten":["foo","bar"]}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "flatten",
	"settings": {
		"input_key": "flatten",
		"output_key": "flatten"
	}
}

func (Flatten) Apply added in v0.4.0

func (p Flatten) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Flatten processor.

func (Flatten) ApplyBatch added in v0.4.0

func (p Flatten) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Flatten processor. Conditions are optionally applied to the data to enable processing.

type FlattenOptions

type FlattenOptions struct {
	Deep bool `json:"deep"`
}

FlattenOptions contains custom options settings for the Flatten processor:

Deep (optional):
	deeply flattens nested arrays

type ForEach added in v0.3.0

type ForEach struct {
	Options   ForEachOptions   `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

ForEach processes data by iterating and applying a processor to each element in a JSON array. The processor supports these patterns:

JSON:
	{"input":["ABC","DEF"]} >>> {"input":["ABC","DEF"],"output":["abc","def"]}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "for_each",
	"settings": {
		"options": {
			"processor": {
				"type": "case",
				"settings": {
					"options": {
						"case": "lower"
					}
				}
			}
		},
		input_key: "input",
		output_key: "output.-1"
	}
}

func (ForEach) Apply added in v0.4.0

func (p ForEach) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the ForEach processor.

JSON values are treated as arrays and the configured processor is applied to each element in the array. If multiple processors need to be applied to each element, then the Pipeline processor should be used to create a nested data processing workflow. For example:

ForEach -> Pipeline -> [Copy, Delete, Copy]

func (ForEach) ApplyBatch added in v0.4.0

func (p ForEach) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the ForEach processor. Conditions are optionally applied to the data to enable processing.

type ForEachOptions added in v0.3.0

type ForEachOptions struct {
	Processor config.Config
}

ForEachOptions contains custom options for the ForEach processor:

Processor:
	processor applied to the data

type Group added in v0.2.0

type Group struct {
	Options   GroupOptions     `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Group processes data by grouping JSON arrays into an array of tuples or array of JSON objects. The processor supports these patterns:

JSON array:
	{"group":[["foo","bar"],[111,222]]} >>> {"group":[["foo",111],["bar",222]]}
	{"group":[["foo","bar"],[111,222]]} >>> {"group":[{"name":foo","size":111},{"name":"bar","size":222}]}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "group",
	"settings": {
		"input_key": "group",
		"output_key": "group"
	}
}

func (Group) Apply added in v0.4.0

func (p Group) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Group processor.

func (Group) ApplyBatch added in v0.4.0

func (p Group) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Group processor. Conditions are optionally applied to the data to enable processing.

type GroupOptions added in v0.2.0

type GroupOptions struct {
	Keys []string `json:"keys"`
}

GroupOptions contains custom options for the Group processor:

Keys (optional):
	path where values from InputKey are written to, creating new JSON objects

type Gzip added in v0.2.0

type Gzip struct {
	Options   GzipOptions      `json:"options"`
	Condition condition.Config `json:"condition"`
}

Gzip processes data by compressing or decompressing gzip. The processor supports these patterns:

data:
	[31 139 8 0 0 0 0 0 0 255 74 203 207 7 4 0 0 255 255 33 101 115 140 3 0 0 0] >>> foo
	foo >>> [31 139 8 0 0 0 0 0 0 255 74 203 207 7 4 0 0 255 255 33 101 115 140 3 0 0 0]

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "gzip",
	"settings": {
		"options": {
			"direction": "from"
		}
	}
}

func (Gzip) Apply added in v0.4.0

func (p Gzip) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Gzip processor.

func (Gzip) ApplyBatch added in v0.4.0

func (p Gzip) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Gzip processor. Conditions are optionally applied to the data to enable processing.

type GzipOptions added in v0.2.0

type GzipOptions struct {
	Direction string `json:"direction"`
}

GzipOptions contains custom options settings for the Gzip processor:

Direction:
	direction of the compression
	must be one of:
		to: compress data to gzip
		from: decompress data from gzip

type Hash

type Hash struct {
	Options   HashOptions      `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Hash processes data by calculating hashes. The processor supports these patterns:

JSON:
	{"hash":"foo"} >>> {"hash":"acbd18db4cc2f85cedef654fccc4a4d8"}
data:
	foo >>> acbd18db4cc2f85cedef654fccc4a4d8

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "hash",
	"settings": {
		"options": {
			"algorithm": "md5"
		},
		"input_key": "hash",
		"output_key": "hash"
	}
}

func (Hash) Apply added in v0.4.0

func (p Hash) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Hash processor.

func (Hash) ApplyBatch added in v0.4.0

func (p Hash) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Hash processor. Conditions are optionally applied to the data to enable processing.

type HashOptions

type HashOptions struct {
	Algorithm string `json:"algorithm"`
}

HashOptions contains custom options for the Hash processor:

Algorithm:
	hashing algorithm applied to the data
	must be one of:
		md5
		sha256

type Insert

type Insert struct {
	Options   InsertOptions    `json:"options"`
	Condition condition.Config `json:"condition"`
	OutputKey string           `json:"output_key"`
}

Insert processes data by inserting a value into a JSON object. The processor supports these patterns:

JSON:
	{"foo":"bar"} >>> {"foo":"bar","baz":"qux"}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "insert",
	"settings": {
		"options": {
			"value": "qux"
		},
		"output_key": "baz"
	}
}

func (Insert) Apply added in v0.4.0

func (p Insert) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Insert processor.

func (Insert) ApplyBatch added in v0.4.0

func (p Insert) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Insert processor. Conditions are optionally applied to the data to enable processing.

type InsertOptions

type InsertOptions struct {
	Value interface{} `json:"value"`
}

InsertOptions contains custom options for the Insert processor:

value:
	value to insert

type Lambda

type Lambda struct {
	Options   LambdaOptions    `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Lambda processes data by synchronously invoking an AWS Lambda and returning the payload. The average latency of synchronously invoking a Lambda function is 10s of milliseconds, but latency can take 100s to 1000s of milliseconds depending on the function which can have significant impact on total event latency. If Substation is running in AWS Lambda with Kinesis, then this latency can be mitigated by increasing the parallelization factor of the Lambda (https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html).

The input key's value must be a JSON object that contains settings for the Lambda. It is recommended to use the copy and insert processors to create the JSON object before calling this processor and to use the delete processor to remove the JSON object after calling this processor.

The processor supports these patterns:

JSON:
	{"foo":"bar","lambda":{"lookup":"baz"}} >>> {"foo":"bar","lambda":{"baz":"qux"}}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "lambda",
	"settings": {
		"options": {
			"function": "foo-function"
		},
		"input_key": "lambda",
		"output_key": "lambda"
	}
}

func (Lambda) Apply added in v0.4.0

func (p Lambda) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Lambda processor.

func (Lambda) ApplyBatch added in v0.4.0

func (p Lambda) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Lambda processor. Conditions are optionally applied to the data to enable processing.

type LambdaOptions

type LambdaOptions struct {
	Function       string `json:"function"`
	ErrorOnFailure bool   `json:"error_on_failure"`
}

LambdaOptions contains custom options settings for the Lambda processor:

Function:
	Lambda function to invoke
ErrorOnFailure (optional):
	if set to true, then errors from the invoked Lambda will cause the processor to fail
	defaults to false

type Math

type Math struct {
	Options   MathOptions      `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Math processes data by applying mathematic operations. The processor supports these patterns:

JSON:
	{"math":[1,3]} >>> {"math":4}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "math",
	"settings": {
		"options": {
			"operation": "add"
		},
		"input_key": "math",
		"output_key": "math"
	}
}

func (Math) Apply added in v0.4.0

func (p Math) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Math processor.

func (Math) ApplyBatch added in v0.4.0

func (p Math) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Math processor. Conditions are optionally applied to the data to enable processing.

type MathOptions

type MathOptions struct {
	Operation string `json:"operation"`
}

MathOptions contains custom options for the Math processor:

Operation:
	operator applied to the data
	must be one of:
		add
		subtract
		multiply
		divide

type Pipeline added in v0.3.0

type Pipeline struct {
	Options   PipelineOptions  `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Pipeline processes data by applying a series of processors. This processor should be used when data requires complex processing outside of the boundaries of any data structures (see tests for examples). The processor supports these patterns:

JSON:
	{"pipeline":"H4sIAMpcy2IA/wXAIQ0AAACAsLbY93csBiFlc4wDAAAA"} >>> {"pipeline":"foo"}
data:
	H4sIAMpcy2IA/wXAIQ0AAACAsLbY93csBiFlc4wDAAAA >> foo

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "pipeline",
	"settings": {
		"options": {
			"processors": [
				{
					"type": "base64",
					"settings": {
						"options": {
							"direction": "from"
						}
					}
				},
				{
					"type": "gzip",
					"settings": {
						"options": {
							"direction": "from"
						}
					}
				}
			]
		},
		"input_key": "pipeline",
		"output_key": "pipeline"
	},
}

func (Pipeline) Apply added in v0.4.0

func (p Pipeline) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Pipeline processor.

Applicators only accept encapsulated data, so when processing JSON the input value is converted from Result to its string representation to bytes and put into a new capsule. The conversion from Result to string is safe for strings and objects, but not arrays (e.g., ["foo","bar"]).

If the input is an array, then an error is raised; the input should be run through the ForEach processor (which can encapsulate the Pipeline processor).

func (Pipeline) ApplyBatch added in v0.4.0

func (p Pipeline) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Pipeline processor. Conditions are optionally applied to the data to enable processing.

type PipelineOptions added in v0.3.0

type PipelineOptions struct {
	Processors []config.Config
}

PipelineOptions contains custom options for the Pipeline processor:

Processors:
	array of processors applied to the data

type PrettyPrint added in v0.4.0

type PrettyPrint struct {
	Options   PrettyPrintOptions `json:"options"`
	Condition condition.Config   `json:"condition"`
}

PrettyPrint processes data by applying or reversing prettyprint formatting to JSON. This processor has significant limitations when used to reverse prettyprint, including:

  • cannot support multi-core processing
  • invalid input will cause unpredictable results

It is strongly recommended to _not_ use this processor unless absolutely necessary; a more reliable solution is to modify the source application emitting the multi-line JSON object so that it outputs a single-line object instead.

The processor supports these patterns:

data:
	{
		"foo": "bar"
	}  >>> {"foo":"bar"}

	{"foo":"bar"} >>> {
		"foo": "bar"
	}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "pretty_print",
	"settings": {
		"options": {
			"direction": "from"
		}
	}
}

func (PrettyPrint) Apply added in v0.4.0

func (p PrettyPrint) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the PrettyPrint processor.

Applying prettyprint formatting is handled by the gjson PrettyPrint modifier and is applied to the root JSON object.

This _does not_ support reversing prettyprint formatting; this support is unnecessary for multi-line JSON objects that are stored in a single byte array.

func (PrettyPrint) ApplyBatch added in v0.4.0

func (p PrettyPrint) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the PrettyPrint processor.

Applying prettyprint formatting is handled by the gjson PrettyPrint modifier and is applied to the root JSON object.

Reversing prettyprint formatting is handled by iterating incoming data per byte and pushing the bytes to a stack. When an equal number of open and close curly brackets ( { } ) are observed, then the stack of bytes has JSON compaction applied and the result is emitted as a new object.

type PrettyPrintOptions added in v0.4.0

type PrettyPrintOptions struct {
	Direction string `json:"direction"`
}

PrettyPrintOptions contains custom options settings for the PrettyPrint processor:

Direction:
	direction of the pretty transformation
	must be one of:
		to: applies prettyprint formatting
		from: reverses prettyprint formatting

type Replace

type Replace struct {
	Options   ReplaceOptions   `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Replace processes data by replacing characters. The processor supports these patterns:

JSON:
	{"replace":"bar"} >>> {"replace":"baz"}
data:
	bar >>> baz

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "replace",
	"settings": {
		"options": {
			"old": "r",
			"new": "z"
		},
		"input_key": "replace",
		"output_key": "replace"
	}
}

func (Replace) Apply added in v0.4.0

func (p Replace) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Replace processor.

func (Replace) ApplyBatch added in v0.4.0

func (p Replace) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Replace processor. Conditions are optionally applied to the data to enable processing.

type ReplaceOptions

type ReplaceOptions struct {
	Old   string `json:"old"`
	New   string `json:"new"`
	Count int    `json:"count"`
}

ReplaceOptions contains custom options for the Replace processor:

Old:
	character(s) to replace in the data
New:
	character(s) that replace Old
Count (optional):
	number of replacements to make
	defaults to -1, which replaces all matches

type Split added in v0.4.0

type Split struct {
	Options   SplitOptions     `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Split processes data by splitting it into multiple elements or items. The processor supports these patterns:

JSON:
	{"split":"foo.bar"} >>> {"split":["foo","bar"]}
data:
	foo\nbar\nbaz\qux >>> foo bar baz qux
	{"foo":"bar"}\n{"baz":"qux"} >>> {"foo":"bar"} {"baz":"qux"}

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "split",
	"settings": {
		"options": {
			"separator": "."
		},
		"input_key": "split",
		"output_key": "split"
	}
}

func (Split) Apply added in v0.4.0

func (p Split) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Split processor.

func (Split) ApplyBatch added in v0.4.0

func (p Split) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Split processor. Conditions are optionally applied to the data to enable processing.

type SplitOptions added in v0.4.0

type SplitOptions struct {
	Separator string `json:"separator"`
}

SplitOptions contains custom options settings for the Split processor:

Separator:
	string that separates aggregated data

type Time

type Time struct {
	Options   TimeOptions      `json:"options"`
	Condition condition.Config `json:"condition"`
	InputKey  string           `json:"input_key"`
	OutputKey string           `json:"output_key"`
}

Time processes data by converting time values between formats. The processor supports these patterns:

JSON:
	{"time":1639877490.061} >>> {"time":"2021-12-19T01:31:30.061000Z"}
data:
	1639877490.061 >>> 2021-12-19T01:31:30.061000Z

When loaded with a factory, the processor uses this JSON configuration:

{
	"type": "time",
	"settings": {
		"options": {
			"input_format": "unix",
			"output_format": "2006-01-02T15:04:05.000000Z"
		},
		"input_key": "time",
		"output_key": "time"
	}
}

func (Time) Apply added in v0.4.0

func (p Time) Apply(ctx context.Context, capsule config.Capsule) (config.Capsule, error)

Apply processes encapsulated data with the Time processor.

func (Time) ApplyBatch added in v0.4.0

func (p Time) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)

ApplyBatch processes a slice of encapsulated data with the Time processor. Conditions are optionally applied to the data to enable processing.

type TimeOptions

type TimeOptions struct {
	InputFormat    string `json:"input_format"`
	OutputFormat   string `json:"output_format"`
	InputLocation  string `json:"input_location"`
	OutputLocation string `json:"output_location"`
}

TimeOptions contains custom options for the Time processor:

InputFormat:
	time format of the input
	must be one of:
		pattern-based layouts (https://gobyexample.com/time-formatting-parsing)
		unix: epoch (supports fractions of a second)
		unix_milli: epoch milliseconds
		now: current time
OutputFormat:
	time format of the output
	must be one of:
		pattern-based layouts (https://gobyexample.com/time-formatting-parsing)
		unix: epoch
		unix_milli: epoch milliseconds
InputLocation (optional):
	time zone abbreviation for the input
	defaults to UTC
OutputLocation (optional):
	time zone abbreviation for the output
	defaults to UTC

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL