machine

package module
v0.9.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 23, 2020 License: MIT Imports: 18 Imported by: 13

README ΒΆ

Go GoDoc Go Report Card Codacy Badge Codacy Badge Gitter chat

Machine is a library for creating data workflows. These workflows can be either very concise or quite complex, even allowing for cycles for flows that need retry or self healing mechanisms.

Installation

Add the primary library to your project

  go get -u github.com/whitaker-io/machine

Download the command to generate a boilerplate project (WIP/alpha state)

  go get -u github.com/whitaker-io/machine/cmd
  cd $GOSRC/github.com/whitaker-io/machine/cmd
  go build -a -o pkg/machine -ldflags "-s -w"
  sudo ln -s pkg/machine /usr/local/bin/machine

Add the different Subscription Implementations

  go get -u github.com/whitaker-io/machine/subscriptions/kafka
  go get -u github.com/whitaker-io/machine/subscriptions/redis
  go get -u github.com/whitaker-io/machine/subscriptions/pubsub
  go get -u github.com/whitaker-io/machine/subscriptions/sqs

Documentation

Docs


Example

Redis Subscription with basic receive -> process -> send Stream


  // logger allows for logs to be transmitted to your log provider
  var logger machine.Logger

  // logStore allows for running a cluster and handles communication
  var logStore machine.LogStore

  // pool is a redigo Pool for a redis cluster to read the stream from
  // see also the Google Pub/Sub, Kafka, and SQS implementations
  var pool *redigo.Pool

  redisStream := redis.New(pool, logger)
  
  // NewPipe creates a pipe in which you can run multiple streams
  // the id is the instance identifier for the cluster
  p := NewPipe(uuid.New().String(), logger, logStore, fiber.Config{
    ReadTimeout: time.Second,
    WriteTimeout: time.Second,
    BodyLimit: 4 * 1024 * 1024,
    DisableKeepalive: true,
  })

  // StreamSubscription takes an instance of machine.Subscription
  // and a time interval in which to read
  // the id here needs to be the same for all the nodes for the clustering to work
  builder := p.StreamSubscription("unique_stream_id", redisStream, 5*time.Millisecond,
    &Option{FIFO: boolP(false)},
    &Option{Injectable: boolP(true)},
    &Option{Metrics: boolP(true)},
    &Option{Span: boolP(false)},
    &Option{BufferSize: intP(0)},
  ).Builder()

  builder.Map("unique_id2", 
      func(m Data) error {
        var err error

        // ...do some processing

        return err
      },
    ).
    Transmit("unique_id3", 
      func(d []Data) error {
        // send a copy of the data somewhere

        return nil
      },
    )

  // Run requires a context, the port to run the fiber.App,
  // and the timeout for graceful shutdown
  if err := p.Run(context.Background(), ":5000", 10 * time.Second); err != nil {
    // Run will return an error in the case that 
    // one of the paths is not terminated (i.e. missing a Transmit)
    panic(err)
  }

🀝 Contributing

Contributions, issues and feature requests are welcome.
Feel free to check issues page if you want to contribute.
Check the contributing guide.

Author

πŸ‘€ Jonathan Whitaker

Show your support

Please ⭐️ this repository if this project helped you!


License

Machine is provided under the MIT License.

The MIT License (MIT)

Copyright (c) 2020 Jonathan Whitaker

Documentation ΒΆ

Index ΒΆ

Constants ΒΆ

This section is empty.

Variables ΒΆ

This section is empty.

Functions ΒΆ

This section is empty.

Types ΒΆ

type Applicative ΒΆ added in v0.3.0

type Applicative func(Data) error

Applicative type for applying a change to a typed.Typed

type Builder ΒΆ

type Builder interface {
	Map(id string, a Applicative, options ...*Option) Builder
	FoldLeft(id string, f Fold, options ...*Option) Builder
	FoldRight(id string, f Fold, options ...*Option) Builder
	Fork(id string, f Fork, options ...*Option) (Builder, Builder)
	Link(id, target string, options ...*Option)
	Transmit(id string, s Sender, options ...*Option)
}

Builder interface for plotting out the data flow of the system

type Data ΒΆ added in v0.4.0

type Data typed.Typed

Data wrapper on typed.Typed

type Fold ΒΆ added in v0.5.0

type Fold func(Data, Data) Data

Fold type for folding a 2 Data into a single element

type Fork ΒΆ added in v0.5.0

type Fork func(list []*Packet) (a, b []*Packet)

Fork func for splitting a payload into 2

var (
	// ForkDuplicate is a SplitHandler that sends data to both outputs
	ForkDuplicate Fork = func(payload []*Packet) (a, b []*Packet) {
		payload2 := []*Packet{}
		buf := &bytes.Buffer{}
		enc, dec := gob.NewEncoder(buf), gob.NewDecoder(buf)

		_ = enc.Encode(payload)
		_ = dec.Decode(&payload2)

		for i, packet := range payload {
			payload2[i].span = packet.span
		}

		return payload, payload2
	}

	// ForkError is a SplitHandler for splitting errors from successes
	ForkError Fork = func(payload []*Packet) (s, f []*Packet) {
		s = []*Packet{}
		f = []*Packet{}

		for _, packet := range payload {
			if packet.Error != nil {
				f = append(f, packet)
			} else {
				s = append(s, packet)
			}
		}

		return s, f
	}
)

type ForkRule ΒΆ added in v0.5.0

type ForkRule func(Data) bool

ForkRule provides a SplitHandler for splitting based on the return bool

func (ForkRule) Handler ΒΆ added in v0.5.0

func (r ForkRule) Handler(payload []*Packet) (t, f []*Packet)

Handler func for providing a SplitHandler

type HealthInfo ΒΆ added in v0.7.0

type HealthInfo struct {
	StreamID    string    `json:"stream_id"`
	LastPayload time.Time `json:"last_payload"`
	// contains filtered or unexported fields
}

HealthInfo type for giving info on the stream

type InjectionCallback ΒΆ added in v0.7.0

type InjectionCallback func(logs ...*Log)

InjectionCallback func to run when the LogStore decides to restart the flow of orphaned Data

type Log ΒΆ added in v0.7.0

type Log struct {
	OwnerID    string    `json:"owner_id"`
	StreamID   string    `json:"stream_id"`
	VertexID   string    `json:"vertex_id"`
	VertexType string    `json:"vertex_type"`
	State      string    `json:"state"`
	Packet     *Packet   `json:"packet"`
	When       time.Time `json:"when"`
}

Log type for holding the data that is recorded from the streams

type LogStore ΒΆ added in v0.7.0

type LogStore interface {
	Join(id string, callback InjectionCallback, streamIDs ...string) error
	Write(logs ...*Log)
	Leave(id string) error
}

LogStore type for managing cluster state

type Logger ΒΆ added in v0.7.0

type Logger interface {
	Error(...interface{})
	Info(...interface{})
}

Logger type for accepting log messages

type Option ΒΆ added in v0.2.0

type Option struct {
	FIFO       *bool
	Injectable *bool
	BufferSize *int
	Span       *bool
	Metrics    *bool
}

Option type for holding machine settings

type Packet ΒΆ

type Packet struct {
	ID    string `json:"id"`
	Data  Data   `json:"data"`
	Error error  `json:"error"`
	// contains filtered or unexported fields
}

Packet type that holds information traveling through the machine

type Pipe ΒΆ added in v0.7.0

type Pipe struct {
	// contains filtered or unexported fields
}

Pipe type for holding the server information for running http servers

func NewPipe ΒΆ added in v0.7.0

func NewPipe(id string, logger Logger, store LogStore, config ...fiber.Config) *Pipe

NewPipe func for creating a new server instance

func (*Pipe) Load ΒΆ added in v0.9.0

func (pipe *Pipe) Load(lc *Serialization) error

Load method loads a stream based on github.com/traefik/yaegi

func (*Pipe) Run ΒΆ added in v0.7.0

func (pipe *Pipe) Run(ctx context.Context, port string, gracePeriod time.Duration) error

Run func to start the server

func (*Pipe) Stream ΒΆ added in v0.8.0

func (pipe *Pipe) Stream(stream Stream) Builder

Stream func for registering a Stream with the Pipe

func (*Pipe) StreamHTTP ΒΆ added in v0.7.0

func (pipe *Pipe) StreamHTTP(id string, opts ...*Option) Builder

StreamHTTP func for creating a Stream at the path /stream/<id>

func (*Pipe) StreamSubscription ΒΆ added in v0.7.0

func (pipe *Pipe) StreamSubscription(id string, sub Subscription, interval time.Duration, opts ...*Option) Builder

StreamSubscription func for creating a Stream at the that reads from a subscription

func (*Pipe) Use ΒΆ added in v0.7.0

func (pipe *Pipe) Use(args ...interface{})

Use Wraps fiber.App.Use

Use registers a middleware route that will match requests with the provided prefix (which is optional and defaults to "/").

app.Use(func(c *fiber.Ctx) error {
   return c.Next()
})
app.Use("/api", func(c *fiber.Ctx) error {
   return c.Next()
})
app.Use("/api", handler, func(c *fiber.Ctx) error {
   return c.Next()
})

This method will match all HTTP verbs: GET, POST, PUT, HEAD etc...

type Retriever ΒΆ added in v0.3.0

type Retriever func(context.Context) chan []Data

Retriever type for providing the data to flow into the system

type Sender ΒΆ added in v0.3.0

type Sender func([]Data) error

Sender type for sending data out of the system

type Serialization ΒΆ added in v0.9.0

type Serialization struct {
	ID       string         `json:"id,omitempty" mapstructure:"id,omitempty"`
	Type     string         `json:"type,omitempty" mapstructure:"type,omitempty"`
	Symbol   string         `json:"symbol,omitempty" mapstructure:"symbol,omitempty"`
	Script   string         `json:"script,omitempty" mapstructure:"script,omitempty"`
	Interval time.Duration  `json:"interval,omitempty" mapstructure:"interval,omitempty"`
	Options  []*Option      `json:"options,omitempty" mapstructure:"options,omitempty"`
	To       string         `json:"to,omitempty" mapstructure:"to,omitempty"`
	Next     *Serialization `json:"next,omitempty" mapstructure:"next,omitempty"`
	Left     *Serialization `json:"left,omitempty" mapstructure:"left,omitempty"`
	Right    *Serialization `json:"right,omitempty" mapstructure:"right,omitempty"`
}

Serialization type for holding information about github.com/traefik/yaegi based streams

type Stream ΒΆ added in v0.5.0

type Stream interface {
	ID() string
	Run(ctx context.Context, recorders ...recorder) error
	Inject(ctx context.Context, events map[string][]*Packet)
	Builder() Builder
}

Stream interface for Running and injecting data

func NewStream ΒΆ added in v0.5.0

func NewStream(id string, retriever Retriever, options ...*Option) Stream

NewStream func for providing a Stream

type Subscription ΒΆ added in v0.7.0

type Subscription interface {
	Read(ctx context.Context) []Data
	Close() error
}

Subscription interface for creating a pull based stream

Directories ΒΆ

Path Synopsis
cmd module
common module
edge
http module
pubsub module
telemetry module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL