玩转从MySQL到DynamoDB的迁移(二)
这周我们继续讲从MySQL到DynamoDB的数据库迁移的第二个方案:使用EMR, Amazon Kinesis和Lambda来提供更灵活以及更精确的控制。如果你的环境有一个MySQL的只读副本(Replica),那么从Replica转存数据则是一个更好的方案。
改变主键(Key)的设计:
当你决定将数据库从RDBMS迁移到NoSQL的时候,你需要针对性能和成本节约的方向寻找一个更合适的NoSQL主键设计。
和方案1一样,假设MySQL数据源有一个组合主键(customerid+ordered+producid)。将MySQL的记录通过Customerid(hash键) 和orderid(排序键)组织到更少的DynamDB条目中,另外,通过转换在MySQL中的productid列到DynamoDB的属性名,并移除组合键的最后一列(productid),并且设置属性值为quantity。 这个转换方法降低了条目的数量。你可以用更少的读容量单位(Read Capacity Unit)读取到同样数量的信息,这种方法可以帮助节约成本和改善性能。
更多的信息请查看怎样计算读/写容量单位(https://docs.aws.amazon.com/zh_cn/amazondynamodb/latest/developerguide/HowItWorks.ReadWriteCapacityMode.html)
迁移步骤
方案2有两个迁移路径,分别是:
- 批量处理:导出MySQL数据,上传到Amazon S3,然后导入到DynamoDB。
- 实时处理:在MySQL中抓取变更的数据,发送插入/更新/删除事务到Amazon Kinesis Streams,并且触发Lambda函数来发送数据到DynamoDB。
为了保证数据的一致性和完整性,抓取和发送数据到Amazon Kinesis 流应该在批量处理之前开始。直到EMR的批量处理结束之前,Lambda函数应该待命并且Amazon Kinesis 流应该保留抓取到的数据。以下是执行顺序:
- 开始实时发送数据到Amazon Kinesis Streams
- 实时处理一开始,同时开始批量处理
- 在批量处理结束时,触发Lambda函数来执行put_item从Amazon Kinesis Streams发送数据到DynamoDB
- 修改应用终端节点从MySQL到DynamoDB
步骤1:抓取正在变化的数据并放入Amazon Kinesis Streams
首先,创建一个Amazon Kinesis Stream 来保存来自MySQL事务处理的数据。基于批量迁移过程估算时间并且设置Amazon Kinesis stream 的“数据保留期限”值。为了数据的完整性,保留期限应该要设置足够长时间来保存所有批量处理结束时的事务处理数据。然而我们不需要选择最长保存期限,这个期限取决于数据迁移的总量。
在MySQL的配置中,使用BinLogStreamReader模块设置binlog_format到行来抓取事务。Log_bin参数必须开启为binlog。
更多信息请查看AWS数据库博客(https://aws.amazon.com/cn/blogs/database/streaming-changes-in-a-database-with-amazon-kinesis/)
[mysqld]
secure-file-priv = ""
log_bin=/data/binlog/binlog
binlog_format=ROW
server-id = 1
以下是抓取事务处理数据并且发送到Amazon Kinesis Streams的Python代码示例:
#!/usr/bin/env python
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
def main():
kinesis = boto3.client("kinesis")
stream = BinLogStreamReader(
connection_settings= {
"host": "<host IP address>",
"port": <port number>,
"user": "<user name>",
"passwd": "<password>"},
server_id=100,
blocking=True,
resume_stream=True,
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent])
for binlogevent in stream:
for row in binlogevent.rows:
event = {"schema": binlogevent.schema,
"table": binlogevent.table,
"type": type(binlogevent).__name__,
"row": row
}
kinesis.put_record(StreamName="<Amazon Kinesis stream name>", Data=json.dumps(event), PartitionKey="default")
print json.dumps(event)
if __name__ == "__main__":
main()
以下代码是Python脚本生成的JSON示例数据。这类属性用JSON语句定义事务处理记录:
WriteRowsEvent = INSERT
UpdateRowsEvent = UPDATE
DeleteRowsEvent = DELETE
{"table": "purchase_temp", "row": {"values": {"orderid": "orderidA1", "quantity": 100, "customerid": "customeridA74187", "productid": "productid1"}}, "type": "WriteRowsEvent", "schema": "test"}
{"table": "purchase_temp", "row": {"before_values": {"orderid": "orderid1", "quantity": 1, "customerid": "customerid74187", "productid": "productid1"}, "after_values": {"orderid": "orderid1", "quantity": 99, "customerid": "customerid74187", "productid": "productid1"}}, "type": "UpdateRowsEvent", "schema": "test"}
{"table": "purchase_temp", "row": {"values": {"orderid": "orderid100", "quantity": 1, "customerid": "customerid74187", "productid": "productid1"}}, "type": "DeleteRowsEvent", "schema": "test"}
步骤2:从MySQL转存数据到DynamoDB
最容易的方法是使用DMS(Data Migration Service),这个服务最近更新之后可以指定S3作为DMS的目标。对于S3作为目标来说,完整数据加载和CDC数据加载都会以csv格式写入到S3。然而,CDC不是非常支持UPDATE和DELET语句。
更多信息请查看用S3作为数据迁移服务的目标(https://docs.aws.amazon.com/zh_cn/dms/latest/userguide/CHAP_Target.S3.html)。
另一个方法是使用自己定义脚本同时运行INTO OUTFILE SQL分句和aws s3 sync CLI命令上传数据到Amazon S3。两种方法的相似程度处决于你的服务器容量和本地网络带宽。你可能也会发现一些有用的第三方工具,比如pt-archiver。
SELECT * FROM purchase WHERE <condition_1>
INTO OUTFILE '/data/export/purchase/1.csv' FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n';
SELECT * FROM purchase WHERE <condition_2>
INTO OUTFILE '/data/export/purchase/2.csv' FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n';
...
SELECT * FROM purchase WHERE <condition_n>
INTO OUTFILE '/data/export/purchase/n.csv' FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n';
对于这类case我推荐使用aws s3 sync。这个命令和s3 Multi-part upload 功能从内网连接合作。模式匹配能够排除和包含特殊的文件。另外,如果同步进程在处理过程中崩溃,你不需要再次上传相同的文件。同步命令会比较本地以及s3版本的文件大小和修改时间,并且同步本地未上传的部分以及修改s3上的时间。
$ aws s3 sync /data/export/purchase/ s3://<your bucket name>/purchase/
$ aws s3 sync /data/export/<other path_1>/ s3://<your bucket name>/<other path_1>/
...
$ aws s3 sync /data/export/<other path_n>/ s3://<your bucket name>/<other path_n>/
在所有数据上传到S3之后,有两种方法放入DynamoDB:
- 对外部表使用Hive。
- 写MapReduce代码。
在外部表使用Hive
针对S3上的数据创建一个Hive外部表,并根据DynamoDB表将其插入到另一个外部标中,使用属性“org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler”,为了提高生产率和可伸缩性,将考虑使用Brickhouse(github资源链接:https://github.com/klout/brickhouse),它是针对Hive的UDF集合。
下面的示例代码假定DynamoDB的Hive表是使用产品列创建的,该产品列的类型为ARRAY<STRING>, 聚合productid和quantity列,按customerid和orderid分组,并与Brickhouse提供的CollectUDAF列一起插入产品列。
hive> DROP TABLE purchase_ext_s3;
--- To read data from S3
hive> CREATE EXTERNAL TABLE purchase_ext_s3 (
customerid string,
orderid string,
productid string,
quantity string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION 's3://<your bucket name>/purchase/';
Hive> drop table purchase_ext_dynamodb ;
--- To connect to DynamoDB table
Hive> CREATE EXTERNAL TABLE purchase_ext_dynamodb (
customerid STRING, orderid STRING, products ARRAY<STRING>)
STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
TBLPROPERTIES ("dynamodb.table.name" = "purchase",
"dynamodb.column.mapping" = "customerid:customerid,orderid:orderid,products:products");
--- Batch-puts to DynamoDB using Brickhouse
hive> add jar /<jar file path>/brickhouse-0.7.1-SNAPSHOT.jar ;
hive> create temporary function collect as 'brickhouse.udf.collect.CollectUDAF';
hive> INSERT INTO purchase_ext_dynamodb
select customerid as customerid , orderid as orderid
,collect(concat(productid,':' ,quantity)) as products
from purchase_ext_s3
group by customerid, orderid;
但是,DynamoDBStorageHandler class不支持MAP, LIST, BOOLEAN和NULL数据类型,所以选择了ARRAY<STRING>数据类型。Hive中的ARRAY<STRING>数据类型的products列匹配DynamoDB中的StringSet类型属性。示例代码展示了Brickhouse怎样针对聚合多个记录到DynamoDB中的StringSet属性。
Python MapReduce和Hadoop流:
一个映射器任务从S3上的输入数据中读取每个记录,并将输入键值对映射到中间键值对。它将 S3 中的源数据分为由 TAB 字符 (“\t”) 分隔的两个部分 (关键部分和值部分)。映射器数据按其中间键 (customerid和orderid) 排序, 并发送给减速器。记录将在减速器步骤中放入 DynamoDB 中。
#!/usr/bin/env python
import sys
# get all lines from stdin
for line in sys.stdin:
line = line.strip()
cols = line.split(',')
# divide source data into Key and attribute part.
# example output : “cusotmer1,order1 product1,10”
print '%s,%s\t%s,%s' % (cols[0],cols[1],cols[2],cols[3] )
通常,reduce任务在映射处理(键值对或者列表值)之后接收产生的输出,然后针对每个键值在列表值上执行操作。
在这个case里,减速器基于STDIN/STDOUT/Hadoop流使用Python编写。枚举型数据不可用。减速器接收在映射器中使用中间键集(customerid和orderid(cols[0],col[1])) 排序和整理过的数据,并且将特定键值的所有属性存储在item_data字典中。每次新的中间键来自 sys. stdin 时, item _ data 字典中的属性都会放入或刷新 DynamoDB。
#!/usr/bin/env python
import sys
import boto.dynamodb
# create connection to DynamoDB
current_keys = None
conn = boto.dynamodb.connect_to_region( '<region>', aws_access_key_id='<access key id>', aws_secret_access_key='<secret access key>')
table = conn.get_table('<dynamodb table name>')
item_data = {}
# input comes from STDIN emitted by Mapper
for line in sys.stdin:
line = line.strip()
dickeys, items = line.split('\t')
products = items.split(',')
if current_keys == dickeys:
item_data[pr0ducts[0]]=products[1]
else:
if current_keys:
try:
mykeys = current_keys.split(',')
item = table.new_item(hash_key=mykeys[0],range_key=mykeys[1], attrs=item_data )
item.put()
except Exception ,e:
print 'Exception occurred! :', e.message,'==> Data:' , mykeys
item_data = {}
item_data[pr0ducts[00]]=products[1]
current_keys = dickeys
# put last data
if current_keys == dickeys:
print 'Last one:' , current_keys #, item_data
try:
mykeys = dickeys.split(',')
item = table.new_item(hash_key=mykeys[0] , range_key=mykeys[1], attrs=item_data )
item.put()
except Exception ,e:
print 'Exception occurred! :', e.message, '==> Data:' , mykeys
运行MapReduce作业时,链接到EMR主节点并且运行Hadoop流作业。根据EMR版本,文件hadoop-streaming.jar的位置和名字可能会有区别。当减速器运行时出现的异常信息存储在-output选项制定的目录中。哈希键和范围键值也会被记录用来无额定那些数据会导致异常或者错误。
$ hadoop fs -rm -r s3://<bucket name>/<output path>
$ hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \
-input s3://<bucket name>/<input path> -output s3://<bucket name>/<output path>\
-file /<local path>/mapper.py -mapper /<local path>/mapper.py \
-file /<local path>/reducer.py -reducer /<local path>/reducer.py
在我使用上述脚本和自己产生测试数据的迁移经验中,我发现以下的结果,包括数据大小和完成迁移所花的时间。
以下截图显示写容量的性能结果:
值得注意的是性能结果是灵活的,可能因服务器容量,网络带宽,并行度,转换逻辑,程序语言以及其他条件而不同。所有配置的写入容量单位,都是由MapReduce作业使用,用于数据导入,所以增加EMR集群的大小和DynamoDB表的写入容量单位,任务完成的时间就越短。基于java的MapReduce代码对于功能和MapReduce框架将会更灵活。
步骤3:Amazon Lamda通过从Amazon Kinesis读取数据更新DynamoDB
在Lambda控制台上,选取创建一个Lambda函数和Kinesis-process-record-python蓝本。下一步,在配置触发页面,选定你刚才创建的流。
Lambda函数必须有包含权限的IAM角色去读取Amazon Kinesis的数据并且放入DynamoDB。
Lambda函数能够通过查找类别属性识别记录的事务的类别,事务类别确定转换和更新的方法。
例如,当一个JSON记录传递给函数时,这个函数将查找类型属性。它还检查DynamoDB表中的现有项是否与传入记录有相同的秘钥,如果不同,则必须检索现有项并将其保存在字典条目中。在将条目字典放回DynamoDB表之前,将新的更新信息命令应用于该条目字典。这样可以防止现有项被传入记录覆盖。
from __future__ import print_function
import base64
import json
import boto3
print('Loading function')
client = boto3.client('dynamodb')
def lambda_handler(event, context):
#print("Received event: " + json.dumps(event, indent=2))
for record in event['Records']:
# Amazon Kinesis data is base64-encoded so decode here
payload = base64.b64decode(record['kinesis']['data'])
print("Decoded payload: " + payload)
data = json.loads(payload)
# user logic for data triggered by WriteRowsEvent
if data["type"] == "WriteRowsEvent":
my_table = data["table"]
my_hashkey = data["row"]["values"]["customerid"]
my_rangekey = data["row"]["values"]["orderid"]
my_productid = data["row"]["values"]["productid"]
my_quantity = str( data["row"]["values"]["quantity"] )
try:
response = client.get_item( Key={'customerid':{'S':my_hashkey} , 'orderid':{'S':my_rangekey}} ,TableName = my_table )
if 'Item' in response:
item = response['Item']
item[data["row"]["values"]["productid"]] = {"S":my_quantity}
result1 = client.put_item(Item = item , TableName = my_table )
else:
item = { 'customerid':{'S':my_hashkey} , 'orderid':{'S':my_rangekey} , my_productid :{"S":my_quantity} }
result2 = client.put_item( Item = item , TableName = my_table )
except Exception, e:
print( 'WriteRowsEvent Exception ! :', e.message , '==> Data:' ,data["row"]["values"]["customerid"] , data["row"]["values"]["orderid"] )
# user logic for data triggered by UpdateRowsEvent
if data["type"] == "UpdateRowsEvent":
my_table = data["table"]
# user logic for data triggered by DeleteRowsEvent
if data["type"] == "DeleteRowsEvent":
my_table = data["table"]
return 'Successfully processed {} records.'.format(len(event['Records']))
步骤4:切换应用程序终端节点到DynamoDB
当数据库从MySQL到DynamoDB时应用程序代码需要重构,以下代码段主要侧重于连接和查询部分。
了解更多使用DynamoDB和AWS开发工具包进行编程请查看:https://docs.aws.amazon.com/zh_cn/amazondynamodb/latest/developerguide/Programming.html
以下示例代码显示一种常见的连接MySQL和返回数据:
import java.sql.* ;
...
try {
Connection conn = DriverManager.getConnection("jdbc:mysql://<host name>/<database name>" , "<user>" , "<password>");
stmt = conn.createStatement();
String sql = "SELECT quantity as quantity FROM purchase WHERE customerid = '<customerid>' and orderid = '<orderid>' and productid = '<productid>'";
ResultSet rs = stmt.executeQuery(sql);
while(rs.next()){
int quantity = rs.getString("quantity"); //Retrieve by column name
System.out.print("quantity: " + quantity); //Display values
}
} catch (SQLException ex) {
// handle any errors
System.out.println("SQLException: " + ex.getMessage());}
...
==== Output ====
quantity:1
以下的步骤是从DynamoDB返回数据条目:
1. 创建一个DynamoDB 实例
2. 创建一个表实例
3. 添加withHashKey和withRangeKeyCondition方法到querySpec的实例中。
4. 使用上一步创建的querySpec实例执行查询方法,数据条目以json格式返回,因此使用getJSON方法查找条目中的特定属性。
...
DynamoDB dynamoDB = new DynamoDB( new AmazonDynamoDBClient(new ProfileCredentialsProvider()));
Table table = dynamoDB.getTable("purchase");
QuerySpec querySpec = new QuerySpec()
.withHashKey("customerid" , "customer1") // hashkey name and its value
.withRangeKeyCondition(new RangeKeyCondition("orderid").eq("order1") ) ; // Ranage key and its condition value
ItemCollection<QueryOutcome> items = table.query(querySpec);
Iterator<Item> iterator = items.iterator();
while (iterator.hasNext()) {
Item item = iterator.next();
System.out.println(("quantity: " + item.getJSON("product1")); //
}
...
==== Output ====
quantity:1
结论:在这篇博客中,我介绍了两种从MySQL到DynamoDB在迁移期间无缝迁移数据和最小化宕机时间,方案1使用DMS,方案2使用了EMR,Amazon Kinesis和Lambda。我也展示了如何根据数据库特征转换键值设计,以提高读写性能并且降低成本。每个方案都有优点和缺点,因此最佳方案处决于客户需求。本文中的示例代码不足以使一个完整、高效和可靠的数据迁移代码库在许多不同的环境中重复使用。可以从这两个方案中一个开始,但是实际环境中的其他变量需要根据实际情况进行设计。
原文地址:
https://amazonaws-china.com/cn/blogs/big-data/near-zero-downtime-migration-from-mysql-to-dynamodb/
Tag:RDS