cdc

package module
v0.0.0-...-b5aee91 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 27, 2025 License: MIT Imports: 16 Imported by: 0

README

Go MongoDB CDC Go Reference Go Report Card OpenSSF Scorecard

Go MongoDB CDC captures and processes real-time changes from MongoDB using Change Streams.

Features

  • Real-time Data Capture - Instantly captures INSERT, UPDATE, DELETE and REPLACE operations using MongoDB Change Streams API
  • Scalable Partition System - Intelligent partition management that distributes workload across multiple workers
  • Automatic Failover - Automatic partition transfer and load balancing between workers
  • Oplog Rollover Protection - Automatic re-snapshot mechanism when oplog history is lost, ensuring zero data loss (similar to Debezium's snapshot.mode = "when_needed")
  • Smart Resume Token Recovery - Automatic detection and recovery from expired/invalid resume tokens with graceful fallback
  • Universal Hash Distribution - Advanced multi-hash algorithm ensuring optimal distribution for any ID pattern
  • Bootstrap Mode - Full collection scanning feature for processing existing data on first run
  • Advanced Checkpoint System - Comprehensive checkpoint mechanism to prevent data loss
  • Sharded Cluster Support - Works with both replica sets and sharded clusters
  • Comprehensive Metrics - Detailed performance and system metrics with Prometheus
  • Easily manageable configurations - Simple configuration with YAML and JSON files

Example

package main

import (
	"context"

	"github.com/Trendyol/go-mongo-cdc/logger"

	cdc "github.com/Trendyol/go-mongo-cdc"

	"log"
	"time"

	"github.com/Trendyol/go-mongo-cdc/config"
	"github.com/Trendyol/go-mongo-cdc/mongo/message"
	"github.com/Trendyol/go-mongo-cdc/stream"
)

func main() {
	cfg := config.Config{
		MongoDB: config.MongoDB{
			Connection: config.Connection{
				URI:        "localhost:27017",
				Database:   "exampleDB",
				Collection: "exampleCollection",
			},
			ConnectionPool: config.ConnectionPool{
				MaxPoolSize:   100,
				MinPoolSize:   5,
				MaxIdleTimeMS: 300000,
			},
			Timeouts: config.Timeouts{
				ConnectTimeoutMS:         30000,
				ServerSelectionTimeoutMS: 60000,
				SocketTimeoutMS:          120000,
			},
		},
		Metric: config.MetricConfig{
			Port: 8080,
		},
	Checkpoint: config.CheckpointConfig{
		TokenSaveInterval:       10 * time.Second,
		ChangeStreamSaveCount:   500,
		BootstrapSaveCount:      5000,
		BootstrapSaveInterval:   10 * time.Second,
		BootstrapQueryBatchSize: 5000,
	},
		Partition: config.PartitionConfig{
			ConsumerGroup:          "myapp",
			HeartbeatInterval:      10 * time.Second,
			WorkerTimeout:          90 * time.Second,
			RebalanceCheckInterval: 10 * time.Second,
			TotalPartition:         30,
		},
		Logger: config.LoggerConfig{LogLevel: "debug"},
	}

	connector, err := cdc.NewConnector(cfg, ProcessChangeEvent)
	if err != nil {
		log.Fatal("failed to create connector:", err)
	}

	defer connector.Close()

	ctx := context.Background()
	connector.Start(ctx)
}

func ProcessChangeEvent(lc *stream.ListenerContext) error {
	select {
	case <-lc.Context.Done():
		logger.Log.Info("Shutdown signal received, stopping event processing")
		return lc.Context.Err()
	default:
	}

	switch lc.Message.OperationType {
	case message.OperationInsert, message.OperationUpdate, message.OperationReplace:
		if lc.Message.FullDocument != nil {
			logger.Log.Info("Document changed - operation: %s, document: %v, partitionId: %d", string(lc.Message.OperationType), lc.Message.DocumentID, lc.PartitionID)
		}
	case message.OperationDelete:
		logger.Log.Info("Document deleted - documentId: %v, partitionId: %d", lc.Message.DocumentID, lc.PartitionID)
	}

	if err := lc.Ack(); err != nil {
		logger.Log.Error("Failed to acknowledge message: %v", err)
		return err
	}
	return nil
}

Usage

$ go get github.com/Trendyol/go-mongo-cdc

Configuration

MongoDB Configuration
Connection Settings (mongodb.connection)
Variable Type Required Default Description
mongodb.connection.uri string yes MongoDB connection URI
mongodb.connection.database string yes MongoDB database name
mongodb.connection.collection string yes MongoDB collection
mongodb.connection.username string no MongoDB username for authentication
mongodb.connection.password string no MongoDB password for authentication
Connection Pool Settings (mongodb.connectionPool)
Variable Type Required Default Description
mongodb.connectionPool.maxPoolSize uint64 no 100 Maximum number of connections in the pool
mongodb.connectionPool.minPoolSize uint64 no 5 Minimum number of connections to maintain
mongodb.connectionPool.maxIdleTimeMS int64 no 300000 Maximum time a connection can remain idle (5 min)
Timeout Settings (mongodb.timeouts)
Variable Type Required Default Description
mongodb.timeouts.connectTimeoutMS int64 no 30000 Connection timeout in milliseconds
mongodb.timeouts.serverSelectionTimeoutMS int64 no 60000 Server selection timeout in milliseconds
mongodb.timeouts.socketTimeoutMS int64 no 120000 Socket timeout in milliseconds
Metric Configuration

Note: Monitoring oplog is critical for Change Streams. If the oplog window is too small or oplog history is lost, Change Streams may fail. We recommend using MongoDB's native metrics exposed through MongoDB Exporter or MongoDB's built-in monitoring tools to track oplog health (size, used percentage, window).

Variable Type Required Default Description
metric.port int no 8080 Prometheus metrics port
Checkpoint Configuration
Variable Type Required Default Description
checkpoint.type string no auto Checkpoint mode: "auto" (periodic) or "manual" (explicit Commit() call required)
checkpoint.tokenSaveInterval time.Duration no 10s Token save interval (only for auto mode)
checkpoint.tokenSaveTimeout time.Duration no 10s Token save timeout
checkpoint.changeStreamSaveCount int no 500 Change stream batch size
checkpoint.bootstrapSaveCount int no 2500 Number of documents to process before saving
checkpoint.bootstrapSaveInterval time.Duration no 10s Bootstrap checkpoint save interval (only for auto mode)
checkpoint.bootstrapQueryBatchSize int no 2500 Bootstrap batch size
checkpoint.idleHeartbeatInterval time.Duration no 3m Idle heartbeat interval
checkpoint.maxIdleTime time.Duration no 15m Maximum idle time before partition release
Partition Configuration
Variable Type Required Default Description
partition.heartbeatInterval time.Duration no 10s Worker heartbeat interval
partition.workerTimeout time.Duration no 90s Worker timeout duration
partition.workersCollection string no workers Workers collection name
partition.partitionsCollection string no partition_assignments Partition assignments collection
partition.rebalanceCheckInterval time.Duration no 10s Partition rebalance check interval
partition.totalPartition int no 15 Total number of partitions. Important: This value is stored on first initialization and cannot be changed afterwards. Changing this value after initialization will cause system startup failures as it disrupts partition distribution balance across workers.
partition.consumerGroup string yes - Unique consumer group name for this CDC application. Each CDC application monitoring the same MongoDB collection must use a different consumer group name (e.g., elasticsearch, kafka, myapp). Collection names will be suffixed with the consumer group (e.g., workers_elasticsearch, partition_assignments_kafka).
Logger Configuration
Variable Type Required Default Description
logger.logLevel string no info Log level
Graceful Shutdown Configuration
Variable Type Required Default Description
gracefulShutdownTimeout time.Duration no 10s Maximum time to wait for in-flight events to complete on shutdown
Configuration Example
cdcconfig:
  mongodb:
    connection:
      uri: "localhost:27017"
      database: exampleDB
      collection: exampleCollection
    connectionPool:
      maxPoolSize: 100
      minPoolSize: 3
      maxIdleTimeMS: 300000

  metric:
    port: 8085

  checkpoint:
    bootstrapSaveCount: 5000
    bootstrapQueryBatchSize: 5000
    bootstrapSaveInterval: 10s
    tokenSaveInterval: 10s
    changeStreamSaveCount: 500

  partition:
    heartbeatInterval: 10s
    workerTimeout: 90s
    rebalanceCheckInterval: 10s
    totalPartition: 15
    workersCollection: "workers"
    partitionsCollection: "partition_assignments"
    consumerGroup: "myapp"

  logger:
    logLevel: "debug"

gracefulShutdownTimeout: 5s

appPort: :8080

Exposed Metrics

Metric Name Type Description Labels
go_mongo_cdc_insert_total Counter Total number of INSERT operations processed N/A
go_mongo_cdc_update_total Counter Total number of UPDATE operations processed N/A
go_mongo_cdc_delete_total Counter Total number of DELETE operations processed N/A
go_mongo_cdc_replace_total Counter Total number of REPLACE operations processed N/A
go_mongo_cdc_cdc_latency_seconds Gauge Current CDC latency in seconds N/A
go_mongo_cdc_checkpoint_save_total Counter Total number of successful checkpoint saves N/A
go_mongo_cdc_checkpoint_save_error_total Counter Total number of checkpoint save errors N/A
go_mongo_cdc_checkpoint_save_latency_seconds Gauge Current checkpoint save latency in seconds N/A
go_mongo_cdc_bootstrap_document_total Counter Total number of documents processed during bootstrap N/A
go_mongo_cdc_bootstrap_active Gauge Bootstrap active status (1=active, 0=inactive) N/A
go_mongo_cdc_active_partition_count Gauge Number of currently active partitions assigned to this worker N/A
go_mongo_cdc_partition_acquire_total Counter Total number of partition acquisitions N/A
go_mongo_cdc_partition_release_total Counter Total number of partition releases N/A
go_mongo_cdc_resume_token_expired_total Counter Total number of expired resume tokens N/A
go_mongo_cdc_change_stream_error_total Counter Total number of change stream errors N/A
go_mongo_cdc_change_stream_restart_total Counter Total number of change stream restarts N/A
go_mongo_cdc_last_event_time_seconds Gauge Unix timestamp of the last processed event N/A
go_mongo_cdc_event_lag_seconds Gauge Duration in seconds since the last event was processed N/A
go_mongo_cdc_listener_latency_seconds Gauge Listener function execution latency in seconds N/A
go_mongo_cdc_time_since_last_checkpoint_seconds Gauge Time since last checkpoint was saved in seconds N/A
go_mongo_cdc_listener_error_total Counter Total number of listener function errors N/A
go_mongo_cdc_partition_rebalance_total Counter Total number of partition rebalance operations N/A
go_mongo_cdc_build_info Gauge Build information version, go_version

Examples

Contributing

Go MongoDB CDC is always open for direct contributions. For more information please check our Contribution Guideline document.

License

Released under the MIT License.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Connector

type Connector interface {
	Start(ctx context.Context)
	Close()
	Commit()
	CommitBootstrap(partitionID int)
	SetEventHandler(handler models.EventHandler)
}

func NewConnector

func NewConnector(cfg config.Config, listenerFunc stream.ListenerFunc) (Connector, error)

func NewConnectorWithConfigFile

func NewConnectorWithConfigFile(
	ctx context.Context,
	configFilePath string,
	listenerFunc stream.ListenerFunc,
) (Connector, error)

Directories

Path Synopsis
example
simple command
internal
mongo

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL