distributed

package
v0.3.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 25, 2025 License: Apache-2.0 Imports: 8 Imported by: 0

Documentation

Overview

Package distributed provides distributed training strategies and coordination mechanisms for multi-node machine learning workloads in the Zerfoo framework.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type AllReduceStrategy

type AllReduceStrategy[T tensor.Numeric] struct {
	// contains filtered or unexported fields
}

AllReduceStrategy implements a more advanced AllReduce algorithm.

func NewAllReduceStrategy

func NewAllReduceStrategy[T tensor.Numeric](
	localStrategy, crossNodeStrategy InternalStrategy[T],
) *AllReduceStrategy[T]

NewAllReduceStrategy creates a new AllReduceStrategy.

func (*AllReduceStrategy[T]) AllReduceGradients

func (s *AllReduceStrategy[T]) AllReduceGradients(gradients map[string]*tensor.TensorNumeric[T]) error

AllReduceGradients performs hierarchical all-reduce on gradients.

func (*AllReduceStrategy[T]) Barrier

func (s *AllReduceStrategy[T]) Barrier() error

Barrier synchronizes all workers across all nodes.

func (*AllReduceStrategy[T]) BroadcastTensor

func (s *AllReduceStrategy[T]) BroadcastTensor(t *tensor.TensorNumeric[T], rootRank int) error

BroadcastTensor broadcasts a tensor from the root rank to all other ranks in the distributed system. The tensor is first broadcast within the root's local node, then across node leaders, and finally within each local node to ensure all ranks receive the broadcasted tensor.

func (*AllReduceStrategy[T]) Init

func (s *AllReduceStrategy[T]) Init(rank, size int, coordinatorAddress string) error

Init initializes the hierarchical strategy.

func (*AllReduceStrategy[T]) Rank

func (s *AllReduceStrategy[T]) Rank() int

Rank returns the rank from the local strategy.

func (*AllReduceStrategy[T]) Shutdown

func (s *AllReduceStrategy[T]) Shutdown()

Shutdown gracefully closes all connections.

func (*AllReduceStrategy[T]) Size

func (s *AllReduceStrategy[T]) Size() int

Size returns the size from the local strategy.

type CoordinatorClient

type CoordinatorClient interface {
	RegisterWorker(ctx context.Context, in *pb.RegisterWorkerRequest, opts ...grpc.CallOption) (*pb.RegisterWorkerResponse, error)
	UnregisterWorker(ctx context.Context, in *pb.UnregisterWorkerRequest, opts ...grpc.CallOption) (*pb.UnregisterWorkerResponse, error)
	Heartbeat(ctx context.Context, in *pb.HeartbeatRequest, opts ...grpc.CallOption) (*pb.HeartbeatResponse, error)
}

CoordinatorClient is an interface for a client of the coordinator service.

type Dialer

type Dialer func(ctx context.Context, target string) (*grpc.ClientConn, error)

Dialer is a function that creates a gRPC client connection.

type GrpcServer

type GrpcServer interface {
	RegisterService(desc *grpc.ServiceDesc, impl interface{})
	Serve(lis net.Listener) error
	Stop()
	GracefulStop()
}

GrpcServer is an interface for a gRPC server.

type InternalStrategy

type InternalStrategy[T tensor.Numeric] interface {
	// Init initializes the strategy.
	Init(rank int, size int, coordinatorAddress string) error
	// AllReduceGradients performs an all-reduce operation on the gradients.
	AllReduceGradients(gradients map[string]*tensor.TensorNumeric[T]) error
	// Barrier blocks until all workers have reached the barrier.
	Barrier() error
	// BroadcastTensor broadcasts a tensor from the root to all other workers.
	BroadcastTensor(t *tensor.TensorNumeric[T], rootRank int) error
	// Rank returns the rank of the current worker.
	Rank() int
	// Size returns the total number of workers.
	Size() int
	// Shutdown cleans up the resources used by the strategy.
	Shutdown()
}

InternalStrategy defines the interface for a distributed training strategy.

type ListenerFactory

type ListenerFactory func(network, address string) (net.Listener, error)

ListenerFactory is a function that creates a new net.Listener.

type Logger

type Logger interface {
	Printf(format string, v ...interface{})
}

Logger is an interface for logging.

type NetworkManager

type NetworkManager interface {
	// ConnectToPeers establishes connections to all other workers in the cluster.
	ConnectToPeers(peers []string, selfRank int, timeout time.Duration) ([]pb.DistributedServiceClient, []*grpc.ClientConn, error)
	// CloseConnections closes all the given connections.
	CloseConnections(conns []*grpc.ClientConn)
}

NetworkManager is an interface for managing network connections between workers.

func NewNetworkManager

func NewNetworkManager(dialer Dialer, clientFactory ServiceClientFactory) NetworkManager

NewNetworkManager creates a new NetworkManager.

type ServerManager

type ServerManager interface {
	Start(workerAddress string, service interface{}, serviceDesc *grpc.ServiceDesc) error
	Stop()
	GracefulStop()
	SetLogger(logger Logger)
}

ServerManager is an interface for managing the gRPC server of a worker.

func NewServerManager

func NewServerManager(grpcServer GrpcServer, listenerFactory ListenerFactory) ServerManager

NewServerManager creates a new ServerManager.

type ServiceClientFactory

type ServiceClientFactory func(cc *grpc.ClientConn) pb.DistributedServiceClient

ServiceClientFactory is a function that creates a new DistributedServiceClient.

Directories

Path Synopsis
Package coordinator provides a distributed training coordinator.
Package coordinator provides a distributed training coordinator.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL