datasetworker

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2023 License: Apache-2.0, MIT Imports: 22 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrorMessageKey = map[WorkType]string{
	WorkTypeScan: "error_message",
	WorkTypePack: "error_message",
	WorkTypeDag:  "dag_gen_error_message",
}
View Source
var WorkModel = map[WorkType]func() any{
	WorkTypeScan: func() any { return &model.Source{} },
	WorkTypePack: func() any { return &model.PackJob{} },
	WorkTypeDag:  func() any { return &model.Source{} },
}
View Source
var WorkStateKey = map[WorkType]string{
	WorkTypeScan: "scanning_state",
	WorkTypePack: "packing_state",
	WorkTypeDag:  "dag_gen_state",
}
View Source
var WorkerIDKey = map[WorkType]string{
	WorkTypeScan: "scanning_worker_id",
	WorkTypePack: "packing_worker_id",
	WorkTypeDag:  "dag_gen_worker_id",
}

Functions

This section is empty.

Types

type Config added in v0.3.0

type Config struct {
	Concurrency    int
	ExitOnComplete bool
	EnableScan     bool
	EnablePack     bool
	EnableDag      bool
	ExitOnError    bool
}

type Thread added in v0.3.0

type Thread struct {
	// contains filtered or unexported fields
}

func (*Thread) ExportDag added in v0.3.0

func (w *Thread) ExportDag(ctx context.Context, source model.Source) error

ExportDag exports a Directed Acyclic Graph (DAG) for a given source. The function takes a source, iterates through the related directories (as rows from the database), and constructs the DAG in the form of a CAR (Content Addressable Archive) file. This CAR file represents the block structure of the data.

The function: - Initializes necessary components like writers and calculators - Iterates through the directories linked with the source and fetches blocks - Writes the blocks into a CAR file - Closes the CAR file and renames it appropriately - Saves the CAR meta-information into the database

Parameters: - ctx context.Context: The context to control cancellations and timeouts. - source model.Source: The source for which the DAG needs to be generated.

The function performs several database and file system operations, each of which might result in an error. Errors are wrapped with context information and returned.

Returns: - error: Standard error interface, returns nil if no error occurred during execution.

func (*Thread) Name added in v0.3.0

func (w *Thread) Name() string

func (*Thread) Start added in v0.3.0

func (w *Thread) Start(ctx context.Context) ([]service.Done, service.Fail, error)

Start initializes and starts the execution of a worker thread. This function: 1. Creates a cancellable context derived from the input context. 2. Registers the worker with a health check service, providing a state function for reporting its status. 3. Launches separate goroutines to report health status, clean up old health check records, execute the worker's task, and handle cleanup. 4. Returns channels that are closed when the health reporting, health check cleanup, worker execution, and worker cleanup are complete.

Parameters:

ctx : The parent context for this thread, used to propagate cancellations.

Returns:

[]service.Done : A slice of channels that are closed when respective components of the worker complete their execution.
service.Fail   : A channel that receives an error if the worker encounters a failure during its execution.
error          : An error is returned if the worker fails to register with the health check service. Otherwise, it returns nil.

type WorkType added in v0.3.0

type WorkType string
const (
	WorkTypeNone WorkType = ""
	WorkTypeScan WorkType = "scan"
	WorkTypePack WorkType = "pack"
	WorkTypeDag  WorkType = "ExportDag"
)

type Worker added in v0.3.0

type Worker struct {
	// contains filtered or unexported fields
}

func NewWorker added in v0.3.0

func NewWorker(db *gorm.DB, config Config) *Worker

func (Worker) Run added in v0.3.0

func (w Worker) Run(ctx context.Context) error

Run initializes and starts a set of worker threads based on the Concurrency specified in the configuration. This function: 1. Creates an array of worker threads, each having a unique identifier. 2. Initializes each thread with a shared set of dependencies (e.g., database, logger) and individual configuration. 3. Invokes the StartServers function to run all the threads, passing the initialized threads and a logger.

Parameters:

ctx : The context under which all the worker threads are run, used to propagate cancellations.

Returns:

error : An error is returned if the StartServers function encounters an issue while starting the threads. Otherwise, it returns nil.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL