Documentation
¶
Overview ¶
Package stream creates and manages a full Benthos logical stream, consisting of an input layer of sources, a buffer layer, a processing pipelines layer, and an output layer of sinks.
Inputs -> Buffer -> Processing Pipelines -> Outputs
Example (SplitToBatch) ¶
ExampleSplitToBatch Demonstrates running a Kafka to Kafka stream where each incoming message is parsed as a line delimited blob of payloads and the payloads are sent on as a single batch of messages.
package main
import (
"bytes"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/Jeffail/benthos/lib/message"
"github.com/Jeffail/benthos/lib/types"
)
// SplitToBatch is a types.Processor implementation that reads a single message
// containing line delimited payloads and splits the payloads into a single
// batch of messages per line.
type SplitToBatch struct{}
// ProcessMessage splits messages of a batch by lines and sends them onwards as
// a batch of messages.
func (p SplitToBatch) ProcessMessage(m types.Message) ([]types.Message, types.Response) {
var splitParts [][]byte
m.Iter(func(i int, b []byte) error {
splitParts = append(splitParts, bytes.Split(b, []byte("\n"))...)
return nil
})
return []types.Message{message.New(splitParts)}, nil
}
// ExampleSplitToBatch Demonstrates running a Kafka to Kafka stream where each
// incoming message is parsed as a line delimited blob of payloads and the
// payloads are sent on as a single batch of messages.
func main() {
conf := NewConfig()
conf.Input.Type = "kafka"
conf.Input.Kafka.Addresses = []string{
"localhost:9092",
}
conf.Input.Kafka.Topic = "example_topic_one"
conf.Output.Type = "kafka"
conf.Output.Kafka.Addresses = []string{
"localhost:9092",
}
conf.Output.Kafka.Topic = "example_topic_two"
s, err := New(conf, OptAddProcessors(func() (types.Processor, error) {
return SplitToBatch{}, nil
}))
if err != nil {
panic(err)
}
defer s.Stop(time.Second)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
// Wait for termination signal
select {
case <-sigChan:
log.Println("Received SIGTERM, the service is closing.")
}
}
Example (SplitToMessages) ¶
ExampleSplitToMessages demonstrates running a Kafka to Kafka stream where each incoming message is parsed as a line delimited blob of payloads and the payloads are sent on as a single message per payload.
package main
import (
"bytes"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/Jeffail/benthos/lib/message"
"github.com/Jeffail/benthos/lib/types"
)
// SplitToMessages is a types.Processor implementation that reads a single
// message containing line delimited payloads and splits the payloads into a
// single message per line.
type SplitToMessages struct{}
// ProcessMessage splits messages of a batch by lines and sends them onwards as
// an individual message per payload.
func (p SplitToMessages) ProcessMessage(m types.Message) ([]types.Message, types.Response) {
var splitParts [][]byte
m.Iter(func(i int, b []byte) error {
splitParts = append(splitParts, bytes.Split(b, []byte("\n"))...)
return nil
})
messages := make([]types.Message, len(splitParts))
for i, part := range splitParts {
messages[i] = message.New([][]byte{part})
}
return messages, nil
}
// ExampleSplitToMessages demonstrates running a Kafka to Kafka stream where
// each incoming message is parsed as a line delimited blob of payloads and the
// payloads are sent on as a single message per payload.
func main() {
conf := NewConfig()
conf.Input.Type = "kafka"
conf.Input.Kafka.Addresses = []string{
"localhost:9092",
}
conf.Input.Kafka.Topic = "example_topic_one"
conf.Output.Type = "kafka"
conf.Output.Kafka.Addresses = []string{
"localhost:9092",
}
conf.Output.Kafka.Topic = "example_topic_two"
s, err := New(conf, OptAddProcessors(func() (types.Processor, error) {
return SplitToMessages{}, nil
}))
if err != nil {
panic(err)
}
defer s.Stop(time.Second)
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
// Wait for termination signal
select {
case <-sigChan:
log.Println("Received SIGTERM, the service is closing.")
}
}
Index ¶
- func OptAddInputPipelines(pipes ...types.PipelineConstructorFunc) func(*Type)
- func OptAddOutputPipelines(pipes ...types.PipelineConstructorFunc) func(*Type)
- func OptAddProcessors(procs ...types.ProcessorConstructorFunc) func(*Type)
- func OptOnClose(onClose func()) func(*Type)
- func OptSetLogger(log log.Modular) func(*Type)
- func OptSetManager(mgr types.Manager) func(*Type)
- func OptSetStats(stats metrics.Type) func(*Type)
- type Config
- type Type
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func OptAddInputPipelines ¶
func OptAddInputPipelines(pipes ...types.PipelineConstructorFunc) func(*Type)
OptAddInputPipelines adds additional pipelines that will be constructed for each input of the Benthos stream.
func OptAddOutputPipelines ¶
func OptAddOutputPipelines(pipes ...types.PipelineConstructorFunc) func(*Type)
OptAddOutputPipelines adds additional pipelines that will be constructed for each output of the Benthos stream.
func OptAddProcessors ¶
func OptAddProcessors(procs ...types.ProcessorConstructorFunc) func(*Type)
OptAddProcessors adds additional processors that will be constructed for each logical thread of the processing pipeline layer of the Benthos stream.
func OptOnClose ¶
func OptOnClose(onClose func()) func(*Type)
OptOnClose sets a closure to be called when the stream closes.
func OptSetLogger ¶
OptSetLogger sets the logging output to be used by all components of the stream.
func OptSetManager ¶
OptSetManager sets the service manager to be used by all components of the stream.
func OptSetStats ¶
OptSetStats sets the metrics aggregator to be used by all components of the stream.
Types ¶
type Config ¶
type Config struct {
Input input.Config `json:"input" yaml:"input"`
Buffer buffer.Config `json:"buffer" yaml:"buffer"`
Pipeline pipeline.Config `json:"pipeline" yaml:"pipeline"`
Output output.Config `json:"output" yaml:"output"`
}
Config is a configuration struct for a Benthos stream.