MySQL数据迁移TcaplusDB实践
小编:啊南 243阅读 2020.11.21
随着业务数据量的剧增,传统MySQL在数据存储上变得越来越吃力,NoSQL因其良好的性能、扩展性、稳定性逐渐成为业务选型的首要考虑。TcaplusDB是腾讯云推出的一款全托管NoSQL数据库服务,旨在为客户提供极致的数据据存储体验,详细信息请参考官方文档。本文主要介绍如何将MySQL数据迁移到TcaplusDB。
2. 迁移说明MySQL与TcaplusDB属于异构数据库,数据迁移之前需要考虑两者间数据的差异。
2.1 术语和概念
TcaplusDB |
MySQL |
---|---|
集群 |
数据库 |
表格组 |
N/A |
表 |
表 |
记录 |
行 |
字段 |
列 |
特性 |
TcaplusDB |
MySQL |
---|---|---|
数据模型 |
Key-Value, JSON格式 |
结构化数据 |
数据类型 |
丰富 |
丰富 |
分片 |
Yes |
N/A |
扩展性 |
优 |
低 |
SQL |
类SQL |
ANSI |
主键 |
Yes |
Yes |
外键 |
N/A |
Yes |
事务 |
N/A |
Yes |
二进制数据 |
Yes |
Yes |
周边生态 |
弱 |
强 |
TcaplusDB支持两种协议来表述数据Schema, 分别为TDR (Tencent Data Representation)和Google Protobuf, 不同协议的数据类型在定义时有所区别,但在底层是统一的。
数据类型 |
TcaplusDB (TDR) |
TcaplusDB (Protobuf) |
MySQL |
---|---|---|---|
整形 |
int64,uint64 |
int64,uint64 |
BIGINT |
整形 |
int32,uint32 |
int32,uint32 |
INT,INTEGER |
整形 |
int16,uint16 |
N/A |
SMALLINT |
整形 |
int8,uint18 |
N/A |
TINYINT |
浮点型 |
float, double |
float,double |
FLOAT, DOUBLE |
定点型 |
N/A |
N/A |
DECIMAL |
位 |
N/A |
N/A |
BIT |
字符型 |
char |
bytes |
CHAR |
字符串 |
string |
string |
VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT |
二进制 |
array char |
bytes |
TINYBLOB, BLOB, MEDIUMBLOB,LONGBLOB, VARBINARY, BINARY |
布尔型 |
bool |
bool |
BOOLEAN |
数组列表 |
array修饰 |
repeated修饰 |
N/A |
结构体 |
struct |
message |
N/A |
枚举 |
N/A |
enum |
ENUM |
共同体 |
union |
N/A |
N/A |
TcaplusDB与MySQL属于异构数据库,数据迁移有一些限制。
序号 |
限制项 |
说明 |
---|---|---|
1 |
不支持外键迁移 |
TcaplusDB没有外键,如果MySQL有定义外键迁移时外键对应列直接映射成TcaplusDB字段,不会维护原有外键关联表信息 |
2 |
不支持存储过程 |
如果MySQL定义有存储过程,迁移时将被忽略 |
3 |
数据类型转换 |
对于TcaplusDB不支持的数据类型,需要进行转换,如Decimal转成TcaplusDB长整形,日期类的转换成字符串类型等 |
4 |
迁移网络环境 |
本文只介绍同是腾讯云环境下MySQL迁移TcaplusDB场景,MySQL与TcaplusDB同属腾讯云一个地域 |
5 |
实时迁移删除操作限制 |
删除操作可能存在删空记录情况,需要避免后续离线迁移重新把待删除的记录写到表中,产生脏数据现象,具体做法是如果删除的是一条空记录把记录写到另一张待删除的表,待离线全量迁移完成后进行对账,把脏数据从业务表删除 |
6 |
MySQL数据订阅 |
开启数据订阅功能,需要修改数据源MySQL实例的参数,涉及重启实例,会影响当前存在的连接,用户需要评估重启期间断连对业务的影响 |
对于线上业务迁移,涉及实时数据迁移和离线存量数据迁移。以游戏举例,如果业务同意停服迁移,则直接采用离线全量数据一次性迁移最好,低价低; 如果业务不同意停服迁移,则需要同时考虑实时和离线。
2.5.1 实时数据迁移场景MySQL实时数据迁移通用的方案是基于binlog进行实时数据采集,捕获数据更新操作如insert、update、delete。腾讯云目前针对MySQL实例支持数据订阅功能可实时捕获MySQL数据变更,数据订阅相关文档可参考官网文档。
数据订阅功能支持将数据采集传输至Kafka,这里会介绍如何用腾讯云CKafka实现数据流传输,同时借助腾讯云SCF无服务函数来消费CKafka数据并写到TcaplusDB。
2.5.2 离线数据迁移场景MySQL离线数据迁移涉及存量数据的搬迁,存量数据搬迁需要考虑如何避免线上业务影响,如在业务低峰期迁移、从备机拉数据。存量数据导出并导入异构平台方案这里介绍两种:
- 方案一: 用Select直接查询备机,以一定格式(如约定好分隔符间隔各字段值)将数据导出到本地文件,然后通过离线大数据批量解析(e.g., Map/Reduce, Spark)文件将数据按TcaplusDB数据格式写入TcaplusDB。这里涉及到的腾讯云产品:腾讯云COS用于存储导出的数据文件,腾讯云EMR用于从COS拉取数据文件进行批量解析并写入到TcaplusDB。此方案涉及开发数据文件解析代码。本文模拟数据量比较小,直接将导出的数据写到TcaplusDB即可,暂不涉及EMR。
- 方案二: 用mysqldump从备机批量dump数据到文件,文件数据格式是SQL格式(INSERT语句),然后再把导出数据重新Load到新的MySQL,产生binlog,再按实时数据迁移方案把数据写到TcaplusDB。这里涉及的腾讯云产品:腾讯云COS存储数据文件,腾讯云MySQL实例存储新的load数据,腾讯云DTS服务数据订阅功能实时采集binlog, 腾讯云CKafka作为消息队列中间件,腾讯云SCF用于消费数据写到TcaplusDB。
迁移场景 |
资源名称 |
资源说明 |
---|---|---|
实时迁移 |
腾讯云CVM实例 |
用于跑数据订阅程序 |
实时迁移 |
腾讯云CDB for MySQL |
用于数据源模拟 |
实时迁移 |
腾讯云DTS服务数据订阅 |
实时订阅MySQL数据 |
实时迁移 |
腾讯云CKafka |
消息队列,解藕订阅/消费过程 |
实时迁移 |
腾讯云SCF |
消费CKafka数据 |
实时迁移 |
腾讯云TcaplusDB |
数据存储平台 |
离线迁移 |
腾讯云COS |
数据文件存储 |
离线迁移 |
腾讯云CDB for MySQL |
用于中间临时数据存储 |
迁移场景 |
开发项 |
---|---|
实时迁移 |
数据订阅程序 |
实时迁移 |
SCF消费订阅数据程序 |
离线迁移 |
批量导出MySQL数据和批量解析程序 |
所有资源统一申请到腾讯云上海地域。
4.1.1 示例表结构- MySQL示例表
表信息 |
|||
---|---|---|---|
库表名 |
库: tcaplus |
表: test |
|
序号 |
字段名 |
字段类型 |
字段说明 |
1 |
player_id |
bigint(20) |
primary key |
2 |
player_name |
varchar(128) |
|
3 |
player_info |
varbinary(2048) |
JSON字符串二进制写入 |
player_info的JSON原始数据结构如下:
序号 |
字段名 |
字段类型 |
---|---|---|
1 |
player_email |
varchar(64) |
2 |
player_phone |
varchar(32) |
- TcpalusDB示例表
表信息 |
||||
---|---|---|---|---|
集群:tw_tcaplus |
表格组: tw_group_1 |
表名: test |
||
序号 |
字段名 |
字段类型 |
字段说明 |
|
1 |
player_id |
int64 |
primary key |
|
2 |
player_name |
string |
||
3 |
player_info |
struct |
参考MySQL表player_info说明 |
- CVM实例: 申请一个2C4G的实例
- MySQL数据源: 申请一个CDB for MySQL实例用于模拟业务原始数据,规格:2C4G高可用版,并创建好库表(库:tcaplus, 表:test,编码:utf8mb4),申请指引参考官方文档,创建表DDL语句如下:
CREATE TABLE `test` ( `player_id` bigint(20) NOT NULL, `player_name` varchar(128) DEFAULT NULL, `player_info` varbinary(2048) DEFAULT NULL, PRIMARY KEY (`player_id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
- CKafka实例: 申请CKafka实例,规格:标准入门版,具体参考官方文档。
- TcaplusDB: 创建一个TcaplusDB表,具体参考创建表指南。表相关信息:表所在集群名:tw_cluster, 表所在表格组名:tw_group_1,表名:test。TcaplusDB表定义Schema如下:
syntax = "proto3"; // Specify the version of the protocol buffers language import "tcaplusservice.optionv1.proto"; // Use the public definitions of TcaplusDB by importing them. message test { // Define a TcaplusDB table with message // Specify the primary keys with the option tcaplusservice.tcaplus_primary_key // The primary key of a TcaplusDB table has a limit of 4 fields option(tcaplusservice.tcaplus_primary_key) = "player_id"; // primary key fields int64 player_id = 1; //ordinary fields string player_name = 2; info player_info = 3; } message info { string player_email = 1; string player_phone = 2; }
- SCF: 在云函数控制台新建一个Python3.6版的空白模板函数,函数名:tw_migrate。具体指引参考官方文档。
- 字符集: 统一为utf8mb4。
- 库表: 从控制台登录实例后,检查库表创建是否OK, 如下所示:
- 专用账户: 创建迁移专用账户tw_tcaplus, 主机授权暂时设置为%,允许所有主机通过此账户来访问实例,如下所示:
- Topic: 创建一个迁移专用的topic, topic名为tw_migrate,如下所示:
- TcaplusDB表: 进TcpalusDB控制台,查看表格列表,如下所示:
- 订阅通道创建: 在数据传输服务控制台新建一个数据订阅实例,如下:
- 订阅配置: 初始化订阅配置,选择MySQL实例作为数据源,选择VPC和子网(注意要与MySQL实例同属一个网络),在同步类型处只选择数据更新,库表任务处选择test表作为订阅的表。具体如下所示:
- 订阅详情: 配置好后,点击启动会去修改MySQL实例参数并重启MySQL实例,订阅信息如下:
- 注意 : 订阅配置过程会对MySQL实例修改一些binlog配置参数,修改过程会kill当前已有连接并重启MySQL实例,业务需要关注这点
数据订阅好后,需要把订阅的数据拉取到CKafka, 这里需要开发相应的程序,示例程序可参考官方文档。程序依赖的相关组件可从官方文档获取。
4.2.5.1 主要依赖- 示例程序: KafkaDemo.java, 参考第6章节资源下载
- 数据订阅SDK:binlogsdk-2.8.2-jar-with-dependencies.jar, 参考第6章节资源下载
- SLF4J组件:SLF4J.zip, 参考第6章节资源下载
- kafka-clients:kafka-clients-1.1.0.jar, 参考第6章节资源下载
- Json-simple:json-simple-1.1.jar, 参考第6章节资源下载
- JDK1.8: 直接安全openjdk, 命令:yum install -y java-1.8.0-openjdk-devel。
- 数据订阅信息获取:查看上述数据订阅详情截图的信息,主要是通道ID、服务IP、服务端口;
- 示例程序修改: 修改上述下载的示例程序KafkaDemo.java,替换相关配置为自己的信息
//替换TOPIC为申请的topic名,这里为tw_migrate final String TOPIC = "tw_migrate"; Properties props = new Properties(); //替换CKafka实例的连接信息,进Ckafka控制查看实例信息中的内网IP与端口 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.17.16.6:9092"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); final Producer4.2.5.3 程序编译producer = new KafkaProducer (props); //替换SecretId为自己腾讯云账号下申请的密钥信息 context.setSecretId("xxx"); //替换SecretKey为自己腾讯云账号下申请的密钥信息 context.setSecretKey("xxx"); //替换Region信息 context.setRegion("ap-shanghai"); // 替换数据订阅连接信息,从详情页获取的ip,port context.setServiceIp("172.17.16.2"); context.setServicePort(7507); final DefaultSubscribeClient client = new DefaultSubscribeClient(context); // 填写对应要同步的数据库和表名 //替换订阅的库名 final String targetDatabase = "tcaplus"; client.addClusterListener(listener); // 替换订阅的通道名,订阅详情页获取的 dts-channel 配置信息,填写到此处 client.askForGUID("dts-channel-5ljfLZmhFvnTCQx9"); client.start();
上传上述下载的组件包到CVM实例,并执行如下编译命令:
javac -cp binlogsdk-2.8.2-jar-with-dependencies.jar:kafka-clients-1.1.0.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar:json-simple-1.1.jar -encoding UTF-8 KafkaDemo.java4.2.5.4 程序执行
java -XX:-UseGCOverheadLimit -Xms2g -Xmx2g -classpath .:binlogsdk-2.6.0-release.jar:kafka-clients-1.1.0.jar:slf4j-api-1.7.25.jar:slf4j-log4j12-1.7.2.jar:json-simple-1.1.jar KafkaDemo4.2.5.5 程序验证
在MySQL实例中插入一条示例数据,验证数据是否能订阅成功。插入MySQL数据这里用Python3程序来模拟,代码如下:
import json import MySQLdb #替换DB连接信息,从已申请的MySQL实例中获取 db = MySQLdb.connect("172.17.16.17", "xxx", "xxx", "tcaplus", charset='utf8' ) cursor = db.cursor() #模拟一个json结构数据转成字符串后以byte形式写入MySQL info={"player_phone":"123456","player_email":"[email protected]"} info_bin = json.dumps(info).encode('utf-8') sql = "INSERT INTO test (player_id, player_name, player_info) VALUES (%s, %s, %s)" val = (7, 'June', info_bin,) cursor.execute(sql, val) db.commit() db.close()
打开另一个CVM终端,执行上述程序,并查看4.2.5.4启动程序的输出,如下所示:
ckafka_subscribe从上图来看说明数据订阅程序已经捕获到MySQL的数据插入动作。
4.2.6 数据消费数据消费通过腾讯云SCF来实现。SCF支持创建CKafka触发器,借助触发器机制可实时捕获CKafka的数据流,只要有数据发布到Ckafka指定topic, 会触发SCF自动拉取Topic新进的数据。触发器如下所示:
scf_triggerSCF捕获到数据后,解析捕获的数据包并转换成TcaplusDB能识别的JSON记录格式,再通过调用TcaplusDB Python RESTful SDK接口把JSON记录写到TcaplusDB表。具体代码可在SCF控制台上传代码ZIP包即可, 下载地址 。代码关键逻辑:
- 捕获插入操作: 针对数据是INSERT操作类型的,转换成TcaplusDB的AddRecord操作,即新增一条记录
- 捕获删除操作: 针对数据是DELETE操作类型的,转换成TcaplusDB的DeleteRecord操作,即删除一条记录
- 捕获更新操作: 针对数据是UPDATE操作类型的,转换成TcaplusDB的SetRecord操作,即更新一条记录
- 脏数据注意事项: 对于删除操作,由于是实时迁移,全量数据暂未同步到TcaplusDB,所以可能会存在删除一条空记录的情况,需要针对删除为空记录场景时把待删除的记录先保存到另一张待删除表,等全量数据迁移至TcaplusDB后,进行一次全量对账,即检查待删除表中的记录是否重新通过 离线迁移方式写到了业务TcaplusDB表,如果是则需要把业务表中的记录进行删除,避免脏数据的出现。
通过SCF转换写入到TcaplusDB的数据,如下所示:
tcaplus_data4.3 迁移总结上面实现并验证了实时迁移数据流管道,通过数据订阅捕获MySQL增删改事件并实时通过订阅程序传输到Ckafka, 同时通过SCF触发器机制捕获CKafka的输入数据流并拉取解析最后写到TcaplusDB。针对删除操作,为避免空删场景,把删除时错误码为261(数据记录不存在)的单独处理,即把这部分数据重新写到新的一张待删除表,待全量迁移完成后再统一对账清理这部分脏数据。
5. 离线迁移方案离线迁移主要有两种方式: 一种是dump方式把表数据dump成SQL文件形式,文件内容为Insert格式,然后可以把SQL文件回写到另一临时MySQL实例产生Binlog走实时迁移方案; 另一种是select方式,从表中查数据出来以指定格式保存到文本文件, 如JSON格式行,通过腾讯云批量解析的方式写到TcaplusDB。
5.1 Dump方式迁移5.1.1 Dump表数据dump全表数据可以用如下命令:
#替换MySQL连接账户名和密码 mysqldump -h172.17.16.17 -u[db_user] -p[db_password] -P3306 -B tcaplus --tables test --skip-opt >test.sql
将上述导出test.sql文件重新Load到新的临时MySQL实例,产生Binlog后用实时迁移方案来进行数据采集,参考第4章节所述,这里不再展开。
5.2 Select方式迁移5.2.1 Select表数据Select方式可以选择数据输出格式如JSON,如果原表设计有时间字段可以将时间字段设置为索引,并按时间段进行数据导出避免一次导出全量数据。这里以导出全量数据举例,借助JSON_OBJECT函数可以导出TcaplusDB的JSON格式,如下命令所示:
#替换用户名和密码,行之间间隔符}}}{{{ mysql -h172.17.16.17 -u[db_user] -p[db_password] -P3306 -D tcaplus -Ne "select JSON_OBJECT('player_id',player_id,'player_name',player_name,'player_info',player_info) from test" >test.json
test.json示例文件格式如下:
{"player_id": 1, "player_info": "base64:type15:eyJwbGF5ZXJfZW1haWwiOiAiMUB0ZXN0LmNvbSIsICJwbGF5ZXJfcGhvbmUiOiAiMTIzNDU2NiJ9", "player_name": "ball"}
从上述示例格式来看,MySQL底层对Varbinary数据类型会自动转成base64编码,在解析时需要把这个base64进行解码转换成TcaplusDB的字符串格式。
5.2.2 数据解析在2.5.2章节介绍了离线数据迁移场景,如果业务表数据量很大,为加快导入TcaplusDB速度,可考虑批量解析,批量解析文件目前业界用得较多的方案是用Spark或Map/Reduce进行文件解析将解析后的数据写入到TcaplusDB,后续针对批量解析这块单独介绍,这里只简单介绍上述导出的JSON文件导入到TcaplusDB。
JSON文件解析采用Python进行,同时引入TcaplusDB Python RESTful SDK,SDK使用方法参考官方文档。示例代码如下:
import json import base64 #引入TcaplusRestClient类 from tcaplusdb_client.tcaplusdb_rest_client import TcaplusRestClient client = None #替换TcaplusDB集群访问地址 endpoint = "http://172.17.16.15" #替换TcaplusDB访问ID access_id = 20 #替换TcaplusDB集群密码 access_password = "xxx" #替换TcaplusDB表格组ID tablegroup_id = 1 #替换TcaplusDB表名 table_name = "test" def init(): #初始化TcaplusDB客户端 global client client = TcaplusRestClient(endpoint, access_id, access_passwd) client.SetTargetTable(table_group_id=tablegroup_id, table_name=table_name) def write(): #从文件读数据并写入TcaplusDB filename='test.json' with open(filename,'r') as f: lines = f.readlines() for line in lines: json_str = json.loads(line) player_info = json_str['player_info'].replace("base64:type15:","") #将base64编码的字段反解码成字符串并转换成JSON格式 json_str['player_info'] = json.loads(base64.b64decode(player_info)) stat, resp = client.AddRecord(json_str) if __name__ == '__main__': init() write()5.2.3 数据COS存储
对于MySQL导出的数据文件可以放腾讯云COS存储,方便其它组件拉取数据进行处理。COS相关介绍可参考官方文档。这里介绍Python SDK操作方法,具体使用手册可参考官方文档。
- SDK安装
pip install -U cos-python-sdk-v5
- COS上传和下载
from qcloud_cos import CosConfig from qcloud_cos import CosS3Client #替换COS所在Region名 cosRegionName='ap-shanghai' #替换腾讯云账户的密钥信息 SECRET_ID='xxx' SECRET_KEY='xxx' scheme='https' config = CosConfig(Region=cosRegionName, SecretId=SECRET_ID, SecretKey=SECRET\_KEY,Scheme=scheme) client = CosS3Client(config) # 创建bucket, 注意格式为migrate名-appid (appid是自己腾讯云账号下的appid) response = client.create_bucket( Bucket='migrate-1258272208' ) #上传文件到COS #替换bucket名,要上传的文件名及Key, PartSize指定分包大小(单位MB),MAXThread指定并发上传的线程数 response = client.upload_file( Bucket='migrate-1258272208', LocalFilePath='1.txt', Key='1.txt', PartSize=1, MAXThread=10, EnableMD5=False ) #从COS下载文件 #替换Bucket名 response = client.get_object( Bucket='migrate-1258272208', Key='1.txt', ) response['Body'].get_stream_to_file('output.txt')6. 资源下载
迁移场景 |
依赖资源 |
资源下载地址 |
资源用途 |
---|---|---|---|
实时迁移 |
tcaplus_tes.sql |
定义数据源表结构 |
|
实时迁移 |
test.proto |
定义TcaplusDB表结构 |
|
实时迁移 |
mysql_demo.py |
用于模拟写入MySQL数据,依赖mysqlclient库,参考网上资料安装 |
|
实时迁移 |
KafkaDemo.java |
数据订阅程序,从数据订阅管道拉取binlog捕获数据并解析写入到CKafka |
|
实时迁移 |
binlogsdk-2.8.2-jar-with-dependencies.jar |
KafkaDemo依赖,binlog捕获SDK |
|
实时迁移 |
SLF4J.zip |
KafkaDemo依赖,日志组件 |
|
实时迁移 |
kafka-clients-1.1.0.jar |
KafkaDemo依赖,Kafka客户端 |
|
实时迁移 |
json-simple-1.1.jar |
KafkaDemo依赖,json处理组件 |
|
实时迁移 |
scf_migrate_tcaplusdb.zip |
SCF程序,从Ckafka摘取数据并写入TcaplusDB |
|
离线迁移 |
tcaplusdb-restapi-python-sdk-3.0.tgz |
TcpalusDB Python RESTful SDK API, 基于包装好的RESTful 接口进行TcaplusDB数据操作 |
本文介绍了MySQL数据迁移TcaplusDB的两种方案: 实时和离线迁移。实时迁移采用订阅MySQL binlog的方式将数据订阅到CKafka, 通过SCF拉取CKafka数据进行实时写入到TcaplusDB。离线迁移可以采用Dump和Select两种方式进行数据导出并将导出文件解析写入到TcaplusDB。
相关推荐
- MySQL|Aborted connection 日志分析 一 前言作为运维DBA,我们经常会在数据库的err.log中查看到如下种类的报错信息:[Warning] Aborted connection xx to db: db user: xxx host: hostname (Got an error reading communication packets)[Warning] Aborted connection xx to db:unconnected user: …
- 3DMAX提示和技巧 本主题标识使用 Civil View 的一些重要提示和技巧。常规使用屏幕分辨率至少为 1280x1024 的 Civil View。低于此分辨率时,一些面板将占用过多屏幕空间。 将视口设置为线框显示以达到最佳性能。 要尽可能简化用户界面,请在单个视口中工作并关闭 3ds Max 命令面…