process

package
v0.8.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 11, 2023 License: MIT Imports: 28 Imported by: 0

README

process

Contains interfaces and methods for processing data. Processors contain methods for processing data singletons and batches (slices).

Documentation

Overview

Example (Applier)
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	// copies the value of key "foo" into key "baz"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// applier is retrieved from the factory
	applier, err := process.NewApplier(cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	// applier is applied to capsule
	capsule := config.NewCapsule()
	capsule.SetData([]byte(`{"foo":"fizz"}`))

	capsule, err = applier.Apply(context.TODO(), capsule)
	if err != nil {
		// handle err
		panic(err)
	}

	fmt.Println(string(capsule.Data()))
}
Output:

{"foo":"fizz","bar":"fizz"}
Example (Batcher)
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	// copies the value of key "foo" into key "bar"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// batcher is retrieved from the factory
	batcher, err := process.NewBatcher(cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	// batcher is applied to capsules
	var capsules []config.Capsule
	capsule := config.NewCapsule()

	// {"foo":"fizz","bar":"buzz"}
	for i := 1; i < 3; i++ {
		_ = capsule.Set("foo", "fizz")
		_ = capsule.Set("bar", "buzz")
		capsules = append(capsules, capsule)
	}

	capsules, err = batcher.Batch(context.TODO(), capsules...)
	if err != nil {
		// handle err
		panic(err)
	}

	for _, c := range capsules {
		fmt.Println(string(c.Data()))
	}

}
Output:

{"foo":"fizz","bar":"fizz"}
{"foo":"fizz","bar":"fizz"}
Example (DNS)
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	capsule := config.NewCapsule()
	capsule.SetData([]byte(`{"addr":"8.8.8.8"}`))

	// apply a reverse_lookup DNS query to addr
	cfg := []config.Config{
		{
			Type: "dns",
			Settings: map[string]interface{}{
				"input_key":  "addr",
				"output_key": "domains",
				"options": map[string]interface{}{
					"function": "reverse_lookup",
				},
			},
		},
	}

	appliers, err := process.NewAppliers(cfg...)
	if err != nil {
		// handle err
		panic(err)
	}

	//nolint: errcheck // errors are ignored in case processing fails in a single applier
	defer process.CloseAppliers(context.TODO(), appliers...)

	for _, app := range appliers {
		capsule, err = app.Apply(context.TODO(), capsule)
		if err != nil {
			// handle err
			panic(err)
		}
	}

	fmt.Println(string(capsule.Data()))
}
Example (IPDatabase)
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	capsule := config.NewCapsule()
	capsule.SetData([]byte(`{"addr":"8.8.8.8"}`))

	// lookup addr in MaxMind City database
	cfg := []config.Config{
		{
			Type: "ip_database",
			Settings: map[string]interface{}{
				"key":     "addr",
				"set_key": "geo",
				"options": map[string]interface{}{
					"type": "maxmind_city",
					"settings": map[string]interface{}{
						// the location of the IP enrichment database can
						// be either a path on local disk, an HTTP(S) URL,
						// or an AWS S3 URL
						"database": "location://path/to/maxmind.mmdb",
						"language": "en",
					},
				},
			},
		},
	}

	appliers, err := process.NewAppliers(cfg...)
	if err != nil {
		// handle err
		panic(err)
	}

	//nolint: errcheck // errors are ignored in case processing fails in a single applier
	defer process.CloseAppliers(context.TODO(), appliers...)

	for _, app := range appliers {
		capsule, err = app.Apply(context.TODO(), capsule)
		if err != nil {
			// handle err
			panic(err)
		}
	}

	fmt.Println(string(capsule.Data()))
}
Example (KVStore)
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	capsule := config.NewCapsule()
	capsule.SetData([]byte(`{"foo":"bar"}`))

	// the value from key "foo" is first set into the KV store and
	// then retrieved from the KV store and set into key "baz". if
	// the KV options are identical across processors, then the same
	// KV store is used in each call. this also allows for the use of
	// multiple KV stores.
	cfg := []config.Config{
		{
			Type: "kv_store",
			Settings: map[string]interface{}{
				"key":     "foo",
				"set_key": "setter",
				"options": map[string]interface{}{
					"type": "set",
					"kv_options": map[string]interface{}{
						"type": "memory",
						"settings": map[string]interface{}{
							"capacitiy": 10,
						},
					},
				},
			},
		},
		{
			Type: "kv_store",
			Settings: map[string]interface{}{
				"key":     "setter",
				"set_key": "baz",
				"options": map[string]interface{}{
					"type": "get",
					"kv_options": map[string]interface{}{
						"type": "memory",
						"settings": map[string]interface{}{
							"capacitiy": 10,
						},
					},
				},
			},
		},
	}

	appliers, err := process.NewAppliers(cfg...)
	if err != nil {
		// handle err
		panic(err)
	}

	//nolint: errcheck // errors are ignored in case processing fails in a single applier
	defer process.CloseAppliers(context.TODO(), appliers...)

	for _, app := range appliers {
		capsule, err = app.Apply(context.TODO(), capsule)
		if err != nil {
			// handle err
			panic(err)
		}
	}

	fmt.Println(string(capsule.Data()))
}

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Apply added in v0.4.0

