process

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 19, 2022 License: MIT Imports: 17 Imported by: 0

README

process

Contains interfaces and methods for atomically processing data. Processors can be applied to bytes and channels of bytes; for JSON data, some processors are array-aware and will automatically process data within arrays.

The package can be used like this:

package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/process"
)

func main() {
	processor := process.Insert{
		Options: process.InsertOptions{
			Value: "bar",
		},
		Output: process.Output{
			Key: "foo",
		},
	}

	ctx := context.TODO()
	data := []byte(`{"hello":"world"}`)
	processed, err := processor.Byte(ctx, data)
	if err != nil {
		panic(err)
	}

	fmt.Println(string(data))
	fmt.Println(string(processed))
}

In Substation applications, processors adhere to these rules:

  • share a common configuration syntax
    • input: settings that control where input is located (e.g., input.key)
    • output: settings that control where output is placed (e.g., output.key)
    • options: settings that control runtime options for the processor
  • applied via conditions (condition)
  • operate on JSON data

processors

Processor Description
Capture Applies a capturing regular expression
Case Modifies the case of a string
Concat Modifies the case of a string
Convert Converts a value to a new type (e.g., string to integer)
Copy Copies a value from one JSON key to another
Count Count data in a channel
Delete Deletes a JSON key
Domain Parses a fully qualified domai name into separate labels (e.g., top-level domain, subdomain)
Drop Drops data from a channel
DynamoDB Runs a query on a DynamoDB table and returns matched items
Expand Expands JSON arrays into individual objects
Flatten Flattens an array of values, including deeply nested arrays
Hash Calculates the hash of a value
Insert Inserts a value into a JSON key
Lambda Synchronously invokes an AWS Lambda and returns the results
Math Performs mathematical operations (e.g., add three values, subtract two values)
Replace Replaces characters within a string
Time Converts time values between formats
Zip Concatenates arrays into tuples or JSON objects
capture

Processes data by applying a capturing regular expression. This processor is array-aware and can output one or many values that are automatically stored as values or arrays of elements.

The processor uses this Jsonnet configuration:

{
  type: 'capture',
  settings: {
    // if the value is "bar", then this returns ["b","a","r"]
    input: {
      key: 'foo',
    },
    output: {
      key: 'processed',
    },
    options: {
      expression: '(.{1})'
      count: 3,
    }
  },
}
case

Processes data by converting the case of a string. This processor is array-aware and supports these options:

  • upper: converts to uppercase
  • lower: converts to lowercase
  • snake: converts to snake case

The processor uses this Jsonnet configuration:

{
  type: 'case',
  settings: {
    // if the value is "bar", then this returns "BAR"
    input: {
      key: 'foo',
    },
    output: {
      key: 'processed',
    },
    options: {
      case: 'upper',
    }
  },
}
concat

Processes data by concatenating multiple values together with a separator. This processor is array-aware.

The processor uses this Jsonnet configuration:

{
  type: 'concat',
  settings: {
    // if the values are "baz" and "qux", then this returns "baz.qux"
    input: {
      keys: ['foo','bar'],
    },
    output: {
      key: 'processed',
    },
    options: {
      separator: '.',
    }
  },
}
convert

Processes data by converting values between types (e.g., string to integer, integer to float). This processor is array-aware and supports these types:

  • bool: boolean
  • int: integer
  • float: float
  • uint: uinteger
  • string: string

The processor uses this Jsonnet configuration:

{
  type: 'convert',
  settings: {
    // if the value is "100", then this returns 100
    input: {
      key: 'foo',
    },
    output: {
      key: 'processed',
    },
    options: {
      type: 'int',
    }
  },
}
copy

Processes data by copying values between JSON keys, preserving the original JSON key.

The processor uses this Jsonnet configuration:

{
  type: 'copy',
  settings: {
    input: {
      key: 'foo',
    },
    output: {
      key: 'processed',
    },
  },
}
count

Processes data by counting data in a channel. The output of this processor is {"count":N}, where N is the number of bytes that were in the channel.

The processor uses this Jsonnet configuration:

{
  type: 'count',
}
delete

Processes data by deleting JSON keys. Any keys nested under the provided key are deleted.

The processor uses this Jsonnet configuration:

{
  type: 'delete',
  settings: {
    // if "foo" is in the JSON object, then this processor deletes it
    input: {
      key: 'foo',
    },
  },
}
domain

Processes data by parsing fully qualified domain names into separate labels. This processor is array-aware and supports these options:

  • tld: top-level domain (e.g., com)
  • domain: tld + one label (e.g., brex.com)
  • subdomain: subdomain (e.g., www)

The processor uses this Jsonnet configuration:

{
  type: 'domain',
  settings: {
    // if the value is "www.brex.com", then this returns "brex.com'
    input: {
      key: 'foo',
    },
    output: {
      key: 'processed',
    },
    options: {
      function: 'domain',
    }
  },
}
drop

Processes data by dropping it from a channel.

The processor uses this Jsonnet configuration:

{
  type: 'drop',
}
dynamodb

Processes data by querying DynamoDB and returning all matched items as an array of JSON objects. This processor is array-aware.

We recommend referring to the documentation for querying DynamoDB when working with this processor. Note that DynamoDB is designed for single-digit millisecond latency, but latency can takes 10s of milliseconds which can have significant impact on total event latency. If Substation is running in AWS Lambda with Kinesis, then this latency can be mitigated by increasing the parallelization factor of the Lambda.

The processor uses this Jsonnet configuration:

{
  type: 'dynamodb',
  settings: {
    // if the value is "bar", then this queries DynamoDB by using "bar" as the paritition key value for the attribute "pk" and returns the last indexed item from the table.
    input: {
      partition_key: 'foo',
    },
    output: {
      key: 'processed',
    },
    options: {
      table: 'foo-table',
      key_condition_expression: 'pk = :partitionkeyval',
      // multiple items can be returned by changing limit
      limit: 1,
      // the order of the returned items can be changed by excluding scan_index_forward or setting it to false.
      scan_index_forward: true,
    }
  },
}
expand

Processes data by expanding data in JSON arrays into individual events. This processor can optionally retain keys outside of the JSON array and insert them into the new events.

The processor uses this Jsonnet configuration:

{
  type: 'expand',
  settings: {
    // if the original event is {"foo":[{"bar":"baz"}],"qux":"quux"}, then this expands to create the event {"bar":"baz","qux":"quux"}
    input: {
      key: 'foo',
    },
    options: {
      retain: ['qux'],
    }
  },
}
flatten

Processes data by flattening JSON arrays. This processor can optionally deeply flatten arrays.

The processor uses this Jsonnet configuration:

{
  type: 'flatten',
  settings: {
    // if the value is [1,2,[3,4,[5,6]]], then this returns [1,2,3,4,5,6]
    input: {
      key: 'foo',
    },
    output: {
      key: 'processed',
    }
    options: {
      deep: true,
    }
  },
}
hash

Processes data by calculating its hash. This processor is array-aware and supports these algorithms:

  • md5
  • sha256

The processor uses this Jsonnet configuration:

{
  type: 'hash',
  settings: {
    // calculates sha256 hash of value in "foo"
    // use "@this" to calculate the hash of entire JSON object
    input: {
      key: 'foo',
    },
    output: {
      key: 'processed',
    }
    options: {
      algorithm: 'sha256',
    }
  },
}
insert

Processes data by inserting a value into a JSON object. This processor supports any type of value.

The processor uses this Jsonnet configuration:

{
  type: 'insert',
  settings: {
    // inserts value "foo" into key "processed"
    output: {
      key: 'processed',
    }
    options: {
      value: 'foo',
    }
  },
}
lambda

Processes data by synchronously invoking an AWS Lambda and returning the results as a JSON object. This processor optionally treats errors in the invoked Lambda as errors in the processor (by default, if errors occur then they are ignored and the input data is returned).

Note that the average latency of synchronously invoking a Lambda function is 10s of milliseconds, but latency can take 100s to 1000s of milliseconds depending on the function which can have significant impact on total event latency. If Substation is running in AWS Lambda with Kinesis, then this latency can be mitigated by increasing the parallelization factor of the Lambda.

The processor uses this Jsonnet configuration:

{
  type: 'lambda',
  settings: {
    // creates an AWS Lambda payload that maps keys from the input JSON object to keys in the payload
    input: {
      payload: [
        {
          key: 'foo',
          payload_key: 'ip_address',
        }
      ],
    },
    output: {
      key: 'processed',
    }
    options: {
      function: 'foo-function',
    }
  },
}
math

Processes data by applying mathematical operations to multiple values. This processor supports these operations:

  • add
  • subtract

The processor uses this Jsonnet configuration:

{
  type: 'math',
  settings: {
    // if the values are 5 and 10, then this returns 15
    input: {
      keys: ['foo','bar'],
    },
    output: {
      key: 'processed',
    }
    options: {
      operation: 'add',
    }
  },
}
replace

Processes data by replacing substrings in string values. This processor is array-aware.

The processor uses this Jsonnet configuration:

{
  type: 'replace',
  settings: {
    // if the value is "bar", then this returns "baz"
    // if the value is "barbar", then this returns "bazbar"
    input: {
      key: 'foo',
    },
    output: {
      key: 'processed',
    }
    options: {
      old: 'bar',
      new: 'baz',
      count: 1,  // defaults to 0, which replaces all substring matches
    }
  },
}
time

Processes data by converting time values between formats. This processor is array-aware and supports these time formats:

  • pattern-based layouts
  • unix: epoch
  • unix_milli: epoch milliseconds
  • unix_nano: epoch nanoseconds
  • now: current time

The processor uses this Jsonnet configuration:

{
  type: 'time',
  settings: {
    // if the value is 0, then this returns "1970-01-01T12:00:00"
    input: {
      key: 'foo',
    },
    output: {
      key: 'processed',
    }
    options: {
      input_format: 'epoch',
      output_format: '2006-01-02T15:04:05',
    }
  },
}
zip

Processes data by concatenating JSON arrays into an array of tuples or array of JSON objects.

For processing into an array of tuples, use this Jsonnet configuration:

{
  type: 'zip',
  settings: {
    // if the values are ["foo","bar"] and [123,456], then this returns [["foo",123],["bar",456]]
    input: {
      keys: ['names','sizes'],
    },
    output: {
      key: 'processed',
    }
  },
}

For processing into an array of JSON objects, use this Jsonnet configuration:

{
  type: 'zip',
  settings: {
    // if the values are ["foo","bar"] and [123,456], then this returns [{"name":"foo","size":123},{"name":"bar","size":456}]
    input: {
      keys: ['names','sizes'],
    },
    output: {
      key: 'processed',
    }
    options: {
      keys: ['name','size'],
    }
  },
}

Documentation

Index

Constants

View Source
const ByteInvalidFactoryConfig = errors.Error("ByteInvalidFactoryConfig")

ByteInvalidFactoryConfig is used when an unsupported Byte is referenced in ByteFactory

View Source
const ChannelInvalidFactoryConfig = errors.Error("ChannelInvalidFactoryConfig")

ChannelInvalidFactoryConfig is used when an unsupported Channel is referenced in ChannelFactory

View Source
const DomainNoSubdomain = errors.Error("DomainNoSubdomain")

DomainNoSubdomain is used when a domain without a subdomain is processed

Variables

This section is empty.

Functions

func Byte

func Byte(ctx context.Context, byters []Byter, data []byte) ([]byte, error)

Byte accepts an array of Byters and applies all processors to the data.

func Channel

func Channel(ctx context.Context, channelers []Channeler, ch <-chan []byte) (<-chan []byte, error)

Channel accepts a channel of bytes and applies all processors to data in the channel.

func ReadOnlyChannel

func ReadOnlyChannel(ch chan []byte) <-chan []byte

ReadOnlyChannel turns a write/read channel into a read-only channel.

Types

type Byter

type Byter interface {
	Byte(context.Context, []byte) ([]byte, error)
}

Byter is an interface for applying processors to bytes.

func ByterFactory

func ByterFactory(cfg Config) (Byter, error)

ByterFactory loads a Byter from a Config. This is the recommended function for retrieving ready-to-use Byters.

func MakeAllByters

func MakeAllByters(cfg []Config) ([]Byter, error)

MakeAllByters accepts an array of Config and returns populated Byters. This a conveience function for loading many Byters.

type Capture

type Capture struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     Input                    `mapstructure:"input"`
	Output    Output                   `mapstructure:"output"`
	Options   CaptureOptions           `mapstructure:"options"`
}

Capture implements the Byter and Channeler interfaces and applies a capturing regular expression to data. More information is available in the README.

func (Capture) Byte

func (p Capture) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Capture) Channel

