Documentation
¶
Index ¶
- Variables
- func GetNodeInfoPublishConfig() routing.NodeInfoPublisherIntervalConfig
- func NewBidder(config ComputeConfig, publishers publisher.PublisherProvider, ...) compute.Bidder
- type AuthenticatorsFactory
- type AuthenticatorsFactoryFunc
- type Compute
- type ComputeConfig
- type ComputeConfigParams
- type ConfigLabelsProvider
- type ExecutorsFactory
- type ExecutorsFactoryFunc
- type Factory
- type FactoryFunc
- type FeatureConfig
- type JobSelectionPolicy
- type NetworkConfig
- type Node
- type NodeConfig
- type NodeDependencyInjector
- type PublishersFactory
- type PublishersFactoryFunc
- type Requester
- type RequesterConfig
- type RequesterConfigParams
- type RuntimeLabelsProvider
- type StorageProvidersFactory
- type StorageProvidersFactoryFunc
Constants ¶
This section is empty.
Variables ¶
View Source
var DefaultComputeConfig = ComputeConfigParams{ PhysicalResourcesProvider: compute_system.NewPhysicalCapacityProvider(), DefaultJobResourceLimits: models.Resources{ CPU: 0.1, Memory: 100 * 1024 * 1024, }, JobNegotiationTimeout: 3 * time.Minute, MinJobExecutionTimeout: 500 * time.Millisecond, MaxJobExecutionTimeout: model.NoJobTimeout, DefaultJobExecutionTimeout: model.NoJobTimeout, LogRunningExecutionsInterval: 10 * time.Second, JobSelectionPolicy: NewDefaultJobSelectionPolicy(), LocalPublisher: types.LocalPublisherConfig{ Directory: path.Join(config.GetStoragePath(), "bacalhau-local-publisher"), }, ControlPlaneSettings: types.ComputeControlPlaneConfig{ InfoUpdateFrequency: types.Duration(60 * time.Second), ResourceUpdateFrequency: types.Duration(30 * time.Second), HeartbeatFrequency: types.Duration(15 * time.Second), HeartbeatTopic: "heartbeat", }, }
View Source
var DefaultNodeInfoPublishConfig = routing.NodeInfoPublisherIntervalConfig{ Interval: 30 * time.Second, EagerPublishInterval: 5 * time.Second, EagerPublishDuration: 30 * time.Second, }
View Source
var DefaultRequesterConfig = RequesterConfigParams{ JobDefaults: transformer.JobDefaults{ ExecutionTimeout: model.NoJobTimeout, }, HousekeepingBackgroundTaskInterval: 30 * time.Second, HousekeepingTimeoutBuffer: 2 * time.Minute, NodeRankRandomnessRange: 5, OverAskForBidsFactor: 3, MinBacalhauVersion: models.BuildVersionInfo{ Major: "1", Minor: "0", GitVersion: "v1.0.4", }, EvalBrokerVisibilityTimeout: 60 * time.Second, EvalBrokerInitialRetryDelay: 1 * time.Second, EvalBrokerSubsequentRetryDelay: 30 * time.Second, EvalBrokerMaxRetryCount: 10, WorkerCount: runtime.NumCPU(), WorkerEvalDequeueTimeout: 5 * time.Second, WorkerEvalDequeueBaseBackoff: 1 * time.Second, WorkerEvalDequeueMaxBackoff: 30 * time.Second, S3PreSignedURLDisabled: false, S3PreSignedURLExpiration: 30 * time.Minute, TranslationEnabled: false, ControlPlaneSettings: types.RequesterControlPlaneConfig{ HeartbeatCheckFrequency: types.Duration(30 * time.Second), HeartbeatTopic: "heartbeat", NodeDisconnectedAfter: types.Duration(30 * time.Second), }, DefaultApprovalState: models.NodeMembership.APPROVED, }
View Source
var TestNodeInfoPublishConfig = routing.NodeInfoPublisherIntervalConfig{ Interval: 30 * time.Second, EagerPublishInterval: 10 * time.Millisecond, EagerPublishDuration: 5 * time.Second, }
TestNodeInfoPublishConfig speeds up node announcements for tests
View Source
var TestRequesterConfig = RequesterConfigParams{ JobDefaults: transformer.JobDefaults{ ExecutionTimeout: 30 * time.Second, }, HousekeepingBackgroundTaskInterval: 30 * time.Second, HousekeepingTimeoutBuffer: 100 * time.Millisecond, NodeRankRandomnessRange: 5, OverAskForBidsFactor: 3, MinBacalhauVersion: models.BuildVersionInfo{ Major: "1", Minor: "0", GitVersion: "v1.0.4", }, EvalBrokerVisibilityTimeout: 5 * time.Second, EvalBrokerInitialRetryDelay: 100 * time.Millisecond, EvalBrokerSubsequentRetryDelay: 100 * time.Millisecond, EvalBrokerMaxRetryCount: 3, WorkerCount: 3, WorkerEvalDequeueTimeout: 200 * time.Millisecond, WorkerEvalDequeueBaseBackoff: 20 * time.Millisecond, WorkerEvalDequeueMaxBackoff: 200 * time.Millisecond, TranslationEnabled: false, S3PreSignedURLDisabled: false, S3PreSignedURLExpiration: 30 * time.Minute, ControlPlaneSettings: types.RequesterControlPlaneConfig{ HeartbeatCheckFrequency: types.Duration(30 * time.Second), HeartbeatTopic: "heartbeat", NodeDisconnectedAfter: types.Duration(30 * time.Second), }, DefaultApprovalState: models.NodeMembership.APPROVED, }
Functions ¶
func GetNodeInfoPublishConfig ¶ added in v1.0.4
func GetNodeInfoPublishConfig() routing.NodeInfoPublisherIntervalConfig
func NewBidder ¶ added in v1.3.1
func NewBidder( config ComputeConfig, publishers publisher.PublisherProvider, storages storage.StorageProvider, executors executor.ExecutorProvider, runningCapacityTracker capacity.Tracker, nodeID string, executionStore store.ExecutionStore, computeCallback compute.Callback, bufferRunner *compute.ExecutorBuffer, apiServer *publicapi.Server, calculator capacity.UsageCalculator, ) compute.Bidder
Types ¶
type AuthenticatorsFactory ¶ added in v1.2.1
type AuthenticatorsFactory = Factory[authn.Authenticator]
func NewStandardAuthenticatorsFactory ¶ added in v1.2.1
func NewStandardAuthenticatorsFactory() AuthenticatorsFactory
type AuthenticatorsFactoryFunc ¶ added in v1.2.1
type AuthenticatorsFactoryFunc = FactoryFunc[authn.Authenticator]
type Compute ¶
type Compute struct {
// Visible for testing
ID string
LocalEndpoint compute.Endpoint
Capacity capacity.Tracker
ExecutionStore store.ExecutionStore
Executors executor.ExecutorProvider
Storages storage.StorageProvider
Bidder compute.Bidder
ManagementClient *compute.ManagementClient
// contains filtered or unexported fields
}
func NewComputeNode ¶
func NewComputeNode( ctx context.Context, nodeID string, cleanupManager *system.CleanupManager, apiServer *publicapi.Server, config ComputeConfig, storagePath string, storages storage.StorageProvider, executors executor.ExecutorProvider, publishers publisher.PublisherProvider, computeCallback compute.Callback, managementProxy compute.ManagementEndpoint, configuredLabels map[string]string, heartbeatClient *heartbeat.HeartbeatClient, ) (*Compute, error)
type ComputeConfig ¶
type ComputeConfig struct {
// Capacity config
TotalResourceLimits models.Resources
JobResourceLimits models.Resources
DefaultJobResourceLimits models.Resources
IgnorePhysicalResourceLimits bool
// JobNegotiationTimeout default timeout value to hold a bid for a job
JobNegotiationTimeout time.Duration
// MinJobExecutionTimeout default value for the minimum execution timeout this compute node supports. Jobs with
// lower timeout requirements will not be bid on.
MinJobExecutionTimeout time.Duration
// MaxJobExecutionTimeout default value for the maximum execution timeout this compute node supports. Jobs with
// higher timeout requirements will not be bid on.
MaxJobExecutionTimeout time.Duration
// DefaultJobExecutionTimeout default value for the execution timeout this compute node will assign to jobs with
// no timeout requirement defined.
DefaultJobExecutionTimeout time.Duration
// JobExecutionTimeoutClientIDBypassList is the list of clients that are allowed to bypass the job execution timeout
// check.
JobExecutionTimeoutClientIDBypassList []string
// Bid strategies config
JobSelectionPolicy JobSelectionPolicy
// logging running executions
LogRunningExecutionsInterval time.Duration
// How many messages to buffer in the log stream channel
LogStreamBufferSize int
FailureInjectionConfig model.FailureInjectionComputeConfig
BidSemanticStrategy bidstrategy.SemanticBidStrategy
BidResourceStrategy bidstrategy.ResourceBidStrategy
ExecutionStore store.ExecutionStore
LocalPublisher types.LocalPublisherConfig
ControlPlaneSettings types.ComputeControlPlaneConfig
}
func NewComputeConfigWith ¶
func NewComputeConfigWith(params ComputeConfigParams) (ComputeConfig, error)
func NewComputeConfigWithDefaults ¶
func NewComputeConfigWithDefaults() (ComputeConfig, error)
type ComputeConfigParams ¶
type ComputeConfigParams struct {
// Capacity config
TotalResourceLimits models.Resources
JobResourceLimits models.Resources
DefaultJobResourceLimits models.Resources
PhysicalResourcesProvider capacity.Provider
IgnorePhysicalResourceLimits bool
// Timeout config
JobNegotiationTimeout time.Duration
MinJobExecutionTimeout time.Duration
MaxJobExecutionTimeout time.Duration
DefaultJobExecutionTimeout time.Duration
JobExecutionTimeoutClientIDBypassList []string
// Bid strategies config
JobSelectionPolicy JobSelectionPolicy
// logging running executions
LogRunningExecutionsInterval time.Duration
// How many messages to buffer in the log stream channel
LogStreamBufferSize int
FailureInjectionConfig model.FailureInjectionComputeConfig
BidSemanticStrategy bidstrategy.SemanticBidStrategy
BidResourceStrategy bidstrategy.ResourceBidStrategy
ExecutionStore store.ExecutionStore
LocalPublisher types.LocalPublisherConfig
ControlPlaneSettings types.ComputeControlPlaneConfig
}
type ConfigLabelsProvider ¶ added in v1.2.1
type ConfigLabelsProvider struct {
// contains filtered or unexported fields
}
type ExecutorsFactory ¶
func NewPluginExecutorFactory ¶ added in v1.0.4
func NewPluginExecutorFactory() ExecutorsFactory
func NewStandardExecutorsFactory ¶
func NewStandardExecutorsFactory() ExecutorsFactory
type ExecutorsFactoryFunc ¶
type ExecutorsFactoryFunc = FactoryFunc[executor.Executor]
type Factory ¶ added in v1.2.1
type Factory[P provider.Providable] interface { Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error) }
Interfaces to inject dependencies into the stack
type FactoryFunc ¶ added in v1.2.1
type FactoryFunc[P provider.Providable] func(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)
Functions that implement the factories for easier creation of new implementations
func (FactoryFunc[P]) Get ¶ added in v1.2.1
func (f FactoryFunc[P]) Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)
type FeatureConfig ¶ added in v0.3.29
type JobSelectionPolicy ¶ added in v1.0.4
type JobSelectionPolicy struct {
// this describes if we should run a job based on
// where the data is located - i.e. if the data is "local"
// or if the data is "anywhere"
Locality semantic.JobSelectionDataLocality `json:"locality"`
// should we reject jobs that don't specify any data
// the default is "accept"
RejectStatelessJobs bool `json:"reject_stateless_jobs"`
// should we accept jobs that specify networking
// the default is "reject"
AcceptNetworkedJobs bool `json:"accept_networked_jobs"`
// external hooks that decide if we should take on the job or not
// if either of these are given they will override the data locality settings
ProbeHTTP string `json:"probe_http,omitempty"`
ProbeExec string `json:"probe_exec,omitempty"`
}
JobSelectionPolicy describe the rules for how a compute node selects an incoming job
func NewDefaultJobSelectionPolicy ¶ added in v1.0.4
func NewDefaultJobSelectionPolicy() JobSelectionPolicy
type NetworkConfig ¶ added in v1.2.1
type NetworkConfig struct {
Type string
Libp2pHost host.Host // only set if using libp2p transport, nil otherwise
ReconnectDelay time.Duration
// NATS config for requesters to be reachable by compute nodes
Port int
AdvertisedAddress string
Orchestrators []string
// Storage directory for NATS features that require it
StoreDir string
// AuthSecret is a secret string that clients must use to connect. NATS servers
// must supply this config, while clients can also supply it as the user part
// of their Orchestrator URL.
AuthSecret string
// NATS config for requester nodes to connect with each other
ClusterName string
ClusterPort int
ClusterAdvertisedAddress string
// When using NATS, never set this value unless you are connecting multiple requester
// nodes together. This should never reference this current running instance (e.g.
// don't use localhost).
ClusterPeers []string
}
func (*NetworkConfig) Validate ¶ added in v1.2.1
func (c *NetworkConfig) Validate() error
type Node ¶
type Node struct {
// Visible for testing
ID string
APIServer *publicapi.Server
ComputeNode *Compute
RequesterNode *Requester
CleanupManager *system.CleanupManager
IPFSClient ipfs.Client
Libp2pHost host.Host // only set if using libp2p transport, nil otherwise
}
func (*Node) IsComputeNode ¶
IsComputeNode returns true if the node is a compute node
func (*Node) IsRequesterNode ¶
IsRequesterNode returns true if the node is a requester node
type NodeConfig ¶
type NodeConfig struct {
NodeID string
IPFSClient ipfs.Client
CleanupManager *system.CleanupManager
HostAddress string
APIPort uint16
RequesterAutoCert string
RequesterAutoCertCache string
RequesterTLSCertificateFile string
RequesterTLSKeyFile string
RequesterSelfSign bool
DisabledFeatures FeatureConfig
ComputeConfig ComputeConfig
RequesterNodeConfig RequesterConfig
APIServerConfig publicapi.Config
AuthConfig types.AuthConfig
NodeType models.NodeType
IsRequesterNode bool
IsComputeNode bool
Labels map[string]string
NodeInfoPublisherInterval routing.NodeInfoPublisherIntervalConfig
DependencyInjector NodeDependencyInjector
AllowListedLocalPaths []string
NodeInfoStoreTTL time.Duration
NetworkConfig NetworkConfig
}
Node configuration
func (*NodeConfig) Validate ¶ added in v1.2.1
func (c *NodeConfig) Validate() error
type NodeDependencyInjector ¶
type NodeDependencyInjector struct {
StorageProvidersFactory StorageProvidersFactory
ExecutorsFactory ExecutorsFactory
PublishersFactory PublishersFactory
AuthenticatorsFactory AuthenticatorsFactory
}
Lazy node dependency injector that generate instances of different components on demand and based on the configuration provided.
func NewExecutorPluginNodeDependencyInjector ¶ added in v1.0.4
func NewExecutorPluginNodeDependencyInjector() NodeDependencyInjector
func NewStandardNodeDependencyInjector ¶
func NewStandardNodeDependencyInjector() NodeDependencyInjector
type PublishersFactory ¶
func NewStandardPublishersFactory ¶
func NewStandardPublishersFactory() PublishersFactory
type PublishersFactoryFunc ¶
type PublishersFactoryFunc = FactoryFunc[publisher.Publisher]
type Requester ¶
type Requester struct {
// Visible for testing
Endpoint requester.Endpoint
EndpointV2 *orchestrator.BaseEndpoint
JobStore jobstore.Store
// We need a reference to the node info store until libp2p is removed
NodeInfoStore routing.NodeInfoStore
NodeDiscoverer orchestrator.NodeDiscoverer
// contains filtered or unexported fields
}
func NewRequesterNode ¶
func NewRequesterNode( ctx context.Context, nodeID string, apiServer *publicapi.Server, requesterConfig RequesterConfig, storageProvider storage.StorageProvider, authnProvider authn.Provider, nodeInfoStore routing.NodeInfoStore, computeProxy compute.Endpoint, nodeManager *manager.NodeManager, ) (*Requester, error)
type RequesterConfig ¶
type RequesterConfig struct {
RequesterConfigParams
}
func NewRequesterConfigWith ¶
func NewRequesterConfigWith(params RequesterConfigParams) (RequesterConfig, error)
func NewRequesterConfigWithDefaults ¶
func NewRequesterConfigWithDefaults() (RequesterConfig, error)
type RequesterConfigParams ¶
type RequesterConfigParams struct {
JobDefaults transformer.JobDefaults
HousekeepingBackgroundTaskInterval time.Duration
HousekeepingTimeoutBuffer time.Duration
NodeRankRandomnessRange int
OverAskForBidsFactor uint
JobSelectionPolicy JobSelectionPolicy
ExternalValidatorWebhook *url.URL
FailureInjectionConfig model.FailureInjectionRequesterConfig
// minimum version of compute nodes that the requester will accept and route jobs to
MinBacalhauVersion models.BuildVersionInfo
RetryStrategy orchestrator.RetryStrategy
// evaluation broker config
EvalBrokerVisibilityTimeout time.Duration
EvalBrokerInitialRetryDelay time.Duration
EvalBrokerSubsequentRetryDelay time.Duration
EvalBrokerMaxRetryCount int
// worker config
WorkerCount int
WorkerEvalDequeueTimeout time.Duration
WorkerEvalDequeueBaseBackoff time.Duration
WorkerEvalDequeueMaxBackoff time.Duration
// Should the orchestrator attempt to translate jobs?
TranslationEnabled bool
S3PreSignedURLDisabled bool
S3PreSignedURLExpiration time.Duration
JobStore jobstore.Store
DefaultPublisher string
// When new nodes join the cluster, what state do they have? By default, APPROVED, and
// for tests, APPROVED. We will provide an option to set this to PENDING for production
// or for when operators are ready to control node approval.
DefaultApprovalState models.NodeMembershipState
ControlPlaneSettings types.RequesterControlPlaneConfig
}
type RuntimeLabelsProvider ¶ added in v1.2.1
type RuntimeLabelsProvider struct{}
type StorageProvidersFactory ¶
func NewStandardStorageProvidersFactory ¶
func NewStandardStorageProvidersFactory() StorageProvidersFactory
Standard implementations used in prod and when testing prod behavior
type StorageProvidersFactoryFunc ¶
type StorageProvidersFactoryFunc = FactoryFunc[storage.Storage]
Source Files
¶
Click to show internal directories.
Click to hide internal directories.