func Apply(ctx context.Context, capsule config.Capsule, appliers ...Applier) (config.Capsule, error)

Apply applies processors in series to encapsulated data.

func ApplyBytes added in v0.8.0

func ApplyBytes(ctx context.Context, data []byte, appliers ...Applier) ([]byte, error)

ApplyBytes is a convenience function for applying processors in series to bytes.

Example
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	// copies the value of key "foo" into key "bar"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// applier is retrieved from the factory
	applier, err := process.NewApplier(cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	// applier is applied to bytes
	b := []byte(`{"foo":"fizz"}`)
	b, err = process.ApplyBytes(context.TODO(), b, applier)
	if err != nil {
		// handle err
		panic(err)
	}

	fmt.Println(string(b))
}
Output:

{"foo":"fizz","bar":"fizz"}

func Batch added in v0.8.0

func Batch(ctx context.Context, batch []config.Capsule, batchers ...Batcher) ([]config.Capsule, error)

Batch accepts one or more batchers and applies processors in series to encapsulated data.

func BatchBytes added in v0.8.0

func BatchBytes(ctx context.Context, data [][]byte, batchers ...Batcher) ([][]byte, error)

BatchBytes is a convenience function for applying processors in series to bytes.

Example
package main

import (
	"context"
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	// copies the value of key "foo" into key "bar"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// batcher is retrieved from the factory
	batcher, err := process.NewBatcher(cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	// applier is applied to slice of bytes
	b := [][]byte{[]byte(`{"foo":"fizz"}`)}
	b, err = process.BatchBytes(context.TODO(), b, batcher)
	if err != nil {
		// handle err
		panic(err)
	}

	for _, e := range b {
		fmt.Println(string(e))
	}
}
Output:

{"foo":"fizz","bar":"fizz"}

func CloseAppliers added in v0.8.0

func CloseAppliers(ctx context.Context, appliers ...Applier) error

CloseAppliers closes all appliers and returns an error if any close fails.

func CloseBatchers added in v0.8.0

func CloseBatchers(ctx context.Context, batchers ...Batcher) error

CloseBatchers closes all batchers and returns an error if any close fails.

Types

type Applier added in v0.8.0

type Applier interface {
	Apply(context.Context, config.Capsule) (config.Capsule, error)
	Close(context.Context) error
}

func NewApplier added in v0.8.0

func NewApplier(cfg config.Config) (Applier, error)

NewApplier returns a configured Applier from a processor configuration.

Example
package main

import (
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	// copies the value of key "foo" into key "bar"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// applier is retrieved from the factory
	applier, err := process.NewApplier(cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	fmt.Println(applier)
}
Output:

{"condition":{"operator":"","inspectors":null},"key":"foo","set_key":"bar","ignore_close":false,"ignore_errors":false}

func NewAppliers added in v0.8.0

func NewAppliers(cfg ...config.Config) ([]Applier, error)

NewAppliers accepts one or more processor configurations and returns configured appliers.

Example
package main

import (
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	// copies the value of key "foo" into key "bar"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// one or more appliers are created
	appliers, err := process.NewAppliers(cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	for _, app := range appliers {
		fmt.Println(app)
	}
}
Output:

{"condition":{"operator":"","inspectors":null},"key":"foo","set_key":"bar","ignore_close":false,"ignore_errors":false}

type Batcher added in v0.8.0

type Batcher interface {
	Batch(context.Context, ...config.Capsule) ([]config.Capsule, error)
	Close(context.Context) error
}

func NewBatcher added in v0.8.0

func NewBatcher(cfg config.Config) (Batcher, error)

NewBatcher returns a configured Batcher from a processor configuration.

Example
package main

import (
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	// copies the value of key "foo" into key "bar"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// one or more appliers are created
	batcher, err := process.NewBatcher(cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	fmt.Println(batcher)
}
Output:

{"condition":{"operator":"","inspectors":null},"key":"foo","set_key":"bar","ignore_close":false,"ignore_errors":false}

func NewBatchers added in v0.8.0

func NewBatchers(cfg ...config.Config) ([]Batcher, error)

NewBatchers accepts one or more processor configurations and returns configured batchers.

Example
package main

import (
	"fmt"

	"github.com/brexhq/substation/config"
	"github.com/brexhq/substation/process"
)

func main() {
	// copies the value of key "foo" into key "bar"
	cfg := config.Config{
		Type: "copy",
		Settings: map[string]interface{}{
			"key":     "foo",
			"set_key": "bar",
		},
	}

	// one or more batchers are created
	batchers, err := process.NewBatchers(cfg)
	if err != nil {
		// handle err
		panic(err)
	}

	for _, bat := range batchers {
		fmt.Println(bat)
	}
}
Output:

{"condition":{"operator":"","inspectors":null},"key":"foo","set_key":"bar","ignore_close":false,"ignore_errors":false}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL