pool

package
v0.0.0-...-fda0a1a Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 9, 2019 License: MIT Imports: 6 Imported by: 2

README

Pipes Pool

Report Docs Version

Pipes Pool provides a worker pool implementation to the Pipes library enabling concurrent execution of pipelines as well as the ability to control troughput and resource utilization.

For now Pipes Pool is a proof of concept and should not be used in production yet.

Features

  • Concurrent execution of entire pipelines.
  • Access to Pipeline during execution.
  • Ability to limit level of concurrency.
  • Graceful shutdown of processes.
  • Worker state and Job information.
  • Ability to register callback functions to be executed by workers.

Installation

Get the source with go get:

$ go get github.com/cbergoon/pipes-pool

Then import the library in your project:

import "github.com/cbergoon/pipes"
import "github.com/cbergoon/pipes-dl"
import "github.com/cbergoon/pipes-pool"

Documentation

A Pipes Pool ...

Example Usage

package main

import (
	"fmt"
	"log"
	"time"

	pipesdl "github.com/cbergoon/pipes-dl"
	pipespool "github.com/cbergoon/pipes-pool"
)

func main() {
	// MaxWorker controls number of workers available
	MaxWorker := 10

	// create and start Dispatcher
	dispatcher := pipespool.NewDispatcher(MaxWorker, true)
	dispatcher.Run()

	var jobs []*pipespool.Job
	for i := 0; i < 15; i++ {
		source := `CREATE PIPELINE "MyPipeline";

		ADD "Alfa" OF "Generator" OUTPUTS = ("Out1");
		ADD "Beta" OF "DynamicJS"
		  INPUTS = ("In1")
		  OUTPUTS = ("Out")
		  SET "src" = 'Out = In1;',
			"gg" = "kk";
		ADD SINK "Charlie" OF "Printer" INPUTS = ("In");
		
		CONNECT "Alfa":"Out1" TO "Beta":"In1";
		CONNECT "Beta":"Out" TO "Charlie":"In";`

		l := pipesdl.NewLexer(source)
		p := pipesdl.NewParser(l)

		pd, err := p.ParseProgram()
		if err != nil {
			log.Fatal(err)
		}

		work := &pipespool.Job{
			ID:                 int64(i),
			InitialState:       make(map[string]string),
			PipelineDefinition: pd,
		}
		jobs = append(jobs, work)
		go func() {
			for {
				if jobs[0].FlowPipeline != nil {
					fmt.Println(jobs[0].FlowPipeline.GetPipelineState())
					time.Sleep(time.Millisecond * 1000)
				}
			}
		}()
		dispatcher.JobQueue <- work
	}

	time.Sleep(time.Second * 10)
}

Contributions

All contributions are welcome.

License

This project is licensed under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher struct {
	Workers    []*Worker
	WorkerPool chan chan *Job

	JobQueue chan *Job

	LivePipelineState bool
	// contains filtered or unexported fields
}

Dispatcher handles routing jobs to workers

func NewDispatcher

func NewDispatcher(maxW int, livePipelineState bool) *Dispatcher

NewDispatcher creates a new Dispatcher

func NewDispatcherWithPlugins

func NewDispatcherWithPlugins(maxW int, livePipelineState bool, pluginDir string) (*Dispatcher, error)

NewDispatcherWithPlugins creates a new Dispatcher with plugin search information

func (*Dispatcher) Run

func (d *Dispatcher) Run()

Run creates workers and builds pool

func (*Dispatcher) RunBlocking

func (d *Dispatcher) RunBlocking()

Run creates workers and builds pool

func (*Dispatcher) Stop

func (d *Dispatcher) Stop()

type Job

type Job struct {
	ID                 int64
	InitialState       map[string]string
	PipelineDefinition *pipeline.PipelineDefinition
	FlowPipeline       *pipeline.FlowPipeline
	PipelineCallback   func(state pipeline.PipelineState)
}

Job defines the task to be executed by worker

type Worker

type Worker struct {
	ID int

	WorkerPool chan chan *Job
	JobChannel chan *Job

	LivePipelineState bool

	DispatcherRef *Dispatcher
	// contains filtered or unexported fields
}

Worker represents the worker that executes the job

func NewWorker

func NewWorker(workerID int, livePipelineState bool, workerPool chan chan *Job, dispatcher *Dispatcher) Worker

NewWorker creates a new worker using WorkerPool from Dispatcher

func (*Worker) Start

func (w *Worker) Start()

Start method starts the run loop of the worker

func (*Worker) Stop

func (w *Worker) Stop()

Stop exits the run loop of the worker

type WorkerError

type WorkerError struct {
	WorkerID int

	Error error

	ErrorTime    time.Time
	ErrorMessage string
	Content      string
}

WorkerError represents error encountered by worker.

type WorkerState

type WorkerState struct {
	WorkerID  int
	IsRunning bool
	StartTime time.Time

	IsRunningJob bool      //TODO (cbergoon): Implement
	JobStartTime time.Time //TODO (cbergoon): Implement
	JobEndTime   time.Time //TODO (cbergoon): Implement

	CurrentJob *Job

	Errors []*WorkerError
}

WorkerState Represents state information of reporting worker.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL