Documentation
¶
Index ¶
- Variables
- func DecodeWorkerAuthToken(bToken types.HexBytes) (*ethereum.ECDSASignature, time.Time, error)
- func EncodeWorkerAuthToken(signature *ethereum.ECDSASignature, timestamp time.Time) (types.HexBytes, error)
- func TimestampToSufix(t time.Time) types.HexBytes
- func ValidWorkerAddress(address string) (common.Address, error)
- func VerifyWorkerHexToken(hexToken, strWorkerAddr string, seqAddr common.Address) (bool, time.Time, error)
- func VerifyWorkerToken(bToken types.HexBytes, workerAddr, seqAddr common.Address) (bool, time.Time, error)
- func WorkerAuthTokenData(sequencerAddress common.Address, timestamp time.Time) (string, string, types.HexBytes)
- func WorkerNameFromAddress(address string) (string, error)
- func WorkerSeedToUUID(seed string) (*uuid.UUID, error)
- type JobsManager
- func (jm *JobsManager) CompleteJob(voteID types.VoteID, success bool) *WorkerJob
- func (jm *JobsManager) IsWorkerAvailable(workerAddr string) (bool, error)
- func (jm *JobsManager) Job(workerAddr string, voteID types.VoteID) (*WorkerJob, error)
- func (jm *JobsManager) RegisterJob(workerAddr string, voteID types.VoteID) (*WorkerJob, error)
- func (jm *JobsManager) Start(ctx context.Context)
- func (jm *JobsManager) Stop()
- type Worker
- type WorkerBanRules
- type WorkerInfo
- type WorkerJob
- type WorkerManager
- func (wm *WorkerManager) AddWorker(address, name string) *Worker
- func (wm *WorkerManager) BannedWorkers() []*Worker
- func (wm *WorkerManager) GetWorker(address string) (*Worker, bool)
- func (wm *WorkerManager) ListWorkerStats() ([]*WorkerInfo, error)
- func (wm *WorkerManager) ResetWorker(address string)
- func (wm *WorkerManager) SetBanDuration(address string)
- func (wm *WorkerManager) Start(ctx context.Context)
- func (wm *WorkerManager) Stop()
- func (wm *WorkerManager) WorkerResult(address string, success bool) error
- func (wm *WorkerManager) WorkerStats(address string) (*WorkerInfo, error)
Constants ¶
This section is empty.
Variables ¶
var ( ErrWorkerNotFound = fmt.Errorf("worker not found") ErrWorkerBanned = fmt.Errorf("worker is banned") ErrWorkerBusy = fmt.Errorf("worker is busy") ErrWorkerJobMismatch = fmt.Errorf("worker job mismatch") ErrJobNotFound = fmt.Errorf("job not found") )
var DefaultWorkerBanRules = &WorkerBanRules{ BanTimeout: 30 * time.Minute, FailuresToGetBanned: 3, }
DefaultWorkerBanRules provides the default ban rules for workers
Functions ¶
func DecodeWorkerAuthToken ¶
DecodeWorkerAuthToken function decodes a worker authentication token into its signature and timestamp. If the token is invalid, an error is returned. It returns the decoded signature and timestamp.
func EncodeWorkerAuthToken ¶
func EncodeWorkerAuthToken(signature *ethereum.ECDSASignature, timestamp time.Time) (types.HexBytes, error)
EncodeWorkerAuthToken function encodes a worker authentication token from the provided signature and timestamp. It concatenates the signature bytes with the timestamp formatted as a byte slice. If the signature is nil or has an invalid length, an error is returned. It returns the encoded token as a byte slice.
func TimestampToSufix ¶
TimestampToSufix function encodes a time.Time value into a byte slice that can be used as a suffix for the worker authentication token. The timestamp is formatted using the RFC3339FixedNano format and converted to a byte slice. The resulting byte slice has a fixed length defined by timestampLen.
func ValidWorkerAddress ¶
ValidWorkerAddress checks if the provided address is a valid Ethereum address.
func VerifyWorkerHexToken ¶
func VerifyWorkerHexToken(hexToken, strWorkerAddr string, seqAddr common.Address) (bool, time.Time, error)
VerifyWorkerHexToken function is a wrapper to VerifyWorkerToken that accepts a hex-encoded token and a string representation of the worker address.
func VerifyWorkerToken ¶
func VerifyWorkerToken(bToken types.HexBytes, workerAddr, seqAddr common.Address) (bool, time.Time, error)
VerifyWorkerToken function verifies a worker authentication token against the worker's address and the sequencer's address. It decodes the token to get the signature and timestamp, then calculates the expected signature message with the sequencer address and timestamp. Then validates the signature against the worker address using the calculated message. It returns a boolean that indicates if the signature is valid or not and the timestamp. If something fails, returns an error.
func WorkerAuthTokenData ¶
func WorkerAuthTokenData(sequencerAddress common.Address, timestamp time.Time) (string, string, types.HexBytes)
WorkersAuthTokenData function prepares the required data to generate a worker authtoken for the current sequencer. It takes the timestamp provided and encode it as token suffix, but also includes it in the signature message. It returns the signature message, the timestamp formatted as a string, and the token suffix as a byte slice.
func WorkerNameFromAddress ¶
WorkerNameFromAddress generates a worker name by masking all but the last 4 hexadecimal characters of the provided Ethereum address as string.
Types ¶
type JobsManager ¶
type JobsManager struct {
FailedJobs chan *WorkerJob // Channel to handle failed jobs
JobTimeout time.Duration // Duration after which a job is considered timed out
WorkerManager *WorkerManager // Reference to the worker manager for job tracking
// contains filtered or unexported fields
}
JobsManager manages worker jobs, including job registration, completion, and timeout handling. It also interacts with the worker manager to track worker availability and job results.
func NewJobsManager ¶
func NewJobsManager(storage *storage.Storage, jobTimeout time.Duration, banRules *WorkerBanRules, tickerInterval ...time.Duration) *JobsManager
NewJobsManager creates a new jobs manager with the specified job timeout and ticker interval. If no ticker interval is provided, it defaults to 10 seconds. It initializes an internal worker manager with default ban rules.
func (*JobsManager) CompleteJob ¶
func (jm *JobsManager) CompleteJob(voteID types.VoteID, success bool) *WorkerJob
CompleteJob marks a job as completed, either successfully or with failure. It looks up the job by its vote ID, updates the worker manager with the result, and removes the job from the pending jobs map. If the job is not found, it logs a warning and returns nil. If the job is marked as failed, it sends the job to the failed jobs channel for further processing. This function is called when a worker completes a job, either successfully or with failure.
func (*JobsManager) IsWorkerAvailable ¶
func (jm *JobsManager) IsWorkerAvailable(workerAddr string) (bool, error)
IsWorkerAvailable checks if a worker is available for a new job. It verifies if the worker exists, is not banned, and does not have any pending jobs.
func (*JobsManager) Job ¶
Job retrieves a job by its vote ID and verifies that it is assigned to the specified worker. If the job is found and matches the worker, it is returned.
func (*JobsManager) RegisterJob ¶
RegisterJob registers a new job for a worker. It checks if the worker is available and not banned. If the worker is valid, it creates a new job with the provided vote ID, assigns it to the worker, and sets an expiration time for the job. The job is then added to the pending jobs map. If the worker is banned returns nil.
func (*JobsManager) Start ¶
func (jm *JobsManager) Start(ctx context.Context)
Start initializes the jobs manager, starts the worker manager, and begins a goroutine to periodically check for job timeouts. It uses a context to manage the lifecycle of the jobs manager, allowing it to be stopped gracefully.
func (*JobsManager) Stop ¶
func (jm *JobsManager) Stop()
Stop gracefully stops the jobs manager, clearing all pending jobs and stopping the worker manager. It also ensures that the failed jobs channel is closed only once using sync.Once. This prevents any potential concurrent write to a closed channel panic.
type Worker ¶
type Worker struct {
Address string
Name string // Name of the worker for identification
// contains filtered or unexported fields
}
Worker represents a Worker that processes jobs
func (*Worker) GetBannedUntil ¶
GetBannedUntil returns the ban expiration time as a time.Time
func (*Worker) IsBanned ¶
func (w *Worker) IsBanned(rules *WorkerBanRules) bool
IsBanned checks if the worker is banned based on the provided rules
func (*Worker) SetBannedUntil ¶
SetBannedUntil sets the ban expiration time atomically
func (*Worker) SetConsecutiveFails ¶
SetConsecutiveFails returns the current consecutive failure count
type WorkerBanRules ¶
type WorkerBanRules struct {
BanTimeout time.Duration // Duration for which the worker is banned
FailuresToGetBanned int // Maximum consecutive failed jobs before banning
}
WorkerBanRules defines the rules for banning workers. It includes the duration for which a worker is banned and the maximum number of consecutive failed jobs before a worker is banned.
type WorkerInfo ¶
type WorkerJob ¶
type WorkerJob struct {
VoteID types.VoteID
Address string
Timestamp time.Time
Expiration time.Time // When the job should expire
}
WorkerJob represents a job assigned to a worker. It contains the vote ID, worker address, timestamp, and expiration time.
type WorkerManager ¶
type WorkerManager struct {
// contains filtered or unexported fields
}
WorkerManager manages workers and their ban status. It tracks workers, bans them based on rules, and resets their status after the ban period.
func NewWorkerManager ¶
func NewWorkerManager(stg *storage.Storage, rules *WorkerBanRules, tickerInterval ...time.Duration) *WorkerManager
NewWorkerManager creates a new worker manager with the specified ban rules. It initializes the worker map and sets up the context for managing workers. An optional ticker interval can be provided; defaults to 10 seconds if not specified.
func (*WorkerManager) AddWorker ¶
func (wm *WorkerManager) AddWorker(address, name string) *Worker
AddWorker adds a new worker to the manager. If the worker already exists, it returns the existing worker without adding a new one. If it's a new worker, it initializes a new worker instance, stores it in the worker map, and returns the worker instance.
func (*WorkerManager) BannedWorkers ¶
func (wm *WorkerManager) BannedWorkers() []*Worker
BannedWorkers returns a slice of workers that are currently banned based on the ban rules. It iterates through all workers in the manager, checks if they are banned according to the rules, and collects them in a slice.
func (*WorkerManager) GetWorker ¶
func (wm *WorkerManager) GetWorker(address string) (*Worker, bool)
GetWorker retrieves a worker by its address. If the worker exists, it returns the worker instance and a boolean indicating success. If the worker does not exist, it returns nil and false.
func (*WorkerManager) ListWorkerStats ¶
func (wm *WorkerManager) ListWorkerStats() ([]*WorkerInfo, error)
ListWorkerStats retrieves the statistics for all workers managed by the WorkerManager. It returns a slice of WorkerInfo containing the address, name, success count, and failed count for each worker. If there is an error retrieving the statistics, it returns an error.
func (*WorkerManager) ResetWorker ¶
func (wm *WorkerManager) ResetWorker(address string)
ResetWorker resets the worker's status by creating a new worker instance with the same address. This effectively clears the worker's consecutive fails and banned status, allowing the worker to be reused without any previous restrictions. It is typically called when a worker has been banned and the ban period has expired, or when a worker needs to be reset for any reason.
func (*WorkerManager) SetBanDuration ¶
func (wm *WorkerManager) SetBanDuration(address string)
SetBanDuration sets the ban duration for a worker. It updates the worker's ban expiration time to the current time plus the ban timeout defined in the rules. This effectively bans the worker for the specified duration, preventing it from processing jobs until the ban period expires.
func (*WorkerManager) Start ¶
func (wm *WorkerManager) Start(ctx context.Context)
Start initializes the worker manager, setting up a context for managing workers. It starts a goroutine that periodically checks for banned workers, bans them if necessary, and resets their status after the ban period.
func (*WorkerManager) Stop ¶
func (wm *WorkerManager) Stop()
Stop stops the worker manager, cancels the context, and clears all workers. It ensures that all workers are removed and no further actions are taken. This is typically called when the application is shutting down or when the worker manager is no longer needed.
func (*WorkerManager) WorkerResult ¶
func (wm *WorkerManager) WorkerResult(address string, success bool) error
WorkerResult updates the worker's status based on the result of a job. If the job was successful, it resets the worker's consecutive fails to zero. If the job failed, it increments the worker's consecutive fails count. This method uses atomic operations to ensure thread safety.
func (*WorkerManager) WorkerStats ¶
func (wm *WorkerManager) WorkerStats(address string) (*WorkerInfo, error)
WorkerStats retrieves the statistics for a specific worker by its ID. The statistics include the worker's name, success count, and failed count. If the worker does not exist, it returns an error indicating that the worker was not found.