indexer

package
v0.16.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 5, 2025 License: Apache-2.0 Imports: 12 Imported by: 0

README

Node Indexers Manager

Initially the node periodically reports its data to the chain, data like capacity, uptime, location, ...etc and then the chain events is processed by graphql-processor to dump these data among with others data for farms/contracts/twins to a postgres database which we use to serve both graphql-api and proxy-api. Things looks fine, but when it comes to a bigger data like gpu/dmi it is not the best solution to store these data on the chain. And that what the Node-Indexers solves by periodically calling the nodes based on a configurable interval to get the data and store it on the same postgres database and then it can be served to apis. only proxy-api for now.

The indexer structure

Each indexer has two clients:

  • Database: a client to the postgres db.
  • RmbClient: an rmb client used to make the node calls.

three channels:

  • IdChan: it collects the twin ids for the nodes the indexer will call.
  • ResultChan: it collects the results returned by the rmb call to the node.
  • BatchChan: transfer batches of results ready to directly upserted.

four types of workers:

  • Finder: this worker calls the database to filter nodes and push its data to the IdChan
  • Getter: this worker pop the twins from IdChan and call the node with the RmbClient to get data and then push the result to ResultChan
  • Batcher: this worker collect results from ResultChan in batches and send it to the BatchChan
  • Upserter: this worker get data from BatchChan then update/insert to the Database

The indexer struct is generic and each indexer functionality differ from the others based on its Work. Work a struct that implement the interface Work which have three methods:

  • Finders: this is a map of string and interval to decide which finders this node should use.
  • Get: a method that prepare the payload from rmb call and parse the response to return a ready db model data.
  • Upsert: calling the equivalent db upserting method with the ability to remove old expired data.

Registered Indexers

  1. Gpu indexer:
    • Function: query the gpu list on node.
    • Interval: 60 min
    • Other triggers: new node is added (check every 5m).
    • Default caller worker number: 5
    • Dump table: node_gpu
  2. Health indexer:
    • Function: decide the node health based on its internal state.
    • Interval: 5 min
    • Default caller worker number: 100
    • Dump table: health_report
  3. Dmi indexer:
    • Function: collect some hardware data from the node.
    • Interval: 1 day
    • Other triggers: new node is added (check every 5m).
    • Default caller worker number: 1
    • Dump table: dmi
  4. Speed indexer:
    • Function: get the network upload/download speed on the node tested against iperf server.
    • Interval: 5 min
    • Default caller worker number: 100
    • Dump table: speed
  5. Ipv6 indexer:
    • Function: decide if the node has ipv6 or not.
    • Interval: 1 day
    • Default caller worker number: 10
    • Dump table: node_ipv6
  6. Workloads indexer:
    • Function: get the number of workloads on each node.
    • Interval: 1 hour
    • Default caller worker number: 10
    • Dump table: node_workloads
  7. Features indexer:
    • Function: get the supported features on each node.
    • Interval: 1 day
    • Default caller worker number: 10
    • Dump table: node_features

Documentation

Index

Constants

View Source
const (
	DmiCallCmd = "zos.system.dmi"
)

Variables

This section is empty.

Functions

This section is empty.

Types

type CPUUtilizationPercent added in v0.15.16

type CPUUtilizationPercent struct {
	HostTotal    float64 `json:"host_total"`
	HostUser     float64 `json:"host_user"`
	HostSystem   float64 `json:"host_system"`
	RemoteTotal  float64 `json:"remote_total"`
	RemoteUser   float64 `json:"remote_user"`
	RemoteSystem float64 `json:"remote_system"`
}

type DMIWork

type DMIWork struct {
	// contains filtered or unexported fields
}

func NewDMIWork

func NewDMIWork(interval uint) *DMIWork

func (*DMIWork) Finders

func (w *DMIWork) Finders() map[string]time.Duration

func (*DMIWork) Get

func (w *DMIWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.Dmi, error)

func (*DMIWork) Upsert

func (w *DMIWork) Upsert(ctx context.Context, db db.Database, batch []types.Dmi) error

type FeatureWork added in v0.15.18

type FeatureWork struct {
	// contains filtered or unexported fields
}

func NewFeatureWork added in v0.15.18

func NewFeatureWork(interval uint) *FeatureWork

func (*FeatureWork) Finders added in v0.15.18

func (w *FeatureWork) Finders() map[string]time.Duration

func (*FeatureWork) Get added in v0.15.18

func (w *FeatureWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.NodeFeatures, error)

func (*FeatureWork) Upsert added in v0.15.18

func (w *FeatureWork) Upsert(ctx context.Context, db db.Database, batch []types.NodeFeatures) error

type Finder

type Finder func(context.Context, time.Duration, db.Database, chan uint32)

