本文共 15700 字,大约阅读时间需要 52 分钟。
主机名 | IP地址 | 部署服务 |
---|---|---|
study01 | 10.186.65.68 | DTLE、MySQL |
study02 | 10.186.65.71 | DTLE、MySQL |
study03 | 10.186.65.72 | DTLE、MySQL |
Ps:如果不适用容器进行部署,首先安装三台或以上MySQL实例,并开启binlog以及GTID
dtle (Data-Transformation-le) 是上海爱可⽣信息技术股份有限公司 开发并开源的 CDC ⼯具. 其功能特点是:
多种数据传输模式
多种数据处理模式
多种数据通道模式
多种源/⽬标端
集群模式
DTLE支持的常见复制场景如下
按数据源/数据目标划分
按网络类型划分
按集群规模划分
https://github.com/actiontech/dtle/releases/tag/v3.21.03.0
[root@study01 ~]# rpm -ivh dtle-3.21.03.0.x86_64.rpm --prefix=/data/dtle
[root@study01 dtle]# pwd/data/dtle/etc/dtle[root@study01 dtle]# cat consul.hcl # Rename for each nodenode_name = "consul0" #定义consul名称,多集群名字不能重复data_dir = "/data/dtle/var/lib/consul"ui = truedisable_update_check = true# Address that should be bound to for internal cluster communicationsbind_addr = "0.0.0.0"# Address to which Consul will bind client interfaces, including the HTTP and DNS serversclient_addr = "10.186.65.68" #本机ip地址advertise_addr = "10.186.65.68" #本机ip地址ports = { # Customize if necessary. -1 means disable. #dns = -1 #server = 8300 #http = 8500 #serf_wan = -1 #serf_lan = 8301}limits = { http_max_conns_per_client = 4096}server = true# For single nodebootstrap_expect = 1 #一台consul# For 3-node cluster#bootstrap_expect = 3 #三台consul,部署集群开启#retry_join = ["127.0.0.1", "127.0.0.2", "127.0.0.3"] # will use default serf portlog_level = "INFO"log_file = "/data/dtle/var/log/consul/"
root@study01 dtle]# lsconsul.hcl nomad.hcl[root@study01 dtle]# pwd/data/dtle/etc/dtle[root@study01 dtle]# cat nomad.hclname = "nomad0" # rename for each node 定义nomad名字,部署集群名字不可重复datacenter = "dc1"data_dir = "/data/dtle/var/lib/nomad"plugin_dir = "/data/dtle/usr/share/dtle/nomad-plugin"log_level = "Info"log_file = "/data/dtle/var/log/nomad/"disable_update_check = truebind_addr = "0.0.0.0" #监听地址# change ports if multiple nodes run on a same machineports { http = 4646 rpc = 4647 serf = 4648}addresses { # Default to `bind_addr`. Or set individually here. #http = "127.0.0.1" #rpc = "127.0.0.1" #serf = "127.0.0.1"}advertise { http = "10.186.65.68:4646" rpc = "10.186.65.68:4647" serf = "10.186.65.68:4648"}server { enabled = true #服务端开启 bootstrap_expect = 1 # Set bootstrap_expect to 3 for multiple (high-availablity) nodes. # Multiple nomad nodes will join with consul.}client { enabled = true options = { "driver.blacklist" = "docker,exec,java,mock,qemu,rawexec,rkt" } # Will auto join other server with consul.}consul { # dtle-plugin and nomad itself use consul separately. # nomad uses consul for server_auto_join and client_auto_join. # Only one consul can be set here. Write the nearest here, # e.g. the one runs on the same machine with the nomad server. address = "10.186.65.68:8500" #客户端开启}plugin "dtle" { config { log_level = "Info" # repeat nomad log level here data_dir = "/data/dtle/var/lib/nomad" nats_bind = "10.186.65.68:8193" nats_advertise = "10.186.65.68:8193" # Repeat the consul address above. consul = "10.186.65.68:8500" #对接consul地址 # By default, API compatibility layer is disabled. #api_addr = "127.0.0.1:8190" # for compatibility API nomad_addr = "10.186.65.68:4646" # compatibility API need to access a nomad server publish_metrics = false stats_collection_interval = 15 }}
systemctl restart dtle-consul.servicesystemctl restart dtle-consul.service[root@study01 dtle]# ps -ef | grep dtledtle 8580 1 1 14:21 ? 00:00:00 /data/dtle/usr/bin/consul agent -config-file=/data/dtle/etc/dtle/consul.hcldtle 8660 1 10 14:22 ? 00:00:00 /data/dtle/usr/bin/nomad agent -config /data/dtle/etc/dtle/nomad.hclroot 8720 2787 0 14:22 pts/0 00:00:00 grep --color=auto dtle
登录nomad和consul的Web页面进行查看
在源端MySQL实例创建测试库和测试表,并在表中插入数据
mysql> create database test;mysql> use test;mysql> create table t1 (id int primary key);mysql> insert into t1 values(1),(2),(3);mysql> select * from t1;
[root@study01 dtle]# cat job.json{"Job": {"ID": "job1", #job名字"Datacenters": ["dc1"],"TaskGroups": [{"Name": "src","Tasks": [{"Name": "src","Driver": "dtle","Config": {"Gtid": "","ReplicateDoDb": [{"TableSchema": "test", #复制的库"Tables": [{"TableName": "t1" #复制的表}]}],"ConnectionConfig": {"Host": "10.186.65.68", #源端连接信息"Port": 3333,"User": "test","Password": "test"}}}]}, {"Name": "dest","Tasks": [{"Name": "dest","Driver": "dtle","Config": {"ConnectionConfig": {"Host": "10.186.65.72", #目标端连接信息"Port": 4444,"User": "test","Password": "test"}}}]}]}}
[root@study01 dtle]# curl -XPOST "http://10.186.65.68:4646/v1/jobs" -d @job.json -s | jq{ "EvalID": "a551d0fc-5de9-9e7b-3673-21b115919646", "EvalCreateIndex": 151, "JobModifyIndex": 150, "Warnings": "", "Index": 151, "LastContact": 0, "KnownLeader": false}
[root@study01 dtle]# curl -XGET "http://10.186.65.68:4646/v1/job/job1" -s | jq '.Status'
mysql> show databases;+--------------------+| Database |+--------------------+| dtle | #有一张二进制的表。用于记录job的GTID号| information_schema || mysql || performance_schema || sys || test | #同步过来的库| universe |+--------------------+mysql> select * from test.t1;+----+| id |+----+| 1 || 2 || 3 |+----+
mysql> use test;mysql> insert into t1 values(7),(8),(9);
此时我们在到目标端数据库查看数据是否有增量数据进来
mysql> select * from test.t1;+----+| id |+----+| 1 || 2 || 3 || 7 || 8 || 9 |+----+
nomad 本体使⽤consul进⾏多节点注册和发现,dtle nomad 插件使⽤consul进⾏任务元数据储存,也就是说,当我们的nomad agent创建一个job的时候会通过consul进行记录元数据,以及复制位置点,如果我们要删除一个job的时候,不单单只是把job给删除,还需要删除consul中的记录,如果不删除consul中的记录,那么下次创建job的时候默认还是会从consul中记录的位置点开始复制。
实际上我们使用curl命令调用的是nomad agent端的HTTP端的接口,将我们本地的job.json文件提交到nomad agent端
nomad工具启动一个job使用的是hcl格式的文件,不可以使用json格式,模板默认放在/usr/share/dtle/scripts下面
[root@study01 dtle]# curl -XDELETE 10.186.65.68:4646/v1/job/job1?purge=true{"EvalID":"c3e69b56-eaab-fa02-afbb-a15e0b395170","EvalCreateIndex":198,"JobModifyIndex":197,"VolumeEvalID":"","VolumeEvalIndex":0,"Index":198,"LastContact":0,"KnownLeader":false}
[root@study01 dtle]# curl -XDELETE "10.186.65.68:8500/v1/kv/dtle/job1?recurse"true
使用nomad命令创建一个job
cp usr/share/dtle/scripts/example.job.hcl .
[root@study01 dtle]# mv example.job.hcl job.hcl[root@study01 dtle]# vim job.hcl #修改复制\源\目标信息
使用nomad命令指定这个文件创建job
[root@study01 dtle]# ./usr/bin/nomad job run job.hcl==> Monitoring evaluation "21c79d55" Evaluation triggered by job "job1" Evaluation within deployment: "c5aeae1d" Allocation "8640b0af" created: node "e1be240c", group "dest" Allocation "e44c8d29" created: node "e1be240c", group "src" Evaluation status changed: "pending" -> "complete"==> Evaluation "21c79d55" finished with status "complete"
[root@study01 dtle]# ./usr/bin/nomad job statusID Type Priority Status Submit Datejob1 service 50 running 2021-05-07T16:26:21+08:00
[root@study01 dtle]# ./usr/bin/nomad job stop -purge job1==> Monitoring evaluation "dce8f4f3" Evaluation triggered by job "job1" Evaluation within deployment: "c5aeae1d" Evaluation status changed: "pending" -> "complete"==> Evaluation "dce8f4f3" finished with status "complete"
[root@study01 dtle]# curl -XDELETE "10.186.65.68:8500/v1/kv/dtle/job1?recurse"
[root@study01 dtle]# ./usr/bin/nomad server membersName Address Port Status Leader Protocol Build Datacenter Regionnomad0.global 10.186.65.68 4648 alive true 2 0.11.1 dc1 global
[root@study01 dtle]# ./usr/bin/nomad node statusID DC Name Class Drain Eligibility Statuse1be240c dc1 nomad0false eligible ready
[root@study01 dtle]# ./usr/bin/nomad job status {jobname}
[root@study01 dtle]# ./usr/bin/nomad versionNomad v0.11.1 (b43457070037800fcc8442c8ff095ff4005dab33)
[root@study01 dtle]# ./usr/bin/nomad node status -verbose e1be240c | grep dtledtle true true Healthy 2021-05-07T14:22:17+08:00driver.dtle = 1driver.dtle.full_version = 3.21.03.0-3.21.03.x-2df8ad7driver.dtle.version = 3.21.03.0
[root@study01 dtle]# ./usr/bin/nomad node status --address=http://10.186.65.68:4646ID DC Name Class Drain Eligibility Statuse1be240c dc1 nomad0false eligible ready
准备两个源端MySQL一个目标端MySQL,先记录GTID,再创建测试库和测试表
mysql> show master status\GExecuted_Gtid_Set: 60144e94-aa1c-11eb-9d16-02000aba4144:1-140mysql> use test;mysql> create table t2(id int);mysql> insert into t2 values(1),(2),(3);mysql> select * from test.t2;+------+| id |+------+| 1 || 2 || 3 |+------+
mysql> show master status \GExecuted_Gtid_Set: efc79686-af16-11eb-bbaa-02000aba4147:1-135mysql> use test;mysql> create table t2(id int);mysql> insert into t2 values(4),(5),(6);
[root@study01 dtle]# cat job_src1.hcljob "job1" { datacenters = ["dc1"] group "src" { task "src" { driver = "dtle" config { ReplicateDoDb = [{ TableSchema = "test" Tables = [{ TableName = "t2" }] }] DropTableIfExists = false Gtid = "60144e94-aa1c-11eb-9d16-02000aba4144:1-140" ChunkSize = 2000 ConnectionConfig = { Host = "10.186.65.68" Port = 3333 User = "test" Password = "test" } } } } group "dest" { task "dest" { driver = "dtle" config { ConnectionConfig = { Host = "10.186.65.72" Port = 4444 User = "test" Password = "test" } # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead. #KafkaConfig = { # Topic = "kafka1" # Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"] # Converter = "json" #} } } } reschedule { # By default, nomad will unlimitedly reschedule a failed task. # We limit it to once per 30min here. attempts = 1 interval = "30m" unlimited = false }}
[root@study01 dtle]# cat job_src2.hcljob "job2" { datacenters = ["dc1"] group "src" { task "src" { driver = "dtle" config { ReplicateDoDb = [{ TableSchema = "test" Tables = [{ TableName = "t2" }] }] DropTableIfExists = false Gtid = "efc79686-af16-11eb-bbaa-02000aba4147:1-135" ChunkSize = 2000 ConnectionConfig = { Host = "10.186.65.71" Port = 5555 User = "test" Password = "test" } } } } group "dest" { task "dest" { driver = "dtle" config { ConnectionConfig = { Host = "10.186.65.72" Port = 4444 User = "test" Password = "test" } # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead. #KafkaConfig = { # Topic = "kafka1" # Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"] # Converter = "json" #} } } } reschedule { # By default, nomad will unlimitedly reschedule a failed task. # We limit it to once per 30min here. attempts = 1 interval = "30m" unlimited = false }}
[root@study01 dtle]# ./usr/bin/nomad job run job_src1.hcl==> Monitoring evaluation "28ae70e4" Evaluation triggered by job "job1" Evaluation within deployment: "95f9a76b" Allocation "d07787a4" created: node "e1be240c", group "dest" Allocation "f46d9291" created: node "e1be240c", group "src" Evaluation status changed: "pending" -> "complete"==> Evaluation "28ae70e4" finished with status "complete"[root@study01 dtle]# ./usr/bin/nomad job run job_src2.hcl==> Monitoring evaluation "1d2da1f7" Evaluation triggered by job "job2" Evaluation within deployment: "716b11a5" Allocation "2dd7fe5c" created: node "e1be240c", group "src" Allocation "724ec296" created: node "e1be240c", group "dest" Evaluation status changed: "pending" -> "complete"==> Evaluation "1d2da1f7" finished with status "complete"
mysql> select * from test.t2;+------+| id |+------+| 1 || 2 || 3 || 4 || 5 || 6 |+------+
mysql> insert into t2 values (7);Query OK, 1 row affected (0.01 sec)mysql> select * from test.t2;+------+| id |+------+| 1 || 2 || 3 || 7 |+------+4 rows in set (0.01 sec)
mysql> select * from test.t2;+------+| id |+------+| 1 || 2 || 3 || 4 || 5 || 6 || 7 |+------+7 rows in set (0.01 sec)
准备三台MySQL实例,一台源端,两台目标端。根据条件,分别复制到不同的目标端数据库
mysql> show master status \GExecuted_Gtid_Set: 60144e94-aa1c-11eb-9d16-02000aba4144:1-143
mysql> use test;mysql> create table t3(id int);mysql> insert into t3 values(1),(2);
[root@study01 dtle]# cat job_dst1.hcljob "dst1" { datacenters = ["dc1"] group "src" { task "src" { driver = "dtle" config { ReplicateDoDb = [{ TableSchema = "test" Tables = [{ TableName = "t3" Where = "id<10" }] }] DropTableIfExists = false Gtid = "60144e94-aa1c-11eb-9d16-02000aba4144:1-143" ChunkSize = 2000 ConnectionConfig = { Host = "10.186.65.68" Port = 3333 User = "test" Password = "test" } } } } group "dest" { task "dest" { driver = "dtle" config { ConnectionConfig = { Host = "10.186.65.71" Port = 5555 User = "test" Password = "test" } # For a kafka job, do not set ConnectionConfig in dest task. Set KafkaConfig instead. #KafkaConfig = { # Topic = "kafka1" # Brokers = ["127.0.0.1:9192", "127.0.0.1:9092"] # Converter = "json" #} } } } reschedule { # By default, nomad will unlimitedly reschedule a failed task. # We limit it to once per 30min here. attempts = 1 interval = "30m" unlimited = false }}
两个配置文件不同的地方有,job名字,where条件,以及目标端的连接信息
[root@study01 dtle]# ./usr/bin/nomad job run job_dst1.hcl[root@study01 dtle]# ./usr/bin/nomad job run job_dst2.hcl[root@study01 dtle]# ./usr/bin/nomad statusID Type Priority Status Submit Datedst1 service 50 running 2021-05-07T20:13:05+08:00dst2 service 50 running 2021-05-07T20:13:10+08:00
验证数据,小于10的会在71这台数据库,大于等于10的会在72这台数据库
在源端插入数据,然后分别到目标端进行查看
mysql> insert into t3 values(9),(10);mysql> select * from test.t3;+------+| id |+------+| 1 || 2 || 9 || 10 |+------+
mysql> select * from test.t3;+------+| id |+------+| 1 || 2 || 9 |+------+
mysql> select * from test.t3;+------+| id |+------+| 10 |+------+
easydb.net
微信公众号:easydb
关注我,不走丢!转载地址:http://vlopz.baihongyu.com/