Documentation
¶
Overview ¶
Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.
Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.
Package machine - Copyright © 2020 Jonathan Whitaker <github@whitaker.io>.
Use of this source code is governed by an MIT-style license that can be found in the LICENSE file.
Index ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Machine ¶
type Machine[T any] interface { // Name returns the name of the Machine path. Useful for debugging or reasoning about the path. Name() string // Then apply a mutation to each individual element of the payload. Then(a ...Monad[T]) Machine[T] // Recurse applies a recursive function to the payload through a Y Combinator. // f is a function used by the Y Combinator to perform a recursion // on the payload. // Example: // // func(f Monad[int]) Monad[int] { // return func(x int) int { // if x <= 0 { // return 1 // } else { // return x * f(x-1) // } // } // } Recurse(x Monad[Monad[T]]) Machine[T] // Memoize applies a recursive function to the payload through a Y Combinator // and memoizes the results based on the index func. // f is a function used by the Y Combinator to perform a recursion // on the payload. // Example: // // func(f Monad[int]) Monad[int] { // return func(x int) int { // if x <= 0 { // return 1 // } else { // return x * f(x-1) // } // } // } Memoize(x Monad[Monad[T]], index func(T) string) Machine[T] // Or runs all of the functions until one succeeds or sends the payload to the right branch Or(x ...Filter[T]) (Machine[T], Machine[T]) // And runs all of the functions and if one doesnt succeed sends the payload to the right branch And(x ...Filter[T]) (Machine[T], Machine[T]) // Filter splits the data into multiple stream branches If(f Filter[T]) (Machine[T], Machine[T]) // Select applies a series of Filters to the payload and returns a list of Builders // the last one being for any unmatched payloads. Select(fns ...Filter[T]) []Machine[T] // Tee duplicates the data into multiple stream branches. Tee(func(T) (a, b T)) (Machine[T], Machine[T]) // While creates a loop in the stream based on the filter While(x Filter[T]) (loop, out Machine[T]) // Drop terminates the data from further processing without passing it on Drop() // Distribute is a function used for fanout Distribute(Edge[T]) Machine[T] // Output provided channel Output() chan T // contains filtered or unexported methods }
Machine is the interface provided for creating a data processing stream.
func New ¶
func New[T any](name string, input chan T, options ...Option) (startFn func(context.Context), x Machine[T])
New is a function for creating a new Machine.
name string input chan T option *Option[T]
Call the startFn returned by New to start the Machine once built.
type Monad ¶
type Monad[T any] func(d T) T
Monad is a function that is applied to data and used for transformations
type Option ¶
type Option interface {
// contains filtered or unexported methods
}
Option is used to configure the machine
OptionFIF0 controls the processing order of the datas If set to true the system will wait for one data to be processed before starting the next.
func OptionAttributes ¶ added in v3.0.9
OptionAttributes apply the slog.Attr's to the machine metrics and spans Do not override the "name", "type", "duration", "error", or "value" attributes
func OptionBufferSize ¶
OptionBufferSize sets the buffer size on the edge channels between the vertices, this setting can be useful when processing large amounts of data with FIFO turned on.
func OptionFlush ¶ added in v3.2.0
OptionFlush attempts to send all data to the flushFN before exiting after the gracePeriod has expired Im looking for a good way to make this type specific, but want to avoid having to add separate option settings for the Transform function.