Documentation
¶
Overview ¶
Package distributed provides distributed training strategies and coordination mechanisms for multi-node machine learning workloads in the Zerfoo framework.
Index ¶
- type AllReduceStrategy
- func (s *AllReduceStrategy[T]) AllReduceGradients(gradients map[string]*tensor.TensorNumeric[T]) error
- func (s *AllReduceStrategy[T]) Barrier() error
- func (s *AllReduceStrategy[T]) BroadcastTensor(t *tensor.TensorNumeric[T], rootRank int) error
- func (s *AllReduceStrategy[T]) Init(rank, size int, coordinatorAddress string) error
- func (s *AllReduceStrategy[T]) Rank() int
- func (s *AllReduceStrategy[T]) Shutdown()
- func (s *AllReduceStrategy[T]) Size() int
- type CoordinatorClient
- type Dialer
- type GrpcServer
- type InternalStrategy
- type ListenerFactory
- type Logger
- type NetworkManager
- type ServerManager
- type ServiceClientFactory
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type AllReduceStrategy ¶
AllReduceStrategy implements a more advanced AllReduce algorithm.
func NewAllReduceStrategy ¶
func NewAllReduceStrategy[T tensor.Numeric]( localStrategy, crossNodeStrategy InternalStrategy[T], ) *AllReduceStrategy[T]
NewAllReduceStrategy creates a new AllReduceStrategy.
func (*AllReduceStrategy[T]) AllReduceGradients ¶
func (s *AllReduceStrategy[T]) AllReduceGradients(gradients map[string]*tensor.TensorNumeric[T]) error
AllReduceGradients performs hierarchical all-reduce on gradients.
func (*AllReduceStrategy[T]) Barrier ¶
func (s *AllReduceStrategy[T]) Barrier() error
Barrier synchronizes all workers across all nodes.
func (*AllReduceStrategy[T]) BroadcastTensor ¶
func (s *AllReduceStrategy[T]) BroadcastTensor(t *tensor.TensorNumeric[T], rootRank int) error
BroadcastTensor broadcasts a tensor from the root rank to all other ranks in the distributed system. The tensor is first broadcast within the root's local node, then across node leaders, and finally within each local node to ensure all ranks receive the broadcasted tensor.
func (*AllReduceStrategy[T]) Init ¶
func (s *AllReduceStrategy[T]) Init(rank, size int, coordinatorAddress string) error
Init initializes the hierarchical strategy.
func (*AllReduceStrategy[T]) Rank ¶
func (s *AllReduceStrategy[T]) Rank() int
Rank returns the rank from the local strategy.
func (*AllReduceStrategy[T]) Shutdown ¶
func (s *AllReduceStrategy[T]) Shutdown()
Shutdown gracefully closes all connections.
func (*AllReduceStrategy[T]) Size ¶
func (s *AllReduceStrategy[T]) Size() int
Size returns the size from the local strategy.
type CoordinatorClient ¶
type CoordinatorClient interface { RegisterWorker(ctx context.Context, in *pb.RegisterWorkerRequest, opts ...grpc.CallOption) (*pb.RegisterWorkerResponse, error) UnregisterWorker(ctx context.Context, in *pb.UnregisterWorkerRequest, opts ...grpc.CallOption) (*pb.UnregisterWorkerResponse, error) Heartbeat(ctx context.Context, in *pb.HeartbeatRequest, opts ...grpc.CallOption) (*pb.HeartbeatResponse, error) }
CoordinatorClient is an interface for a client of the coordinator service.
type GrpcServer ¶
type GrpcServer interface { RegisterService(desc *grpc.ServiceDesc, impl interface{}) Serve(lis net.Listener) error Stop() GracefulStop() }
GrpcServer is an interface for a gRPC server.
type InternalStrategy ¶
type InternalStrategy[T tensor.Numeric] interface { // Init initializes the strategy. Init(rank int, size int, coordinatorAddress string) error // AllReduceGradients performs an all-reduce operation on the gradients. AllReduceGradients(gradients map[string]*tensor.TensorNumeric[T]) error // Barrier blocks until all workers have reached the barrier. Barrier() error // BroadcastTensor broadcasts a tensor from the root to all other workers. BroadcastTensor(t *tensor.TensorNumeric[T], rootRank int) error // Rank returns the rank of the current worker. Rank() int // Size returns the total number of workers. Size() int // Shutdown cleans up the resources used by the strategy. Shutdown() }
InternalStrategy defines the interface for a distributed training strategy.
type ListenerFactory ¶
ListenerFactory is a function that creates a new net.Listener.
type Logger ¶
type Logger interface {
Printf(format string, v ...interface{})
}
Logger is an interface for logging.
type NetworkManager ¶
type NetworkManager interface { // ConnectToPeers establishes connections to all other workers in the cluster. ConnectToPeers(peers []string, selfRank int, timeout time.Duration) ([]pb.DistributedServiceClient, []*grpc.ClientConn, error) // CloseConnections closes all the given connections. CloseConnections(conns []*grpc.ClientConn) }
NetworkManager is an interface for managing network connections between workers.
func NewNetworkManager ¶
func NewNetworkManager(dialer Dialer, clientFactory ServiceClientFactory) NetworkManager
NewNetworkManager creates a new NetworkManager.
type ServerManager ¶
type ServerManager interface { Start(workerAddress string, service interface{}, serviceDesc *grpc.ServiceDesc) error Stop() GracefulStop() SetLogger(logger Logger) }
ServerManager is an interface for managing the gRPC server of a worker.
func NewServerManager ¶
func NewServerManager(grpcServer GrpcServer, listenerFactory ListenerFactory) ServerManager
NewServerManager creates a new ServerManager.
type ServiceClientFactory ¶
type ServiceClientFactory func(cc *grpc.ClientConn) pb.DistributedServiceClient
ServiceClientFactory is a function that creates a new DistributedServiceClient.
Directories
¶
Path | Synopsis |
---|---|
Package coordinator provides a distributed training coordinator.
|
Package coordinator provides a distributed training coordinator. |