博客
关于我
初试DTLE-单向-聚合-分散复制
阅读量:596 次
发布时间:2019-03-09

本文共 15700 字,大约阅读时间需要 52 分钟。

环境准备

主机名 IP地址 部署服务
study01 10.186.65.68 DTLE、MySQL
study02 10.186.65.71 DTLE、MySQL
study03 10.186.65.72 DTLE、MySQL

Ps:如果不适用容器进行部署,首先安装三台或以上MySQL实例,并开启binlog以及GTID

DTLE概述

dtle (Data-Transformation-le) 是上海爱可⽣信息技术股份有限公司 开发并开源的 CDC ⼯具. 其功能特点是:

  1. 多种数据传输模式

    • ⽀持链路压缩
    • ⽀持同构传输和异构传输
    • ⽀持跨⽹络边际的传输
  2. 多种数据处理模式

    • ⽀持库/表/⾏级别 数据过滤
  3. 多种数据通道模式

    • ⽀持多对多的数据传输
    • ⽀持回环传输
  4. 多种源/⽬标端

    • ⽀持MySQL - MySQL的数据传输
    • ⽀持MySQL - Kafka的数据传输
  5. 集群模式

    • 提供可靠的元数据存储
    • 可进⾏⾃动任务分配
    • ⽀持⾃动故障转移

单向复制/聚合/分散

DTLE支持的常见复制场景如下

  1. 按数据源/数据目标划分

    • 支持1个源端到一个目标端的复制
    • 支持多个源端到一个目标端的聚合复制
    • 支持一个源端到多个目标的拆分复制
  2. 按网络类型划分

    • 支持网络内部数据传输
    • 支持跨网络的数据传输(可使⽤ 链路压缩/链路限流 等功能)
  3. 按集群规模划分

    • 可配置单⼀dtle实例处理单⼀数据通道
    • 可配置 dtle集群 处理 多个数据通道

image

DTLE单项复制

  1. 下载DTLE的RPM安装包
https://github.com/actiontech/dtle/releases/tag/v3.21.03.0
  1. 安装RPM包
[root@study01 ~]# rpm -ivh dtle-3.21.03.0.x86_64.rpm --prefix=/data/dtle
  1. 修改consul配置文件,如果安装时不指定安装路径,默认是在/etc/dtle目录下面
[root@study01 dtle]# pwd/data/dtle/etc/dtle[root@study01 dtle]# cat consul.hcl # Rename for each nodenode_name = "consul0"   #定义consul名称,多集群名字不能重复data_dir = "/data/dtle/var/lib/consul"ui = truedisable_update_check = true# Address that should be bound to for internal cluster communicationsbind_addr = "0.0.0.0"# Address to which Consul will bind client interfaces, including the HTTP and DNS serversclient_addr = "10.186.65.68"    #本机ip地址advertise_addr = "10.186.65.68" #本机ip地址ports = {  # Customize if necessary. -1 means disable.  #dns = -1  #server = 8300  #http = 8500  #serf_wan = -1  #serf_lan = 8301}limits = {  http_max_conns_per_client = 4096}server = true# For single nodebootstrap_expect = 1    #一台consul# For 3-node cluster#bootstrap_expect = 3   #三台consul,部署集群开启#retry_join = ["127.0.0.1", "127.0.0.2", "127.0.0.3"] # will use default serf portlog_level = "INFO"log_file = "/data/dtle/var/log/consul/"
  1. 修改nomad配置文件,如果安装时不指定安装路径,默认是在/etc/dtle目录下面
root@study01 dtle]# lsconsul.hcl  nomad.hcl[root@study01 dtle]# pwd/data/dtle/etc/dtle[root@study01 dtle]# cat nomad.hclname = "nomad0" # rename for each node 定义nomad名字,部署集群名字不可重复datacenter = "dc1"data_dir  = "/data/dtle/var/lib/nomad"plugin_dir = "/data/dtle/usr/share/dtle/nomad-plugin"log_level = "Info"log_file = "/data/dtle/var/log/nomad/"disable_update_check = truebind_addr = "0.0.0.0"   #监听地址# change ports if multiple nodes run on a same machineports {  http = 4646  rpc  = 4647  serf = 4648}addresses {  # Default to `bind_addr`. Or set individually here.  #http = "127.0.0.1"  #rpc  = "127.0.0.1"  #serf = "127.0.0.1"}advertise {  http = "10.186.65.68:4646"  rpc  = "10.186.65.68:4647"  serf = "10.186.65.68:4648"}server {  enabled          = true   #服务端开启  bootstrap_expect = 1  # Set bootstrap_expect to 3 for multiple (high-availablity) nodes.  # Multiple nomad nodes will join with consul.}client {  enabled = true  options = {    "driver.blacklist" = "docker,exec,java,mock,qemu,rawexec,rkt"  }  # Will auto join other server with consul.}consul {  # dtle-plugin and nomad itself use consul separately.  # nomad uses consul for server_auto_join and client_auto_join.  # Only one consul can be set here. Write the nearest here,  #   e.g. the one runs on the same machine with the nomad server.  address = "10.186.65.68:8500" #客户端开启}plugin "dtle" {  config {    log_level = "Info" # repeat nomad log level here    data_dir = "/data/dtle/var/lib/nomad"    nats_bind = "10.186.65.68:8193"    nats_advertise = "10.186.65.68:8193"    # Repeat the consul address above.    consul = "10.186.65.68:8500"    #对接consul地址    # By default, API compatibility layer is disabled.    #api_addr = "127.0.0.1:8190"   # for compatibility API    nomad_addr = "10.186.65.68:4646" # compatibility API need to access a nomad server    publish_metrics = false    stats_collection_interval = 15  }}
  1. 启动nomad以及consul服务
systemctl restart dtle-consul.servicesystemctl restart dtle-consul.service[root@study01 dtle]# ps -ef | grep dtledtle      8580     1  1 14:21 ?        00:00:00 /data/dtle/usr/bin/consul agent -config-file=/data/dtle/etc/dtle/consul.hcldtle      8660     1 10 14:22 ?        00:00:00 /data/dtle/usr/bin/nomad agent -config /data/dtle/etc/dtle/nomad.hclroot      8720  2787  0 14:22 pts/0    00:00:00 grep --color=auto dtle
  1. 登录nomad和consul的Web页面进行查看

    • nomad,Web界面地址:http://IP:4646
    • consul,Web界面地址:http://IP:8500
  2. 在源端MySQL实例创建测试库和测试表,并在表中插入数据

mysql> create database test;mysql> use test;mysql> create table t1 (id int primary key);mysql> insert into t1 values(1),(2),(3);mysql> select * from t1;
  1. 创建job的json文件
[root@study01 dtle]# cat job.json{"Job": {"ID": "job1",   #job名字"Datacenters": ["dc1"],"TaskGroups": [{"Name": "src","Tasks": [{"Name": "src","Driver": "dtle","Config": {"Gtid": "","ReplicateDoDb": [{"TableSchema": "test",  #复制的库"Tables": [{"TableName": "t1"   #复制的表}]}],"ConnectionConfig": {"Host": "10.186.65.68", #源端连接信息"Port": 3333,"User": "test","Password": "test"}}}]}, {"Name": "dest","Tasks": [{"Name": "dest","Driver": "dtle","Config": {"ConnectionConfig": {"Host": "10.186.65.72", #目标端连接信息"Port": 4444,"User": "test","Password": "test"}}}]}]}}
  1. 利用curl命令调用nomad客户端的接口,创建这个job
[root@study01 dtle]# curl -XPOST "http://10.186.65.68:4646/v1/jobs" -d @job.json -s | jq{  "EvalID": "a551d0fc-5de9-9e7b-3673-21b115919646",  "EvalCreateIndex": 151,  "JobModifyIndex": 150,  "Warnings": "",  "Index": 151,  "LastContact": 0,  "KnownLeader": false}
  • -d @指定的是job的json文件的路径,我是在当前路径下执行所以不需要指定绝对路径
  • jq命令是用于提取返回结果的内容,类似与linux命令grep,需要安装才可以使用
  1. 查看创建job的状态
[root@study01 dtle]# curl -XGET "http://10.186.65.68:4646/v1/job/job1" -s | jq '.Status'
  • /v1/job/{创建job的ID名字}
  1. 进入到目标端数据库进行查看数据是否已经复制过去
mysql> show databases;+--------------------+| Database           |+--------------------+| dtle               |  #有一张二进制的表。用于记录job的GTID号| information_schema || mysql              || performance_schema || sys                || test               |  #同步过来的库| universe           |+--------------------+mysql> select * from test.t1;+----+| id |+----+|  1 ||  2 ||  3 |+----+
  1. 在源端数据库在插入几条数据,在进行测试
mysql> use test;mysql> insert into t1 values(7),(8),(9);

此时我们在到目标端数据库查看数据是否有增量数据进来

mysql> select * from test.t1;+----+| id |+----+|  1 ||  2 ||  3 ||  7 ||  8 ||  9 |+----+

HTTP API、nomad 命令⾏⼯具

nomad和consul之间的关系

nomad 本体使⽤consul进⾏多节点注册和发现,dtle nomad 插件使⽤consul进⾏任务元数据储存,也就是说,当我们的nomad agent创建一个job的时候会通过consul进行记录元数据,以及复制位置点,如果我们要删除一个job的时候,不单单只是把job给删除,还需要删除consul中的记录,如果不删除consul中的记录,那么下次创建job的时候默认还是会从consul中记录的位置点开始复制。

HTTP API

实际上我们使用curl命令调用的是nomad agent端的HTTP端的接口,将我们本地的job.json文件提交到nomad agent端

nomad命令行工具使用

nomad工具启动一个job使用的是hcl格式的文件,不可以使用json格式,模板默认放在/usr/share/dtle/scripts下面

  1. 先使用curl的方式删除job
[root@study01 dtle]# curl -XDELETE 10.186.65.68:4646/v1/job/job1?purge=true{"EvalID":"c3e69b56-eaab-fa02-afbb-a15e0b395170","EvalCreateIndex":198,"JobModifyIndex":197,"VolumeEvalID":"","VolumeEvalIndex":0,"Index":198,"LastContact":0,"KnownLeader":false}
  1. 删除job后如果想清空consul记录使用以下命令
[root@study01 dtle]# curl -XDELETE "10.186.65.68:8500/v1/kv/dtle/job1?recurse"true
  1. 使用nomad命令创建一个job

    • 复制一份hcl的模板,进行修改
    cp usr/share/dtle/scripts/example.job.hcl .
    • 修改hcl文件
    [root@study01 dtle]# mv example.job.hcl job.hcl[root@study01 dtle]# vim job.hcl    #修改复制\源\目标信息
  2. 使用nomad命令指定这个文件创建job

[root@study01 dtle]# ./usr/bin/nomad job run job.hcl==> Monitoring evaluation "21c79d55"    Evaluation triggered by job "job1"    Evaluation within deployment: "c5aeae1d"    Allocation "8640b0af" created: node "e1be240c", group "dest"    Allocation "e44c8d29" created: node "e1be240c", group "src"    Evaluation status changed: "pending" -> "complete"==> Evaluation "21c79d55" finished with status "complete"
  1. 查询我们刚才创建job的状态
[root@study01 dtle]# ./usr/bin/nomad job statusID    Type     Priority  Status   Submit Datejob1  service  50        running  2021-05-07T16:26:21+08:00
  1. 在源端插入数据,在目标端验证数据
  2. 停止job并删除
[root@study01 dtle]# ./usr/bin/nomad job stop -purge job1==> Monitoring evaluation "dce8f4f3"    Evaluation triggered by job "job1"    Evaluation within deployment: "c5aeae1d"    Evaluation status changed: "pending" -> "complete"==> Evaluation "dce8f4f3" finished with status "complete"
  • 依然需要手动删除consul记录
[root@study01 dtle]# curl -XDELETE "10.186.65.68:8500/v1/kv/dtle/job1?recurse"
nomad其他命令介绍
  1. 查看server节点
[root@study01 dtle]# ./usr/bin/nomad server membersName           Address       Port  Status  Leader  Protocol  Build   Datacenter  Regionnomad0.global  10.186.65.68  4648  alive   true    2         0.11.1  dc1         global
  1. 查看client节点
[root@study01 dtle]# ./usr/bin/nomad node statusID        DC   Name    Class   Drain  Eligibility  Statuse1be240c  dc1  nomad0  
false eligible ready
  1. 查看某个job
[root@study01 dtle]# ./usr/bin/nomad job status {jobname}
  1. 查看nomad版本
[root@study01 dtle]# ./usr/bin/nomad versionNomad v0.11.1 (b43457070037800fcc8442c8ff095ff4005dab33)
  1. 查看某⼀节点的dtle插件版本
[root@study01 dtle]# ./usr/bin/nomad node status -verbose e1be240c | grep dtledtle      true      true     Healthy   2021-05-07T14:22:17+08:00driver.dtle               = 1driver.dtle.full_version  = 3.21.03.0-3.21.03.x-2df8ad7driver.dtle.version       = 3.21.03.0
  1. 此时nomad命令作为HTTP客⼾端连接nomad agent, 如果agent不在默认地址,则需要指定 例如:–address=http://127.0.0.1:4646
[root@study01 dtle]# ./usr/bin/nomad node status --address=http://10.186.65.68:4646ID        DC   Name    Class   Drain  Eligibility  Statuse1be240c  dc1  nomad0  
false eligible ready

MySQL汇聚复制

准备两个源端MySQL一个目标端MySQL,先记录GTID,再创建测试库和测试表

  1. src1数据库操作
mysql> show master status\GExecuted_Gtid_Set: 60144e94-aa1c-11eb-9d16-02000aba4144:1-140mysql> use test;mysql> create table t2(id int);mysql> insert into t2 values(1),(2),(3);mysql> select * from test.t2;+------+| id   |+------+|    1 ||    2 ||    3 |+------+
  1. src2数据库操作
mysql> show master status \GExecuted_Gtid_Set: efc79686-af16-11eb-bbaa-02000aba4147:1-135mysql> use test;mysql> create table t2(id int);mysql> insert into t2 values(4),(5),(6);
  1. 创建src1的配置文件
[root@study01 dtle]# cat job_src1.hcljob "job1" {  datacenters = ["dc1"]   group "src" {    task "src" {      driver = "dtle"      config {        ReplicateDoDb = [{          TableSchema = "test"          Tables = [{            TableName = "t2"          }]        }]        DropTableIfExists = false        Gtid = "60144e94-aa1c-11eb-9d16-02000aba4144:1-140"        ChunkSize = 2000        ConnectionConfig = {          Host = "10.186.65.68"          Port = 3333          User = "test"          Password = "test"        }      }    }  }  group "dest" {    task "dest" {      driver = "dtle"      config {        ConnectionConfig = {          Host = "10.186.65.72"          Port = 4444          User = "test"          Password = "test"        }         # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.        #KafkaConfig = {        #  Topic = "kafka1"        #  Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"]        #  Converter = "json"        #}      }    }  }   reschedule {    # By default, nomad will unlimitedly reschedule a failed task.    # We limit it to once per 30min here.    attempts = 1    interval = "30m"    unlimited = false  }}
  1. 创建src2的配置文件
[root@study01 dtle]# cat job_src2.hcljob "job2" {  datacenters = ["dc1"]   group "src" {    task "src" {      driver = "dtle"      config {        ReplicateDoDb = [{          TableSchema = "test"          Tables = [{            TableName = "t2"          }]        }]        DropTableIfExists = false        Gtid = "efc79686-af16-11eb-bbaa-02000aba4147:1-135"        ChunkSize = 2000        ConnectionConfig = {          Host = "10.186.65.71"          Port = 5555          User = "test"          Password = "test"        }      }    }  }  group "dest" {    task "dest" {      driver = "dtle"      config {        ConnectionConfig = {          Host = "10.186.65.72"          Port = 4444          User = "test"          Password = "test"        }         # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.        #KafkaConfig = {        #  Topic = "kafka1"        #  Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"]        #  Converter = "json"        #}      }    }  }   reschedule {    # By default, nomad will unlimitedly reschedule a failed task.    # We limit it to once per 30min here.    attempts = 1    interval = "30m"    unlimited = false  }}
  • 两个文件不同的地方有,job名称,GTID,源端连接地址
  1. 创建job
[root@study01 dtle]# ./usr/bin/nomad job run job_src1.hcl==> Monitoring evaluation "28ae70e4"    Evaluation triggered by job "job1"    Evaluation within deployment: "95f9a76b"    Allocation "d07787a4" created: node "e1be240c", group "dest"    Allocation "f46d9291" created: node "e1be240c", group "src"    Evaluation status changed: "pending" -> "complete"==> Evaluation "28ae70e4" finished with status "complete"[root@study01 dtle]# ./usr/bin/nomad job run job_src2.hcl==> Monitoring evaluation "1d2da1f7"    Evaluation triggered by job "job2"    Evaluation within deployment: "716b11a5"    Allocation "2dd7fe5c" created: node "e1be240c", group "src"    Allocation "724ec296" created: node "e1be240c", group "dest"    Evaluation status changed: "pending" -> "complete"==> Evaluation "1d2da1f7" finished with status "complete"
  1. 目标端数据库验证数据
mysql> select * from test.t2;+------+| id   |+------+|    1 ||    2 ||    3 ||    4 ||    5 ||    6 |+------+
  1. 在任意一个源端写入数据,然后到目标端查看是否增量数据
mysql> insert into t2 values (7);Query OK, 1 row affected (0.01 sec)mysql> select * from test.t2;+------+| id   |+------+|    1 ||    2 ||    3 ||    7 |+------+4 rows in set (0.01 sec)
  1. 验证目标端数据库的数据
mysql> select * from test.t2;+------+| id   |+------+|    1 ||    2 ||    3 ||    4 ||    5 ||    6 ||    7 |+------+7 rows in set (0.01 sec)

MySQL分散复制

准备三台MySQL实例,一台源端,两台目标端。根据条件,分别复制到不同的目标端数据库

  1. 首先记录下源端数据库当前的GTID
mysql> show master status \GExecuted_Gtid_Set: 60144e94-aa1c-11eb-9d16-02000aba4144:1-143
  1. 创建测试表,写入一些数据
mysql> use test;mysql> create table t3(id int);mysql> insert into t3 values(1),(2);
  1. 创建dst1配置文件
[root@study01 dtle]# cat job_dst1.hcljob "dst1" {  datacenters = ["dc1"]   group "src" {    task "src" {      driver = "dtle"      config {        ReplicateDoDb = [{          TableSchema = "test"          Tables = [{            TableName = "t3"            Where = "id<10"          }]        }]        DropTableIfExists = false        Gtid = "60144e94-aa1c-11eb-9d16-02000aba4144:1-143"        ChunkSize = 2000        ConnectionConfig = {          Host = "10.186.65.68"          Port = 3333          User = "test"          Password = "test"        }      }    }  }  group "dest" {    task "dest" {      driver = "dtle"      config {        ConnectionConfig = {          Host = "10.186.65.71"          Port = 5555          User = "test"          Password = "test"        }         # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead.        #KafkaConfig = {        #  Topic = "kafka1"        #  Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"]        #  Converter = "json"        #}      }    }  }   reschedule {    # By default, nomad will unlimitedly reschedule a failed task.    # We limit it to once per 30min here.    attempts = 1    interval = "30m"    unlimited = false  }}
  1. 创建dst2配置文件

两个配置文件不同的地方有,job名字,where条件,以及目标端的连接信息

  1. 创建job
[root@study01 dtle]# ./usr/bin/nomad job run job_dst1.hcl[root@study01 dtle]# ./usr/bin/nomad job run job_dst2.hcl[root@study01 dtle]# ./usr/bin/nomad statusID    Type     Priority  Status   Submit Datedst1  service  50        running  2021-05-07T20:13:05+08:00dst2  service  50        running  2021-05-07T20:13:10+08:00
  1. 验证数据,小于10的会在71这台数据库,大于等于10的会在72这台数据库

  2. 在源端插入数据,然后分别到目标端进行查看

mysql> insert into t3 values(9),(10);mysql> select * from test.t3;+------+| id   |+------+|    1 ||    2 ||    9 ||   10 |+------+
  1. 到71上查看数据
mysql> select * from test.t3;+------+| id   |+------+|    1 ||    2 ||    9 |+------+
  1. 到72上查看数据
mysql> select * from test.t3;+------+| id   |+------+|   10 |+------+

因为有悔,所以披星戴月;因为有梦,所以奋不顾身! 个人博客首发:easydb.net 微信公众号:easydb 关注我,不走丢!

转载地址:http://vlopz.baihongyu.com/

你可能感兴趣的文章
MySQL8找不到my.ini配置文件以及报sql_mode=only_full_group_by解决方案
查看>>
mysql8的安装与卸载
查看>>
MySQL8,体验不一样的安装方式!
查看>>
MySQL: Host '127.0.0.1' is not allowed to connect to this MySQL server
查看>>
Mysql: 对换(替换)两条记录的同一个字段值
查看>>
mysql:Can‘t connect to local MySQL server through socket ‘/var/run/mysqld/mysqld.sock‘解决方法
查看>>
MYSQL:基础——3N范式的表结构设计
查看>>
MYSQL:基础——触发器
查看>>
Mysql:连接报错“closing inbound before receiving peer‘s close_notify”
查看>>
mysqlbinlog报错unknown variable ‘default-character-set=utf8mb4‘
查看>>
mysqldump 参数--lock-tables浅析
查看>>
mysqldump 导出中文乱码
查看>>
mysqldump 导出数据库中每张表的前n条
查看>>
mysqldump: Got error: 1044: Access denied for user ‘xx’@’xx’ to database ‘xx’ when using LOCK TABLES
查看>>
Mysqldump参数大全(参数来源于mysql5.5.19源码)
查看>>
mysqldump备份时忽略某些表
查看>>
mysqldump实现数据备份及灾难恢复
查看>>
mysqldump数据库备份无法进行操作只能查询 --single-transaction
查看>>
mysqldump的一些用法
查看>>
mysqli
查看>>