Documentation
¶
Index ¶
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func FlattenFlow ¶
Example ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugin/codec/csv"
_ "github.com/OutOfBedlam/tine/plugin/codec/json"
_ "github.com/OutOfBedlam/tine/plugin/flows/base"
_ "github.com/OutOfBedlam/tine/plugin/inlets/file"
_ "github.com/OutOfBedlam/tine/plugin/outlets/file"
)
func main() {
dsl := `
[[inlets.file]]
data = [
"a,1",
"b,2",
"c,3",
]
format = "csv"
[[flows.flatten]]
name_infix = "::"
[[flows.select]]
includes = ["**"]
[[outlets.file]]
path = "-"
format = "csv"
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
count := int64(0)
engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
// Run the pipeline
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: 1721954798,file::0,a 1721954798,file::1,1 1721954799,file::0,b 1721954799,file::1,2 1721954800,file::0,c 1721954800,file::1,3
func MergeFlow ¶
Example ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugin/codec/csv"
_ "github.com/OutOfBedlam/tine/plugin/codec/json"
_ "github.com/OutOfBedlam/tine/plugin/flows/base"
_ "github.com/OutOfBedlam/tine/plugin/inlets/exec"
_ "github.com/OutOfBedlam/tine/plugin/inlets/file"
_ "github.com/OutOfBedlam/tine/plugin/outlets/file"
)
func main() {
// This example demonstrates how to use the merge flow.
dsl := `
[[inlets.file]]
data = [
"a,1",
]
format = "csv"
[[inlets.exec]]
commands = ["echo", "hello world"]
count = 1
trim_space = true
ignore_error = true
[[flows.merge]]
wait_limit = "1s"
[[outlets.file]]
path = "-"
format = "json"
`
// Make the output time deterministic. so we can compare it.
// This line is not needed in production code.
engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
// Create a new engine.
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
// Run the engine.
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: {"_ts":1721954797,"exec.stdout":"hello world","file.0":"a","file.1":"1"}
func SelectFlow ¶ added in v0.0.2
Example ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugin/codec/csv"
_ "github.com/OutOfBedlam/tine/plugin/codec/json"
_ "github.com/OutOfBedlam/tine/plugin/flows/base"
_ "github.com/OutOfBedlam/tine/plugin/inlets/file"
_ "github.com/OutOfBedlam/tine/plugin/outlets/file"
)
func main() {
dsl := `
[[inlets.file]]
data = [
"a,1",
"b,2",
"c,3",
]
format = "csv"
[[flows.select]]
includes = ["**", "not-exist", "#_ts", "1"]
[[outlets.file]]
path = "-"
format = "csv"
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
count := int64(0)
engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
// Run the pipeline
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: file,1721954798,a,1,,1721954798,1 file,1721954799,b,2,,1721954799,2 file,1721954800,c,3,,1721954800,3
Example (Tag) ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugin/codec/csv"
_ "github.com/OutOfBedlam/tine/plugin/codec/json"
_ "github.com/OutOfBedlam/tine/plugin/flows/base"
_ "github.com/OutOfBedlam/tine/plugin/inlets/file"
_ "github.com/OutOfBedlam/tine/plugin/outlets/file"
)
func main() {
dsl := `
[[inlets.file]]
data = [
"a,1",
"b,2",
"c,3",
]
format = "csv"
[[flows.select]]
includes = ["#_in", "*"]
[[outlets.file]]
path = "-"
format = "csv"
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
count := int64(0)
engine.Now = func() time.Time { count++; return time.Unix(1721954797+count, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
// Run the pipeline
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: file,a,1 file,b,2 file,c,3
func UpdateFlow ¶ added in v0.0.3
Example ¶
package main
import (
"time"
"github.com/OutOfBedlam/tine/engine"
_ "github.com/OutOfBedlam/tine/plugin/codec/csv"
_ "github.com/OutOfBedlam/tine/plugin/codec/json"
_ "github.com/OutOfBedlam/tine/plugin/flows/base"
_ "github.com/OutOfBedlam/tine/plugin/inlets/file"
_ "github.com/OutOfBedlam/tine/plugin/outlets/file"
)
func main() {
dsl := `
[[inlets.file]]
data = [
"James,1,1.23,true",
"Jane,2,2.34,false",
"Scott,3,3.45,true",
]
fields = ["my_name", "my_int", "my_float", "flag"]
format = "csv"
[[flows.update]]
set = [
{ field = "my_name", name = "new_name" },
{ field = "my_int", value = 10 },
{ field = "my_float", value = 9.87, name = "new_float" },
{ field = "flag", value = true, name = "new_flag" },
{ tag = "_in", value = "mine" },
]
[[flows.select]]
includes = ["#_in", "*"]
[[outlets.file]]
format = "json"
`
// Make the output timestamp deterministic, so we can compare it
// This line is required only for testing
engine.Now = func() time.Time { return time.Unix(1721954797, 0) }
// Create a new pipeline
pipeline, err := engine.New(engine.WithConfig(dsl))
if err != nil {
panic(err)
}
// Run the pipeline
if err := pipeline.Run(); err != nil {
panic(err)
}
}
Output: {"_in":"mine","my_int":"10","new_flag":true,"new_float":9.87,"new_name":"James"} {"_in":"mine","my_int":"10","new_flag":true,"new_float":9.87,"new_name":"Jane"} {"_in":"mine","my_int":"10","new_flag":true,"new_float":9.87,"new_name":"Scott"}
Types ¶
This section is empty.
Click to show internal directories.
Click to hide internal directories.