Documentation
¶
Overview ¶
Package cbdatasource streams data from a Couchbase cluster. It is implemented using Couchbase DCP protocol and has auto-reconnecting and auto-restarting goroutines underneath the hood to provide a simple, high-level cluster-wide abstraction. By using cbdatasource, your application does not need to worry about connections or reconnections to individual server nodes or cluster topology changes, rebalance & failovers. The API starting point is NewBucketDataSource().
Index ¶
- Variables
- func ExponentialBackoffLoop(name string, f func() int, startSleepMS int, backoffFactor float32, ...)
- func ParseFailOverLog(body []byte) ([][]uint64, error)
- func UPROpen(mc *memcached.Client, name string, bufSize uint32) error
- type AllServerURLsConnectBucketError
- type AuthFailError
- type Bucket
- type BucketDataSource
- type BucketDataSourceOptions
- type BucketDataSourceStats
- type Receiver
- type VBucketMetaData
- type VBucketState
Constants ¶
This section is empty.
Variables ¶
var DefaultBucketDataSourceOptions = &BucketDataSourceOptions{
ClusterManagerBackoffFactor: 1.5,
ClusterManagerSleepInitMS: 100,
ClusterManagerSleepMaxMS: 1000,
DataManagerBackoffFactor: 1.5,
DataManagerSleepInitMS: 100,
DataManagerSleepMaxMS: 1000,
FeedBufferSizeBytes: 20000000,
FeedBufferAckThreshold: 0.2,
}
DefaultBucketDataSourceOptions defines the default options that will be used if nil is provided to NewBucketDataSource().
Functions ¶
func ExponentialBackoffLoop ¶
func ExponentialBackoffLoop(name string, f func() int, startSleepMS int, backoffFactor float32, maxSleepMS int)
ExponentialBackoffLoop invokes f() in a loop, sleeping in an exponential number of milliseconds in between invocations if needed. The provided f() function should return < 0 to stop the loop; >= 0 to continue the loop, where > 0 means there was progress which allows an immediate retry of f() with no sleeping. A return of < 0 is useful when f() will never make any future progress. Repeated attempts with no progress will have exponential backoff sleep times.
func ParseFailOverLog ¶
ParseFailOverLog parses a byte array to an array of [vbucketUUID, seqNum] pairs. It is exposed for testability.
Types ¶
type AllServerURLsConnectBucketError ¶
type AllServerURLsConnectBucketError struct {
ServerURLs []string
}
AllServerURLsConnectBucketError is the error type passed to Receiver.OnError() when the BucketDataSource failed to connect to all the serverURL's provided as a parameter to NewBucketDataSource(). The application, for example, may choose to BucketDataSource.Close() based on this error. Otherwise, the BucketDataSource will backoff and retry reconnecting to the serverURL's.
func (*AllServerURLsConnectBucketError) Error ¶
func (e *AllServerURLsConnectBucketError) Error() string
type AuthFailError ¶
AuthFailError is the error type passed to Receiver.OnError() when there is an auth request error to the Couchbase cluster or server node.
func (*AuthFailError) Error ¶
func (e *AuthFailError) Error() string
type Bucket ¶
type Bucket interface {
Close()
GetUUID() string
VBServerMap() *couchbase.VBucketServerMap
}
A Bucket interface defines the set of methods that cbdatasource needs from an abstract couchbase.Bucket. This separate interface allows for easier testability.
func ConnectBucket ¶
func ConnectBucket(serverURL, poolName, bucketName string, auth couchbase.AuthHandler) (Bucket, error)
ConnectBucket is the default function used by BucketDataSource to connect to a Couchbase cluster to retrieve Bucket information. It is exposed for testability and to allow applications to override or wrap via BucketDataSourceOptions.
type BucketDataSource ¶
type BucketDataSource interface {
// Use Start() to kickoff connectivity to a Couchbase cluster,
// after which calls will be made to the Receiver's methods.
Start() error
// Asynchronously request a cluster map refresh. A reason string
// of "" is valid.
Kick(reason string) error
// Returns an immutable snapshot of stats.
Stats(dest *BucketDataSourceStats) error
// Stops the underlying goroutines.
Close() error
}
BucketDataSource is the main control interface returned by NewBucketDataSource().
func NewBucketDataSource ¶
func NewBucketDataSource( serverURLs []string, poolName string, bucketName string, bucketUUID string, vbucketIDs []uint16, auth couchbase.AuthHandler, receiver Receiver, options *BucketDataSourceOptions) (BucketDataSource, error)
NewBucketDataSource is the main starting point for using the cbdatasource API. The application must supply an array of 1 or more serverURLs (or "seed" URL's) to Couchbase Server cluster-manager REST URL endpoints, like "http://localhost:8091". The BucketDataSource (after Start()'ing) will try each serverURL, in turn, until it can get a successful cluster map. Additionally, the application must supply a poolName & bucketName from where the BucketDataSource will retrieve data. The optional bucketUUID is double-checked by the BucketDataSource to ensure we have the correct bucket, and a bucketUUID of "" means skip the bucketUUID validation. An optional array of vbucketID numbers allows the application to specify which vbuckets to retrieve; and the vbucketIDs array can be nil which means all vbuckets are retrieved by the BucketDataSource. The optional auth parameter can be nil. The application must supply its own implementation of the Receiver interface (see the example program as a sample). The optional options parameter (which may be nil) allows the application to specify advanced parameters like backoff and retry-sleep values.
type BucketDataSourceOptions ¶
type BucketDataSourceOptions struct {
// Optional - used during UPR_OPEN stream start. If empty a
// random name will be automatically generated.
Name string
// Factor (like 1.5) to increase sleep time between retries
// in connecting to a cluster manager node.
ClusterManagerBackoffFactor float32
// Initial sleep time (millisecs) before first retry to cluster manager.
ClusterManagerSleepInitMS int
// Maximum sleep time (millisecs) between retries to cluster manager.
ClusterManagerSleepMaxMS int
// Factor (like 1.5) to increase sleep time between retries
// in connecting to a data manager node.
DataManagerBackoffFactor float32
// Initial sleep time (millisecs) before first retry to data manager.
DataManagerSleepInitMS int
// Maximum sleep time (millisecs) between retries to data manager.
DataManagerSleepMaxMS int
// Buffer size in bytes provided for UPR flow control.
FeedBufferSizeBytes uint32
// Used for UPR flow control and buffer-ack messages when this
// percentage of FeedBufferSizeBytes is reached.
FeedBufferAckThreshold float32
// Used for applications like backup which wish to control the
// last sequence number provided. Key is vbucketID, value is seqEnd.
SeqEnd map[uint16]uint64
// Optional function to connect to a couchbase cluster manager bucket.
// Defaults to ConnectBucket() function in this package.
ConnectBucket func(serverURL, poolName, bucketName string,
auth couchbase.AuthHandler) (Bucket, error)
// Optional function to connect to a couchbase data manager node.
// Defaults to memcached.Connect().
Connect func(protocol, dest string) (*memcached.Client, error)
}
BucketDataSourceOptions allows the application to provide configuration settings to NewBucketDataSource().
type BucketDataSourceStats ¶
type BucketDataSourceStats struct {
TotStart uint64
TotKick uint64
TotKickOk uint64
TotRefreshCluster uint64
TotRefreshClusterConnectBucket uint64
TotRefreshClusterConnectBucketErr uint64
TotRefreshClusterConnectBucketOk uint64
TotRefreshClusterBucketUUIDErr uint64
TotRefreshClusterVBMNilErr uint64
TotRefreshClusterKickWorkers uint64
TotRefreshClusterKickWorkersOk uint64
TotRefreshClusterAwokenClosed uint64
TotRefreshClusterAwokenStopped uint64
TotRefreshClusterAwokenRestart uint64
TotRefreshClusterAwoken uint64
TotRefreshClusterAllServerURLsConnectBucketErr uint64
TotRefreshClusterDone uint64
TotRefreshWorkers uint64
TotRefreshWorkersVBMNilErr uint64
TotRefreshWorkersVBucketIDErr uint64
TotRefreshWorkersServerIdxsErr uint64
TotRefreshWorkersMasterIdxErr uint64
TotRefreshWorkersMasterServerErr uint64
TotRefreshWorkersRemoveWorker uint64
TotRefreshWorkersAddWorker uint64
TotRefreshWorkersKickWorker uint64
TotRefreshWorkersCloseWorker uint64
TotRefreshWorkersDone uint64
TotWorkerStart uint64
TotWorkerDone uint64
TotWorkerBody uint64
TotWorkerBodyKick uint64
TotWorkerConnect uint64
TotWorkerConnectErr uint64
TotWorkerConnectOk uint64
TotWorkerAuth uint64
TotWorkerAuthErr uint64
TotWorkerAuthFail uint64
TotWorkerSelBktFail uint64
TotWorkerSelBktOk uint64
TotWorkerAuthOk uint64
TotWorkerUPROpenErr uint64
TotWorkerUPROpenOk uint64
TotWorkerTransmitStart uint64
TotWorkerTransmit uint64
TotWorkerTransmitErr uint64
TotWorkerTransmitOk uint64
TotWorkerTransmitDone uint64
TotWorkerReceiveStart uint64
TotWorkerReceive uint64
TotWorkerReceiveErr uint64
TotWorkerReceiveOk uint64
TotWorkerSendEndCh uint64
TotWorkerRecvEndCh uint64
TotWorkerHandleRecv uint64
TotWorkerHandleRecvErr uint64
TotWorkerHandleRecvOk uint64
TotRefreshWorker uint64
TotRefreshWorkerDone uint64
TotRefreshWorkerOk uint64
TotUPRDataChange uint64
TotUPRDataChangeStateErr uint64
TotUPRDataChangeMutation uint64
TotUPRDataChangeDeletion uint64
TotUPRDataChangeExpiration uint64
TotUPRDataChangeErr uint64
TotUPRDataChangeOk uint64
TotUPRCloseStream uint64
TotUPRCloseStreamRes uint64
TotUPRCloseStreamResStateErr uint64
TotUPRCloseStreamResErr uint64
TotUPRCloseStreamResOk uint64
TotUPRStreamReq uint64
TotUPRStreamReqWant uint64
TotUPRStreamReqRes uint64
TotUPRStreamReqResStateErr uint64
TotUPRStreamReqResFail uint64
TotUPRStreamReqResFailNotMyVBucket uint64
TotUPRStreamReqResFailERange uint64
TotUPRStreamReqResFailENoMem uint64
TotUPRStreamReqResRollback uint64
TotUPRStreamReqResRollbackStart uint64
TotUPRStreamReqResRollbackErr uint64
TotUPRStreamReqResWantAfterRollbackErr uint64
TotUPRStreamReqResKick uint64
TotUPRStreamReqResSuccess uint64
TotUPRStreamReqResSuccessOk uint64
TotUPRStreamReqResFLogErr uint64
TotUPRStreamEnd uint64
TotUPRStreamEndStateErr uint64
TotUPRStreamEndKick uint64
TotUPRSnapshot uint64
TotUPRSnapshotStateErr uint64
TotUPRSnapshotStart uint64
TotUPRSnapshotStartErr uint64
TotUPRSnapshotOk uint64
TotUPRNoop uint64
TotUPRControl uint64
TotUPRControlErr uint64
TotUPRBufferAck uint64
TotWantCloseRequestedVBucketErr uint64
TotWantClosingVBucketErr uint64
TotGetVBucketMetaData uint64
TotGetVBucketMetaDataUnmarshalErr uint64
TotGetVBucketMetaDataErr uint64
TotGetVBucketMetaDataOk uint64
TotSetVBucketMetaData uint64
TotSetVBucketMetaDataMarshalErr uint64
TotSetVBucketMetaDataErr uint64
TotSetVBucketMetaDataOk uint64
}
BucketDataSourceStats is filled by the BucketDataSource.Stats() method. All the metrics here prefixed with "Tot" are monotonic counters: they only increase.
func (*BucketDataSourceStats) AtomicCopyTo ¶
func (s *BucketDataSourceStats) AtomicCopyTo(r *BucketDataSourceStats, fn func(sv uint64, rv uint64) uint64)
AtomicCopyTo copies metrics from s to r (or, from source to result), and also applies an optional fn function. The fn is invoked with metrics from s and r, and can be used to compute additions, subtractions, negations, etc. When fn is nil, AtomicCopyTo behaves as a straight copier.
type Receiver ¶
type Receiver interface {
// Invoked in advisory fashion by the BucketDataSource when it
// encounters an error. The BucketDataSource will continue to try
// to "heal" and restart connections, etc, as necessary. The
// Receiver has a recourse during these error notifications of
// simply Close()'ing the BucketDataSource.
OnError(error)
// Invoked by the BucketDataSource when it has received a mutation
// from the data source. Receiver implementation is responsible
// for making its own copies of the key and request.
DataUpdate(vbucketID uint16, key []byte, seq uint64,
r *gomemcached.MCRequest) error
// Invoked by the BucketDataSource when it has received a deletion
// or expiration from the data source. Receiver implementation is
// responsible for making its own copies of the key and request.
DataDelete(vbucketID uint16, key []byte, seq uint64,
r *gomemcached.MCRequest) error
// An callback invoked by the BucketDataSource when it has
// received a start snapshot message from the data source. The
// Receiver implementation, for example, might choose to optimize
// persistence perhaps by preparing a batch write to
// application-specific storage.
SnapshotStart(vbucketID uint16, snapStart, snapEnd uint64, snapType uint32) error
// The Receiver should persist the value parameter of
// SetMetaData() for retrieval during some future call to
// GetMetaData() by the BucketDataSource. The metadata value
// should be considered "in-stream", or as part of the sequence
// history of mutations. That is, a later Rollback() to some
// previous sequence number for a particular vbucketID should
// rollback both persisted metadata and regular data.
SetMetaData(vbucketID uint16, value []byte) error
// GetMetaData() should return the opaque value previously
// provided by an earlier call to SetMetaData(). If there was no
// previous call to SetMetaData(), such as in the case of a brand
// new instance of a Receiver (as opposed to a restarted or
// reloaded Receiver), the Receiver should return (nil, 0, nil)
// for (value, lastSeq, err), respectively. The lastSeq should be
// the last sequence number received and persisted during calls to
// the Receiver's DataUpdate() & DataDelete() methods.
GetMetaData(vbucketID uint16) (value []byte, lastSeq uint64, err error)
// Invoked by the BucketDataSource when the datasource signals a
// rollback during stream initialization. Note that both data and
// metadata should be rolled back.
Rollback(vbucketID uint16, rollbackSeq uint64) error
}
A Receiver interface is implemented by the application, or the receiver of data. Calls to methods on this interface will be made by the BucketDataSource using multiple, concurrent goroutines, so the application should implement its own Receiver-side synchronizations if needed.
type VBucketMetaData ¶
type VBucketMetaData struct {
SeqStart uint64 `json:"seqStart"`
SeqEnd uint64 `json:"seqEnd"`
SnapStart uint64 `json:"snapStart"`
SnapEnd uint64 `json:"snapEnd"`
FailOverLog [][]uint64 `json:"failOverLog"`
}
VBucketMetaData is an internal struct is exposed to enable json marshaling.