Documentation
¶
Index ¶
- Constants
- Variables
- func GetNodeInfoPublishConfig() routing.NodeInfoPublisherIntervalConfig
- type Compute
- type ComputeConfig
- type ComputeConfigParams
- type ExecutorsFactory
- type ExecutorsFactoryFunc
- type FeatureConfig
- type JobSelectionPolicy
- type Node
- type NodeConfig
- type NodeDependencyInjector
- type PublishersFactory
- type PublishersFactoryFunc
- type Requester
- type RequesterConfig
- type RequesterConfigParams
- type StorageProvidersFactory
- type StorageProvidersFactoryFunc
Constants ¶
View Source
const JobInfoTopic = "bacalhau-job-info"
View Source
const NodeInfoTopic = "bacalhau-node-info"
Variables ¶
View Source
var DefaultComputeConfig = ComputeConfigParams{ PhysicalResourcesProvider: compute_system.NewPhysicalCapacityProvider(), DefaultJobResourceLimits: models.Resources{ CPU: 0.1, Memory: 100 * 1024 * 1024, }, ExecutorBufferBackoffDuration: 50 * time.Millisecond, JobNegotiationTimeout: 3 * time.Minute, MinJobExecutionTimeout: 500 * time.Millisecond, MaxJobExecutionTimeout: model.NoJobTimeout, DefaultJobExecutionTimeout: model.NoJobTimeout, LogRunningExecutionsInterval: 10 * time.Second, JobSelectionPolicy: NewDefaultJobSelectionPolicy(), }
View Source
var DefaultNodeInfoPublishConfig = routing.NodeInfoPublisherIntervalConfig{ Interval: 30 * time.Second, EagerPublishInterval: 5 * time.Second, EagerPublishDuration: 30 * time.Second, }
View Source
var DefaultRequesterConfig = RequesterConfigParams{ MinJobExecutionTimeout: 0 * time.Second, DefaultJobExecutionTimeout: model.NoJobTimeout, HousekeepingBackgroundTaskInterval: 30 * time.Second, NodeRankRandomnessRange: 5, OverAskForBidsFactor: 3, MinBacalhauVersion: models.BuildVersionInfo{ Major: "0", Minor: "3", GitVersion: "v0.3.26", }, EvalBrokerVisibilityTimeout: 60 * time.Second, EvalBrokerInitialRetryDelay: 1 * time.Second, EvalBrokerSubsequentRetryDelay: 30 * time.Second, EvalBrokerMaxRetryCount: 10, WorkerCount: runtime.NumCPU(), WorkerEvalDequeueTimeout: 5 * time.Second, WorkerEvalDequeueBaseBackoff: 1 * time.Second, WorkerEvalDequeueMaxBackoff: 30 * time.Second, }
View Source
var TestNodeInfoPublishConfig = routing.NodeInfoPublisherIntervalConfig{ Interval: 30 * time.Second, EagerPublishInterval: 10 * time.Millisecond, EagerPublishDuration: 5 * time.Second, }
TestNodeInfoPublishConfig speeds up node announcements for tests
View Source
var TestRequesterConfig = RequesterConfigParams{ MinJobExecutionTimeout: 0 * time.Second, DefaultJobExecutionTimeout: 30 * time.Second, HousekeepingBackgroundTaskInterval: 30 * time.Second, NodeRankRandomnessRange: 5, OverAskForBidsFactor: 3, MinBacalhauVersion: models.BuildVersionInfo{ Major: "0", Minor: "3", GitVersion: "v0.3.26", }, EvalBrokerVisibilityTimeout: 5 * time.Second, EvalBrokerInitialRetryDelay: 100 * time.Millisecond, EvalBrokerSubsequentRetryDelay: 100 * time.Millisecond, EvalBrokerMaxRetryCount: 3, WorkerCount: 3, WorkerEvalDequeueTimeout: 200 * time.Millisecond, WorkerEvalDequeueBaseBackoff: 20 * time.Millisecond, WorkerEvalDequeueMaxBackoff: 200 * time.Millisecond, }
Functions ¶
func GetNodeInfoPublishConfig ¶ added in v1.0.4
func GetNodeInfoPublishConfig() routing.NodeInfoPublisherIntervalConfig
Types ¶
type Compute ¶
type Compute struct {
// Visible for testing
ID string
LocalEndpoint compute.Endpoint
Capacity capacity.Tracker
ExecutionStore store.ExecutionStore
Executors executor.ExecutorProvider
Storages storage.StorageProvider
LogServer *logstream.LogStreamServer
Bidder compute.Bidder
// contains filtered or unexported fields
}
func NewComputeNode ¶
func NewComputeNode( ctx context.Context, cleanupManager *system.CleanupManager, host host.Host, apiServer *publicapi.Server, config ComputeConfig, storages storage.StorageProvider, executors executor.ExecutorProvider, publishers publisher.PublisherProvider, fsRepo *repo.FsRepo, ) (*Compute, error)
func (*Compute) RegisterLocalComputeCallback ¶
type ComputeConfig ¶
type ComputeConfig struct {
// Capacity config
TotalResourceLimits models.Resources
QueueResourceLimits models.Resources
JobResourceLimits models.Resources
DefaultJobResourceLimits models.Resources
IgnorePhysicalResourceLimits bool
// How long the buffer would backoff before polling the queue again for new jobs
ExecutorBufferBackoffDuration time.Duration
// JobNegotiationTimeout default timeout value to hold a bid for a job
JobNegotiationTimeout time.Duration
// MinJobExecutionTimeout default value for the minimum execution timeout this compute node supports. Jobs with
// lower timeout requirements will not be bid on.
MinJobExecutionTimeout time.Duration
// MaxJobExecutionTimeout default value for the maximum execution timeout this compute node supports. Jobs with
// higher timeout requirements will not be bid on.
MaxJobExecutionTimeout time.Duration
// DefaultJobExecutionTimeout default value for the execution timeout this compute node will assign to jobs with
// no timeout requirement defined.
DefaultJobExecutionTimeout time.Duration
// JobExecutionTimeoutClientIDBypassList is the list of clients that are allowed to bypass the job execution timeout
// check.
JobExecutionTimeoutClientIDBypassList []string
// Bid strategies config
JobSelectionPolicy JobSelectionPolicy
// logging running executions
LogRunningExecutionsInterval time.Duration
FailureInjectionConfig model.FailureInjectionComputeConfig
BidSemanticStrategy bidstrategy.SemanticBidStrategy
BidResourceStrategy bidstrategy.ResourceBidStrategy
ExecutionStore store.ExecutionStore
}
func NewComputeConfigWith ¶
func NewComputeConfigWith(params ComputeConfigParams) (config ComputeConfig)
func NewComputeConfigWithDefaults ¶
func NewComputeConfigWithDefaults() ComputeConfig
type ComputeConfigParams ¶
type ComputeConfigParams struct {
// Capacity config
TotalResourceLimits models.Resources
QueueResourceLimits models.Resources
JobResourceLimits models.Resources
DefaultJobResourceLimits models.Resources
PhysicalResourcesProvider capacity.Provider
IgnorePhysicalResourceLimits bool
ExecutorBufferBackoffDuration time.Duration
// Timeout config
JobNegotiationTimeout time.Duration
MinJobExecutionTimeout time.Duration
MaxJobExecutionTimeout time.Duration
DefaultJobExecutionTimeout time.Duration
JobExecutionTimeoutClientIDBypassList []string
// Bid strategies config
JobSelectionPolicy JobSelectionPolicy
// logging running executions
LogRunningExecutionsInterval time.Duration
FailureInjectionConfig model.FailureInjectionComputeConfig
BidSemanticStrategy bidstrategy.SemanticBidStrategy
BidResourceStrategy bidstrategy.ResourceBidStrategy
}
type ExecutorsFactory ¶
type ExecutorsFactory interface {
Get(ctx context.Context, nodeConfig NodeConfig) (executor.ExecutorProvider, error)
}
func NewPluginExecutorFactory ¶ added in v1.0.4
func NewPluginExecutorFactory() ExecutorsFactory
func NewStandardExecutorsFactory ¶
func NewStandardExecutorsFactory() ExecutorsFactory
type ExecutorsFactoryFunc ¶
type ExecutorsFactoryFunc func( ctx context.Context, nodeConfig NodeConfig, ) (executor.ExecutorProvider, error)
func (ExecutorsFactoryFunc) Get ¶
func (f ExecutorsFactoryFunc) Get( ctx context.Context, nodeConfig NodeConfig, ) (executor.ExecutorProvider, error)
type FeatureConfig ¶ added in v0.3.29
type JobSelectionPolicy ¶ added in v1.0.4
type JobSelectionPolicy struct {
// this describes if we should run a job based on
// where the data is located - i.e. if the data is "local"
// or if the data is "anywhere"
Locality semantic.JobSelectionDataLocality `json:"locality"`
// should we reject jobs that don't specify any data
// the default is "accept"
RejectStatelessJobs bool `json:"reject_stateless_jobs"`
// should we accept jobs that specify networking
// the default is "reject"
AcceptNetworkedJobs bool `json:"accept_networked_jobs"`
// external hooks that decide if we should take on the job or not
// if either of these are given they will override the data locality settings
ProbeHTTP string `json:"probe_http,omitempty"`
ProbeExec string `json:"probe_exec,omitempty"`
}
JobSelectionPolicy describe the rules for how a compute node selects an incoming job
func NewDefaultJobSelectionPolicy ¶ added in v1.0.4
func NewDefaultJobSelectionPolicy() JobSelectionPolicy
type Node ¶
type Node struct {
// Visible for testing
APIServer *publicapi.Server
ComputeNode *Compute
RequesterNode *Requester
NodeInfoStore routing.NodeInfoStore
CleanupManager *system.CleanupManager
IPFSClient ipfs.Client
Host host.Host
}
func (*Node) IsComputeNode ¶
IsComputeNode returns true if the node is a compute node
func (*Node) IsRequesterNode ¶
IsRequesterNode returns true if the node is a requester node
type NodeConfig ¶
type NodeConfig struct {
IPFSClient ipfs.Client
CleanupManager *system.CleanupManager
Host host.Host
HostAddress string
APIPort uint16
RequesterAutoCert string
RequesterAutoCertCache string
DisabledFeatures FeatureConfig
ComputeConfig ComputeConfig
RequesterNodeConfig RequesterConfig
APIServerConfig publicapi.Config
IsRequesterNode bool
IsComputeNode bool
Labels map[string]string
NodeInfoPublisherInterval routing.NodeInfoPublisherIntervalConfig
DependencyInjector NodeDependencyInjector
AllowListedLocalPaths []string
FsRepo *repo.FsRepo
}
Node configuration
type NodeDependencyInjector ¶
type NodeDependencyInjector struct {
StorageProvidersFactory StorageProvidersFactory
ExecutorsFactory ExecutorsFactory
PublishersFactory PublishersFactory
}
Lazy node dependency injector that generate instances of different components on demand and based on the configuration provided.
func NewExecutorPluginNodeDependencyInjector ¶ added in v1.0.4
func NewExecutorPluginNodeDependencyInjector() NodeDependencyInjector
func NewStandardNodeDependencyInjector ¶
func NewStandardNodeDependencyInjector() NodeDependencyInjector
type PublishersFactory ¶
type PublishersFactory interface {
Get(ctx context.Context, nodeConfig NodeConfig) (publisher.PublisherProvider, error)
}
func NewStandardPublishersFactory ¶
func NewStandardPublishersFactory() PublishersFactory
type PublishersFactoryFunc ¶
type PublishersFactoryFunc func(ctx context.Context, nodeConfig NodeConfig) (publisher.PublisherProvider, error)
func (PublishersFactoryFunc) Get ¶
func (f PublishersFactoryFunc) Get(ctx context.Context, nodeConfig NodeConfig) (publisher.PublisherProvider, error)
type Requester ¶
type Requester struct {
// Visible for testing
Endpoint requester.Endpoint
JobStore jobstore.Store
NodeDiscoverer orchestrator.NodeDiscoverer
// contains filtered or unexported fields
}
func NewRequesterNode ¶
func NewRequesterNode( ctx context.Context, host host.Host, apiServer *publicapi.Server, nodeConfig RequesterConfig, storageProviders storage.StorageProvider, nodeInfoStore routing.NodeInfoStore, gossipSub *libp2p_pubsub.PubSub, fsRepo *repo.FsRepo, ) (*Requester, error)
func (*Requester) RegisterLocalComputeEndpoint ¶
type RequesterConfig ¶
type RequesterConfig struct {
RequesterConfigParams
}
func NewRequesterConfigWith ¶
func NewRequesterConfigWith(params RequesterConfigParams) (config RequesterConfig)
func NewRequesterConfigWithDefaults ¶
func NewRequesterConfigWithDefaults() RequesterConfig
type RequesterConfigParams ¶
type RequesterConfigParams struct {
// Timeout config
MinJobExecutionTimeout time.Duration
DefaultJobExecutionTimeout time.Duration
HousekeepingBackgroundTaskInterval time.Duration
NodeRankRandomnessRange int
OverAskForBidsFactor uint
JobSelectionPolicy JobSelectionPolicy
ExternalValidatorWebhook *url.URL
FailureInjectionConfig model.FailureInjectionRequesterConfig
// minimum version of compute nodes that the requester will accept and route jobs to
MinBacalhauVersion models.BuildVersionInfo
RetryStrategy orchestrator.RetryStrategy
// evaluation broker config
EvalBrokerVisibilityTimeout time.Duration
EvalBrokerInitialRetryDelay time.Duration
EvalBrokerSubsequentRetryDelay time.Duration
EvalBrokerMaxRetryCount int
// worker config
WorkerCount int
WorkerEvalDequeueTimeout time.Duration
WorkerEvalDequeueBaseBackoff time.Duration
WorkerEvalDequeueMaxBackoff time.Duration
}
type StorageProvidersFactory ¶
type StorageProvidersFactory interface {
Get(ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)
}
Interfaces to inject dependencies into the stack
func NewStandardStorageProvidersFactory ¶
func NewStandardStorageProvidersFactory() StorageProvidersFactory
Standard implementations used in prod and when testing prod behavior
type StorageProvidersFactoryFunc ¶
type StorageProvidersFactoryFunc func(ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)
Functions that implement the factories for easier creation of new implementations
func (StorageProvidersFactoryFunc) Get ¶
func (f StorageProvidersFactoryFunc) Get(ctx context.Context, nodeConfig NodeConfig) (storage.StorageProvider, error)
Click to show internal directories.
Click to hide internal directories.