Documentation
¶
Overview ¶
Example (DNS) ¶
package main
import (
"context"
"fmt"
"github.com/brexhq/substation/config"
"github.com/brexhq/substation/process"
)
func main() {
capsule := config.NewCapsule()
capsule.SetData([]byte(`{"ip":"8.8.8.8"}`))
// in native Substation applications configuration is handled by compiling Jsonnet and loading JSON into the application
cfg := []config.Config{
{
Type: "dns",
Settings: map[string]interface{}{
"input_key": "ip",
"output_key": "domains",
"options": map[string]interface{}{
"function": "reverse_lookup",
},
},
},
}
applicators, err := process.MakeApplicators(cfg)
if err != nil {
// handle err
panic(err)
}
//nolint: errcheck // errors are ignored in case processing fails in a single applicator
defer process.CloseApplicators(context.TODO(), applicators...)
for _, applicator := range applicators {
// applies the IPInfo processors to the capsule
capsule, err = applicator.Apply(context.TODO(), capsule)
if err != nil {
// handle err
panic(err)
}
}
fmt.Println(string(capsule.Data()))
}
Output:
Example (IPDatabase) ¶
package main
import (
"context"
"fmt"
"github.com/brexhq/substation/config"
"github.com/brexhq/substation/process"
)
func main() {
capsule := config.NewCapsule()
capsule.SetData([]byte(`{"ip":"8.8.8.8"}`))
// in native Substation applications configuration is handled by compiling Jsonnet and loading JSON into the application
cfg := []config.Config{
{
Type: "ip_database",
Settings: map[string]interface{}{
"input_key": "ip",
"output_key": "as",
"options": map[string]interface{}{
"function": "maxmind_asn",
"database_options": map[string]interface{}{
"type": "maxmind_asn",
"settings": map[string]interface{}{
// the location of the IP enrichment database can be either a path on local disk, an HTTP(S) URL, or an AWS S3 URL
"database": "location://path/to/maxmind.mmdb",
"language": "en",
},
},
},
},
},
{
Type: "ip_database",
Settings: map[string]interface{}{
"input_key": "ip",
"output_key": "geo",
"options": map[string]interface{}{
"function": "maxmind_city",
"database_options": map[string]interface{}{
"type": "maxmind_city",
"settings": map[string]interface{}{
// the location of the IP enrichment database can be either a path on local disk, an HTTP(S) URL, or an AWS S3 URL
"database": "location://path/to/maxmind.mmdb",
"language": "en",
},
},
},
},
},
}
applicators, err := process.MakeApplicators(cfg)
if err != nil {
// handle err
panic(err)
}
//nolint: errcheck // errors are ignored in case processing fails in a single applicator
defer process.CloseApplicators(context.TODO(), applicators...)
for _, applicator := range applicators {
// applies the IPInfo processors to the capsule
capsule, err = applicator.Apply(context.TODO(), capsule)
if err != nil {
// handle err
panic(err)
}
}
fmt.Println(string(capsule.Data()))
}
Output:
Index ¶
- func Apply(ctx context.Context, capsule config.Capsule, apps ...Applicator) (config.Capsule, error)
- func ApplyBatch(ctx context.Context, batch []config.Capsule, apps ...BatchApplicator) ([]config.Capsule, error)
- func ApplyByte(ctx context.Context, data []byte, apps ...Applicator) ([]byte, error)
- func CloseApplicators(ctx context.Context, apps ...Applicator) error
- func CloseBatchApplicators(ctx context.Context, apps ...BatchApplicator) error
- type Aggregate
- type AggregateOptions
- type Applicator
- type Base64
- type Base64Options
- type BatchApplicator
- type Capture
- type CaptureOptions
- type Case
- type CaseOptions
- type Concat
- type ConcatOptions
- type Convert
- type ConvertOptions
- type Copy
- type Count
- type DNS
- type DNSOptions
- type Delete
- type Domain
- type DomainOptions
- type Drop
- type DynamoDB
- type DynamoDBOptions
- type Expand
- type Flatten
- type FlattenOptions
- type ForEach
- type ForEachOptions
- type Group
- type GroupOptions
- type Gzip
- type GzipOptions
- type Hash
- type HashOptions
- type IPDatabase
- type IPDatabaseOptions
- type Insert
- type InsertOptions
- type Lambda
- type LambdaOptions
- type Math
- type MathOptions
- type Pipeline
- type PipelineOptions
- type PrettyPrint
- type PrettyPrintOptions
- type Replace
- type ReplaceOptions
- type Split
- type SplitOptions
- type Time
- type TimeOptions
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Apply ¶ added in v0.4.0
Apply accepts one or many Applicators and applies processors in series to encapsulated data.
func ApplyBatch ¶ added in v0.4.0
func ApplyBatch(ctx context.Context, batch []config.Capsule, apps ...BatchApplicator) ([]config.Capsule, error)
ApplyBatch accepts one or many BatchApplicators and applies processors in series to a slice of encapsulated data.
func ApplyByte ¶ added in v0.4.0
ApplyByte is a convenience function for applying one or many Applicators to bytes.
func CloseApplicators ¶ added in v0.7.0
func CloseApplicators(ctx context.Context, apps ...Applicator) error
CloseApplicators closes all Applicators and returns an error if any close fails.
func CloseBatchApplicators ¶ added in v0.7.0
func CloseBatchApplicators(ctx context.Context, apps ...BatchApplicator) error
CloseBatchApplicators closes all BatchApplicators and returns an error if any close fails.
Types ¶
type Aggregate ¶ added in v0.4.0
type Aggregate struct {
Options AggregateOptions `json:"options"`
Condition condition.Config `json:"condition"`
OutputKey string `json:"output_key"`
}
Aggregate processes data by buffering and aggregating it into a single item.
Data is processed by aggregating it into in-memory buffers until the configured count or size of the aggregate meets a threshold and new data is produced. This supports multiple data aggregation patterns:
- concatenate batches of data with a separator value
- store batches of data in a JSON array
- organize nested JSON in a JSON array based on unique keys
The processor supports these patterns:
JSON array:
foo bar baz qux >>> {"aggregate":["foo","bar","baz","qux"]}
{"foo":"bar"} {"baz":"qux"} >>> {"aggregate":[{"foo":"bar"},{"baz":"qux"}]}
data:
foo bar baz qux >>> foo\nbar\nbaz\qux
{"foo":"bar"} {"baz":"qux"} >>> {"foo":"bar"}\n{"baz":"qux"}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "aggregate",
"settings": {
"options": {
"max_count": 1000,
"max_size": 1000
},
"output_key": "aggregate.-1"
}
}
type AggregateOptions ¶ added in v0.4.0
type AggregateOptions struct {
AggregateKey string `json:"aggregate_key"`
Separator string `json:"separator"`
MaxCount int `json:"max_count"`
MaxSize int `json:"max_size"`
}
AggregateOptions contains custom options settings for the Aggregate processor:
AggregateKey (optional): JSON key-value that is used to organize aggregated data defaults to empty string, only applies to JSON Separator (optional): string that separates aggregated data defaults to empty string, only applies to data MaxCount (optional): maximum number of items stored in a buffer when aggregating data defaults to 1000 MaxSize (optional): maximum size, in bytes, of items stored in a buffer when aggregating data defaults to 10000 (10KB)
type Applicator ¶ added in v0.4.0
type Applicator interface {
Apply(context.Context, config.Capsule) (config.Capsule, error)
Close(context.Context) error
}
Applicator is an interface for applying a processor to encapsulated data.
func ApplicatorFactory ¶ added in v0.4.0
func ApplicatorFactory(cfg config.Config) (Applicator, error)
ApplicatorFactory returns a configured Applicator from a config. This is the recommended method for retrieving ready-to-use Applicators.
func MakeApplicators ¶ added in v0.4.0
func MakeApplicators(cfg []config.Config) ([]Applicator, error)
MakeApplicators accepts multiple processor configs and returns populated Applicators. This is a convenience function for generating many Applicators.
type Base64 ¶ added in v0.2.0
type Base64 struct {
Options Base64Options `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Base64 processes data by converting it to and from base64 encoding. The processor supports these patterns:
JSON:
{"base64":"Zm9v"} >>> {"base64":"foo"}
data:
Zm9v >>> foo
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "base64",
"settings": {
"options": {
"direction": "from"
},
"input_key": "base64",
"output_key": "base64"
}
}
type Base64Options ¶ added in v0.2.0
type Base64Options struct {
Direction string `json:"direction"`
}
Base64Options contains custom options for the Base64 processor:
Direction: direction of the encoding must be one of: to: encode to base64 from: decode from base64
type BatchApplicator ¶ added in v0.4.0
type BatchApplicator interface {
ApplyBatch(context.Context, []config.Capsule) ([]config.Capsule, error)
Close(context.Context) error
}
BatchApplicator is an interface for applying a processor to a slice of encapsulated data.
func BatchApplicatorFactory ¶ added in v0.4.0
func BatchApplicatorFactory(cfg config.Config) (BatchApplicator, error)
BatchApplicatorFactory returns a configured BatchApplicator from a config. This is the recommended method for retrieving ready-to-use BatchApplicators.
func MakeBatchApplicators ¶ added in v0.4.0
func MakeBatchApplicators(cfg []config.Config) ([]BatchApplicator, error)
MakeBatchApplicators accepts multiple processor configs and returns populated BatchApplicators. This is a convenience function for generating many BatchApplicators.
type Capture ¶
type Capture struct {
Options CaptureOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Capture processes data by capturing values using regular expressions. The processor supports these patterns:
JSON:
{"capture":"foo@qux.com"} >>> {"capture":"foo"}
{"capture":"foo@qux.com"} >>> {"capture":["f","o","o"]}
data:
foo@qux.com >>> foo
bar quux >>> {"foo":"bar","qux":"quux"}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "capture",
"settings": {
"options": {
"expression": "^([^@]*)@.*$",
"function": "find"
},
"input_key": "capture",
"output_key": "capture"
}
}
func (Capture) Apply ¶ added in v0.4.0
Apply processes encapsulated data with the Capture processor.
type CaptureOptions ¶
type CaptureOptions struct {
Expression string `json:"expression"`
Function string `json:"function"`
Count int `json:"count"`
}
CaptureOptions contains custom options for the Capture processor:
Expression: regular expression used to capture values Function: type of regular expression applied must be one of: find: applies the Find(String)?Submatch function find_all: applies the FindAll(String)?Submatch function (see count) named_group: applies the Find(String)?Submatch function and stores values as JSON using subexpressions Count (optional): used for repeating capture groups defaults to match all capture groups
type Case ¶
type Case struct {
Options CaseOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Case processes data by changing the case of a string or byte slice. The processor supports these patterns:
JSON:
{"case":"foo"} >>> {"case":"FOO"}
data:
foo >>> FOO
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "case",
"settings": {
"options": {
"case": "upper"
},
"input_key": "case",
"output_key": "case"
}
}
func (Case) ApplyBatch ¶ added in v0.4.0
ApplyBatch processes a slice of encapsulated data with the Case processor. Conditions are optionally applied to the data to enable processing.
type CaseOptions ¶
type CaseOptions struct {
Case string `json:"case"`
}
CaseOptions contains custom options for the Case processor:
Case: case to convert the string or byte to must be one of: upper lower snake (strings only)
type Concat ¶
type Concat struct {
Options ConcatOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Concat processes data by concatenating multiple values together with a separator. The processor supports these patterns:
JSON:
{"concat":["foo","bar"]} >>> {"concat":"foo.bar"}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "concat",
"settings": {
"options": {
"separator": "."
},
"input_key": "concat",
"output_key": "concat"
}
}
type ConcatOptions ¶
type ConcatOptions struct {
Separator string `json:"separator"`
}
ConcatOptions contains custom options for the Concat processor:
Separator: string that separates the concatenated values
type Convert ¶
type Convert struct {
Options ConvertOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Convert processes data by converting values between types (e.g., string to integer, integer to float). The processor supports these patterns:
JSON:
{"convert":"true"} >>> {"convert":true}
{"convert":"-123"} >>> {"convert":-123}
{"convert":123} >>> {"convert":"123"}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "convert",
"settings": {
"options": {
"type": "bool"
},
"input_key": "convert",
"output_key": "convert"
}
}
func (Convert) Apply ¶ added in v0.4.0
Apply processes encapsulated data with the Convert processor.
type ConvertOptions ¶
type ConvertOptions struct {
Type string `json:"type"`
}
ConvertOptions contains custom options for the Convert processor:
Type: type that the value is converted to must be one of: bool (boolean) int (integer) float uint (unsigned integer) string
type Copy ¶
type Copy struct {
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Copy processes data by copying it. The processor supports these patterns:
JSON:
{"hello":"world"} >>> {"hello":"world","goodbye":"world"}
from JSON:
{"hello":"world"} >>> world
to JSON:
world >>> {"hello":"world"}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "copy",
"settings": {
"input_key": "hello",
"output_key": "goodbye"
}
}
func (Copy) ApplyBatch ¶ added in v0.4.0
ApplyBatch processes a slice of encapsulated data with the Copy processor. Conditions are optionally applied to the data to enable processing.
type Count ¶
type Count struct{}
Count processes data by counting it.
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "count"
}
func (Count) ApplyBatch ¶ added in v0.4.0
ApplyBatch processes a slice of encapsulated data with the Count processor. Conditions are optionally applied to the data to enable processing.
type DNS ¶ added in v0.7.0
type DNS struct {
Options DNSOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
DNS processes data by querying domains or IP addresses in the Domain Name System (DNS). By default, this processor can take up to 1 second per DNS query and may have significant impact on end-to-end data processing latency. If Substation is running in AWS Lambda with Kinesis, then this latency can be mitigated by increasing the parallelization factor of the Lambda (https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html).
The processor supports these patterns:
JSON:
{"ip":"8.8.8.8"} >>> {"ip":"8.8.8.8","domains":["dns.google."]}
{"domain":"dns.google"} >>> {"domain":"dns.google","ips":["8.8.4.4","8.8.8.8","2001:4860:4860::8844","2001:4860:4860::8888"]}
data:
8.8.8.8 >>> dns.google.
dns.google >>> 8.8.4.4
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "dns",
"settings": {
"options": {
"function": "reverse_lookup"
},
"input_key": "ip",
"output_key": "domains"
}
}
func (DNS) Apply ¶ added in v0.7.0
Apply processes encapsulated data with the DNS processor. nolint: gocognit
func (DNS) ApplyBatch ¶ added in v0.7.0
ApplyBatch processes a slice of encapsulated data with the DNS processor. Conditions are optionally applied to the data to enable processing.
type DNSOptions ¶ added in v0.7.0
type DNSOptions struct {
Function string `json:"function"`
Timeout int `json:"timeout"`
ErrorOnFailure bool `json:"error_on_failure"`
}
DNSOptions contains custom options for the DNS processor.
Function: Type of query made to DNS. Must be one of: forward_lookup (retrieve IP addresses associated with a domain) reverse_lookup (retrieve domains associated with an IP address) query_txt (retrieve TXT records for a domain) Timeout (optional): Amount of time to wait (in milliseconds) for a response. Defaults to 1000 milliseconds (1 second). ErrorOnFailure (optional): If set to true, then errors from the DNS request will cause the processor to fail. Defaults to false.
type Delete ¶
type Delete struct {
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
}
Delete processes data by deleting JSON keys. The processor supports these patterns:
JSON:
{"foo":"bar","baz":"qux"} >>> {"foo":"bar"}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "delete",
"settings": {
"input_key": "delete"
}
}
type Domain ¶
type Domain struct {
Options DomainOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Domain processes data by parsing fully qualified domain names into labels. The processor supports these patterns:
JSON:
{"domain":"example.com"} >>> {"domain":"example.com","tld":"com"}
data:
example.com >>> com
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "domain",
"settings": {
"options": {
"function": "tld"
},
"input_key": "domain",
"output_key": "tld"
}
}
type DomainOptions ¶
type DomainOptions struct {
Function string `json:"function"`
}
DomainOptions contains custom options for the Domain processor:
Function: domain processing function applied to the data must be one of: tld domain subdomain
type Drop ¶
Drop processes data by "dropping" it -- the data is entirely removed and not emitted.
When loaded with a factory, the processor uses this JSON configuration:
{
type: "drop"
}
func (Drop) ApplyBatch ¶ added in v0.4.0
ApplyBatch processes a slice of encapsulated data with the Drop processor. Conditions are optionally applied to the data to enable processing.
type DynamoDB ¶
type DynamoDB struct {
Options DynamoDBOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
DynamoDB processes data by querying a DynamoDB table and returning all matched items as an array of JSON objects. The input must be a JSON object containing a partition key ("PK") and optionally containing a sort key ("SK"). This processor uses the DynamoDB Query operation, refer to the DynamoDB documentation for the Query operation's request syntax and key condition expression patterns:
The processor supports these patterns:
JSON:
{"ddb":{"PK":"foo"}} >>> {"ddb":[{"foo":"bar"}]}
{"ddb":{"PK":"foo","SK":"baz"}} >>> {"ddb":[{"foo":"bar"}]}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "dynamodb",
"settings": {
"options": {
"table": "foo-table",
"key_condition_expression": "PK = :pk and begins_with(SK, :sk)",
"limit": 1,
"scan_index_forward": true
},
"input_key": "ddb",
"output_key": "ddb"
}
}
func (DynamoDB) Apply ¶ added in v0.4.0
Apply processes encapsulated data with the DynamoDB processor.
type DynamoDBOptions ¶
type DynamoDBOptions struct {
Table string `json:"table"`
KeyConditionExpression string `json:"key_condition_expression"`
Limit int64 `json:"limit"`
ScanIndexForward bool `json:"scan_index_forward"`
}
DynamoDBOptions contains custom options settings for the DynamoDB processor (https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Query.html#API_Query_RequestSyntax):
Table: DynamoDB table to query KeyConditionExpression: key condition expression (see documentation) Limit (optional): maximum number of items to evaluate defaults to evaluating all items ScanIndexForward (optional): specifies the order of index traversal must be one of: true (default): traversal is performed in ascending order false: traversal is performed in descending order
type Expand ¶
type Expand struct {
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
}
Expand processes data by creating individual events from objects in arrays. The processor supports these patterns:
JSON:
{"expand":[{"foo":"bar"}],"baz":"qux"} >>> {"foo":"bar","baz":"qux"}
data:
[{"foo":"bar"}] >>> {"foo":"bar"}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "expand",
"settings": {
"input_key": "expand"
}
}
type Flatten ¶
type Flatten struct {
Options FlattenOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Flatten processes data by flattening JSON arrays. The processor supports these patterns:
JSON:
{"flatten":["foo",["bar"]]} >>> {"flatten":["foo","bar"]}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "flatten",
"settings": {
"input_key": "flatten",
"output_key": "flatten"
}
}
func (Flatten) Apply ¶ added in v0.4.0
Apply processes encapsulated data with the Flatten processor.
type FlattenOptions ¶
type FlattenOptions struct {
Deep bool `json:"deep"`
}
FlattenOptions contains custom options settings for the Flatten processor:
Deep (optional): deeply flattens nested arrays
type ForEach ¶ added in v0.3.0
type ForEach struct {
Options ForEachOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
ForEach processes data by iterating and applying a processor to each element in a JSON array. The processor supports these patterns:
JSON:
{"input":["ABC","DEF"]} >>> {"input":["ABC","DEF"],"output":["abc","def"]}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "for_each",
"settings": {
"options": {
"processor": {
"type": "case",
"settings": {
"options": {
"case": "lower"
}
}
}
},
input_key: "input",
output_key: "output.-1"
}
}
func (ForEach) Apply ¶ added in v0.4.0
Apply processes encapsulated data with the ForEach processor.
JSON values are treated as arrays and the configured processor is applied to each element in the array. If multiple processors need to be applied to each element, then the Pipeline processor should be used to create a nested data processing workflow. For example:
ForEach -> Pipeline -> [Copy, Delete, Copy]
type ForEachOptions ¶ added in v0.3.0
ForEachOptions contains custom options for the ForEach processor:
Processor: processor applied to the data
type Group ¶ added in v0.2.0
type Group struct {
Options GroupOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Group processes data by grouping JSON arrays into an array of tuples or array of JSON objects. The processor supports these patterns:
JSON array:
{"group":[["foo","bar"],[111,222]]} >>> {"group":[["foo",111],["bar",222]]}
{"group":[["foo","bar"],[111,222]]} >>> {"group":[{"name":foo","size":111},{"name":"bar","size":222}]}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "group",
"settings": {
"input_key": "group",
"output_key": "group"
}
}
func (Group) ApplyBatch ¶ added in v0.4.0
ApplyBatch processes a slice of encapsulated data with the Group processor. Conditions are optionally applied to the data to enable processing.
type GroupOptions ¶ added in v0.2.0
type GroupOptions struct {
Keys []string `json:"keys"`
}
GroupOptions contains custom options for the Group processor:
Keys (optional): path where values from InputKey are written to, creating new JSON objects
type Gzip ¶ added in v0.2.0
type Gzip struct {
Options GzipOptions `json:"options"`
Condition condition.Config `json:"condition"`
}
Gzip processes data by compressing or decompressing gzip. The processor supports these patterns:
data: [31 139 8 0 0 0 0 0 0 255 74 203 207 7 4 0 0 255 255 33 101 115 140 3 0 0 0] >>> foo foo >>> [31 139 8 0 0 0 0 0 0 255 74 203 207 7 4 0 0 255 255 33 101 115 140 3 0 0 0]
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "gzip",
"settings": {
"options": {
"direction": "from"
}
}
}
func (Gzip) ApplyBatch ¶ added in v0.4.0
ApplyBatch processes a slice of encapsulated data with the Gzip processor. Conditions are optionally applied to the data to enable processing.
type GzipOptions ¶ added in v0.2.0
type GzipOptions struct {
Direction string `json:"direction"`
}
GzipOptions contains custom options settings for the Gzip processor:
Direction: direction of the compression must be one of: to: compress data to gzip from: decompress data from gzip
type Hash ¶
type Hash struct {
Options HashOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Hash processes data by calculating hashes. The processor supports these patterns:
JSON:
{"hash":"foo"} >>> {"hash":"acbd18db4cc2f85cedef654fccc4a4d8"}
data:
foo >>> acbd18db4cc2f85cedef654fccc4a4d8
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "hash",
"settings": {
"options": {
"algorithm": "md5"
},
"input_key": "hash",
"output_key": "hash"
}
}
func (Hash) ApplyBatch ¶ added in v0.4.0
ApplyBatch processes a slice of encapsulated data with the Hash processor. Conditions are optionally applied to the data to enable processing.
type HashOptions ¶
type HashOptions struct {
Algorithm string `json:"algorithm"`
}
HashOptions contains custom options for the Hash processor:
Algorithm: hashing algorithm applied to the data must be one of: md5 sha256
type IPDatabase ¶ added in v0.7.0
type IPDatabase struct {
Options IPDatabaseOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
// IgnoreClose overrides attempts to close the processor.
IgnoreClose bool `json:"ignore_close"`
}
IPDatabase processes data by querying IP addresses in enrichment databases, including geographic location (geo) and autonomous system (asn) databases. The processor supports multiple database providers and can be reused if multiple databases need to be queried.
IP address information is abstracted from each enrichment database into a single record that contains these categories:
- asn (autonomous system information)
- geo (location information)
Enrichment databases are selected based on the naming convention [provider]_[database_name]. For example, maxmind_city returns information from any MaxMind City database. These database providers are supported:
- IP2Location
- MaxMind ASN (GeoLite2)
- MaxMind City (GeoIP2 or GeoLite2)
The processor supports these patterns:
JSON:
{"ip":"8.8.8.8"} >>> {"ip":"8.8.8.8","as":{"number":15169,"organization":"GOOGLE"}}
{"ip":"8.8.8.8"} >>> {"ip":"8.8.8.8","geo":{"continent":"North America","country":"United States","latitude":37.751,"longitude":-97.822,"accuracy_radius":1000,"timezone":"America/Chicago"}}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "ip_database",
"settings": {
"options": {
"function": "maxmind_geo",
"database_options": {
"type": "maxmind_geo",
"settings": {
"database": "location://path/to/maxmind.mmdb"
}
}
},
"input_key": "ip",
"output_key": "geo"
}
}
func (IPDatabase) Apply ¶ added in v0.7.0
Apply processes encapsulated data with the IPDatabase processor.
func (IPDatabase) ApplyBatch ¶ added in v0.7.0
func (p IPDatabase) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)
ApplyBatch processes a slice of encapsulated data with the IPDatabase processor. Conditions are optionally applied to the data to enable processing.
type IPDatabaseOptions ¶ added in v0.7.0
type IPDatabaseOptions struct {
Function string `json:"function"`
/*
DatabaseOptions is a configuration passed directly to the internal IP database package. Similar to processors, each database has its own config requirements. See internal/ip/database for more information.
Each database is lazy loaded on first invocation and can be loaded from a path on local disk, an HTTP(S) URL, or an AWS S3 URL.
*/
DatabaseOptions config.Config `json:"database_options"`
}
IPDatabaseOptions contains custom options for the IPDatabase processor.
type Insert ¶
type Insert struct {
Options InsertOptions `json:"options"`
Condition condition.Config `json:"condition"`
OutputKey string `json:"output_key"`
}
Insert processes data by inserting a value into a JSON object. The processor supports these patterns:
JSON:
{"foo":"bar"} >>> {"foo":"bar","baz":"qux"}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "insert",
"settings": {
"options": {
"value": "qux"
},
"output_key": "baz"
}
}
type InsertOptions ¶
type InsertOptions struct {
Value interface{} `json:"value"`
}
InsertOptions contains custom options for the Insert processor:
value: value to insert
type Lambda ¶
type Lambda struct {
Options LambdaOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Lambda processes data by synchronously invoking an AWS Lambda and returning the payload. The average latency of synchronously invoking a Lambda function is 10s of milliseconds, but latency can take 100s to 1000s of milliseconds depending on the function and may have significant impact on end-to-end data processing latency. If Substation is running in AWS Lambda with Kinesis, then this latency can be mitigated by increasing the parallelization factor of the Lambda (https://docs.aws.amazon.com/lambda/latest/dg/with-kinesis.html).
The input key's value must be a JSON object that contains settings for the Lambda. It is recommended to use the copy and insert processors to create the JSON object before calling this processor and to use the delete processor to remove the JSON object after calling this processor.
The processor supports these patterns:
JSON:
{"foo":"bar","lambda":{"lookup":"baz"}} >>> {"foo":"bar","lambda":{"baz":"qux"}}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "lambda",
"settings": {
"options": {
"function": "foo-function"
},
"input_key": "lambda",
"output_key": "lambda"
}
}
type LambdaOptions ¶
type LambdaOptions struct {
Function string `json:"function"`
ErrorOnFailure bool `json:"error_on_failure"`
}
LambdaOptions contains custom options settings for the Lambda processor:
Function: Lambda function to invoke ErrorOnFailure (optional): if set to true, then errors from the invoked Lambda will cause the processor to fail defaults to false
type Math ¶
type Math struct {
Options MathOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Math processes data by applying mathematic operations. The processor supports these patterns:
JSON:
{"math":[1,3]} >>> {"math":4}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "math",
"settings": {
"options": {
"operation": "add"
},
"input_key": "math",
"output_key": "math"
}
}
func (Math) ApplyBatch ¶ added in v0.4.0
ApplyBatch processes a slice of encapsulated data with the Math processor. Conditions are optionally applied to the data to enable processing.
type MathOptions ¶
type MathOptions struct {
Operation string `json:"operation"`
}
MathOptions contains custom options for the Math processor:
Operation: operator applied to the data must be one of: add subtract multiply divide
type Pipeline ¶ added in v0.3.0
type Pipeline struct {
Options PipelineOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Pipeline processes data by applying a series of processors. This processor should be used when data requires complex processing outside of the boundaries of any data structures (see tests for examples). The processor supports these patterns:
JSON:
{"pipeline":"H4sIAMpcy2IA/wXAIQ0AAACAsLbY93csBiFlc4wDAAAA"} >>> {"pipeline":"foo"}
data:
H4sIAMpcy2IA/wXAIQ0AAACAsLbY93csBiFlc4wDAAAA >> foo
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "pipeline",
"settings": {
"options": {
"processors": [
{
"type": "base64",
"settings": {
"options": {
"direction": "from"
}
}
},
{
"type": "gzip",
"settings": {
"options": {
"direction": "from"
}
}
}
]
},
"input_key": "pipeline",
"output_key": "pipeline"
},
}
func (Pipeline) Apply ¶ added in v0.4.0
Apply processes encapsulated data with the Pipeline processor.
Applicators only accept encapsulated data, so when processing JSON the input value is converted from Result to its string representation to bytes and put into a new capsule. The conversion from Result to string is safe for strings and objects, but not arrays (e.g., ["foo","bar"]).
If the input is an array, then an error is raised; the input should be run through the ForEach processor (which can encapsulate the Pipeline processor).
type PipelineOptions ¶ added in v0.3.0
PipelineOptions contains custom options for the Pipeline processor:
Processors: array of processors applied to the data
type PrettyPrint ¶ added in v0.4.0
type PrettyPrint struct {
Options PrettyPrintOptions `json:"options"`
Condition condition.Config `json:"condition"`
}
PrettyPrint processes data by applying or reversing prettyprint formatting to JSON. This processor has significant limitations when used to reverse prettyprint, including:
- cannot support multi-core processing
- invalid input will cause unpredictable results
It is strongly recommended to _not_ use this processor unless absolutely necessary; a more reliable solution is to modify the source application emitting the multi-line JSON object so that it outputs a single-line object instead.
The processor supports these patterns:
data:
{
"foo": "bar"
} >>> {"foo":"bar"}
{"foo":"bar"} >>> {
"foo": "bar"
}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "pretty_print",
"settings": {
"options": {
"direction": "from"
}
}
}
func (PrettyPrint) Apply ¶ added in v0.4.0
Apply processes encapsulated data with the PrettyPrint processor.
Applying prettyprint formatting is handled by the gjson PrettyPrint modifier and is applied to the root JSON object.
This _does not_ support reversing prettyprint formatting; this support is unnecessary for multi-line JSON objects that are stored in a single byte array.
func (PrettyPrint) ApplyBatch ¶ added in v0.4.0
func (p PrettyPrint) ApplyBatch(ctx context.Context, capsules []config.Capsule) ([]config.Capsule, error)
ApplyBatch processes a slice of encapsulated data with the PrettyPrint processor.
Applying prettyprint formatting is handled by the gjson PrettyPrint modifier and is applied to the root JSON object.
Reversing prettyprint formatting is handled by iterating incoming data per byte and pushing the bytes to a stack. When an equal number of open and close curly brackets ( { } ) are observed, then the stack of bytes has JSON compaction applied and the result is emitted as a new object.
type PrettyPrintOptions ¶ added in v0.4.0
type PrettyPrintOptions struct {
Direction string `json:"direction"`
}
PrettyPrintOptions contains custom options settings for the PrettyPrint processor:
Direction: direction of the pretty transformation must be one of: to: applies prettyprint formatting from: reverses prettyprint formatting
type Replace ¶
type Replace struct {
Options ReplaceOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Replace processes data by replacing characters. The processor supports these patterns:
JSON:
{"replace":"bar"} >>> {"replace":"baz"}
data:
bar >>> baz
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "replace",
"settings": {
"options": {
"old": "r",
"new": "z"
},
"input_key": "replace",
"output_key": "replace"
}
}
func (Replace) Apply ¶ added in v0.4.0
Apply processes encapsulated data with the Replace processor.
type ReplaceOptions ¶
type ReplaceOptions struct {
Old string `json:"old"`
New string `json:"new"`
Count int `json:"count"`
}
ReplaceOptions contains custom options for the Replace processor:
Old: character(s) to replace in the data New: character(s) that replace Old Count (optional): number of replacements to make defaults to -1, which replaces all matches
type Split ¶ added in v0.4.0
type Split struct {
Options SplitOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Split processes data by splitting it into multiple elements or items. The processor supports these patterns:
JSON:
{"split":"foo.bar"} >>> {"split":["foo","bar"]}
data:
foo\nbar\nbaz\qux >>> foo bar baz qux
{"foo":"bar"}\n{"baz":"qux"} >>> {"foo":"bar"} {"baz":"qux"}
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "split",
"settings": {
"options": {
"separator": "."
},
"input_key": "split",
"output_key": "split"
}
}
func (Split) ApplyBatch ¶ added in v0.4.0
ApplyBatch processes a slice of encapsulated data with the Split processor. Conditions are optionally applied to the data to enable processing.
type SplitOptions ¶ added in v0.4.0
type SplitOptions struct {
Separator string `json:"separator"`
}
SplitOptions contains custom options settings for the Split processor:
Separator: string that separates aggregated data
type Time ¶
type Time struct {
Options TimeOptions `json:"options"`
Condition condition.Config `json:"condition"`
InputKey string `json:"input_key"`
OutputKey string `json:"output_key"`
}
Time processes data by converting time values between formats. The processor supports these patterns:
JSON:
{"time":1639877490.061} >>> {"time":"2021-12-19T01:31:30.061000Z"}
data:
1639877490.061 >>> 2021-12-19T01:31:30.061000Z
When loaded with a factory, the processor uses this JSON configuration:
{
"type": "time",
"settings": {
"options": {
"input_format": "unix",
"output_format": "2006-01-02T15:04:05.000000Z"
},
"input_key": "time",
"output_key": "time"
}
}
func (Time) ApplyBatch ¶ added in v0.4.0
ApplyBatch processes a slice of encapsulated data with the Time processor. Conditions are optionally applied to the data to enable processing.
type TimeOptions ¶
type TimeOptions struct {
InputFormat string `json:"input_format"`
OutputFormat string `json:"output_format"`
InputLocation string `json:"input_location"`
OutputLocation string `json:"output_location"`
}
TimeOptions contains custom options for the Time processor:
InputFormat: time format of the input must be one of: pattern-based layouts (https://gobyexample.com/time-formatting-parsing) unix: epoch (supports fractions of a second) unix_milli: epoch milliseconds now: current time OutputFormat: time format of the output must be one of: pattern-based layouts (https://gobyexample.com/time-formatting-parsing) unix: epoch unix_milli: epoch milliseconds InputLocation (optional): time zone abbreviation for the input defaults to UTC OutputLocation (optional): time zone abbreviation for the output defaults to UTC