arbiter

package
v0.0.0-...-552cffb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Sep 12, 2024 License: Apache-2.0 Imports: 23 Imported by: 0

README

Arbiter

Arbiter is a tool used for syncing data from Kafka to TiDB incrementally.

The complete import process is as follows:

  1. Read Binlog from Kafka in the format of Protobuf.
  2. While reaching a limit data size, construct the SQL according the Binlog and write to downstream concurrently(notice: Arbiter will split the upstream transaction).
  3. Save the checkpoint.

Checkpoint

arbiter will write a record to the table tidb_binlog.arbiter_checkpoint at downstream TiDB.

mysql> select * from tidb_binlog.arbiter_checkpoint;
+-------------+--------------------+--------+
| topic_name  | ts                 | status |
+-------------+--------------------+--------+
| test_kafka4 | 405809779094585347 |      1 |
+-------------+--------------------+--------+
  • topic_name: the topic name of Kafka to consume.
  • ts: the timestamp checkpoint
  • status:
    • 0 All Binlog data <= ts has synced to downstream.
    • 1 means Arbiter is running or quit unexpectedly, Binlog with timestamp bigger than ts may partially synced to downstream.

Monitor

Arbiter supports metrics collection via Prometheus.

###Metrics

  • binlog_arbiter_checkpoint_tso (Gauge)

    Corresponding to ts in table tidb_binlog.arbiter_checkpoint

  • binlog_arbiter_query_duration_time (Histogram)

    Bucketed histogram of the time needed to wirte to downstream. Labels:

    • type: exec commit time takes to execute and commit SQL.
  • binlog_arbiter_event (Counter)

    Event times counter. Labels:

    • type: e.g. DDL Insert Update Delete Txn
  • binlog_arbiter_queue_size (Gauge)

    Queue size. Labels:

    • name: e.g. kafka_reader loader_input
  • binlog_arbiter_txn_latency_seconds (Histogram)

    Bucketed histogram of the time duration between the time write to downstream and commit time of upstream transaction(phsical part of commitTS).

Documentation

Index

Constants

View Source
const (
	// StatusNormal means server quit normally, data <= ts is synced to downstream
	StatusNormal int = 0
	// StatusRunning means server running or quit abnormally, part of data may or may not been synced to downstream
	StatusRunning int = 1
)

Variables

Registry is the metrics registry of server

Functions

This section is empty.

Types

type Checkpoint

type Checkpoint interface {
	Save(ts int64, status int) error
	Load() (ts int64, status int, err error)
}

Checkpoint is able to save and load checkpoints

func NewCheckpoint

func NewCheckpoint(db *gosql.DB, topicName string) (Checkpoint, error)

NewCheckpoint creates a Checkpoint

type Config

type Config struct {
	*flag.FlagSet `json:"-"`
	LogLevel      string `toml:"log-level" json:"log-level"`
	ListenAddr    string `toml:"addr" json:"addr"`
	LogFile       string `toml:"log-file" json:"log-file"`
	OpenSaramaLog bool   `toml:"open-sarama-log" json:"open-sarama-log"`

	Up   UpConfig   `toml:"up" json:"up"`
	Down DownConfig `toml:"down" json:"down"`

	Metrics Metrics `toml:"metrics" json:"metrics"`
	// contains filtered or unexported fields
}

Config is the configuration of Server

func NewConfig

func NewConfig() *Config

NewConfig return an instance of configuration

func (*Config) Parse

func (cfg *Config) Parse(args []string) error

Parse parses all config from command-line flags, environment vars or the configuration file

func (*Config) String

func (cfg *Config) String() string

type DownConfig

type DownConfig struct {
	Host     string `toml:"host" json:"host"`
	Port     int    `toml:"port" json:"port"`
	User     string `toml:"user" json:"user"`
	Password string `toml:"password" json:"password"`

	WorkerCount int  `toml:"worker-count" json:"worker-count"`
	BatchSize   int  `toml:"batch-size" json:"batch-size"`
	SafeMode    bool `toml:"safe-mode" json:"safe-mode"`
}

DownConfig is configuration of downstream

type Metrics

type Metrics struct {
	Addr     string `toml:"addr" json:"addr"`
	Interval int    `toml:"interval" json:"interval"`
}

Metrics is configuration of metrics

type Server

type Server struct {
	// contains filtered or unexported fields
}

Server is the server to load data to mysql

func NewServer

func NewServer(cfg *Config) (srv *Server, err error)

NewServer creates a Server

func (*Server) Close

func (s *Server) Close() error

Close closes the Server

func (*Server) Run

func (s *Server) Run() error

Run runs the Server, will quit once encounter error or Server is closed

type UpConfig

type UpConfig struct {
	KafkaAddrs   string `toml:"kafka-addrs" json:"kafka-addrs"`
	KafkaVersion string `toml:"kafka-version" json:"kafka-version"`

	InitialCommitTS   int64  `toml:"initial-commit-ts" json:"initial-commit-ts"`
	Topic             string `toml:"topic" json:"topic"`
	MessageBufferSize int    `toml:"message-buffer-size" json:"message-buffer-size"`
	SaramaBufferSize  int    `toml:"sarama-buffer-size" json:"sarama-buffer-size"`
}

UpConfig is configuration of upstream

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL