dcpConn

package
v0.0.0-...-5951314 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 4, 2026 License: Apache-2.0 Imports: 21 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ActiveOnly          = uint32(0x10)
	TillLatest          = uint32(0x04)
	IgnorePurgeRollback = uint32(0x80)
)
View Source
const (
	DCP_NO_RES        = resCommandCode(0x00)
	DCP_SEQ_NUM       = resCommandCode(0x48)
	DCP_OPEN_CONN     = resCommandCode(0x50) // Open a DCP connection with a name
	DCP_SELECT_BUCKET = resCommandCode(0x89) // Select bucket
	DCP_ADDSTREAM     = resCommandCode(0x51) // DCP stream addition
	DCP_BUFFERACK     = resCommandCode(0x5d) // DCP Buffer Acknowledgement
	DCP_CONTROL       = resCommandCode(0x5e) // Set flow control params
	DCP_NOOP          = resCommandCode(0x5c) // dcp noop
	DCP_FLUSH         = resCommandCode(0x5a)

	DCP_SNAPSHOT_MARKER = resCommandCode(0x56) //snapshot marker

	DCP_DELETION   = resCommandCode(0x58)
	DCP_EXPIRATION = resCommandCode(0x59)
	DCP_MUTATION   = resCommandCode(0x57)

	DCP_FAILOVER   = resCommandCode(0x54)
	DCP_SEQ_NUMBER = resCommandCode(0x48)

	DCP_STREAMREQ   = resCommandCode(0x53)
	DCP_STREAM_END  = resCommandCode(0x55)
	DCP_CLOSESTREAM = resCommandCode(0x52)

	DCP_SYSTEM_EVENT = resCommandCode(0x5F)
	DCP_ADV_SEQNUM   = resCommandCode(0x64)

	DCP_HELO = resCommandCode(0x1F)

	DCP_SASL_AUTH  = resCommandCode(0x21)
	SASL_AUTH_LIST = resCommandCode(0x20)
)
View Source
const (
	COLLECTION_CREATE  = collectionEvent(0x00) // Collection has been created
	COLLECTION_DROP    = collectionEvent(0x01) // Collection has been dropped
	COLLECTION_FLUSH   = collectionEvent(0x02) // Collection has been flushed
	SCOPE_CREATE       = collectionEvent(0x03) // Scope has been created
	SCOPE_DROP         = collectionEvent(0x04) // Scope has been dropped
	COLLECTION_CHANGED = collectionEvent(0x05) // Collection has changed

	OSO_SNAPSHOT_START = collectionEvent(0x06) // OSO snapshot start
	OSO_SNAPSHOT_END   = collectionEvent(0x07) // OSO snapshot end

	EVENT_UNKNOWN = collectionEvent(0xFF)
)
View Source
const (
	// Can add more status later
	SUCCESS         = status(0x00)
	KEY_ENOENT      = status(0x01)
	FORCED_CLOSED   = status(0x01) // for stream end status
	STATE_CHANGED   = status(0x02)
	KEY_EEXISTS     = status(0x02)
	E2BIG           = status(0x03)
	EINVAL          = status(0x04)
	NOT_STORED      = status(0x05)
	DELTA_BADVAL    = status(0x06)
	FILTER_DELETED  = status(0x07) // Filter is deleted
	NOT_MY_VBUCKET  = status(0x07)
	LOST_PRIVILAGE  = status(0x08)
	ERANGE          = status(0x22)
	ROLLBACK        = status(0x23)
	UNKNOWN_COMMAND = status(0x81)
	ENOMEM          = status(0x82)
	TMPFAIL         = status(0x86)
	UNKNOWN         = status(0xFF)
)
View Source
const (
	FEATURE_COLLECTIONS = heloCommand(0x12)
	FEATURE_XERROR      = heloCommand(0x07)
)
View Source
const (
	RAW    = 0x00
	JSON   = 0x01
	SNAPPY = 0x02
	XATTR  = 0x04
)
View Source
const (
	SystemScope = "_system"
)

Variables

View Source
var (
	ErrConnClosed          = errors.New("connection closed. Retry request")
	ErrFetchingSeqNum      = errors.New("error fetching seq number")
	ErrFetchingFailoverLog = errors.New("error fetching FailoverLog")
	ErrTimeout             = errors.New("request timeout")
)
View Source
var (
	ErrInvalidRequest  = errors.New("invalid Request")
	ErrAlreadyInflight = errors.New("stream request is already in flight")
)
View Source
var (
	ErrDecodingComponentID = errors.New("error decoding keyspaceComponentId")
)

Functions

func GetVbUUID

func GetVbUUID(seqNo uint64, failoverLog FailoverLog) (uint64, int)

func LEB128Dec

func LEB128Dec(data []byte) ([]byte, uint32)

Decodes the encoded value according to LEB128 uint32 scheme Returns the decoded key as byte stream, collectionID as uint32 value

Types

type Config

type Config struct {
	Mode            Mode                  `json:"mode"`
	ClientName      string                `json:"client_name"`
	BucketName      string                `json:"bucket_name"`
	KvAddressStruct *notifier.NodeAddress `json:"kv_address"`

	DcpConfig  map[ConfigKey]interface{} `json:"dcp_config"`
	SeqChecker time.Duration             `json:"seq_checker"`
}

func (Config) String

func (c Config) String() string

type ConfigKey

type ConfigKey int
const (
	// KeyOnly specifies whether to open connection with key only
	KeyOnly ConfigKey = iota
	IncludeXattr
)

type DcpConsumer

type DcpConsumer interface {
	Wait() error

	StartStreamReq(sr *StreamReq) error
	PauseStreamReq(id uint16, vbno uint16)
	StopStreamReq(id uint16, vbno uint16)

	GetSeqNumber(collectionID string) (map[uint16]uint64, error)
	GetFailoverLog(vbs []uint16) (map[uint16]FailoverLog, error)

	TlsSettingsChange(config *notifier.TlsConfig)

	GetRuntimeStats() common.StatsInterface
	CloseDcpConsumer() []*DcpEvent
}

func GetDcpConsumer

func GetDcpConsumer(config Config, tlsConfig *notifier.TlsConfig, sendChannel chan<- *DcpEvent, dcpEventPool *sync.Pool) DcpConsumer

func GetDcpConsumerWithContext

func GetDcpConsumerWithContext(ctx context.Context, config Config, tlsConfig *notifier.TlsConfig, sendChannel chan<- *DcpEvent, dcpEventPool *sync.Pool) DcpConsumer

type DcpEvent

type DcpEvent struct {
	Opcode   resCommandCode
	Datatype dcpDatatype
	Vbno     uint16
	ID       uint16
	Version  uint32

	Vbuuid       uint64
	Key, Value   []byte
	Cas          uint64
	ScopeID      uint32
	CollectionID uint32
	Seqno        uint64
	Status       status
	Type         valueType
	Expiry       uint32

	EventType   collectionEvent
	ManifestUID string
	FailoverLog FailoverLog

	Keyspace    *common.MarshalledData[application.Keyspace]
	SystemXattr map[string]xattrVal
	UserXattr   map[string]xattrVal
	SrRequest   *StreamReq
	// contains filtered or unexported fields
}

func (*DcpEvent) Reset

func (event *DcpEvent) Reset()

type FailoverLog

type FailoverLog [][2]uint64

func (FailoverLog) Pop

func (failoverLog FailoverLog) Pop(seq uint64) (FailoverLog, uint64, uint64)

type Mode

type Mode uint8
const (
	InfoMode Mode = iota
	StreamRequestMode
	MixedMode
)

type RequestType

type RequestType int8
const (
	Request_Collections RequestType = iota
	Request_Scope
	Request_Bucket
)

type StreamReq

type StreamReq struct {
	ID      uint16 `json:"-"`
	Version uint32 `json:"-"`

	// Status gives whats the status of the current request
	Status status `json:"status"`

	RequestType   RequestType `json:"-"`
	CollectionIDs []string    `json:"-"` //array of collction ids
	ScopeID       string      `json:"-"` // scope id
	ManifestUID   string      `json:"-"` //manifest id

	Vbno        uint16      `json:"vb_no"`
	Flags       uint32      `json:"flags"`
	StartSeq    uint64      `json:"seq_num_received"`
	EndSeq      uint64      `json:"end_seq_no"`
	Vbuuid      uint64      `json:"vb_uuid"`
	FailoverLog FailoverLog `json:"-"`

	LastStreamSuccessTime   time.Time `json:"stream_success_time"`
	LastStreamRequestedTime time.Time `json:"stream_requested_time"`
	// contains filtered or unexported fields
}

func (*StreamReq) Copy

func (sr *StreamReq) Copy() *StreamReq

func (*StreamReq) String

func (sr *StreamReq) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL