博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Data Lake Analytics,大数据的ETL神器!
阅读量:6574 次
发布时间:2019-06-24

本文共 14595 字,大约阅读时间需要 48 分钟。

  hot3.png

0. Data Lake Analytics(简称DLA)介绍

数据湖(Data Lake)是时下大数据行业热门的概念:。基于数据湖做分析,可以不用做任何ETL、数据搬迁等前置过程,实现跨各种异构数据源进行大数据关联分析,从而极大的节省成本和提升用户体验。关于Data Lake的概念。

终于,阿里云现在也有了自己的数据湖分析产品:

可以点击申请使用(目前公测阶段还属于邀测模式),体验本教程分析OTS数据之旅。
产品文档:

1. ETL介绍

ETL()就是Extract、Transfrom、Load即抽取、转换、加载,是传统数仓和大数据的重要工具。

抽取:就是从源系统抽取需要的数据,这些源系统是同构或异构的:比如Excel表格、XML文件、关系型数据库。

转换:源系统的数据按照分析目的,转换成目标系统要求的格式,或者做数据清洗和数据加工。
加载:把转换后的数据装载到目标数据库,作为联机分析、数据挖掘、数据展示的基础。

整个ETL过程就像是在源系统和目标系统之间构建一个管道,数据在这个管道里源源不断的流动。

2. DLA与ETL

Data Placement Optimization(数据摆放优化)是目前云平台上的业务系统的主流架构方向和思路。架构师们会从读写性能、稳定性、强一致性、成本、易用性、开发效率等方面来考量不同存储引擎给业务上带来的好处,从而实现整个业务系统的完美的平衡状态。

而这种跨异构数据源之间的数据搬迁,却不是一件容易的事情。很多ELT工具基本上属于框架级别,需要自己开发不少的辅助工具;同时表达能力也较弱,无法满足很多场景;另外对异构数据源的抽象和兼容性也不是那么完美。

反观DLA,无论从哪方面来看,DLA都完美的契合ETL的需求场景。下图是DLA的简易架构图,DLA一开始就是基于“MPP计算引擎+存储计算分离+弹性高可用+异构数据集源”等架构原则来设计的,支持各种异构数据源读写是DLA的核心目标!

image.png | left | 706x724

通过连接异构数据源来执行select + join + subQuery等逻辑实现Extract,通过Filter+ Project + Aggregation + Sort + Functions等实现数据流转换和映射Transform,而通过insert实现Load,下面是一个例子:

--基本格式insert into target_table (col1, col2, col3, ....)  --需要导入的列以及列的顺序select c1, c2, c3, ....                            --需要与导入列的类型兼容,顺序要确认清楚from ...                         --可以是任何你想要查询的数据目标where ...--下面是一个例子insert into target_table (id, name, age)  select s1.pk1, s2.name, s1.age            from source_table1 s1join source_table2 s2on s1.sid = s2.sidwhere s1.xxx = 'yyy'

下面我们就尝试往不同的数据源导入数据吧。

3. 实际测试(以TableStore:为例)

  • 准备DLA账号(已有测试账号)

    • 测试集群:上海region;
    • 账号账号:DLA测试账号;
  • 准备两个来源表(两个TPC-H的OSS表,customer和nation),用来做join和数据查询;
  • 准备一个TableStore()的目标表;
  • 执行导入SQL,写入数据后校验结果;

a)两个来源表定义:

mysql> show create database tpch_50x_text;+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| Database      | Create Database                                                                                                                                                        |+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| tpch_50x_text | CREATE DATABASE `tpch_50x_text`WITH DBPROPERTIES (    catalog = 'hive',    location = 'oss://${您的bucket}/datasets/tpch/50x/text_date/')COMMENT '' |+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------+1 row in set (0.02 sec)mysql> show tables;+------------+| Table_Name |+------------+| customer   || nation     |+------------+2 rows in set (0.03 sec)mysql> show create table customer;+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| Table    | Create Table                                                                                                                                                                                                                                                                                                                                                                           |+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| customer | CREATE EXTERNAL TABLE `tpch_50x_text`.`customer` (    `c_custkey` int,    `c_name` string,    `c_address` string,    `c_nationkey` int,    `c_phone` string,    `c_acctbal` double,    `c_mktsegment` string,    `c_comment` string)ROW FORMAT DELIMITED    FIELDS TERMINATED BY '|'STORED AS `TEXTFILE`LOCATION 'oss://${您的bucket}/datasets/tpch/50x/text_date/customer_text' |+----------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+1 row in set (0.90 sec)mysql> show create table nation;+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| Table      | Create Table                                                                                                                                                                                                                                    |+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| nation     | CREATE EXTERNAL TABLE `tpch_50x_text`.`nation` (    `n_nationkey` int,    `n_name` string,    `n_regionkey` int,    `n_comment` string)ROW FORMAT DELIMITED    FIELDS TERMINATED BY '|'STORED AS `TEXTFILE`LOCATION 'oss://${您的bucket}/datasets/tpch/50x/text_date/nation_text' |+------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+1 row in set (0.73 sec)

b)准备TableStore的库和表

## 建库mysql> show create database etl_ots_test;+--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| Database     | Create Database                                                                                                                                                                |+--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| etl_ots_test | CREATE DATABASE `etl_ots_test`WITH DBPROPERTIES (    catalog = 'ots',    location = 'https://${您的instance}.cn-shanghai.ots-internal.aliyuncs.com',    instance = '${您的instance}')COMMENT '' |+--------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+1 row in set (0.02 sec)## 使用库mysql> use etl_ots_test;Database changed## 建表mysql> show create table test_insert;+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| Table       | Create Table                                                                                                                                                                          |+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+| test_insert | CREATE EXTERNAL TABLE `test_insert` (    `id1_int` int NOT NULL COMMENT '客户id主键',    `c_address` varchar(20) NULL COMMENT '客户的地址',    `c_acctbal` double NULL COMMENT '客户的account balance',    PRIMARY KEY (`id1_int`))COMMENT ''             |+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+1 row in set (0.03 sec)

以下是实际数据的截图:

image.png | left | 747x416

image.png | left | 747x601

c)开始导入数据,确保导入字段顺序和类型兼容性

## 检查数据,都是空的mysql> select * from etl_ots_test.test_insert;Empty set (0.31 sec)
mysql> use tpch_50x_text;Database changed## 查询下nation数据,其中CANADA的nationkey是3,后续要找这个数据mysql> select n_nationkey, n_name from nation;+-------------+----------------+| n_nationkey | n_name         |+-------------+----------------+|           0 | ALGERIA        ||           1 | ARGENTINA      ||           2 | BRAZIL         ||           3 | CANADA         ||           4 | EGYPT          ||           5 | ETHIOPIA       ||           6 | FRANCE         ||           7 | GERMANY        ||           8 | INDIA          ||           9 | INDONESIA      ||          10 | IRAN           ||          11 | IRAQ           ||          12 | JAPAN          ||          13 | JORDAN         ||          14 | KENYA          ||          15 | MOROCCO        ||          16 | MOZAMBIQUE     ||          17 | PERU           ||          18 | CHINA          ||          19 | ROMANIA        ||          20 | SAUDI ARABIA   ||          21 | VIETNAM        ||          22 | RUSSIA         ||          23 | UNITED KINGDOM ||          24 | UNITED STATES  |+-------------+----------------+25 rows in set (0.37 sec)## 查询下customer数据,我们只关注nationkey=3以及c_mktsegment='BUILDING'的数据mysql> select count(*) from customer where c_nationkey = 3 and c_mktsegment = 'BUILDING';+----------+| count(*) |+----------+|    60350 |+----------+1 row in set (0.66 sec)## 查询下customer数据,我们只关注nationkey=3以及c_mktsegment='BUILDING'的数据mysql> select * from customer where c_nationkey = 3 and c_mktsegment = 'BUILDING' order by c_custkey limit 3;+-----------+--------------------+-------------------------+-------------+-----------------+-----------+--------------+----------------------------------------------------------------------------------------------------+| c_custkey | c_name             | c_address               | c_nationkey | c_phone         | c_acctbal | c_mktsegment | c_comment                                                                                          |+-----------+--------------------+-------------------------+-------------+-----------------+-----------+--------------+----------------------------------------------------------------------------------------------------+|        13 | Customer#000000013 | nsXQu0oVjD7PM659uC3SRSp |           3 | 13-761-547-5974 |   3857.34 | BUILDING     | ounts sleep carefully after the close frays. carefully bold notornis use ironic requests. blithely ||        27 | Customer#000000027 | IS8GIyxpBrLpMT0u7       |           3 | 13-137-193-2709 |   5679.84 | BUILDING     |  about the carefully ironic pinto beans. accoun                                                    ||        40 | Customer#000000040 | gOnGWAyhSV1ofv          |           3 | 13-652-915-8939 |    1335.3 | BUILDING     | rges impress after the slyly ironic courts. foxes are. blithely                                    |+-----------+--------------------+-------------------------+-------------+-----------------+-----------+--------------+----------------------------------------------------------------------------------------------------+3 rows in set (0.78 sec)

导入之前我们想清楚需求:把国家是'CANADA'的,客户的market segmentation为'BUILDING'的客户找到,然后对c_custkey排序,选择前10条数据,然后选择他们的c_custkey、c_address、c_acctbal三列,清晰到OTS的test_insert表中,以备后续使用

##先查询下数据,看看有几条数据mysql> select c.c_custkey, c.c_address, c.c_acctbal     -> from tpch_50x_text.customer c    -> join tpch_50x_text.nation n     -> on c.c_nationkey = n.n_nationkey    -> where n.n_name = 'CANADA'     -> and c.c_mktsegment = 'BUILDING'     -> order by c.c_custkey    -> limit 10;+-----------+--------------------------------+-----------+| c_custkey | c_address                      | c_acctbal |+-----------+--------------------------------+-----------+|        13 | nsXQu0oVjD7PM659uC3SRSp        |   3857.34 ||        27 | IS8GIyxpBrLpMT0u7              |   5679.84 ||        40 | gOnGWAyhSV1ofv                 |    1335.3 ||        64 | MbCeGY20kaKK3oalJD,OT          |   -646.64 ||       255 | I8Wz9sJBZTnEFG08lhcbfTZq3S     |   3196.07 ||       430 | s2yfPEGGOqHfgkVSs5Rs6 qh,SuVmR |   7905.17 ||       726 | 4w7DOLtN9Hy,xzZMR              |   6253.81 ||       905 | f iyVEgCU2lZZPCebx5bGp5        |   -600.73 ||      1312 | f5zgMB4MHLMSHaX0tDduHAmVd4     |    9459.5 ||      1358 | t23gsl4TdVXqTZha DioEHIq5w7y   |   5149.23 |+-----------+--------------------------------+-----------+10 rows in set (1.09 sec)##开始导入mysql> insert into etl_ots_test.test_insert (id1_int ,c_address, c_acctbal)    -> select c.c_custkey, c.c_address, c.c_acctbal     -> from tpch_50x_text.customer c    -> join tpch_50x_text.nation n     -> on c.c_nationkey = n.n_nationkey    -> where n.n_name = 'CANADA'     -> and c.c_mktsegment = 'BUILDING'     -> order by c.c_custkey    -> limit 10;+------+| rows |+------+|   10 |+------+1 row in set (2.14 sec)## 验证结果,没有问题:mysql> select * from etl_ots_test.test_insert;+---------+--------------------------------+-----------+| id1_int | c_address                      | c_acctbal |+---------+--------------------------------+-----------+|      13 | nsXQu0oVjD7PM659uC3SRSp        |   3857.34 ||      27 | IS8GIyxpBrLpMT0u7              |   5679.84 ||      40 | gOnGWAyhSV1ofv                 |    1335.3 ||      64 | MbCeGY20kaKK3oalJD,OT          |   -646.64 ||     255 | I8Wz9sJBZTnEFG08lhcbfTZq3S     |   3196.07 ||     430 | s2yfPEGGOqHfgkVSs5Rs6 qh,SuVmR |   7905.17 ||     726 | 4w7DOLtN9Hy,xzZMR              |   6253.81 ||     905 | f iyVEgCU2lZZPCebx5bGp5        |   -600.73 ||    1312 | f5zgMB4MHLMSHaX0tDduHAmVd4     |    9459.5 ||    1358 | t23gsl4TdVXqTZha DioEHIq5w7y   |   5149.23 |+---------+--------------------------------+-----------+10 rows in set (0.27 sec)

d)注意点:

虽然有ETL工具快速导入导出,但也有些问题需要注意的,比如:

  • 如果导入任务时间太长,请走异步模式,否则连接断开可能会影响任务正常运行;
  • TableStore目前的insert是根据主键覆盖,主键不会去重判断的,请务必不能对你正常的数据表做插入;
  • 目前DLA和TableStore的事务能力还不够,可能会出现中断,已导入的数据不会清楚,需要自行清理;
  • 列的个数和列的类型,需要自己对齐保障,否则会报错;

4. 其他数据源导入

整个过程是不是很简单?是不是想要导入其他场景的数据源?对DLA而言,底层任何数据源都以相同方式处理,只要确保其他数据源的库、表在DLA中正常创建,就可以正常的读写,实现ETL啦!赶紧试试吧!

其他相关的文档:

  • 使用Data Lake Analytics从OSS清洗数据到AnalyticDB:
  • DLA相关技术文档:
  • Data Lake Analytics使用场景:
  • OLAP on TableStore——基于Data Lake Analytics的Serverless SQL大数据分析
  • 使用Data Lake Analytics 分析OSS数据:
  • Data Lake Analytics数据库的连接方式:
  • Data Lake Analytics分析RDS数据:

作者:

本文为云栖社区原创内容,未经允许不得转载。

转载于:https://my.oschina.net/yunqi/blog/3016773

你可能感兴趣的文章
Eclipse Java注释模板设置
查看>>
基于gmap.net制作离线地图下载器
查看>>
Docker网络的基本功能操作示例
查看>>
淘宝静态页面
查看>>
Dockerfile Tomcat镜像制作
查看>>
自适应备忘录 demo
查看>>
Sharepoint 2010弹出对话框
查看>>
静态类(C#)
查看>>
linux vi
查看>>
K:栈和队列的比较
查看>>
PHP中获取当前页面的完整URL
查看>>
【模板】左偏树(可并堆)
查看>>
Django框架之路由层、视图层
查看>>
深入分析escape()、encodeURI()、encodeURIComponent()的区别及示例
查看>>
正则查找文章内容关键字
查看>>
JS绘制拓扑图示例 (JTopo)
查看>>
世界最大电子展明年将移植到深圳
查看>>
iOS图片浏览器 - XLPhotoBrowser(类似微信多图片浏览效果)
查看>>
pymysql 单独获取表的栏位名称
查看>>
安卓srcCompat弄死我了
查看>>