Documentation
¶
Overview ¶
Package ccb implements the wire protocol for HTCondor's Condor Connection Broker (CCB). It is shared by CCB clients (requesters), CCB listeners (targets that register to be reachable), and CCB servers (brokers).
The protocol is described in the HTCondor C++ sources src/ccb/ccb_client.cpp, ccb_listener.cpp and ccb_server.cpp. All control messages are a single CEDAR message carrying one ClassAd. The reverse-connect "hello" is a raw command integer followed by a ClassAd in one message (no security handshake), so it can be delivered to an ordinary CEDAR command socket.
Index ¶
- Constants
- Variables
- func AdBool(ad *classad.ClassAd, name string) (bool, bool)
- func AdInt(ad *classad.ClassAd, name string) (int64, bool)
- func AdString(ad *classad.ClassAd, name string) string
- func ContactString(brokerAddr string, ccbid uint64) string
- func Dial(ctx context.Context, contacts []addresses.CCBContact, opts DialOptions) (net.Conn, error)
- func GenerateConnectID() (string, error)
- func NewAd(fields map[string]any) *classad.ClassAd
- func ReadControlAd(ctx context.Context, s *stream.Stream) (*classad.ClassAd, error)
- func ReadReverseConnectAd(ctx context.Context, msg *message.Message, cmd int) (*classad.ClassAd, error)
- func SplitBrokerList(s string) []string
- func WriteControlAd(ctx context.Context, s *stream.Stream, ad *classad.ClassAd) error
- func WriteReverseConnect(ctx context.Context, s *stream.Stream, connectID, requestID, myAddr string) error
- type ConnHandler
- type DialOptions
- type Listener
- type ListenerConfig
- type SharedPortEndpointConfig
- type StreamingUnsupportedError
Constants ¶
const ( CommandRegister = commands.CCB_REGISTER // 67 CommandRequest = commands.CCB_REQUEST // 68 CommandReverseConnect = commands.CCB_REVERSE_CONNECT // 69 CommandAlive = commands.ALIVE // 441 )
Command integers (mirrors condor_commands.h via the commands package).
const ( AttrCommand = "Command" AttrCCBID = "CCBID" AttrClaimID = "ClaimId" AttrRequestID = "RequestID" AttrMyAddress = "MyAddress" AttrResult = "Result" AttrErrorString = "ErrorString" AttrName = "Name" // Streaming/proxy extension (new; ignored by stock HTCondor). AttrCCBStreaming = "CCBStreaming" // server -> peer: capability advertisement AttrCCBStreamingRequired = "CCBStreamingRequired" // requester -> server: this request needs proxying AttrProxyMode = "ProxyMode" // server -> requester: reply will be proxied on this socket AttrCCBStreamingUnsupported = "CCBStreamingUnsupported" // server -> requester: typed "not supported" failure )
ClassAd attribute names (must match condor_attributes.h exactly).
Variables ¶
var StreamingMinVersion = version.CondorVersion{Major: 25, Minor: 12, Sub: 0}
StreamingMinVersion is the minimum broker $CondorVersion$ that is assumed to support streaming/proxy mode when no explicit capability flag is available. golang-ccb advertises a version at or above this; older C++ CCB servers fall below it, so a private requester fails fast instead of sending a request the old server would mishandle.
Functions ¶
func ContactString ¶
ContactString builds a CCB contact "<brokerAddr>#<ccbid>" (broker address without angle brackets), matching CCBServer::CCBIDToContactString.
func Dial ¶
func Dial(ctx context.Context, contacts []addresses.CCBContact, opts DialOptions) (net.Conn, error)
Dial reaches a target daemon through the CCB broker(s) named by contacts, using connection reversal, and returns a net.Conn connected to the target (the reverse-connect hello already consumed and validated). The caller then runs the normal CEDAR client handshake over the returned connection.
When more than one broker contact is given, Dial uses a Happy-Eyeballs-style algorithm: it tries brokers in randomized order, starting an attempt to the next broker after Stagger has elapsed (or immediately if an outstanding attempt has already failed), and returns the first connection that succeeds. Remaining attempts are cancelled. This bounds the latency impact of a slow or dead CCB server in a multi-CCB pool. Each attempt uses its own connect id, so concurrent attempts to different brokers do not collide.
func GenerateConnectID ¶
GenerateConnectID returns a fresh connect id: 20 random bytes rendered as 40 hex characters, matching CCBClient's connect-id generation.
func NewAd ¶
NewAd builds a ClassAd from a map of attribute values. Supported value types are string, int, int64, uint64, and bool.
func ReadControlAd ¶
ReadControlAd reads a single control message (one ClassAd) from the stream.
func ReadReverseConnectAd ¶
func ReadReverseConnectAd(ctx context.Context, msg *message.Message, cmd int) (*classad.ClassAd, error)
ReadReverseConnectAd reads the ClassAd of a reverse-connect hello when the command integer has already been consumed from msg (e.g. by a dispatching server). It validates that cmd is CCB_REVERSE_CONNECT.
func SplitBrokerList ¶
SplitBrokerList splits a CCB broker list as written in configuration (e.g. the value of CCB_ADDRESS), which is separated by commas and/or whitespace, into individual broker addresses.
func WriteControlAd ¶
WriteControlAd writes a single control message (one ClassAd terminated by end-of-message) on the stream. Used for CCB_REGISTER, CCB_REQUEST forwarding, result reports, heartbeats and replies.
func WriteReverseConnect ¶
func WriteReverseConnect(ctx context.Context, s *stream.Stream, connectID, requestID, myAddr string) error
WriteReverseConnect writes the raw reverse-connect hello: the CCB_REVERSE_CONNECT command integer followed by a ClassAd, in one message. The ClassAd carries ClaimId (the connect id), RequestID and MyAddress.
Types ¶
type ConnHandler ¶
ConnHandler is called with each inbound reverse connection a Listener accepts on behalf of the registered daemon. By the time it is called the reverse-connect hello has been sent; the connection is now an ordinary inbound CEDAR command socket on which the caller is the server (the remote requester will drive DC_AUTHENTICATE). The handler owns the conn and must close it.
type DialOptions ¶
type DialOptions struct {
// Security is the credential/config used to authenticate to the CCB
// broker. Its Command is overridden with CCB_REQUEST. Required.
Security *security.SecurityConfig
// MyAddress, if set, is the reverse-connect address advertised to the
// target (HTCondor sinful, e.g. "<1.2.3.4:5678>"). If empty, the address
// of the local reverse-connect listener is used. Ignored in proxy mode.
MyAddress string
// ProxyReturnAddr, if set, switches the dial to streaming/proxy mode: the
// requester is itself private, so instead of listening it asks the broker
// to proxy. The value is the requester's own CCB sinful (carrying a ccbid),
// sent as MyAddress so the broker recognizes proxy mode.
ProxyReturnAddr string
// RequireStreaming makes proxy mode mandatory: if the broker does not
// support streaming, the dial fails fast rather than falling back.
RequireStreaming bool
// TargetDesc is a human-readable description of the target (debugging).
TargetDesc string
// ListenAddr is the bind address for the reverse-connect listener in
// standard mode (default ":0"). Ignored when SharedPortEndpoint is set.
ListenAddr string
// target's reverse connection through a shared-port endpoint (a Unix socket
// that a condor_shared_port daemon forwards connections to) instead of
// opening its own TCP listen socket. This lets a requester whose only
// inbound path is a shared port still be reached by the reverse connection.
// Ignored in proxy mode (ProxyReturnAddr set).
SharedPortEndpoint *SharedPortEndpointConfig
// Timeout bounds the whole dial (default 30s).
Timeout time.Duration
// Stagger is the Happy-Eyeballs delay between starting attempts to
// successive brokers (default 250ms). A new broker attempt is also started
// immediately whenever an outstanding attempt fails. Set to a negative
// value to force fully-sequential dialing (one broker at a time).
Stagger time.Duration
}
DialOptions configures a CCB reverse-connect dial.
type Listener ¶
type Listener struct {
// contains filtered or unexported fields
}
Listener maintains a persistent registration with one or more CCB brokers and services reverse-connect requests from any of them. Each broker is handled by an independent registration (its own connection, heartbeat, and reconnect loop), so one broker going down does not affect the others.
func NewListener ¶
func NewListener(cfg ListenerConfig) *Listener
NewListener creates a Listener for all configured brokers.
func (*Listener) BrokerSupportsStreaming ¶
BrokerSupportsStreaming reports whether any currently-registered broker advertised streaming support. Returns false if no broker is registered.
func (*Listener) Contact ¶
Contact returns a single assigned contact (the first registered broker's), for convenience in the common single-broker case. Empty until registered.
func (*Listener) ContactList ¶
ContactList returns Contacts joined by spaces, i.e. the value to use for the "ccbid" parameter of a sinful string.
func (*Listener) Contacts ¶
Contacts returns the CCB contact strings ("addr#id") currently assigned by the brokers this listener is registered with, one per registered broker. Together they are the ccbid list to advertise in the daemon's sinful. Brokers that are not currently registered are omitted.
func (*Listener) NumRegistered ¶
NumRegistered returns how many of the configured brokers are currently registered.
type ListenerConfig ¶
type ListenerConfig struct {
// BrokerAddrs are the CCB broker addresses ("host:port", brackets
// optional) to register with. A daemon in a pool with multiple CCB
// servers registers with all of them, and the union of the assigned
// contacts (see Contacts) forms the ccbid list in the daemon's sinful.
BrokerAddrs []string
// BrokerAddr is a convenience for the single-broker case. If set, it is
// appended to BrokerAddrs.
BrokerAddr string
// Security authenticates registration to the broker. Its Command is
// overridden with CCB_REGISTER. Required.
Security *security.SecurityConfig
// Handler receives each reverse-connected inbound connection. Required.
Handler ConnHandler
// Name identifies this daemon to the broker (debugging only).
Name string
// HeartbeatInterval is how often to send ALIVE (default 1200s, min 30s).
HeartbeatInterval time.Duration
// ReconnectInterval is how long to wait before re-registering after a
// broker connection drops (default 60s).
ReconnectInterval time.Duration
// DialTimeout bounds reverse-connect dials to requesters (default 30s).
DialTimeout time.Duration
}
ListenerConfig configures a CCB Listener (a daemon registering itself with one or more brokers so private peers can reach it).
type SharedPortEndpointConfig ¶
type SharedPortEndpointConfig struct {
// this endpoint. It is used to build the advertised return address
// "<host:port?sock=NAME>" so the target reverse-connects through the shared
// port. Required.
SharedPortAddr string
// sockets (its DAEMON_SOCKET_DIR). The endpoint socket is created here so
// the daemon can forward connections to it. If empty, the socket is created
// in a temporary directory -- only useful when the shared_port daemon shares
// that filesystem location, so prefer setting it explicitly.
SocketDir string
// a random ("anonymous") name is generated.
SocketName string
}
SharedPortEndpointConfig configures a requester's incoming reverse-connect port when it is managed by a condor_shared_port daemon rather than a private TCP listen socket.
type StreamingUnsupportedError ¶
StreamingUnsupportedError indicates the broker cannot honor a required streaming/proxy request.
func (*StreamingUnsupportedError) Error ¶
func (e *StreamingUnsupportedError) Error() string