type GPUWork

type GPUWork struct {
	// contains filtered or unexported fields
}

func NewGPUWork

func NewGPUWork(interval uint, db db.Database) *GPUWork

func (*GPUWork) Finders

func (w *GPUWork) Finders() map[string]time.Duration

func (*GPUWork) Get

func (w *GPUWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.NodeGPU, error)

func (*GPUWork) Upsert

func (w *GPUWork) Upsert(ctx context.Context, db db.Database, batch []types.NodeGPU) error

type HealthWork

type HealthWork struct {
	// contains filtered or unexported fields
}

func NewHealthWork

func NewHealthWork(interval uint) *HealthWork

func (*HealthWork) Finders

func (w *HealthWork) Finders() map[string]time.Duration

func (*HealthWork) Get

func (w *HealthWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.HealthReport, error)

func (*HealthWork) Upsert

func (w *HealthWork) Upsert(ctx context.Context, db db.Database, batch []types.HealthReport) error

type Indexer

type Indexer[T any] struct {
	// contains filtered or unexported fields
}

func NewIndexer

func NewIndexer[T any](
	work Work[T],
	name string,
	db db.Database,
	rmb *peer.RpcClient,
	worker uint,
) *Indexer[T]

func (*Indexer[T]) Start

func (i *Indexer[T]) Start(ctx context.Context)

type IperfResult added in v0.15.16

type IperfResult struct {
	UploadSpeed   float64               `json:"upload_speed"`   // in bit/sec
	DownloadSpeed float64               `json:"download_speed"` // in bit/sec
	NodeID        uint32                `json:"node_id"`
	NodeIpv4      string                `json:"node_ip"`
	TestType      string                `json:"test_type"`
	Error         string                `json:"error"`
	CpuReport     CPUUtilizationPercent `json:"cpu_report"`
}

type Ipv6Work added in v0.15.1

type Ipv6Work struct {
	// contains filtered or unexported fields
}

func NewIpv6Work added in v0.15.1

func NewIpv6Work(interval uint) *Ipv6Work

func (*Ipv6Work) Finders added in v0.15.1

func (w *Ipv6Work) Finders() map[string]time.Duration

func (*Ipv6Work) Get added in v0.15.1

func (w *Ipv6Work) Get(ctx context.Context, rmb *peer.RpcClient, id uint32) ([]types.HasIpv6, error)

func (*Ipv6Work) Upsert added in v0.15.1

func (w *Ipv6Work) Upsert(ctx context.Context, db db.Database, batch []types.HasIpv6) error

type LocationWork added in v0.16.5

type LocationWork struct {
	// contains filtered or unexported fields
}

func NewLocationWork added in v0.16.5

func NewLocationWork(interval uint) *LocationWork

func (*LocationWork) Finders added in v0.16.5

func (w *LocationWork) Finders() map[string]time.Duration

func (*LocationWork) Get added in v0.16.5

func (*LocationWork) Upsert added in v0.16.5

func (w *LocationWork) Upsert(ctx context.Context, db db.Database, batch []types.NodeLocation) error

type SpeedWork

type SpeedWork struct {
	// contains filtered or unexported fields
}

func NewSpeedWork

func NewSpeedWork(interval uint) *SpeedWork

func (*SpeedWork) Finders

func (w *SpeedWork) Finders() map[string]time.Duration

func (*SpeedWork) Get

func (w *SpeedWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.Speed, error)

func (*SpeedWork) Upsert

func (w *SpeedWork) Upsert(ctx context.Context, db db.Database, batch []types.Speed) error

type TaskResult added in v0.15.16

type TaskResult struct {
	Name        string      `json:"name"`
	Description string      `json:"description"`
	Timestamp   uint64      `json:"timestamp"`
	Result      interface{} `json:"result"`
}

type Work

type Work[T any] interface {
	Finders() map[string]time.Duration
	Get(ctx context.Context, rmb *peer.RpcClient, id uint32) ([]T, error)
	Upsert(ctx context.Context, db db.Database, batch []T) error
}

type WorkloadWork added in v0.15.3

type WorkloadWork struct {
	// contains filtered or unexported fields
}

func NewWorkloadWork added in v0.15.3

func NewWorkloadWork(interval uint) *WorkloadWork

func (*WorkloadWork) Finders added in v0.15.3

func (w *WorkloadWork) Finders() map[string]time.Duration

func (*WorkloadWork) Get added in v0.15.3

func (w *WorkloadWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32) ([]types.NodesWorkloads, error)

func (*WorkloadWork) Upsert added in v0.15.3

func (w *WorkloadWork) Upsert(ctx context.Context, db db.Database, batch []types.NodesWorkloads) error

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL