Documentation
¶
Index ¶
- Variables
- func GetNodeInfoPublishConfig() routing.NodeInfoPublisherIntervalConfig
- type AuthenticatorsFactory
- type AuthenticatorsFactoryFunc
- type Compute
- type ComputeConfig
- type ComputeConfigParams
- type ConfigLabelsProvider
- type ExecutorsFactory
- type ExecutorsFactoryFunc
- type Factory
- type FactoryFunc
- type FeatureConfig
- type JobSelectionPolicy
- type NetworkConfig
- type Node
- type NodeConfig
- type NodeDependencyInjector
- type PublishersFactory
- type PublishersFactoryFunc
- type Requester
- type RequesterConfig
- type RequesterConfigParams
- type RuntimeLabelsProvider
- type StorageProvidersFactory
- type StorageProvidersFactoryFunc
Constants ¶
This section is empty.
Variables ¶
View Source
var DefaultComputeConfig = ComputeConfigParams{ PhysicalResourcesProvider: compute_system.NewPhysicalCapacityProvider(), DefaultJobResourceLimits: models.Resources{ CPU: 0.1, Memory: 100 * 1024 * 1024, }, JobNegotiationTimeout: 3 * time.Minute, MinJobExecutionTimeout: 500 * time.Millisecond, MaxJobExecutionTimeout: model.NoJobTimeout, DefaultJobExecutionTimeout: model.NoJobTimeout, LogRunningExecutionsInterval: 10 * time.Second, JobSelectionPolicy: NewDefaultJobSelectionPolicy(), LocalPublisher: types.LocalPublisherConfig{ Directory: path.Join(config.GetStoragePath(), "bacalhau-local-publisher"), }, }
View Source
var DefaultNodeInfoPublishConfig = routing.NodeInfoPublisherIntervalConfig{ Interval: 30 * time.Second, EagerPublishInterval: 5 * time.Second, EagerPublishDuration: 30 * time.Second, }
View Source
var DefaultRequesterConfig = RequesterConfigParams{ JobDefaults: transformer.JobDefaults{ ExecutionTimeout: model.NoJobTimeout, }, HousekeepingBackgroundTaskInterval: 30 * time.Second, NodeRankRandomnessRange: 5, OverAskForBidsFactor: 3, MinBacalhauVersion: models.BuildVersionInfo{ Major: "1", Minor: "0", GitVersion: "v1.0.4", }, EvalBrokerVisibilityTimeout: 60 * time.Second, EvalBrokerInitialRetryDelay: 1 * time.Second, EvalBrokerSubsequentRetryDelay: 30 * time.Second, EvalBrokerMaxRetryCount: 10, WorkerCount: runtime.NumCPU(), WorkerEvalDequeueTimeout: 5 * time.Second, WorkerEvalDequeueBaseBackoff: 1 * time.Second, WorkerEvalDequeueMaxBackoff: 30 * time.Second, S3PreSignedURLDisabled: false, S3PreSignedURLExpiration: 30 * time.Minute, TranslationEnabled: false, }
View Source
var TestNodeInfoPublishConfig = routing.NodeInfoPublisherIntervalConfig{ Interval: 30 * time.Second, EagerPublishInterval: 10 * time.Millisecond, EagerPublishDuration: 5 * time.Second, }
TestNodeInfoPublishConfig speeds up node announcements for tests
View Source
var TestRequesterConfig = RequesterConfigParams{ JobDefaults: transformer.JobDefaults{ ExecutionTimeout: 30 * time.Second, }, HousekeepingBackgroundTaskInterval: 30 * time.Second, NodeRankRandomnessRange: 5, OverAskForBidsFactor: 3, MinBacalhauVersion: models.BuildVersionInfo{ Major: "1", Minor: "0", GitVersion: "v1.0.4", }, EvalBrokerVisibilityTimeout: 5 * time.Second, EvalBrokerInitialRetryDelay: 100 * time.Millisecond, EvalBrokerSubsequentRetryDelay: 100 * time.Millisecond, EvalBrokerMaxRetryCount: 3, WorkerCount: 3, WorkerEvalDequeueTimeout: 200 * time.Millisecond, WorkerEvalDequeueBaseBackoff: 20 * time.Millisecond, WorkerEvalDequeueMaxBackoff: 200 * time.Millisecond, TranslationEnabled: false, S3PreSignedURLDisabled: false, S3PreSignedURLExpiration: 30 * time.Minute, }
Functions ¶
func GetNodeInfoPublishConfig ¶ added in v1.0.4
func GetNodeInfoPublishConfig() routing.NodeInfoPublisherIntervalConfig
Types ¶
type AuthenticatorsFactory ¶ added in v1.2.1
type AuthenticatorsFactory = Factory[authn.Authenticator]
func NewStandardAuthenticatorsFactory ¶ added in v1.2.1
func NewStandardAuthenticatorsFactory() AuthenticatorsFactory
type AuthenticatorsFactoryFunc ¶ added in v1.2.1
type AuthenticatorsFactoryFunc = FactoryFunc[authn.Authenticator]
type Compute ¶
type Compute struct {
// Visible for testing
ID string
LocalEndpoint compute.Endpoint
Capacity capacity.Tracker
ExecutionStore store.ExecutionStore
Executors executor.ExecutorProvider
Storages storage.StorageProvider
Bidder compute.Bidder
// contains filtered or unexported fields
}
func NewComputeNode ¶
func NewComputeNode( ctx context.Context, nodeID string, cleanupManager *system.CleanupManager, apiServer *publicapi.Server, config ComputeConfig, storagePath string, storages storage.StorageProvider, executors executor.ExecutorProvider, publishers publisher.PublisherProvider, computeCallback compute.Callback, ) (*Compute, error)
type ComputeConfig ¶
type ComputeConfig struct {
// Capacity config
TotalResourceLimits models.Resources
QueueResourceLimits models.Resources
JobResourceLimits models.Resources
DefaultJobResourceLimits models.Resources
IgnorePhysicalResourceLimits bool
// JobNegotiationTimeout default timeout value to hold a bid for a job
JobNegotiationTimeout time.Duration
// MinJobExecutionTimeout default value for the minimum execution timeout this compute node supports. Jobs with
// lower timeout requirements will not be bid on.
MinJobExecutionTimeout time.Duration
// MaxJobExecutionTimeout default value for the maximum execution timeout this compute node supports. Jobs with
// higher timeout requirements will not be bid on.
MaxJobExecutionTimeout time.Duration
// DefaultJobExecutionTimeout default value for the execution timeout this compute node will assign to jobs with
// no timeout requirement defined.
DefaultJobExecutionTimeout time.Duration
// JobExecutionTimeoutClientIDBypassList is the list of clients that are allowed to bypass the job execution timeout
// check.
JobExecutionTimeoutClientIDBypassList []string
// Bid strategies config
JobSelectionPolicy JobSelectionPolicy
// logging running executions
LogRunningExecutionsInterval time.Duration
// How many messages to buffer in the log stream channel
LogStreamBufferSize int
FailureInjectionConfig model.FailureInjectionComputeConfig
BidSemanticStrategy bidstrategy.SemanticBidStrategy
BidResourceStrategy bidstrategy.ResourceBidStrategy
ExecutionStore store.ExecutionStore
LocalPublisher types.LocalPublisherConfig
}
func NewComputeConfigWith ¶
func NewComputeConfigWith(params ComputeConfigParams) (ComputeConfig, error)
func NewComputeConfigWithDefaults ¶
func NewComputeConfigWithDefaults() (ComputeConfig, error)
type ComputeConfigParams ¶
type ComputeConfigParams struct {
// Capacity config
TotalResourceLimits models.Resources
QueueResourceLimits models.Resources
JobResourceLimits models.Resources
DefaultJobResourceLimits models.Resources
PhysicalResourcesProvider capacity.Provider
IgnorePhysicalResourceLimits bool
// Timeout config
JobNegotiationTimeout time.Duration
MinJobExecutionTimeout time.Duration
MaxJobExecutionTimeout time.Duration
DefaultJobExecutionTimeout time.Duration
JobExecutionTimeoutClientIDBypassList []string
// Bid strategies config
JobSelectionPolicy JobSelectionPolicy
// logging running executions
LogRunningExecutionsInterval time.Duration
// How many messages to buffer in the log stream channel
LogStreamBufferSize int
FailureInjectionConfig model.FailureInjectionComputeConfig
BidSemanticStrategy bidstrategy.SemanticBidStrategy
BidResourceStrategy bidstrategy.ResourceBidStrategy
ExecutionStore store.ExecutionStore
LocalPublisher types.LocalPublisherConfig
}
type ConfigLabelsProvider ¶ added in v1.2.1
type ConfigLabelsProvider struct {
// contains filtered or unexported fields
}
type ExecutorsFactory ¶
func NewPluginExecutorFactory ¶ added in v1.0.4
func NewPluginExecutorFactory() ExecutorsFactory
func NewStandardExecutorsFactory ¶
func NewStandardExecutorsFactory() ExecutorsFactory
type ExecutorsFactoryFunc ¶
type ExecutorsFactoryFunc = FactoryFunc[executor.Executor]
type Factory ¶ added in v1.2.1
type Factory[P provider.Providable] interface { Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error) }
Interfaces to inject dependencies into the stack
type FactoryFunc ¶ added in v1.2.1
type FactoryFunc[P provider.Providable] func(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)
Functions that implement the factories for easier creation of new implementations
func (FactoryFunc[P]) Get ¶ added in v1.2.1
func (f FactoryFunc[P]) Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)
type FeatureConfig ¶ added in v0.3.29
type JobSelectionPolicy ¶ added in v1.0.4
type JobSelectionPolicy struct {
// this describes if we should run a job based on
// where the data is located - i.e. if the data is "local"
// or if the data is "anywhere"
Locality semantic.JobSelectionDataLocality `json:"locality"`
// should we reject jobs that don't specify any data
// the default is "accept"
RejectStatelessJobs bool `json:"reject_stateless_jobs"`
// should we accept jobs that specify networking
// the default is "reject"
AcceptNetworkedJobs bool `json:"accept_networked_jobs"`
// external hooks that decide if we should take on the job or not
// if either of these are given they will override the data locality settings
ProbeHTTP string `json:"probe_http,omitempty"`
ProbeExec string `json:"probe_exec,omitempty"`
}
JobSelectionPolicy describe the rules for how a compute node selects an incoming job
func NewDefaultJobSelectionPolicy ¶ added in v1.0.4
func NewDefaultJobSelectionPolicy() JobSelectionPolicy
type NetworkConfig ¶ added in v1.2.1
type NetworkConfig struct {
Type string
Libp2pHost host.Host // only set if using libp2p transport, nil otherwise
ReconnectDelay time.Duration
// NATS config for requesters to be reachable by compute nodes
Port int
AdvertisedAddress string
Orchestrators []string
// Storage directory for NATS features that require it
StoreDir string
// AuthSecret is a secret string that clients must use to connect. It is
// only used by NATS servers; clients should supply the auth secret as the
// user part of their Orchestrator URL.
AuthSecret string
// NATS config for requester nodes to connect with each other
ClusterName string
ClusterPort int
ClusterAdvertisedAddress string
ClusterPeers []string
}
func (*NetworkConfig) Validate ¶ added in v1.2.1
func (c *NetworkConfig) Validate() error
type Node ¶
type Node struct {
// Visible for testing
ID string
APIServer *publicapi.Server
ComputeNode *Compute
RequesterNode *Requester
CleanupManager *system.CleanupManager
IPFSClient ipfs.Client
Libp2pHost host.Host // only set if using libp2p transport, nil otherwise
}
func (*Node) IsComputeNode ¶
IsComputeNode returns true if the node is a compute node
func (*Node) IsRequesterNode ¶
IsRequesterNode returns true if the node is a requester node
type NodeConfig ¶
type NodeConfig struct {
NodeID string
IPFSClient ipfs.Client
CleanupManager *system.CleanupManager
HostAddress string
APIPort uint16
RequesterAutoCert string
RequesterAutoCertCache string
RequesterTLSCertificateFile string
RequesterTLSKeyFile string
DisabledFeatures FeatureConfig
ComputeConfig ComputeConfig
RequesterNodeConfig RequesterConfig
APIServerConfig publicapi.Config
AuthConfig types.AuthConfig
IsRequesterNode bool
IsComputeNode bool
Labels map[string]string
NodeInfoPublisherInterval routing.NodeInfoPublisherIntervalConfig
DependencyInjector NodeDependencyInjector
AllowListedLocalPaths []string
NodeInfoStoreTTL time.Duration
NetworkConfig NetworkConfig
}
Node configuration
func (*NodeConfig) Validate ¶ added in v1.2.1
func (c *NodeConfig) Validate() error
type NodeDependencyInjector ¶
type NodeDependencyInjector struct {
StorageProvidersFactory StorageProvidersFactory
ExecutorsFactory ExecutorsFactory
PublishersFactory PublishersFactory
AuthenticatorsFactory AuthenticatorsFactory
}
Lazy node dependency injector that generate instances of different components on demand and based on the configuration provided.
func NewExecutorPluginNodeDependencyInjector ¶ added in v1.0.4
func NewExecutorPluginNodeDependencyInjector() NodeDependencyInjector
func NewStandardNodeDependencyInjector ¶
func NewStandardNodeDependencyInjector() NodeDependencyInjector
type PublishersFactory ¶
func NewStandardPublishersFactory ¶
func NewStandardPublishersFactory() PublishersFactory
type PublishersFactoryFunc ¶
type PublishersFactoryFunc = FactoryFunc[publisher.Publisher]
type Requester ¶
type Requester struct {
// Visible for testing
Endpoint requester.Endpoint
EndpointV2 *orchestrator.BaseEndpoint
JobStore jobstore.Store
NodeInfoStore routing.NodeInfoStore
NodeDiscoverer orchestrator.NodeDiscoverer
// contains filtered or unexported fields
}
func NewRequesterNode ¶
type RequesterConfig ¶
type RequesterConfig struct {
RequesterConfigParams
}
func NewRequesterConfigWith ¶
func NewRequesterConfigWith(params RequesterConfigParams) (RequesterConfig, error)
func NewRequesterConfigWithDefaults ¶
func NewRequesterConfigWithDefaults() (RequesterConfig, error)
type RequesterConfigParams ¶
type RequesterConfigParams struct {
JobDefaults transformer.JobDefaults
HousekeepingBackgroundTaskInterval time.Duration
NodeRankRandomnessRange int
OverAskForBidsFactor uint
JobSelectionPolicy JobSelectionPolicy
ExternalValidatorWebhook *url.URL
FailureInjectionConfig model.FailureInjectionRequesterConfig
// minimum version of compute nodes that the requester will accept and route jobs to
MinBacalhauVersion models.BuildVersionInfo
RetryStrategy orchestrator.RetryStrategy
// evaluation broker config
EvalBrokerVisibilityTimeout time.Duration
EvalBrokerInitialRetryDelay time.Duration
EvalBrokerSubsequentRetryDelay time.Duration
EvalBrokerMaxRetryCount int
// worker config
WorkerCount int
WorkerEvalDequeueTimeout time.Duration
WorkerEvalDequeueBaseBackoff time.Duration
WorkerEvalDequeueMaxBackoff time.Duration
// Should the orchestrator attempt to translate jobs?
TranslationEnabled bool
S3PreSignedURLDisabled bool
S3PreSignedURLExpiration time.Duration
JobStore jobstore.Store
DefaultPublisher string
}
type RuntimeLabelsProvider ¶ added in v1.2.1
type RuntimeLabelsProvider struct{}
type StorageProvidersFactory ¶
func NewStandardStorageProvidersFactory ¶
func NewStandardStorageProvidersFactory() StorageProvidersFactory
Standard implementations used in prod and when testing prod behavior
type StorageProvidersFactoryFunc ¶
type StorageProvidersFactoryFunc = FactoryFunc[storage.Storage]
Source Files
¶
Click to show internal directories.
Click to hide internal directories.