worker

package
v0.5.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2025 License: Apache-2.0 Imports: 39 Imported by: 0

Documentation

Index

Constants

View Source
const (
	LabelStep = "step"
	LabelData = "data"
)

Variables

View Source
var (
	UpdateUserRecommendTotal = promauto.NewGauge(prometheus.GaugeOpts{
		Namespace: "gorse",
		Subsystem: "worker",
		Name:      "update_user_recommend_total",
	})
	OfflineRecommendStepSecondsVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: "gorse",
		Subsystem: "worker",
		Name:      "offline_recommend_step_seconds",
	}, []string{LabelStep})
	OfflineRecommendTotalSeconds = promauto.NewGauge(prometheus.GaugeOpts{
		Namespace: "gorse",
		Subsystem: "worker",
		Name:      "offline_recommend_total_seconds",
	})
	MemoryInuseBytesVec = promauto.NewGaugeVec(prometheus.GaugeOpts{
		Namespace: "gorse",
		Subsystem: "worker",
		Name:      "memory_inuse_bytes",
	}, []string{LabelData})
)

Functions

This section is empty.

Types

type HealthStatus

type HealthStatus struct {
	Ready               bool
	DataStoreError      error
	CacheStoreError     error
	DataStoreConnected  bool
	CacheStoreConnected bool
}

type ItemCache

type ItemCache struct {
	Client data.Database
	Data   sync.Map
}

ItemCache is alias of map[string]data.Item.

func NewItemCache

func NewItemCache(client data.Database) *ItemCache

NewItemCache creates a new ItemCache.

func (*ItemCache) GetMap

func (c *ItemCache) GetMap(itemIds []string) (map[string]*data.Item, error)

func (*ItemCache) GetSlice

func (c *ItemCache) GetSlice(itemIds []string) ([]*data.Item, error)

type Pipeline

type Pipeline struct {
	Config                   *config.Config
	CacheClient              cache.Database
	DataClient               data.Database
	Tracer                   *monitor.Monitor
	Jobs                     int
	MatrixFactorizationItems *logics.MatrixFactorizationItems
	MatrixFactorizationUsers *logics.MatrixFactorizationUsers
	ClickThroughRateModel    ctr.FactorizationMachines
	// contains filtered or unexported fields
}

func (*Pipeline) Recommend

func (p *Pipeline) Recommend(users []data.User, progress func(completed, throughput int))

type Worker

type Worker struct {
	Pipeline
	// contains filtered or unexported fields
}

Worker manages states of a worker node.

func NewWorker

func NewWorker(
	masterHost string,
	masterPort int,
	httpHost string,
	httpPort int,
	jobs int,
	cacheFile string,
	tlsConfig *util.TLSConfig,
	interval time.Duration,
) *Worker

NewWorker creates a new worker node.

func (*Worker) Pull

func (w *Worker) Pull()

Pull user index and ranking model from master.

func (*Worker) Serve

func (w *Worker) Serve()

Serve as a worker node.

func (*Worker) ServeHTTP

func (w *Worker) ServeHTTP()

ServeHTTP serves Prometheus metrics and API.

func (*Worker) Sync

func (w *Worker) Sync()

Sync this worker to the master.

func (*Worker) WorkerName

func (w *Worker) WorkerName() (string, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL