Documentation
¶
Overview ¶
Package db implements generic connection to MongoDB, and contains subpackages for specific methods of connection.
Index ¶
- Constants
- func AKSCallback(ctx context.Context, _ *mopt.OIDCArgs) (*mopt.OIDCCredential, error)
- func ApplyFlags(opts *mopt.FindOneOptionsBuilder, flags int)
- func CanIgnoreError(err error) bool
- func FilterError(stopOnError bool, err error) error
- func GetCollections(database *mongo.Database, name string) (*mongo.Cursor, error)
- func GetIndexes(coll *mongo.Collection) (*mongo.Cursor, error)
- func GetTimeseriesCollNameFromBucket(bucketCollName string) (string, error)
- func MongoCanAcceptLiteralZeroTimestamp(version Version) bool
- func NewMongoWriteConcern(writeConcern string, cs *connstring.ConnString) (wc *wcwrapper.WriteConcern, err error)
- func NewReadPreference(rp string, cs *connstring.ConnString) (*readpref.ReadPref, error)
- func OpTimeIsEmpty(opTime OpTime) bool
- func OpTimeLessThan(lhs OpTime, rhs OpTime) bool
- func TimeseriesBucketNeedsMixedSchema(err error) bool
- type BSONSource
- type BufferedBulkInserter
- func (bb *BufferedBulkInserter) CanDoZeroTimestamp() bool
- func (bb *BufferedBulkInserter) Delete(ctx context.Context, selector, replacement bson.D) (*mongo.BulkWriteResult, error)
- func (bb *BufferedBulkInserter) Flush(ctx context.Context) (*mongo.BulkWriteResult, error)
- func (bb *BufferedBulkInserter) Insert(ctx context.Context, doc any) (*mongo.BulkWriteResult, error)
- func (bb *BufferedBulkInserter) InsertRaw(ctx context.Context, rawBytes []byte) (*mongo.BulkWriteResult, error)
- func (bb *BufferedBulkInserter) Replace(ctx context.Context, selector, replacement bson.D) (*mongo.BulkWriteResult, error)
- func (bb *BufferedBulkInserter) ResetBulk()
- func (bb *BufferedBulkInserter) SetBypassDocumentValidation(bypass bool) *BufferedBulkInserter
- func (bb *BufferedBulkInserter) SetOrdered(ordered bool) *BufferedBulkInserter
- func (bb *BufferedBulkInserter) SetUpsert(upsert bool) *BufferedBulkInserter
- func (bb *BufferedBulkInserter) TryFlush(ctx context.Context) (*mongo.BulkWriteResult, error)
- func (bb *BufferedBulkInserter) Update(ctx context.Context, selector bson.D, update bson.D) (*mongo.BulkWriteResult, error)
- type CollectionInfo
- type DecodedBSONSource
- type DeferredQuery
- type NodeType
- type OpTime
- type Oplog
- type RawDocSource
- type SessionProvider
- func (sp *SessionProvider) Close()
- func (sp *SessionProvider) CreateCollection(dbName, collName string) error
- func (sp *SessionProvider) DB(name string) *mongo.Database
- func (sp *SessionProvider) DatabaseNames() ([]string, error)
- func (sp *SessionProvider) DropCollection(dbName, collName string) error
- func (sp *SessionProvider) DropDatabase(dbName string) error
- func (sp *SessionProvider) FindOne(db, collection string, skip int, query any, sort any, into any, flags int) error
- func (sp *SessionProvider) GetNodeType() (NodeType, error)
- func (sp *SessionProvider) GetSession() (*mongo.Client, error)
- func (sp *SessionProvider) IsAtlasProxy() bool
- func (sp *SessionProvider) IsMongos() (bool, error)
- func (sp *SessionProvider) IsReplicaSet() (bool, error)
- func (sp *SessionProvider) Run(command any, out any, name string) error
- func (sp *SessionProvider) RunApplyOpsCreateIndex(C, DB string, index bson.D, UUID *bson.Binary, result *any) error
- func (sp *SessionProvider) RunString(commandName string, out any, name string) error
- func (sp *SessionProvider) ServerVersion() (string, error)
- func (sp *SessionProvider) ServerVersionArray() (Version, error)
- type Version
Constants ¶
const ( Snapshot = 1 << iota LogReplay Prefetch )
Query flags.
const ( None sessionFlag = 0 Monotonic sessionFlag = 1 << iota DisableSocketTimeout )
Session flags.
const ( // ignorable errors. ErrDuplicateKeyCode = 11000 ErrFailedDocumentValidation = 121 ErrUnacknowledgedWrite = "unacknowledged write" // ErrCannotInsertTimeseriesBucketsWithMixedSchema can be handled by turning TimeseriesBucketsWithMixedSchema off. ErrCannotInsertTimeseriesBucketsWithMixedSchema = 408 )
const (
DefaultTestPort = "33333"
)
Default port for integration tests.
const MAX_MESSAGE_SIZE_BYTES = 48000000
The default value of maxMessageSizeBytes See: https://docs.mongodb.com/manual/reference/command/hello/#mongodb-data-hello.maxMessageSizeBytes
const (
MaxBSONSize = 16 * 1024 * 1024 // 16MB - maximum BSON document size
)
MongoDB enforced limits.
const (
WarningNonPrimaryMongosConnection = "Warning: using a non-primary readPreference with a " +
"connection to mongos may produce inconsistent duplicates or miss some documents."
)
Variables ¶
This section is empty.
Functions ¶
func AKSCallback ¶
AKSCallback is a callback function that can be used to authenticate with Azure Kubernetes Service. See https://github.com/pmeredit/atlas-azure-fed-auth for testing, speficially the go test with AKS.
func ApplyFlags ¶
func ApplyFlags(opts *mopt.FindOneOptionsBuilder, flags int)
ApplyFlags applies flags to the given query session.
func CanIgnoreError ¶
Returns whether the tools can continue when encountering the given error. Currently, only DuplicateKeyErrors are ignorable.
func FilterError ¶
FilterError determines whether an error needs to be propagated back to the user or can be continued through. If an error cannot be ignored, a non-nil error is returned. If an error can be continued through, it is logged and nil is returned.
func GetCollections ¶
Assumes that mongo.Database will normalize legacy names to omit database name as required by the Enumerate Collections spec.
func GetIndexes ¶
func GetIndexes(coll *mongo.Collection) (*mongo.Cursor, error)
GetIndexes returns an iterator to thethe raw index info for a collection by using the listIndexes command if available, or by falling back to querying against system.indexes (pre-3.0 systems). nil is returned if the collection does not exist.
func GetTimeseriesCollNameFromBucket ¶
GetTimeseriesCollNameFromBucket returns a timeseries collection name from its bucket collection name.
func MongoCanAcceptLiteralZeroTimestamp ¶
MongoCanAcceptLiteralZeroTimestamp indicates whether the given server version can accept a literal zero timestamp in a query. See SERVER-88750 and TOOLS-3540.
func NewMongoWriteConcern ¶
func NewMongoWriteConcern( writeConcern string, cs *connstring.ConnString, ) (wc *wcwrapper.WriteConcern, err error)
NewMongoWriteConcern takes a string (from the command line writeConcern option) and a ConnString object (from the command line uri option) and returns a WriteConcern. If both are provided, preference is given to the command line writeConcern option. If neither is provided, the default 'majority' write concern is constructed.
func NewReadPreference ¶
func NewReadPreference(rp string, cs *connstring.ConnString) (*readpref.ReadPref, error)
NewReadPreference takes a string (command line read preference argument) and a ConnString (from the command line URI argument) and returns a ReadPref. If both are provided, preference is given to the command line argument. If both are empty, a default read preference of primary will be returned.
func OpTimeIsEmpty ¶
OpTimeIsEmpty returns true if opTime is uninitialized, false otherwise.
func OpTimeLessThan ¶
OpTimeLessThan returns true if lhs comes before rhs, false otherwise. We first check if both the terms exist. If they don't or they're equal, we compare just the timestamps.
func TimeseriesBucketNeedsMixedSchema ¶
Returns a boolean based on whether the given error indicates that this timeseries collection needs to be updated to set `timeseriesBucketsMayHaveMixedSchemaData` to `true`.
Types ¶
type BSONSource ¶
type BSONSource struct {
Stream io.ReadCloser
MaxBSONSize uint32
// contains filtered or unexported fields
}
BSONSource reads documents from the underlying io.ReadCloser, Stream which wraps a stream of BSON documents.
func NewBSONSource ¶
func NewBSONSource(in io.ReadCloser) *BSONSource
NewBSONSource creates a BSONSource with a reusable I/O buffer.
func NewBufferlessBSONSource ¶
func NewBufferlessBSONSource(in io.ReadCloser) *BSONSource
NewBufferlessBSONSource creates a BSONSource without a reusable I/O buffer.
func (*BSONSource) Close ¶
func (bs *BSONSource) Close() error
Close closes the BSONSource, rendering it unusable for I/O. It returns an error, if any.
func (*BSONSource) Err ¶
func (bs *BSONSource) Err() error
func (*BSONSource) LoadNext ¶
func (bs *BSONSource) LoadNext() []byte
LoadNext reads and returns the next BSON document in the stream. If the BSONSource was created with NewBSONSource then each returned []byte will be a slice of a single reused I/O buffer. If the BSONSource was created with NewBufferlessBSONSource then each returend []byte will be individually allocated.
func (*BSONSource) SetMaxBSONSize ¶
func (bs *BSONSource) SetMaxBSONSize(size uint32)
type BufferedBulkInserter ¶
type BufferedBulkInserter struct {
// contains filtered or unexported fields
}
BufferedBulkInserter implements a bufio.Writer-like design for queuing up documents and inserting them in bulk when the given doc limit (or max message size) is reached. Must be flushed at the end to ensure that all documents are written.
func NewUnorderedBufferedBulkInserter ¶
func NewUnorderedBufferedBulkInserter( collection *mongo.Collection, docLimit int, serverVersion Version, ) *BufferedBulkInserter
NewUnorderedBufferedBulkInserter returns an initialized BufferedBulkInserter for performing unordered bulk writes.
func (*BufferedBulkInserter) CanDoZeroTimestamp ¶
func (bb *BufferedBulkInserter) CanDoZeroTimestamp() bool
func (*BufferedBulkInserter) Delete ¶
func (bb *BufferedBulkInserter) Delete( ctx context.Context, selector, replacement bson.D, ) (*mongo.BulkWriteResult, error)
Delete adds a document to the buffer for bulk removal. If the buffer becomes full, the bulk delete is performed, returning any error that occurs.
func (*BufferedBulkInserter) Flush ¶
func (bb *BufferedBulkInserter) Flush(ctx context.Context) (*mongo.BulkWriteResult, error)
Flush writes all buffered documents in one bulk write and then resets the buffer.
func (*BufferedBulkInserter) Insert ¶
func (bb *BufferedBulkInserter) Insert( ctx context.Context, doc any, ) (*mongo.BulkWriteResult, error)
Insert adds a document to the buffer for bulk insertion. If the buffer becomes full, the bulk write is performed, returning any error that occurs.
func (*BufferedBulkInserter) InsertRaw ¶
func (bb *BufferedBulkInserter) InsertRaw( ctx context.Context, rawBytes []byte, ) (*mongo.BulkWriteResult, error)
InsertRaw adds a document, represented as raw bson bytes, to the buffer for bulk insertion. If the buffer becomes full, the bulk write is performed, returning any error that occurs.
func (*BufferedBulkInserter) Replace ¶
func (bb *BufferedBulkInserter) Replace( ctx context.Context, selector, replacement bson.D, ) (*mongo.BulkWriteResult, error)
Replace adds a document to the buffer for bulk replacement. If the buffer becomes full, the bulk write is performed, returning any error that occurs.
func (*BufferedBulkInserter) ResetBulk ¶
func (bb *BufferedBulkInserter) ResetBulk()
throw away the old bulk and init a new one.
func (*BufferedBulkInserter) SetBypassDocumentValidation ¶
func (bb *BufferedBulkInserter) SetBypassDocumentValidation(bypass bool) *BufferedBulkInserter
func (*BufferedBulkInserter) SetOrdered ¶
func (bb *BufferedBulkInserter) SetOrdered(ordered bool) *BufferedBulkInserter
func (*BufferedBulkInserter) SetUpsert ¶
func (bb *BufferedBulkInserter) SetUpsert(upsert bool) *BufferedBulkInserter
func (*BufferedBulkInserter) TryFlush ¶
func (bb *BufferedBulkInserter) TryFlush(ctx context.Context) (*mongo.BulkWriteResult, error)
TryFlush writes all buffered documents in one bulk write without resetting the buffer.
func (*BufferedBulkInserter) Update ¶
func (bb *BufferedBulkInserter) Update( ctx context.Context, selector bson.D, update bson.D, ) (*mongo.BulkWriteResult, error)
Update adds a document to the buffer for bulk update. If the buffer becomes full, the bulk write is performed, returning any error that occurs.
type CollectionInfo ¶
type CollectionInfo struct {
Name string `bson:"name"`
Type string `bson:"type"`
Options bson.D `bson:"options"`
Info bson.M `bson:"info"`
}
func GetCollectionInfo ¶
func GetCollectionInfo(coll *mongo.Collection) (*CollectionInfo, error)
func (*CollectionInfo) GetUUID ¶
func (ci *CollectionInfo) GetUUID() string
func (*CollectionInfo) IsSystemCollection ¶
func (ci *CollectionInfo) IsSystemCollection() bool
func (*CollectionInfo) IsTimeseries ¶
func (ci *CollectionInfo) IsTimeseries() bool
func (*CollectionInfo) IsView ¶
func (ci *CollectionInfo) IsView() bool
type DecodedBSONSource ¶
type DecodedBSONSource struct {
RawDocSource
// contains filtered or unexported fields
}
DecodedBSONSource reads documents from the underlying io.ReadCloser, Stream which wraps a stream of BSON documents.
func NewDecodedBSONSource ¶
func NewDecodedBSONSource(ds RawDocSource) *DecodedBSONSource
func (*DecodedBSONSource) Err ¶
func (dbs *DecodedBSONSource) Err() error
Err returns any error in the DecodedBSONSource or its RawDocSource.
func (*DecodedBSONSource) Next ¶
func (dbs *DecodedBSONSource) Next(result any) bool
NextGBSON unmarshals the next BSON document into result using the official go driver. Returns true if no errors are encountered and false otherwise. This function does NOT zero out the result before writing to it.
type DeferredQuery ¶
type DeferredQuery struct {
Coll *mongo.Collection
Filter any
Hint any
LogReplay bool
}
DeferredQuery represents a deferred query.
type OpTime ¶
OpTime represents the values to uniquely identify an oplog entry. An OpTime must always have a timestamp, but may or may not have a term. The hash is set uniquely up until (and including) version 4.0, but is set to zero in version 4.2+ with plans to remove it soon (see SERVER-36334).
func GetOpTimeFromOplogEntry ¶
GetOpTimeFromOplogEntry returns an OpTime struct from the relevant fields in an Oplog struct.
type Oplog ¶
type Oplog struct {
Timestamp bson.Timestamp `bson:"ts"`
Term *int64 `bson:"t"`
Version int `bson:"v"`
Operation string `bson:"op"`
Namespace string `bson:"ns"`
Object bson.D `bson:"o"`
Query bson.D `bson:"o2,omitempty"`
UI *bson.Binary `bson:"ui,omitempty"`
LSID bson.Raw `bson:"lsid,omitempty"`
TxnNumber *int64 `bson:"txnNumber,omitempty"`
PrevOpTime bson.Raw `bson:"prevOpTime,omitempty"`
MultiOpType *int `bson:"multiOpType,omitempty"`
}
Oplog represents a MongoDB oplog document.
type RawDocSource ¶
RawDocSource wraps basic functions for reading a BSON source file.
type SessionProvider ¶
Used to manage database sessions.
func NewSessionProvider ¶
func NewSessionProvider(opts options.ToolOptions) (*SessionProvider, error)
NewSessionProvider constructs a session provider, including a connected client.
func (*SessionProvider) Close ¶
func (sp *SessionProvider) Close()
Close closes the master session in the connection pool.
func (*SessionProvider) CreateCollection ¶
func (sp *SessionProvider) CreateCollection(dbName, collName string) error
func (*SessionProvider) DB ¶
func (sp *SessionProvider) DB(name string) *mongo.Database
DB provides a database with the default read preference.
func (*SessionProvider) DatabaseNames ¶
func (sp *SessionProvider) DatabaseNames() ([]string, error)
DatabaseNames returns a slice containing the names of all the databases on the connected server.
func (*SessionProvider) DropCollection ¶
func (sp *SessionProvider) DropCollection(dbName, collName string) error
func (*SessionProvider) DropDatabase ¶
func (sp *SessionProvider) DropDatabase(dbName string) error
func (*SessionProvider) FindOne ¶
func (sp *SessionProvider) FindOne( db, collection string, skip int, query any, sort any, into any, flags int, ) error
FindOne returns the first document in the collection and database that matches the query after skip, sort and query flags are applied.
func (*SessionProvider) GetNodeType ¶
func (sp *SessionProvider) GetNodeType() (NodeType, error)
GetNodeType checks if the connected SessionProvider is a mongos, standalone, or replset, by looking at the result of calling isMaster.
func (*SessionProvider) GetSession ¶
func (sp *SessionProvider) GetSession() (*mongo.Client, error)
Returns a mongo.Client connected to the database server for which the session provider is configured.
func (*SessionProvider) IsAtlasProxy ¶
func (sp *SessionProvider) IsAtlasProxy() bool
IsAtlasProxy checks if the connected SessionProvider is an atlas proxy.
func (*SessionProvider) IsMongos ¶
func (sp *SessionProvider) IsMongos() (bool, error)
IsMongos returns true if the connected server is a mongos.
func (*SessionProvider) IsReplicaSet ¶
func (sp *SessionProvider) IsReplicaSet() (bool, error)
IsReplicaSet returns a boolean which is true if the connected server is part of a replica set.
func (*SessionProvider) Run ¶
func (sp *SessionProvider) Run(command any, out any, name string) error
func (*SessionProvider) RunApplyOpsCreateIndex ¶
func (sp *SessionProvider) RunApplyOpsCreateIndex( C, DB string, index bson.D, UUID *bson.Binary, result *any, ) error
RunApplyOpsCreateIndex will create index using applyOps. For versions that support collection UUIDs (<3.6) it uses an insert to system indexes. Later versions use the createIndexes command.
func (*SessionProvider) RunString ¶
func (sp *SessionProvider) RunString(commandName string, out any, name string) error
func (*SessionProvider) ServerVersion ¶
func (sp *SessionProvider) ServerVersion() (string, error)
func (*SessionProvider) ServerVersionArray ¶
func (sp *SessionProvider) ServerVersionArray() (Version, error)