escheduler

package module
v0.1.63 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 29, 2024 License: Apache-2.0 Imports: 26 Imported by: 0

README

eScheduler 📅

eScheduler is a distributed task scheduling framework designed to handle various load balancing strategies. The framework allows clients to add and remove tasks dynamically by monitoring task statuses.

Features ✨

  • Distributed Scheduling: Efficiently schedules tasks across multiple workers.
  • Load Balancing: Utilizes least-load and sticky strategies for balanced task distribution.
  • Priority Queue: Executes tasks based on their priority levels.
  • Kubernetes Integration: Supports rolling updates by ensuring all tasks are running before proceeding.
  • Mutex-Based Worker Registration: Prevents worker overload by limiting the number of workers registered at any given time.

How It Works 🛠️

  1. Initial Setup: Workers gather at a barrier to prevent premature scheduling.
  2. Re-Balancing: Workers re-balance after an initial wait period.
  3. Task Execution: Workers start tasks based on priority.
  4. Load Distribution: Least-load algorithm ensures no worker is overburdened.
  5. Sticky Strategy: Minimizes changes during reassignments to maintain stability.
  6. Worker Registration: Uses a mutex to check worker count before registration in etcd.

Getting Started 🚀

To get started with eScheduler, clone the repository and follow the setup instructions in the installation guide.

License 📄

This project is licensed under the Apache-2.0 License - see the LICENSE file for details.

Contributing 🤝

We welcome contributions! Please see our contributing guidelines for more details.

Contact ✉️

For any inquiries or support, please open an issue on the GitHub repository.


Made with ❤️ by the eScheduler team.

Documentation

Index

Constants

View Source
const (
	WorkerStatusNew         = "new"          // 0
	WorkerStatusRegistered  = "registered"   // 1
	WorkerStatusInBarrier   = "in_barrier"   // 2
	WorkerStatusLeftBarrier = "left_barrier" // 3
	WorkerStatusDead        = "dead"         // 4
)
View Source
const (
	ActionNew     = 1
	ActionDeleted = 2
)
View Source
const (
	ReasonFirstSchedule = "first schedule"
)
View Source
const (
	WorkerValueRunning = "running"
)

Variables

View Source
var (
	ErrWorkerNumExceedMaximum = errors.New("worker num exceed maximum")
)

Functions

func GetLocalIP

func GetLocalIP() (ipv4 string, err error)

GetLocalIP get local ip

func GetWorkerBarrierLeftKey added in v0.1.22

func GetWorkerBarrierLeftKey(rootName string) string

GetWorkerBarrierLeftKey /kline-pump-20220628/worker_barrier_left

func GetWorkerBarrierName added in v0.1.8

func GetWorkerBarrierName(rootName string) string

GetWorkerBarrierName /kline-pump/20220628/worker_barrier

func NewPathParser added in v0.1.60

func NewPathParser(rootName string) *pathParser

Types

type Coordinator added in v0.1.60

type Coordinator struct {
	sync.Mutex
	// contains filtered or unexported fields
}

func NewCoordinator added in v0.1.60

func NewCoordinator(replicas int) *Coordinator

func (*Coordinator) GetReBalanceResult added in v0.1.60

func (c *Coordinator) GetReBalanceResult(workers []string, generatedTaskMap map[string]Task, oldAssignMap map[string][]string) (toDeleteWorkerAllTask []string, toDeleteTask map[string][]string, toAddTask map[string][]string, err error)

GetReBalanceResult workerList current online worker list, elements are worker's name taskMap current task collection, key is ID ,value is task taskPathResp current assigned state taskPathResp []kv key: /Root/task/worker-0/task-abbr-1 value: task raw data for task 1

type Generator added in v0.1.8

type Generator func(ctx context.Context) ([]Task, error)

type Hash added in v0.1.60

type Hash func(data []byte) uint32

type HashRing added in v0.1.60

type HashRing struct {
	// contains filtered or unexported fields
}

func NewHashRing added in v0.1.60

func NewHashRing(replicas int, fn Hash) *HashRing

func (*HashRing) Empty added in v0.1.60

func (r *HashRing) Empty() bool

Empty Whether there are nodes on the hash ring

func (*HashRing) Get added in v0.1.60

func (r *HashRing) Get(key string) string

Get Get the node corresponding to Key

func (*HashRing) Reset added in v0.1.60

func (r *HashRing) Reset(nodes ...string)

Empty the hash ring first and then set it

type Master added in v0.1.41

type Master struct {
	Node

	// balancer
	RoundRobinBalancer balancer.Balancer
	HashBalancer       balancer.Balancer
	// contains filtered or unexported fields
}

func NewMaster added in v0.1.41

func NewMaster(config MasterConfig, node Node) (*Master, error)

NewMaster create a scheduler

func (*Master) Campaign added in v0.1.58

func (m *Master) Campaign(ctx context.Context) (err error)

Campaign

@Description: 竞选期间有子context,其控制的生命周期包括
1. 成为leader 2. 监听workers的变动,通知chan进行调度 3.定时调度  3. 打印
@receiver s
@param ctx
@return error

func (*Master) ElectionKey added in v0.1.58

func (s *Master) ElectionKey() string

func (*Master) NotifySchedule added in v0.1.41

func (m *Master) NotifySchedule(request string)

func (*Master) Start added in v0.1.41

func (m *Master) Start()

Start The endless loop is for trying to election. lifecycle 1. if outer ctx done, scheduler done 2. if leader changed to the other, leader ctx done

func (*Master) Stop added in v0.1.41

func (m *Master) Stop()

type MasterConfig added in v0.1.41

type MasterConfig struct {
	// Interval configures interval of schedule task.
	// If Interval is <= 0, the default 60 seconds Interval will be used.
	Interval      time.Duration
	Timeout       time.Duration // The maximum time to schedule once
	Generator     Generator
	ReBalanceWait time.Duration
	Replicas      int // replica count per node in consistent hash ring
}

func (*MasterConfig) Validation added in v0.1.41

func (sc *MasterConfig) Validation() error

type Node

type Node struct {
	EtcdConfig clientv3.Config
	RootName   string
	// TTL configures the session's TTL in seconds.
	// If TTL is <= 0, the default 60 seconds TTL will be used.
	TTL int64 // worker registered in etcd

	MaxNumNodes int    // total worker num + 1 scheduler
	Name        string // if not set, default {ip}-{pid}
	// contains filtered or unexported fields
}

func (*Node) GetDefaultName added in v0.1.31

func (n *Node) GetDefaultName() (string, error)

func (*Node) Validate added in v0.1.57

func (n *Node) Validate() error

type RawData

type RawData []byte

type Task

type Task struct {
	P     float64
	Key   string  // if not empty, will use hash-rebalance
	Group string  //
	ID    string  // a short name which uniquely identify the task
	Raw   RawData // task value, []byte
}

type TaskChange

type TaskChange struct {
	Action int // 1 new 2 deleted
	Task
}

func (TaskChange) CreatedTask added in v0.1.14

func (t TaskChange) CreatedTask() (Task, bool)

func (TaskChange) DeletedTask added in v0.1.14

func (t TaskChange) DeletedTask() (string, bool)

func (TaskChange) String added in v0.1.18

func (t TaskChange) String() string

type WatchEvent added in v0.1.14

type WatchEvent interface {
	CreatedTask() (Task, bool)
	DeletedTask() (string, bool)
}

type Watcher added in v0.1.26

type Watcher struct {
	EventChan <-chan *clientv3.Event // output event channel

	IncipientKVs []*mvccpb.KeyValue // initial kv with prefix
	// contains filtered or unexported fields
}

func NewWatcher added in v0.1.26

func NewWatcher(ctx context.Context, client *clientv3.Client, pathPrefix string) (*Watcher, error)

NewWatcher 关于 watch 哪个版本: watch 某一个 key 时,想要从历史记录开始就用 CreateRevision,最新一条(这一条直接返回) 开始就用 ModRevision 。 watch 某个前缀,就必须使用 Revision。如果要watch当前前缀后续的变化,则应该从当前集群的 Revision+1 版本开始watch。

type Worker

type Worker struct {
	Node
	// contains filtered or unexported fields
}

func NewWorker

func NewWorker(node Node) (*Worker, error)

NewWorker create a worker

func (*Worker) Add added in v0.1.57

func (w *Worker) Add(task Task)

func (*Worker) Del added in v0.1.57

func (w *Worker) Del(id string)

func (*Worker) IsAllRunning added in v0.1.57

func (w *Worker) IsAllRunning() bool

func (*Worker) SetStatus added in v0.1.57

func (w *Worker) SetStatus(status string)

func (*Worker) Start

func (w *Worker) Start()

func (*Worker) Status added in v0.1.16

func (w *Worker) Status() string

func (*Worker) Stop

func (w *Worker) Stop()

func (*Worker) Tasks added in v0.1.23

func (w *Worker) Tasks(ctx context.Context) (map[string]struct{}, error)

func (*Worker) TryLeaveBarrier added in v0.1.16

func (w *Worker) TryLeaveBarrier(d time.Duration) bool

func (*Worker) WatchTask added in v0.1.16

func (w *Worker) WatchTask() <-chan WatchEvent

type WorkerTask added in v0.1.49

type WorkerTask struct {
	Task
	// contains filtered or unexported fields
}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL