点击上方关注,all in ai中国
在这篇文章中,我将讨论apache spark以及如何在其中创建简单但强大的etl管道。您将了解spark如何提供api以将不同的数据格式转换为数据帧和sql以进行分析,以及如何将一个数据源转换为另一个数据源。
什么是apache spark?
根据维基百科:
apache spark是一个开源的分布式通用集群计算框架。 spark提供了一个接口,用于使用隐式数据并行和容错来编程整个集群。
官方网站:
apache spark是用于大规模数据处理的统一分析引擎。
简而言之,apache spark是一个用于处理、查询和分析大数据的框架。由于计算是在内存中完成的,因此它比mapreduce等竞争对手要快好几倍。以每天产生数tb的数据的速率,需要一种能够以高速提供实时分析的解决方案。一些spark功能是:
它比传统的大规模数据处理框架快100倍。易于使用,因为您可以在python、r和scala中编写spark应用程序。它为sql、steaming和graph计算提供了库。apache spark组件
spark 核心
它包含spark的基本功能,如任务调度、内存管理,与存储的交互等。
spark sql
它是一组用于与结构化数据交互的库。它使用类似sql的界面与各种格式的数据进行交互,如csv、json、parquet等。
spark streaming
spark streaming是一个spark组件,支持处理实时数据流。实时流,如股票数据、天气数据、日志和其他各种。
mlib
mlib是spark提供的一套机器学习算法,用于监督和无监督学习
graphx
它是apache spark用于图形和图形并行计算的api。它扩展了spark rdd api,允许我们创建一个带有附加到每个顶点和边缘的任意属性的有向图。它为etl、探索性分析和迭代图计算提供了统一的工具。
spark集群管理器
spark支持以下资源/集群管理器:
spark standalone - spark附带的简单集群管理器apache mesos - 一个也可以运行hadoop应用程序的通用集群管理器。apache hadoop yarn - hadoop 2中的资源管理器kubernetes - 一个开源系统,用于自动化容器化应用程序的部署、扩展和管理。设置和安装
从这里下载apache spark的二进制文件。您必须在系统上安装scala,并且还应设置其路径。
对于本教程,我们使用的是在2019年5月发布的2.4.3版。将文件夹移到/ usr / local中
mv spark-2.4.3-bin-hadoop2.7 / usr / local / spark
然后导出scala和spark的路径。
#scala path
export path=/usr/local/scala/bin:$path
#apache spark path
export path=/usr/local/spark/bin:$path
通过在终端上运行spark-shell命令来调用spark shell。如果一切顺利,你会看到如下所示:
它加载基于scala的shell。由于我们将使用python语言,因此我们必须安装pyspark。
pip install pyspark
安装完成后,您可以通过在终端中运行命令pyspark来调用它:
您找到了一个典型的python shell,但它加载了spark库。
用python开发
让我们开始编写我们的第一个程序。
from pyspark.sql import sparksession
from pyspark.sql import sqlcontext
if __name__ == __main__:
scspark = sparksession \
.builder \
.appname(reading csv) \
.getorcreate()
我们导入了两个库:sparksession和sqlcontext。
sparksession是编写spark应用程序的入口点。它允许您与spark提供的dataset和dataframe api进行交互。我们通过调用appname来设置应用程序名称。 getorcreate()方法返回应用程序的新sparksession或返回现有的sparksession。
我们的下一个目标是读取csv文件。我创建了一个示例csv文件,名为data.csv,如下所示:
name,age,country
adnan,40,pakistan
maaz,9,pakistan
musab,4,pakistan
ayesha,32,pakistan
和代码:
if __name__ == __main__:
scspark = sparksession \
.builder \
.appname(reading csv) \
.getorcreate()
data_file = /development/petprojects/learningspark/data.csv
sdfdata = scspark.read.csv(data_file, header=true, sep=,).cache()
print(total records = {}.format(sdfdata.count()))
sdfdata.show()
我设置了文件路径,然后调用.read.csv来读取csv文件。参数是不言自明的。 .cache()缓存返回resultset,从而提高性能。当我运行该程序时,它返回如下所示的内容:
看起来有趣,不是吗?现在,如果我想读取数据帧中的多个文件,该怎么办?让我们创建另一个文件,我将其称为data1.csv,它如下所示:
1
2
3
4
五
姓名,年龄,国家
诺琳,23,英格兰
阿米尔,9,巴基斯坦
诺曼,4,巴基斯坦
拉希德,12,巴基斯坦
我只需要这样做:
data_file =/development/petprojects/learningspark/data*.csv它将读取所有以csv类型的数据开头的文件。
它将如何读取与模式和转储结果匹配的所有csv文件:
如您所见,它将csv中的所有数据转储到单个数据帧中。
但有一点,只有当所有csv都遵循某种模式时,这种转储才有效。如果您有一个具有不同列名的csv,那么它将返回以下消息。
19/06/04 18:59:05 warn csvdatasource: number of column in csv header is not equal to number of fields in the schema:
header length: 3, schema size: 17
csv file: file:///development/petprojects/learningspark/data.csv
如您所见,spark抱怨不能处理不同的csv文件。
您可以使用dataframe执行许多操作,但spark为您提供了更简单、更熟悉的接口来使用sqlcontext操作数据。它是sparksql的网关,它允许您使用类似sql的查询来获得所需的结果。
在我们进一步发展之前,让我们先玩一些真实的数据。为此,我们使用的是从kaggle得到的超市销售数据。在我们尝试sql查询之前,让我们尝试按性别对记录进行分组。我们正在处理etl的提取部分。
data_file = /development/petprojects/learningspark/supermarket_sales.csv
sdfdata = scspark.read.csv(data_file, header=true, sep=,).cache()
gender = sdfdata.groupby(gender).count()
print(gender.show())
当你运行时,它会返回如下内容:
groupby()按给定列对数据进行分组。在我们的例子中,它是性别列。
sparksql允许您使用类似sql的查询来访问数据。
sdfdata.registertemptable(sales)
output = scspark.sql(select * from sales)
output.show()
首先,我们从dataframe中创建一个临时表。为此,使用了registertamptable。在我们的例子中,表名是sales。完成后,您可以在其上使用典型的sql查询。在我们的例子中,它是select * from sales。
或者类似下面的内容:
output = scspark.sql(select * from sales where `unit price` < 15 and quantity < 10)
output.show()
甚至是聚合值。
output = scspark.sql(select count(*) as total, city from sales group by city)
output.show()
非常灵活,对吗?
我们刚刚完成了etl的变换部分。
最后是etl的加载部分。如果要保存转换后的数据怎么办?您会有很多的可用的选项,rdbms、xml或json。
output.write.format(json).save(filtered.json)
运行时,sparks会创建以下文件夹/文件结构。
它创建了一个具有文件名称的文件夹,在我们的例子中是filtered.json。然后,名为successtells的文件是否成功运行。如果失败,则生成名为failure的文件。然后,您在此处找到多个文件。多个文件的原因是每个工作都涉及在文件中写入的操作。如果要创建单个文件(不建议使用),则可以使用合并来收集所有分区中的数据并将其减少到单个数据帧。
output.coalesce(1).write.format(json).save(filtered.json)
它将输出以下数据:
{ “总”:328, “城市”: “内比都”}
{ “总”:332, “城市”: “曼德勒”}
{ “总”:340, “城市”: “仰光”}
mysql和apache spark集成
上述数据帧包含转换后的数据。我们希望将这些数据加载到mysql中,以便进一步使用,例如可视化或显示在应用程序上。
首先,我们需要mysql连接器库来与spark进行交互。我们将从mysql网站下载连接器并将其放在一个文件夹中。我们将修改sparksession以包含jar文件。
scspark = sparksession \
.builder \
.appname(reading csv) \
.config(spark.driver.extraclasspath, /usr/local/spark/jars/mysql-connector-java-8.0.16.jar) \
.getorcreate()
输出现在如下所示:
output = scspark.sql(select count(*) as total, city from sales group by city)
output.show()
output.write.format(jdbc).options(
url=jdbc:mysql://localhost/spark,
driver=com.mysql.cj.jdbc.driver,
dbtable=city_info,
user=root,
password=root).mode(append).save()
在运行脚本之前,我在数据库中创建了所需的db和表。如果一切顺利,您应该看到如下结果:
如您所见,spark可以更轻松地将数据从一个数据源传输到另一个数据源。
结论
apache spark是一个非常苛刻且有用的大数据工具,可以帮助您轻松编写etl。 您可以加载pb级的数据,并且可以通过设置多个节点的集群轻松地处理这些数据。本教程只是为您提供apache spark编写etl的基本思想。 您应该检查文档和其他资源以深入挖掘。
河北挤压造粒机 复混肥造粒机 东衡自动化
春装新品 女士服装t恤 性感小开叉显瘦网纱蕾丝长袖打底衫批发
三好色织提花酒店宾馆纯棉吸水会所沙滩礼品毛巾厂家直销
河南实心锥喷嘴厂家, 售后保障
厂家直销 精密针规 塞规 通止规 精度高
动手系列:在Apache Spark和Python中创建您的第一个ETL管道
VantageFX招商(秒返82美金一手)
山西医院水处理设备 哪安装医院水处理系统?华蓝净化设备
OEM专业代工 宝宝儿童肉酥 肉松
长治匀质泡沫板匀质泡沫板,匀质泡沫板,匀质泡沫板价格匀质泡
扭簧批发价格
尚赫公司什么时候上市的代理门槛.有什么好处
塔城地区撰写乡村振兴规划@公司@砺锋集团
Redmi K20 Pro第七代屏幕指纹好在哪儿?官方深度科普
智能交通系统8寸工业平板电脑
泊头【福宇畜牧】批发供应母猪产床保育床肥猪保胎栏
浔阳不锈钢方矩管
西城泰柚饰面板生产厂家,实木三层复合地板
纯软件交付还是一体机交付,这是个问题吗?
淋浴花洒报价-新泰市永丰五金-九牧淋浴花洒报价