func (p Capture) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type CaptureOptions

type CaptureOptions struct {
	Expression string `mapstructure:"expression"`
	Count      int    `mapstructure:"count"`
}

CaptureOptions contain custom options settings for this processor.

Expression: the capturing regular expression Count (optional): the number of captures to return; defaults to 0, which returns all captures

type Case

type Case struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     Input                    `mapstructure:"input"`
	Output    Output                   `mapstructure:"output"`
	Options   CaseOptions              `mapstructure:"options"`
}

Case implements the Byter and Channeler interfaces and converts the case of a string. More information is available in the README.

func (Case) Byte

func (p Case) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Case) Channel

func (p Case) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type CaseOptions

type CaseOptions struct {
	Case string `mapstructure:"case"`
}

CaseOptions contain custom options settings for this processor.

Case: the case to convert the string to; one of: upper, lower, or snake

type Channeler

type Channeler interface {
	Channel(context.Context, <-chan []byte) (<-chan []byte, error)
}

Channeler is an interface for applying processors to channels of bytes.

func ChannelerFactory

func ChannelerFactory(cfg Config) (Channeler, error)

ChannelerFactory loads Channeler from a Config. This is the recommended function for retrieving ready-to-use Channelers.

func MakeAllChannelers

func MakeAllChannelers(cfg []Config) ([]Channeler, error)

MakeAllChannelers accepts an array of Config and returns populated Channelers. This a conveience function for loading many Channelers.

type Concat

type Concat struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     Inputs                   `mapstructure:"input"`
	Output    Output                   `mapstructure:"output"`
	Options   ConcatOptions            `mapstructure:"options"`
}

Concat implements the Byter and Channeler interfaces and concatenates multiple JSON keys into a single value with a separator character. More information is available in the README.

func (Concat) Byte

func (p Concat) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Concat) Channel

func (p Concat) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type ConcatOptions

type ConcatOptions struct {
	Separator string `mapstructure:"separator"`
}

ConcatOptions contain custom options settings for this processor.

Separator: the string that separates the concatenated values.

type Config

type Config struct {
	Type     string
	Settings map[string]interface{}
}

Config contains arbitrary JSON settings for Processors loaded via mapstructure.

type Convert

type Convert struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     Input                    `mapstructure:"input"`
	Output    Output                   `mapstructure:"output"`
	Options   ConvertOptions           `mapstructure:"options"`
}

Convert implements the Byter and Channeler interfaces and converts values between types. More information is available in the README.

func (Convert) Byte

func (p Convert) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Convert) Channel

func (p Convert) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type ConvertOptions

type ConvertOptions struct {
	Type string `mapstructure:"type"`
}

ConvertOptions contain custom options settings for this processor.

Type: the type that the value should be converted to; one of: bool (boolean), int (integer), float, uint (unsigned integer), string

type Copy

type Copy struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     Input                    `mapstructure:"input"`
	Output    Output                   `mapstructure:"output"`
}

Copy implements the Byter and Channeler interfaces and copies values between JSON keys, preserving the original JSON key. More information is available in the README.

func (Copy) Byte

func (p Copy) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Copy) Channel

func (p Copy) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type Count

type Count struct{}

Count implements the Channeler interfaces and counts all data put into the channel. More information is available in the README.

func (Count) Channel

func (p Count) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor.

type Delete

type Delete struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     Input                    `mapstructure:"input"`
}

Delete implements the Byter and Channeler interfaces and deletes JSON keys. More information is available in the README.

func (Delete) Byte

func (p Delete) Byte(ctx context.Context, object []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Delete) Channel

func (p Delete) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type Domain

type Domain struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     Input                    `mapstructure:"input"`
	Output    Output                   `mapstructure:"output"`
	Options   DomainOptions            `mapstructure:"options"`
}

Domain implements the Byter and Channeler interfaces and parses fully qualified domain names into separate labels. More information is available in the README.

func (Domain) Byte

func (p Domain) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Domain) Channel

func (p Domain) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type DomainOptions

type DomainOptions struct {
	Function string `mapstructure:"function"`
}

DomainOptions contain custom options settings for this processor.

Function: the domain processing function to apply to the data; one of: tld, domain, or subdomain

type Drop

type Drop struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
}

Drop implements the Channeler interface and drops any matched data. More information is available in the README.

func (Drop) Channel

