workers

package
v0.0.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 4, 2026 License: AGPL-3.0 Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrWorkerNotFound    = fmt.Errorf("worker not found")
	ErrWorkerBanned      = fmt.Errorf("worker is banned")
	ErrWorkerBusy        = fmt.Errorf("worker is busy")
	ErrWorkerJobMismatch = fmt.Errorf("worker job mismatch")
	ErrJobNotFound       = fmt.Errorf("job not found")
)
View Source
var DefaultWorkerBanRules = &WorkerBanRules{
	BanTimeout:          30 * time.Minute,
	FailuresToGetBanned: 3,
}

DefaultWorkerBanRules provides the default ban rules for workers

Functions

func DecodeWorkerAuthToken

func DecodeWorkerAuthToken(bToken types.HexBytes) (*ethereum.ECDSASignature, time.Time, error)

DecodeWorkerAuthToken function decodes a worker authentication token into its signature and timestamp. If the token is invalid, an error is returned. It returns the decoded signature and timestamp.

func EncodeWorkerAuthToken

func EncodeWorkerAuthToken(signature *ethereum.ECDSASignature, timestamp time.Time) (types.HexBytes, error)

EncodeWorkerAuthToken function encodes a worker authentication token from the provided signature and timestamp. It concatenates the signature bytes with the timestamp formatted as a byte slice. If the signature is nil or has an invalid length, an error is returned. It returns the encoded token as a byte slice.

func TimestampToSufix

func TimestampToSufix(t time.Time) types.HexBytes

TimestampToSufix function encodes a time.Time value into a byte slice that can be used as a suffix for the worker authentication token. The timestamp is formatted using the RFC3339FixedNano format and converted to a byte slice. The resulting byte slice has a fixed length defined by timestampLen.

func ValidWorkerAddress

func ValidWorkerAddress(address string) (common.Address, error)

ValidWorkerAddress checks if the provided address is a valid Ethereum address.

func VerifyWorkerHexToken

func VerifyWorkerHexToken(hexToken, strWorkerAddr string, seqAddr common.Address) (bool, time.Time, error)

VerifyWorkerHexToken function is a wrapper to VerifyWorkerToken that accepts a hex-encoded token and a string representation of the worker address.

func VerifyWorkerToken

func VerifyWorkerToken(bToken types.HexBytes, workerAddr, seqAddr common.Address) (bool, time.Time, error)

VerifyWorkerToken function verifies a worker authentication token against the worker's address and the sequencer's address. It decodes the token to get the signature and timestamp, then calculates the expected signature message with the sequencer address and timestamp. Then validates the signature against the worker address using the calculated message. It returns a boolean that indicates if the signature is valid or not and the timestamp. If something fails, returns an error.

func WorkerAuthTokenData

func WorkerAuthTokenData(sequencerAddress common.Address, timestamp time.Time) (string, string, types.HexBytes)

WorkersAuthTokenData function prepares the required data to generate a worker authtoken for the current sequencer. It takes the timestamp provided and encode it as token suffix, but also includes it in the signature message. It returns the signature message, the timestamp formatted as a string, and the token suffix as a byte slice.

func WorkerNameFromAddress

func WorkerNameFromAddress(address string) (string, error)

WorkerNameFromAddress generates a worker name by masking all but the last 4 hexadecimal characters of the provided Ethereum address as string.

func WorkerSeedToUUID

func WorkerSeedToUUID(seed string) (*uuid.UUID, error)

WorkerSeedToUUID converts a worker seed string into a UUID. It uses the first 16 bytes of the SHA256 hash of the seed to create a UUID.

Types

type JobsManager

type JobsManager struct {
	FailedJobs    chan *WorkerJob // Channel to handle failed jobs
	JobTimeout    time.Duration   // Duration after which a job is considered timed out
	WorkerManager *WorkerManager  // Reference to the worker manager for job tracking
	// contains filtered or unexported fields
}

JobsManager manages worker jobs, including job registration, completion, and timeout handling. It also interacts with the worker manager to track worker availability and job results.

func NewJobsManager

func NewJobsManager(storage *storage.Storage, jobTimeout time.Duration, banRules *WorkerBanRules, tickerInterval ...time.Duration) *JobsManager

NewJobsManager creates a new jobs manager with the specified job timeout and ticker interval. If no ticker interval is provided, it defaults to 10 seconds. It initializes an internal worker manager with default ban rules.

func (*JobsManager) CompleteJob

func (jm *JobsManager) CompleteJob(voteID types.VoteID, success bool) *WorkerJob

CompleteJob marks a job as completed, either successfully or with failure. It looks up the job by its vote ID, updates the worker manager with the result, and removes the job from the pending jobs map. If the job is not found, it logs a warning and returns nil. If the job is marked as failed, it sends the job to the failed jobs channel for further processing. This function is called when a worker completes a job, either successfully or with failure.

func (*JobsManager) IsWorkerAvailable

func (jm *JobsManager) IsWorkerAvailable(workerAddr string) (bool, error)

IsWorkerAvailable checks if a worker is available for a new job. It verifies if the worker exists, is not banned, and does not have any pending jobs.

func (*JobsManager) Job

func (jm *JobsManager) Job(workerAddr string, voteID types.VoteID) (*WorkerJob, error)

Job retrieves a job by its vote ID and verifies that it is assigned to the specified worker. If the job is found and matches the worker, it is returned.

func (*JobsManager) RegisterJob

func (jm *JobsManager) RegisterJob(workerAddr string, voteID types.VoteID) (*WorkerJob, error)

RegisterJob registers a new job for a worker. It checks if the worker is available and not banned. If the worker is valid, it creates a new job with the provided vote ID, assigns it to the worker, and sets an expiration time for the job. The job is then added to the pending jobs map. If the worker is banned returns nil.

func (*JobsManager) Start

func (jm *JobsManager) Start(ctx context.Context)

Start initializes the jobs manager, starts the worker manager, and begins a goroutine to periodically check for job timeouts. It uses a context to manage the lifecycle of the jobs manager, allowing it to be stopped gracefully.

func (*JobsManager) Stop

func (jm *JobsManager) Stop()

Stop gracefully stops the jobs manager, clearing all pending jobs and stopping the worker manager. It also ensures that the failed jobs channel is closed only once using sync.Once. This prevents any potential concurrent write to a closed channel panic.

type Worker

type Worker struct {
	Address string
	Name    string // Name of the worker for identification
	// contains filtered or unexported fields
}

Worker represents a Worker that processes jobs

func (*Worker) GetBannedUntil

func (w *Worker) GetBannedUntil() time.Time

GetBannedUntil returns the ban expiration time as a time.Time

func (*Worker) IsBanned

func (w *Worker) IsBanned(rules *WorkerBanRules) bool

IsBanned checks if the worker is banned based on the provided rules

func (*Worker) SetBannedUntil

func (w *Worker) SetBannedUntil(t time.Time)

SetBannedUntil sets the ban expiration time atomically

func (*Worker) SetConsecutiveFails

func (w *Worker) SetConsecutiveFails() int

SetConsecutiveFails returns the current consecutive failure count

type WorkerBanRules

type WorkerBanRules struct {
	BanTimeout          time.Duration // Duration for which the worker is banned
	FailuresToGetBanned int           // Maximum consecutive failed jobs before banning
}

WorkerBanRules defines the rules for banning workers. It includes the duration for which a worker is banned and the maximum number of consecutive failed jobs before a worker is banned.

type WorkerInfo

type WorkerInfo struct {
	Address      string `json:"address"`
	Name         string `json:"name"`
	SuccessCount int64  `json:"successCount"`
	FailedCount  int64  `json:"failedCount"`
}

type WorkerJob

type WorkerJob struct {
	VoteID     types.VoteID
	Address    string
	Timestamp  time.Time
	Expiration time.Time // When the job should expire
}

WorkerJob represents a job assigned to a worker. It contains the vote ID, worker address, timestamp, and expiration time.

type WorkerManager

type WorkerManager struct {
	// contains filtered or unexported fields
}

WorkerManager manages workers and their ban status. It tracks workers, bans them based on rules, and resets their status after the ban period.

func NewWorkerManager

func NewWorkerManager(stg *storage.Storage, rules *WorkerBanRules, tickerInterval ...time.Duration) *WorkerManager

NewWorkerManager creates a new worker manager with the specified ban rules. It initializes the worker map and sets up the context for managing workers. An optional ticker interval can be provided; defaults to 10 seconds if not specified.

func (*WorkerManager) AddWorker

func (wm *WorkerManager) AddWorker(address, name string) *Worker

AddWorker adds a new worker to the manager. If the worker already exists, it returns the existing worker without adding a new one. If it's a new worker, it initializes a new worker instance, stores it in the worker map, and returns the worker instance.

func (*WorkerManager) BannedWorkers

func (wm *WorkerManager) BannedWorkers() []*Worker

BannedWorkers returns a slice of workers that are currently banned based on the ban rules. It iterates through all workers in the manager, checks if they are banned according to the rules, and collects them in a slice.

func (*WorkerManager) GetWorker

func (wm *WorkerManager) GetWorker(address string) (*Worker, bool)

GetWorker retrieves a worker by its address. If the worker exists, it returns the worker instance and a boolean indicating success. If the worker does not exist, it returns nil and false.

func (*WorkerManager) ListWorkerStats

func (wm *WorkerManager) ListWorkerStats() ([]*WorkerInfo, error)

ListWorkerStats retrieves the statistics for all workers managed by the WorkerManager. It returns a slice of WorkerInfo containing the address, name, success count, and failed count for each worker. If there is an error retrieving the statistics, it returns an error.

func (*WorkerManager) ResetWorker

func (wm *WorkerManager) ResetWorker(address string)

ResetWorker resets the worker's status by creating a new worker instance with the same address. This effectively clears the worker's consecutive fails and banned status, allowing the worker to be reused without any previous restrictions. It is typically called when a worker has been banned and the ban period has expired, or when a worker needs to be reset for any reason.

func (*WorkerManager) SetBanDuration

func (wm *WorkerManager) SetBanDuration(address string)

SetBanDuration sets the ban duration for a worker. It updates the worker's ban expiration time to the current time plus the ban timeout defined in the rules. This effectively bans the worker for the specified duration, preventing it from processing jobs until the ban period expires.

func (*WorkerManager) Start

func (wm *WorkerManager) Start(ctx context.Context)

Start initializes the worker manager, setting up a context for managing workers. It starts a goroutine that periodically checks for banned workers, bans them if necessary, and resets their status after the ban period.

func (*WorkerManager) Stop

func (wm *WorkerManager) Stop()

Stop stops the worker manager, cancels the context, and clears all workers. It ensures that all workers are removed and no further actions are taken. This is typically called when the application is shutting down or when the worker manager is no longer needed.

func (*WorkerManager) WorkerResult

func (wm *WorkerManager) WorkerResult(address string, success bool) error

WorkerResult updates the worker's status based on the result of a job. If the job was successful, it resets the worker's consecutive fails to zero. If the job failed, it increments the worker's consecutive fails count. This method uses atomic operations to ensure thread safety.

func (*WorkerManager) WorkerStats

func (wm *WorkerManager) WorkerStats(address string) (*WorkerInfo, error)

WorkerStats retrieves the statistics for a specific worker by its ID. The statistics include the worker's name, success count, and failed count. If the worker does not exist, it returns an error indicating that the worker was not found.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL