Documentation
¶
Index ¶
- Variables
- func CreateMessageSerDeRegistry() (*ncl.MessageSerDeRegistry, error)
- func NewBidder(config ComputeConfig, publishers publisher.PublisherProvider, ...) compute.Bidder
- type AuthenticatorsFactory
- type AuthenticatorsFactoryFunc
- type Compute
- type ComputeConfig
- type ComputeConfigParams
- type ConfigLabelsProvider
- type ExecutorsFactory
- type ExecutorsFactoryFunc
- type Factory
- type FactoryFunc
- type FeatureConfig
- type JobSelectionPolicy
- type NetworkConfig
- type Node
- type NodeConfig
- type NodeDependencyInjector
- type PublishersFactory
- type PublishersFactoryFunc
- type Requester
- type RequesterConfig
- type RequesterConfigParams
- type RuntimeLabelsProvider
- type StorageProvidersFactory
- type StorageProvidersFactoryFunc
Constants ¶
This section is empty.
Variables ¶
View Source
var DefaultRequesterConfig = RequesterConfigParams{ JobDefaults: transformer.JobDefaults{ TotalTimeout: models.NoTimeout, }, HousekeepingBackgroundTaskInterval: 30 * time.Second, HousekeepingTimeoutBuffer: 2 * time.Minute, NodeRankRandomnessRange: 5, OverAskForBidsFactor: 3, MinBacalhauVersion: models.BuildVersionInfo{ Major: "1", Minor: "0", GitVersion: "v1.0.4", }, EvalBrokerVisibilityTimeout: 60 * time.Second, EvalBrokerInitialRetryDelay: 1 * time.Second, EvalBrokerSubsequentRetryDelay: 30 * time.Second, EvalBrokerMaxRetryCount: 10, WorkerCount: runtime.NumCPU(), WorkerEvalDequeueTimeout: 5 * time.Second, WorkerEvalDequeueBaseBackoff: 1 * time.Second, WorkerEvalDequeueMaxBackoff: 30 * time.Second, S3PreSignedURLDisabled: false, S3PreSignedURLExpiration: 30 * time.Minute, TranslationEnabled: false, ControlPlaneSettings: types.RequesterControlPlaneConfig{ HeartbeatCheckFrequency: types.Duration(30 * time.Second), HeartbeatTopic: "heartbeat", NodeDisconnectedAfter: types.Duration(30 * time.Second), }, NodeInfoStoreTTL: 10 * time.Minute, DefaultApprovalState: models.NodeMembership.APPROVED, }
View Source
var TestRequesterConfig = RequesterConfigParams{ JobDefaults: transformer.JobDefaults{ TotalTimeout: 30 * time.Second, }, HousekeepingBackgroundTaskInterval: 30 * time.Second, HousekeepingTimeoutBuffer: 100 * time.Millisecond, NodeRankRandomnessRange: 5, OverAskForBidsFactor: 3, MinBacalhauVersion: models.BuildVersionInfo{ Major: "1", Minor: "0", GitVersion: "v1.0.4", }, EvalBrokerVisibilityTimeout: 5 * time.Second, EvalBrokerInitialRetryDelay: 100 * time.Millisecond, EvalBrokerSubsequentRetryDelay: 100 * time.Millisecond, EvalBrokerMaxRetryCount: 3, WorkerCount: 3, WorkerEvalDequeueTimeout: 200 * time.Millisecond, WorkerEvalDequeueBaseBackoff: 20 * time.Millisecond, WorkerEvalDequeueMaxBackoff: 200 * time.Millisecond, NodeOverSubscriptionFactor: 1.5, TranslationEnabled: false, S3PreSignedURLDisabled: false, S3PreSignedURLExpiration: 30 * time.Minute, ControlPlaneSettings: types.RequesterControlPlaneConfig{ HeartbeatCheckFrequency: types.Duration(30 * time.Second), HeartbeatTopic: "heartbeat", NodeDisconnectedAfter: types.Duration(30 * time.Second), }, DefaultApprovalState: models.NodeMembership.APPROVED, }
Functions ¶
func CreateMessageSerDeRegistry ¶ added in v1.5.0
func CreateMessageSerDeRegistry() (*ncl.MessageSerDeRegistry, error)
CreateMessageSerDeRegistry creates a new payload registry.
func NewBidder ¶ added in v1.3.1
func NewBidder( config ComputeConfig, publishers publisher.PublisherProvider, storages storage.StorageProvider, executors executor.ExecutorProvider, runningCapacityTracker capacity.Tracker, nodeID string, executionStore store.ExecutionStore, computeCallback compute.Callback, bufferRunner *compute.ExecutorBuffer, apiServer *publicapi.Server, calculator capacity.UsageCalculator, ) compute.Bidder
Types ¶
type AuthenticatorsFactory ¶ added in v1.2.1
type AuthenticatorsFactory = Factory[authn.Authenticator]
func NewStandardAuthenticatorsFactory ¶ added in v1.2.1
func NewStandardAuthenticatorsFactory(userKey *baccrypto.UserKey) AuthenticatorsFactory
type AuthenticatorsFactoryFunc ¶ added in v1.2.1
type AuthenticatorsFactoryFunc = FactoryFunc[authn.Authenticator]
type Compute ¶
type Compute struct {
// Visible for testing
ID string
LocalEndpoint compute.Endpoint
Capacity capacity.Tracker
ExecutionStore store.ExecutionStore
Executors executor.ExecutorProvider
Storages storage.StorageProvider
Bidder compute.Bidder
ManagementClient *compute.ManagementClient
// contains filtered or unexported fields
}
func NewComputeNode ¶
func NewComputeNode( ctx context.Context, nodeID string, apiServer *publicapi.Server, cfg types.BacalhauConfig, config ComputeConfig, storages storage.StorageProvider, executors executor.ExecutorProvider, publishers publisher.PublisherProvider, natsConn *nats.Conn, computeCallback compute.Callback, managementProxy compute.ManagementEndpoint, configuredLabels map[string]string, messageSerDeRegistry *ncl.MessageSerDeRegistry, ) (*Compute, error)
type ComputeConfig ¶
type ComputeConfig struct {
// Capacity config
TotalResourceLimits models.Resources
JobResourceLimits models.Resources
DefaultJobResourceLimits models.Resources
IgnorePhysicalResourceLimits bool
// JobNegotiationTimeout default timeout value to hold a bid for a job
JobNegotiationTimeout time.Duration
// MinJobExecutionTimeout default value for the minimum execution timeout this compute node supports. Jobs with
// lower timeout requirements will not be bid on.
MinJobExecutionTimeout time.Duration
// MaxJobExecutionTimeout default value for the maximum execution timeout this compute node supports. Jobs with
// higher timeout requirements will not be bid on.
MaxJobExecutionTimeout time.Duration
// DefaultJobExecutionTimeout default value for the execution timeout this compute node will assign to jobs with
// no timeout requirement defined.
DefaultJobExecutionTimeout time.Duration
// JobExecutionTimeoutClientIDBypassList is the list of clients that are allowed to bypass the job execution timeout
// check.
JobExecutionTimeoutClientIDBypassList []string
// Bid strategies config
JobSelectionPolicy JobSelectionPolicy
// logging running executions
LogRunningExecutionsInterval time.Duration
// How many messages to buffer in the log stream channel
LogStreamBufferSize int
FailureInjectionConfig models.FailureInjectionComputeConfig
BidSemanticStrategy bidstrategy.SemanticBidStrategy
BidResourceStrategy bidstrategy.ResourceBidStrategy
ExecutionStore store.ExecutionStore
LocalPublisher types.LocalPublisherConfig
ControlPlaneSettings types.ComputeControlPlaneConfig
}
func NewComputeConfigWith ¶
func NewComputeConfigWith(executionDir string, params ComputeConfigParams) (ComputeConfig, error)
func NewComputeConfigWithDefaults ¶
func NewComputeConfigWithDefaults(executionDir string) (ComputeConfig, error)
type ComputeConfigParams ¶
type ComputeConfigParams struct {
// Capacity config
TotalResourceLimits models.Resources
JobResourceLimits models.Resources
DefaultJobResourceLimits models.Resources
PhysicalResourcesProvider capacity.Provider
IgnorePhysicalResourceLimits bool
// Timeout config
JobNegotiationTimeout time.Duration
MinJobExecutionTimeout time.Duration
MaxJobExecutionTimeout time.Duration
DefaultJobExecutionTimeout time.Duration
JobExecutionTimeoutClientIDBypassList []string
// Bid strategies config
JobSelectionPolicy JobSelectionPolicy
// logging running executions
LogRunningExecutionsInterval time.Duration
// How many messages to buffer in the log stream channel
LogStreamBufferSize int
FailureInjectionConfig models.FailureInjectionComputeConfig
BidSemanticStrategy bidstrategy.SemanticBidStrategy
BidResourceStrategy bidstrategy.ResourceBidStrategy
ExecutionStore store.ExecutionStore
LocalPublisher types.LocalPublisherConfig
ControlPlaneSettings types.ComputeControlPlaneConfig
}
func NewDefaultComputeParam ¶ added in v1.3.2
func NewDefaultComputeParam(storagePath string) ComputeConfigParams
type ConfigLabelsProvider ¶ added in v1.2.1
type ConfigLabelsProvider struct {
// contains filtered or unexported fields
}
type ExecutorsFactory ¶
func NewPluginExecutorFactory ¶ added in v1.0.4
func NewPluginExecutorFactory(pluginPath string) ExecutorsFactory
func NewStandardExecutorsFactory ¶
func NewStandardExecutorsFactory(cfg types.DockerCacheConfig) ExecutorsFactory
type ExecutorsFactoryFunc ¶
type ExecutorsFactoryFunc = FactoryFunc[executor.Executor]
type Factory ¶ added in v1.2.1
type Factory[P provider.Providable] interface { Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error) }
Interfaces to inject dependencies into the stack
type FactoryFunc ¶ added in v1.2.1
type FactoryFunc[P provider.Providable] func(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)
Functions that implement the factories for easier creation of new implementations
func (FactoryFunc[P]) Get ¶ added in v1.2.1
func (f FactoryFunc[P]) Get(ctx context.Context, nodeConfig NodeConfig) (provider.Provider[P], error)
type FeatureConfig ¶ added in v0.3.29
type JobSelectionPolicy ¶ added in v1.0.4
type JobSelectionPolicy struct {
// this describes if we should run a job based on
// where the data is located - i.e. if the data is "local"
// or if the data is "anywhere"
Locality semantic.JobSelectionDataLocality `json:"locality"`
// should we reject jobs that don't specify any data
// the default is "accept"
RejectStatelessJobs bool `json:"reject_stateless_jobs"`
// should we accept jobs that specify networking
// the default is "reject"
AcceptNetworkedJobs bool `json:"accept_networked_jobs"`
// external hooks that decide if we should take on the job or not
// if either of these are given they will override the data locality settings
ProbeHTTP string `json:"probe_http,omitempty"`
ProbeExec string `json:"probe_exec,omitempty"`
}
JobSelectionPolicy describe the rules for how a compute node selects an incoming job
func NewDefaultJobSelectionPolicy ¶ added in v1.0.4
func NewDefaultJobSelectionPolicy() JobSelectionPolicy
type NetworkConfig ¶ added in v1.2.1
type NetworkConfig struct {
// NATS config for requesters to be reachable by compute nodes
Port int
AdvertisedAddress string
Orchestrators []string
// Storage directory for NATS features that require it
StoreDir string
// AuthSecret is a secret string that clients must use to connect. NATS servers
// must supply this config, while clients can also supply it as the user part
// of their Orchestrator URL.
AuthSecret string
// NATS config for requester nodes to connect with each other
ClusterName string
ClusterPort int
ClusterAdvertisedAddress string
// When using NATS, never set this value unless you are connecting multiple requester
// nodes together. This should never reference this current running instance (e.g.
// don't use localhost).
ClusterPeers []string
}
func (*NetworkConfig) Validate ¶ added in v1.2.1
func (c *NetworkConfig) Validate() error
type Node ¶
type Node struct {
// Visible for testing
ID string
APIServer *publicapi.Server
ComputeNode *Compute
RequesterNode *Requester
CleanupManager *system.CleanupManager
}
func NewNode ¶
func NewNode( ctx context.Context, bacalhauConfig types.BacalhauConfig, config NodeConfig, fsr *repo.FsRepo, ) (*Node, error)
func (*Node) IsComputeNode ¶
IsComputeNode returns true if the node is a compute node
func (*Node) IsRequesterNode ¶
IsRequesterNode returns true if the node is a requester node
type NodeConfig ¶
type NodeConfig struct {
NodeID string
CleanupManager *system.CleanupManager
HostAddress string
APIPort uint16
RequesterAutoCert string
RequesterAutoCertCache string
RequesterTLSCertificateFile string
RequesterTLSKeyFile string
RequesterSelfSign bool
DisabledFeatures FeatureConfig
ComputeConfig ComputeConfig
RequesterNodeConfig RequesterConfig
APIServerConfig publicapi.Config
AuthConfig types.AuthConfig
NodeType models.NodeType
IsRequesterNode bool
IsComputeNode bool
Labels map[string]string
DependencyInjector NodeDependencyInjector
AllowListedLocalPaths []string
NetworkConfig NetworkConfig
}
Node configuration
func (*NodeConfig) Validate ¶ added in v1.2.1
func (c *NodeConfig) Validate() error
type NodeDependencyInjector ¶
type NodeDependencyInjector struct {
StorageProvidersFactory StorageProvidersFactory
ExecutorsFactory ExecutorsFactory
PublishersFactory PublishersFactory
AuthenticatorsFactory AuthenticatorsFactory
}
Lazy node dependency injector that generate instances of different components on demand and based on the configuration provided.
func NewExecutorPluginNodeDependencyInjector ¶ added in v1.0.4
func NewExecutorPluginNodeDependencyInjector( cfg types.BacalhauConfig, userKey *baccrypto.UserKey, pluginPath string, ) NodeDependencyInjector
func NewStandardNodeDependencyInjector ¶
func NewStandardNodeDependencyInjector(cfg types.BacalhauConfig, userKey *baccrypto.UserKey) NodeDependencyInjector
type PublishersFactory ¶
func NewStandardPublishersFactory ¶
func NewStandardPublishersFactory(cfg types.BacalhauConfig) PublishersFactory
type PublishersFactoryFunc ¶
type PublishersFactoryFunc = FactoryFunc[publisher.Publisher]
type Requester ¶
type Requester struct {
// Visible for testing
Endpoint *orchestrator.BaseEndpoint
JobStore jobstore.Store
// We need a reference to the node info store until libp2p is removed
NodeInfoStore routing.NodeInfoStore
NodeDiscoverer orchestrator.NodeDiscoverer
// contains filtered or unexported fields
}
func NewRequesterNode ¶
func NewRequesterNode( ctx context.Context, nodeID string, apiServer *publicapi.Server, nodeConfig NodeConfig, metricsConfig types.MetricsConfig, requesterConfig RequesterConfig, transportLayer *nats_transport.NATSTransport, computeProxy compute.Endpoint, messageSerDeRegistry *ncl.MessageSerDeRegistry, ) (*Requester, error)
type RequesterConfig ¶
type RequesterConfig struct {
RequesterConfigParams
}
func NewRequesterConfigWith ¶
func NewRequesterConfigWith(params RequesterConfigParams) (RequesterConfig, error)
func NewRequesterConfigWithDefaults ¶
func NewRequesterConfigWithDefaults() (RequesterConfig, error)
type RequesterConfigParams ¶
type RequesterConfigParams struct {
JobDefaults transformer.JobDefaults
HousekeepingBackgroundTaskInterval time.Duration
HousekeepingTimeoutBuffer time.Duration
NodeRankRandomnessRange int
OverAskForBidsFactor uint
JobSelectionPolicy JobSelectionPolicy
ExternalValidatorWebhook *url.URL
FailureInjectionConfig models.FailureInjectionRequesterConfig
// minimum version of compute nodes that the requester will accept and route jobs to
MinBacalhauVersion models.BuildVersionInfo
RetryStrategy orchestrator.RetryStrategy
// evaluation broker config
EvalBrokerVisibilityTimeout time.Duration
EvalBrokerInitialRetryDelay time.Duration
EvalBrokerSubsequentRetryDelay time.Duration
EvalBrokerMaxRetryCount int
// worker config
WorkerCount int
WorkerEvalDequeueTimeout time.Duration
WorkerEvalDequeueBaseBackoff time.Duration
WorkerEvalDequeueMaxBackoff time.Duration
// scheduler config
SchedulerQueueBackoff time.Duration
NodeOverSubscriptionFactor float64
// Should the orchestrator attempt to translate jobs?
TranslationEnabled bool
S3PreSignedURLDisabled bool
S3PreSignedURLExpiration time.Duration
JobStore jobstore.Store
NodeInfoStoreTTL time.Duration
DefaultPublisher string
// When new nodes join the cluster, what state do they have? By default, APPROVED, and
// for tests, APPROVED. We will provide an option to set this to PENDING for production
// or for when operators are ready to control node approval.
DefaultApprovalState models.NodeMembershipState
ControlPlaneSettings types.RequesterControlPlaneConfig
}
type RuntimeLabelsProvider ¶ added in v1.2.1
type RuntimeLabelsProvider struct{}
type StorageProvidersFactory ¶
func NewStandardStorageProvidersFactory ¶
func NewStandardStorageProvidersFactory(cfg types.BacalhauConfig) StorageProvidersFactory
Standard implementations used in prod and when testing prod behavior
type StorageProvidersFactoryFunc ¶
type StorageProvidersFactoryFunc = FactoryFunc[storage.Storage]
Source Files
¶
Click to show internal directories.
Click to hide internal directories.