func (p Drop) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type DynamoDB

type DynamoDB struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     DynamoDBInput            `mapstructure:"input"`
	Output    Output                   `mapstructure:"output"`
	Options   DynamoDBOptions          `mapstructure:"options"`
}

DynamoDB implements the Byter and Channeler interfaces and queries DynamoDB and returns all matched items as an array of JSON objects. More information is available in the README.

func (DynamoDB) Byte

func (p DynamoDB) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (DynamoDB) Channel

func (p DynamoDB) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type DynamoDBInput

type DynamoDBInput struct {
	PartitionKey string `mapstructure:"partition_key"`
	SortKey      string `mapstructure:"sort_key"`
}

DynamoDBInput contains custom input settings for this processor.

PartitionKey: the JSON key that is used as the paritition key value for the DynamoDB query SortKey (optional): the JSON key that is used as the sort /range key value for the DynamoDB query

type DynamoDBOptions

type DynamoDBOptions struct {
	Table                  string `mapstructure:"table"`
	KeyConditionExpression string `mapstructure:"key_condition_expression"`
	Limit                  int64  `mapstructure:"limit"`
	ScanIndexForward       bool   `mapstructure:"scan_index_forward"`
}

DynamoDBOptions contain custom options settings for this processor. Refer to the DynamoDB API query documentation for more information: https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Query.html.

A common use for this processor is to return the most recent item from a DynamoDB table based on partition and sort keys. This can be achieved by setting Limit to 1 and ScanIndexForward to false.

Table: the DynamoDB table to query KeyConditionExpression: the DynamoDB key condition expression Limit (optional): the number of result items to return ScanIndexForward (optional): reverses the order of item results

type Expand

type Expand struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     Input                    `mapstructure:"input"`
	Options   ExpandOptions            `mapstructure:"options"`
}

Expand implements the Channeler interface and expands data in JSON arrays into individual events. More information is available in the README.

func (Expand) Channel

func (p Expand) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type ExpandOptions

type ExpandOptions struct {
	Retain []string `mapstructure:"retain"` // retain fields found anywhere in input
}

ExpandOptions contain custom options settings for this processor.

Retain: array of JSON keys to retain from the original object and insert into the new objects

type Flatten

type Flatten struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     Input                    `mapstructure:"input"`
	Output    Output                   `mapstructure:"output"`
	Options   FlattenOptions           `mapstructure:"options"`
}

Flatten implements the Byter and Channeler interfaces and flattens JSON arrays. More information is available in the README.

func (Flatten) Byte

func (p Flatten) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Flatten) Channel

func (p Flatten) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type FlattenOptions

type FlattenOptions struct {
	Deep bool `mapstructure:"deep"`
}

FlattenOptions contain custom options settings for this processor.

Deep: deeply flattens nested arrays.

type Hash

type Hash struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     Input                    `mapstructure:"input"`
	Output    Output                   `mapstructure:"output"`
	Options   HashOptions              `mapstructure:"options"`
}

Hash implements the Byter and Channeler interfaces and calculates the hash of data. More information is available in the README.

func (Hash) Byte

func (p Hash) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Hash) Channel

func (p Hash) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type HashOptions

type HashOptions struct {
	Algorithm string `mapstructure:"algorithm"`
}

HashOptions contain custom options settings for this processor.

Algorithm: the algorithm to apply.

type Input

type Input struct {
	Key string `mapstructure:"key"`
}

Input is the default input setting for processors that accept a single JSON key. This can be overriden by each processor.

type Inputs

type Inputs struct {
	Keys []string `mapstructure:"keys"`
}

Inputs is the default input setting for processors that accept multiple JSON keys. This can be overriden by each processor.

type Insert

type Insert struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Output    Output                   `mapstructure:"output"`
	Options   InsertOptions            `mapstructure:"options"`
}

Insert implements the Byter and Channeler interfaces and inserts a value into a JSON object. More information is available in the README.

func (Insert) Byte

func (p Insert) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Insert) Channel

func (p Insert) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type InsertOptions

type InsertOptions struct {
	Value interface{} `mapstructure:"value"`
}

InsertOptions contain custom options settings for this processor.

Value: the value to insert.

type Lambda

type Lambda struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     LambdaInput              `mapstructure:"input"`
	Output    Output                   `mapstructure:"output"`
	Options   LambdaOptions            `mapstructure:"options"`
}

Lambda implements the Byter and Channeler interfaces and synchronously invokes an AWS Lambda. More information is available in the README.

func (Lambda) Byte

func (p Lambda) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Lambda) Channel

func (p Lambda) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type LambdaInput

type LambdaInput struct {
	Payload []struct {
		Key        string `mapstructure:"key"`
		PayloadKey string `mapstructure:"payload_key"`
	} `mapstructure:"payload"`
}

LambdaInput contain custom options settings for this processor.

Payload: maps values from a JSON object (Key) to values in the AWS Lambda payload (PayloadKey)

type LambdaOptions

type LambdaOptions struct {
	Function string `mapstructure:"function"`
	Errors   bool   `mapstructure:"errors"`
}

LambdaOptions contain custom options settings for this processor.

Function: the name of the AWS Lambda function to invoke. Errors: if true, then errors from the invoked Lambda will cause this processor to fail (defaults to false).

type Math

type Math struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     Inputs                   `mapstructure:"input"`
	Output    Output                   `mapstructure:"output"`
	Options   MathOptions              `mapstructure:"options"`
}

Math implements the Byter and Channeler interfaces and applies mathematical operations to data. More information is available in the README.

func (Math) Byte

func (p Math) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Math) Channel

func (p Math) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type MathOptions

type MathOptions struct {
	Operation string `mapstructure:"operation"`
}

MathOptions contain custom options settings for this processor.

Operation: the math operation applied to the data.

type Output

type Output struct {
	Key string `mapstructure:"key"`
}

Output is the default output setting for processors that produce a single JSON key. This can be overriden by each processor.

type Replace

type Replace struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     Input                    `mapstructure:"input"`
	Output    Output                   `mapstructure:"output"`
	Options   ReplaceOptions           `mapstructure:"options"`
}

Replace implements the Byter and Channeler interfaces and replaces substrings in string values. More information is available in the README.

func (Replace) Byte

func (p Replace) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Replace) Channel

func (p Replace) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type ReplaceOptions

type ReplaceOptions struct {
	Old   string `mapstructure:"old"`
	New   string `mapstructure:"new"`
	Count int    `mapstructure:"count"`
}

ReplaceOptions contain custom options settings for this processor.

Old: the substring to replace New: the substring that replaces Count: the number of replacements to make; defaults to 0, which replaces all substrings

type Time

type Time struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     Input                    `mapstructure:"input"`
	Output    Output                   `mapstructure:"output"`
	Options   TimeOptions              `mapstructure:"options"`
}

Time implements the Byter and Channeler interfaces and converts time values between formats. More information is available in the README.

func (Time) Byte

func (p Time) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Time) Channel

func (p Time) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type TimeOptions

type TimeOptions struct {
	InputFormat    string `mapstructure:"input_format"`
	InputLocation  string `mapstructure:"output_location"`
	OutputFormat   string `mapstructure:"output_format"`
	OutputLocation string `mapstructure:"output_location"`
}

TimeOptions contain custom options settings for this processor.

InputFormat: time format of the input InputLocation (optional): the time zone abbreviation for the input; if empty, then defaults to UTC OutputFormat: time format of the output OutputLocation (optional): the time zone abbreviation for the output; if empty, then defaults to UTC

type Zip

type Zip struct {
	Condition condition.OperatorConfig `mapstructure:"condition"`
	Input     Inputs                   `mapstructure:"input"`
	Output    Output                   `mapstructure:"output"`
	Options   ZipOptions               `mapstructure:"options"`
}

Zip implements the Byter and Channeler interfaces and concatenates JSON arrays into an array of tuples or array of JSON objects. More information is available in the README.

func (Zip) Byte

func (p Zip) Byte(ctx context.Context, data []byte) ([]byte, error)

Byte processes a byte slice with this processor

func (Zip) Channel

func (p Zip) Channel(ctx context.Context, ch <-chan []byte) (<-chan []byte, error)

Channel processes a data channel of bytes with this processor. Conditions can be optionally applied on the channel data to enable processing.

type ZipOptions

type ZipOptions struct {
	Keys []string `mapstructure:"keys"`
}

ZipOptions contain custom options settings for this processor.

Keys: location where elements from the input keys are written to; this creates JSON objects (optional)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL