Documentation
¶
Overview ¶
Package call implements an RPC mechanism.
Index ¶
- Constants
- Variables
- func RoundRobin() *roundRobin
- func Serve(ctx context.Context, l net.Listener, hmap *HandlerMap, opts ServerOptions) error
- func ServeOn(ctx context.Context, conn net.Conn, hmap *HandlerMap, opts ServerOptions)
- type Balancer
- type CallOptions
- type ClientOptions
- type Connection
- type Endpoint
- type Handler
- type HandlerMap
- type MethodKey
- type NetEndpoint
- type NetworkAddress
- type Resolver
- type ServerOptions
- type Version
Constants ¶
const ( // CommunicationError is the type of the error returned by a call when some // communication error is encountered, typically a process or network // error. Check for it via errors.Is(call.CommunicationError). CommunicationError transportError = iota // Unreachable is the type of the error returned by a call when every // server is unreachable. Check for it via errors.Is(call.Unreachable). Unreachable )
Variables ¶
var Missing = Version{"__tombstone__"}
Missing is the version associated with a value that does not exist in the store.
TODO(rgrandl): this should be the same as the one in gke/internal/store/store.go. Should we move the version into a common file? Right now we have a duplicate version struct that is used both by the gke/store and stub/resolver.
Functions ¶
func Serve ¶
func Serve(ctx context.Context, l net.Listener, hmap *HandlerMap, opts ServerOptions) error
Serve starts listening for connections and requests on l. Handlers to handle incoming requests are found in hmap.
func ServeOn ¶
func ServeOn(ctx context.Context, conn net.Conn, hmap *HandlerMap, opts ServerOptions)
ServeOn serves client requests received over an already established network connection with a client. This can be useful in tests or when using custom networking transports.
Types ¶
type Balancer ¶
type Balancer interface {
// Update updates the current set of endpoints from which the Balancer can
// pick. Before Update is called for the first time, the set of endpoints
// is empty.
Update(endpoints []Endpoint)
// Pick picks an endpoint. Pick is guaranteed to return an endpoint that
// was passed to the most recent call of Update. If there are no endpoints,
// then Pick returns an error that includes Unreachable.
Pick(CallOptions) (Endpoint, error)
}
A Balancer picks the endpoint to wich which an RPC client performs a call. A Balancer should only be used by a single goroutine.
TODO(mwhittaker): Right now, balancers have no load information about endpoints. In the short term, we can at least add information about the number of pending requests for every endpoint.
TODO(mwhittaker): Right now, we pass a balancer the set of all endpoints. We instead probably want to pass it only the endpoints for which we have a connection. This means we may have to form connections more eagerly.
TODO(mwhittaker): We may want to guarantee that Update() is never called with an empty list of addresses. If we don't have addresses, then we don't need to do balancing.
func BalancerFunc ¶
func BalancerFunc(pick func([]Endpoint, CallOptions) (Endpoint, error)) Balancer
BalancerFunc returns a stateless, purely functional load balancer that uses the provided picking function.
type CallOptions ¶
type CallOptions struct {
// ShardKey, if not 0, is the shard key that a Balancer can use to route a
// call. A Balancer can always choose to ignore the ShardKey.
//
// TODO(mwhittaker): Figure out a way to have 0 be a valid shard key. Could
// change to *uint64 for example.
ShardKey uint64
// Balancer, if not nil, is the Balancer to use for a call, instead of the
// Balancer that the client was constructed with (provided in
// ClientOptions).
Balancer Balancer
}
CallOptions are call-specific options.
type ClientOptions ¶
type ClientOptions struct {
// Load balancer. Defaults to RoundRobin() if nil.
Balancer Balancer
// Logger. Defaults to a logger that logs to stderr.
Logger logtype.Logger
// If non-zero, each call will optimistically spin for a given duration
// before blocking, waiting for the results.
OptimisticSpinDuration time.Duration
// If non-zero, all writes smaller than this limit are flattened into
// a single buffer before being written on the connection.
WriteFlattenLimit int
}
ClientOptions are the options to configure an RPC client.
type Connection ¶
type Connection interface {
// Call makes an RPC over a Connection.
Call(context.Context, MethodKey, []byte, CallOptions) ([]byte, error)
// Close closes a connection. Pending invocations of Call are cancelled and
// return an error. All future invocations of Call fail and return an error
// immediately. Close can be called more than once.
Close()
}
Connection allows a client to send RPCs.
func Connect ¶
func Connect(ctx context.Context, resolver Resolver, opts ClientOptions) (Connection, error)
Connect creates a connection to the servers at the endpoints returned by the resolver.
type Endpoint ¶
type Endpoint interface {
// Dial returns an network connection to the endpoint.
Dial(ctx context.Context) (net.Conn, error)
// Address returns the address of the endpoint. If two endpoints have the
// same address, then they are guaranteed to represent the same endpoint.
// But, two endpoints with different addresses may also represent the same
// endpoint (e.g., TCP("golang.org:http") and TCP("golang.org:80")).
Address() string
}
An endpoint is a dialable entity with an address. For example, TCP("localhost:8000") returns an endpoint that dials the TCP server running on localhost:8000, and Unix("/tmp/unix.sock") returns an endpoint that dials the Unix socket /tmp/unix.sock.
type Handler ¶
Handler is a function that handles remote procedure calls. Regular application errors should be serialized in the returned bytes. A Handler should only return a non-nil error if the handler was not able to execute successfully.
type HandlerMap ¶
type HandlerMap struct {
// contains filtered or unexported fields
}
HandlerMap is a mapping from MethodID to a Handler. The zero value for a HandlerMap is an empty map.
func (*HandlerMap) Set ¶
func (hm *HandlerMap) Set(component, method string, handler Handler)
Set registers a handler for the specified method of component.
type MethodKey ¶
type MethodKey [16]byte
MethodKey identifies a particular method on a component (formed by fingerprinting the component and method name).
func MakeMethodKey ¶
MakeMethodKey returns the fingerprint for the specified method on component.
type NetEndpoint ¶
type NetEndpoint struct {
Net string // e.g., "tcp", "udp", "unix"
Addr string // e.g., "localhost:8000", "/tmp/unix.sock"
}
NetEndpoint is an Endpoint that implements Dial using net.Dial.
func TCP ¶
func TCP(address string) NetEndpoint
TCP returns a TCP endpoint. The provided address is passed to net.Dial. For example:
TCP("golang.org:http")
TCP("192.0.2.1:http")
TCP("198.51.100.1:80")
TCP("[2001:db8::1]:domain")
TCP("[fe80::1%lo0]:53")
TCP(":80")
func Unix ¶
func Unix(filename string) NetEndpoint
Unix returns an endpoint that uses Unix sockets. The provided filename is passed to net.Dial. For example:
Unix("unix.sock")
Unix("/tmp/unix.sock")
func (NetEndpoint) Address ¶
func (ne NetEndpoint) Address() string
Address implements the Endpoint interface.
type NetworkAddress ¶
type NetworkAddress string
A NetworkAddress is a string of the form <network>://<address> (e.g., "tcp://localhost:8000", "unix:///tmp/unix.sock").
func (NetworkAddress) Split ¶
func (na NetworkAddress) Split() (network string, address string, err error)
Split splits the network and address from a NetworkAddress. For example,
NetworkAddress("tcp://localhost:80").Split() // "tcp", "localhost:80"
NetworkAddress("unix://unix.sock").Split() // "unix", "unix.sock"
type Resolver ¶
type Resolver interface {
// IsConstant returns whether a resolver is constant. A constant resolver
// returns a fixed set of endpoints that doesn't change over time. A
// non-constant resolver manages a set of endpoints that does change over
// time.
IsConstant() bool
// Resolve returns a resolver's set of dialable endpoints. For non-constant
// resolvers, this set of endpoints may change over time. Every snapshot of
// the set of endpoints is assigned a unique version. If you call the
// Resolve method with a nil version, Resolve returns the current set of
// endpoints and its version. If you call the Resolve method with a non-nil
// version, then a Resolver either:
// 1. Blocks until the latest set of endpoints has a version newer than
// the one provided, returning the new set of endpoints and a new
// version.
// 2. Returns the same version, indicating that the Resolve should
// be called again after an appropriate delay.
//
// Example:
// if !resolver.IsConstant() {
// // Perform an unversioned, non-blocking Resolve to get the the
// // latest set of endpoints and its version.
// endpoints, version, err := resolver.Resolve(ctx, nil)
//
// // Peform a versioned Resolve that either (1) blocks until a set
// // of endpoints exists with a version newer than `version`, or
// // (2) returns `version`, indicating that the Resolve should be
// // called again after an appropriate delay.
// newEndpoints, newVersion, err := resolver.Resolve(ctx, version)
// }
//
// If the resolver is constant, then Resolve only needs to be called once
// with a nil version. The returned set of endpoints will never change, and
// the returned version is nil.
//
// if resolver.IsConstant() {
// // endpoints1 == endpoints2, and version1 == version2 == nil.
// endpoints1, version1, err := resolver.Resolve(ctx, nil)
// endpoints2, version2, err := resolver.Resolve(ctx, nil)
// }
Resolve(ctx context.Context, version *Version) ([]Endpoint, *Version, error)
}
A Resolver manages a potentially changing set of endpoints. For example:
- A DNS resolver might resolve a hostname like "google.com" into a set of IP addresses like ["74.125.142.106", "74.125.142.103", "74.125.142.99"].
- A Kubernetes Service resolver might resolve a service name like "backend" into the IP addresses of the pods that implement the service.
- A unix resolver might resolve a directory name like "/tmp/workers" into the set of unix socket files within the directory.
A Resolver can safely be used concurrently from multiple goroutines.
Example usage:
func printAddrs(ctx context.Context, resolver Resolver) error {
var version *Version
for ctx.Err() == nil {
endpoints, newVersion, err = resolver.Resolve(ctx, version)
if err != nil {
return err
}
version = newVersion
for _, endpoint := range endpoints {
fmt.Println(endpoint.Address())
}
if resolver.IsConstant() {
return nil
}
}
return ctx.Err()
}
func NewConstantResolver ¶
NewConstantResolver returns a new resolver that returns the provided set of endpoints.
func NewFileResolver ¶
NewFileResolver returns a new resolver that returns a given set of endpoints when a given file is created.
type ServerOptions ¶
type ServerOptions struct {
// Logger. Defaults to a logger that logs to stderr.
Logger logtype.Logger
// Tracer. Defaults to a discarding tracer.
Tracer trace.Tracer
// If non-zero, calls on the server are inlined and a new goroutine is
// launched only if the call takes longer than the provided duration.
InlineHandlerDuration time.Duration
// If non-zero, all writes smaller than this limit are flattened into
// a single buffer before being written on the connection.
WriteFlattenLimit int
}
ServerOption are the options to configure an RPC server.
type Version ¶
type Version struct {
Opaque string
}
Version is the version associated with a resolver's set of endpoints. Versions are opaque entities and should not be inspected or interpreted. Versions should only ever be constructed by calling a resolver's Resolve method and should only ever be used by being passed to the same resolver's Resolve method.