Pyspark
若工作场景涉及大数据,则需要使用pyspark。找了三天,暂时发现Tomasz Drabas, Denny Lee的《Learning PySpark》应该是一本较适合的书,感知其难度适中。发现此书不介绍环境搭建。Bill Chambers and Matei Zaharia的《Spark: The Definitive Guide》也是,难度适中,但不涉及环境搭建。不涉及环境搭建是不行的,因为可能版本演进太快,可用代码在不同环境无法运行。比如,anaconda里面conda install -c conda-forge pyspark搭建的环境无法运行《Spark: The Definitive Guide》里的代码,也许涉及本地安装等变量的设置问题而无法运行等,暂不清楚。还是应该找本包括环境搭建的书。琳达贵的《python+spark2.0+hadoop机器学习与大数据实战》包含环境搭建的过程,需要内存和固态空间,我电脑不一定跟得上,试着安装吧。2019-08-21装好虚拟机装好linux装好hadoop,目前很卡。
20190826: 最近在学习scrapy爬虫,今天看点这本书,明天看点那个博客,因此笔记就很杂乱。启示是:要先列一个提纲,这样即使看的书、知识比较杂也好,都可以把笔记写在对应位置。
网易云课堂有一免费课程《Spark编程基础》,我接下来的笔记大纲则遵从此课程。当然,有可能会找其他资料等补充此笔记大纲。
1. 大数据计数概括
1.1 大数据时代
1.2 大数据的概念
1.3 大数据的影响
1.4 大数据的关键技术
1.5 答案数据计算模式
1.6 代表大数据技术之Hadoop
1.7 代表大数据技术之Spark
1.8 代表大数据技术之Flink和Beam
2. Spark的设计与运行原理
2.1 Spark概述
2.2 Spark生态系统
2.3 基本概念和架构设计
2.4 Spark运行原理(RDD概念、操作和特性)
2.5 RDD运行原理(RDD之间的依赖关系)
2.6 RDD运行原理(阶段的划分和RDD运行过程)
2.7 Spark的部署和应用方式
3. Spark环境搭建和使用方法
3.1 安装Spark
3.1.1 单机安装Spark
参考:
注:
- 需要说明的是因为安装时间与以上文章已有较长时间,并未完全遵从文章版本。我的版本:linux14.04、Hadoop 2.7.7、spark2.4.4、python3.4.3、Java JDK 1.8。 虽然单机成功运行pyspark,但其中涉及到py4j,并没有特意去安装,但在spark/python/lib里有py4j-0.10.7-src.zip,不知道怎么回事。
不知怎得,我linux系统中自带python2.7.7,而spark里的的为python3.4.3(随spark2.4.4自动安装的吗,为什么比Spark 2.0分布式集群环境搭建(Python版)里的python版本还低,但查阅spark官网,其只要求版本高于python2.6):
hadoop@cfy-VirtualBox:~$ python Python 2.7.6 (default, Nov 13 2018, 12:45:42) [GCC 4.8.4] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> exit() hadoop@cfy-VirtualBox:~$ cd /usr/local/spark hadoop@cfy-VirtualBox:/usr/local/spark$ bin/pyspark Python 3.4.3 (default, Nov 12 2018, 22:25:49) [GCC 4.8.4] on linux Type "help", "copyright", "credits" or "license" for more information. 19/09/15 17:18:52 WARN util.Utils: Your hostname, cfy-VirtualBox resolves to a loopback address: 127.0.1.1; using 192.168.0.119 instead (on interface eth0) 19/09/15 17:18:52 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address 19/09/15 17:18:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.4.4 /_/ Using Python version 3.4.3 (default, Nov 12 2018 22:25:49) SparkSession available as 'spark'. >>> exit() hadoop@cfy-VirtualBox:~$
3.2 克隆主机
参考:
- 从零开始的Hadoop大数据集群(伪)搭建,全免费VirtualBox虚拟机Ubuntu版,学习向,超详细---(一)
- 从零开始的Hadoop大数据集群(伪)搭建,全免费VirtualBox虚拟机Ubuntu版,学习向,超详细---(二)
- 从零开始的Hadoop大数据集群(伪)搭建,全免费VirtualBox虚拟机Ubuntu版,学习向,超详细---(三)
备注: * 关于克隆主机,出现在参考2,注意副本类型选择完全复制。经测试,复制出的副本,除计算机名字,完全就是被复制计算机的副本,具有被复制计算机全部功能。因此,为效率必须安装主从均相同的软件和环境并配置共性的文件,之后再进行虚拟机复制。
3.3 各主机组成集群
现在到了最易混淆的地方了,怎么将各台虚拟机组成集群。有虚拟机名(像我是Ubuntu、Ubuntu0、Ubuntu1、Ubuntu2)、主机名(也叫hostname,像我将Ubuntu0、1,2分别取名mater、slave1、slave2)。 参考:
- virtualBox里Ubuntu设置静态IP
- Hadoop 2.7分布式集群环境搭建
- vmware 虚拟机ping win10宿主机
- Ubuntu安装SSH服务器故障分析及解决办法
- Spark 2.0分布式集群环境搭建(Python版)
注:
- 关于参考1,设置静态IP方便下一步集群配置。
- 关于参考2,ping的时候注意目标虚拟机必须开机,我在这里浪费了一个下午。
关于参考3,虚拟机
ssh localhost
,需要虚拟机能ping 192.168.0.118
(192.168.0.118为localhost
的IP),可按参考3进行防火墙和网络保护。ssh localhost
成功的界面如下:hadoop@cfy-VirtualBox:~$ ssh localhost hadoop@localhost's password: Welcome to Ubuntu 14.04.6 LTS (GNU/Linux 4.4.0-148-generic x86_64) * Documentation: https://help.ubuntu.com/ 0 updates can be installed immediately. 0 of these updates are security updates. New release '16.04.6 LTS' available. Run 'do-release-upgrade' to upgrade to it. Your Hardware Enablement Stack (HWE) is supported until April 2019. Last login: Sun Sep 15 19:11:18 2019 from localhost hadoop@cfy-VirtualBox:~$ logout Connection to localhost closed. hadoop@cfy-VirtualBox:~$
关于参考4,能ping不代表能
ssh localhost
。
还是没有解决,初始建立的ubuntu能通过ssh localhost
连接到本地localhost,然后重新复制的也能,那么不求甚解地重新复制系统吧。另virtualBox搭建hadoop集群(1)虚拟机网络配置我猜测它的网络设置也许靠谱。百度时候应盯住“virtualbox hadoop”来搜索也许还有更合适的带图的步骤详细的示例(为什么不“virtualbox spark”呢,因为会过滤掉很多只装hadoop集群的例子,这些例子也很宝贵----20190916)。
3.2 在pyspark中运行代码
3.3 开发Spark独立应用程序
3.4 Spark集群环境搭建
3.5 在集群上运行Spark应用程序
4. RDD编程基础
4.1 RDD创建
从本地文件系统中加载数据创建RDD的示例:
如:我在公司32号机我用户根目录下放入word.txt文件,文件内容如下:
Hadoop is good
Spark is fast
Spark is better
运行如下命令:
>>> lines = sc.textFile("file:///home/chenfy/word.txt")
>>> lines.foreach(print)
[Stage 0:> (0 + 2) / 2]Spark is better
Hadoop is good
Spark is fast
>>>
将文件放入集群HDFS可采用命令如hdfs dfs -put /home/hdfs/files/text.txt /input
,前面为服务器中本地文件路径,后面为HDFS文件路径。一般,我们很可能不知道集群HDFS路径,那么可采用su hdfs
来切换到HDFS用户,其提示我输入密码,我无此权限,于是作罢。
从分布式文件系统HDFS中加载数据:
lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.text") #主机名localhost 端口号9000,来源于配置。
lines = sc.textFile("/user/hadoop/word.text")
lines = sc.textFile("word.txt")
注:三条语句完全等价,可以使用其中任意一种方式。这条或者能在公司集群端口运行,可惜暂不知道怎么将我创建的word.txt放到集群分布式文件系统。
注:可能关于集群各种路径是难点,本地,集群hdfs、集群中某台主机的本地、集群jupyter工作路径等等。
通过并行集合(组合)创建RDD
>>> array = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(array)
>>> rdd.foreach(print)
[Stage 0:> (0 + 40) / 40]1
2
4
5
3
>>>
4.2 RDD操作(转换操作之filter、map、flatMap)
每次转换生成一个新的RDD,但RDD是不能更改的。 多次转换即一个有向无环图。转换操作是惰性的,只记录这个转换需要进行的操作,不真正计算。
操作 | 含义 |
---|---|
filter(func) | 筛选出满足函数func的元素,并返回一个新的数据集 |
map(func) | 将每个元素传递到函数func中,并将结果返回为一个新的数据集 |
flatMap(func) | 与map()相似,但每个输入元素都可以映射到0或多个输出结果 |
groupByKey() | 应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集 |
reduceByKey(func) | 应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果 |
filter(func)
:筛选出满足函数func的元素,并返回一个新的数据集。
>>> lines = sc.textFile("file:///home/chenfy/word.txt")
>>> linesWithSpark = lines.filter(lambda line: "Spark" in line)
>>> linesWithSpark.foreach(print)
[Stage 0:> (0 + 2) / 2]Spark is better
Spark is fast
>>>
map(func)
:将每个元素传递到func
函数中,并将结果返回为一个新的数据集:
>>> data = [1, 2, 3, 4,5]
>>> rdd1 = sc.parallelize(data)
>>> rdd2 = rdd1.map(lambda x:x+10)
>>> rdd2.foreach(print)
14
12
11
15
13
>>>
>>> lines = sc.textFile("file:///home/chenfy/word.txt")
>>> words = lines.map(lambda line: line.split(" "))
>>> words.foreach(print)
['Spark', 'is', 'better']
['Hadoop', 'is', 'good']
['Spark', 'is', 'fast']
>>>
flatMap(func)
:map(func)
得到的元素可能为列表、集合等内部结构,flatMap(func)
将之进一步拆散(“拍扁”)。
>>> lines = sc.textFile("file:///home/chenfy/word.txt")
>>> words = lines.flatMap(lambda line: line.split(" "))
>>> words.foreach(print)
Hadoop
is
good
Spark
is
fast
Spark
is
better
>>>
4.3 RDD操作(转换操作之groupByKey、reduceByKey)
groupByKey()
:应用于(K,V)
键值对的数据集时,返回一个新的(K,Iterable)
形式的数据集。
>>> words = sc.parallelize([("Hadoop", 1), ("is", 1), ("good", 1), ("Spark", 1), ("is", 1), ("fast", 1), ("Spark", 1), ("is", 1), ("better", 1)])
>>> words1 = words.groupByKey()
>>> words1.foreach(print)
[Stage 4:============================================> (31 + 9) / 40]('good', <pyspark.resultiterable.ResultIterable object at 0x7f697a08ff28>)
('better', <pyspark.resultiterable.ResultIterable object at 0x7f697a08ff28>)
('is', <pyspark.resultiterable.ResultIterable object at 0x7f697a08ff28>)
('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7f697a08ff28>)
('fast', <pyspark.resultiterable.ResultIterable object at 0x7f697a08ff28>)
('Spark', <pyspark.resultiterable.ResultIterable object at 0x7f697a08ff28>)
>>>
reduceByKey(func)
:在groupByKey()
基础上,对Iterable
运用func
函数,得到新的键值对(K,Vnew)
形式。
4.4 RDD操作(行动操作)
Spark程序徐执行行动操作时,才会执行真正的计算,从文件中加载数据,完成一次次的转换操作,最终完成行动操作得到结果(注:若之前的转换操作有错误,直到代码运行到行动操作时才报错)。
操作 | 含义 |
---|---|
count() | 返回数据集中的元素个数 |
collect() | 以数组的形式返回数据集中的所有元素 |
first() | 返回数据集中的第一个元素 |
take(n) | 以数组的心事返回数据集中的前n 各元素 |
reduce(func) | 通过func (输入两个参数并返回一个值)聚合数据集中的元素 |
foreach(func) | 将数据集中的每个元素传递到函数func 中运行 |
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.count()
5
>>> rdd.first()
1
>>> rdd.take(3)
[1, 2, 3]
>>> rdd.reduce(lambda a, b: a+b)
15
>>> rdd.collect()
[1, 2, 3, 4, 5]
>>> rdd.foreach(lambda elem: print(elem))
1
2
5
3
4
>>>
4.5 持久化
.persist()
可进行持久化,后面再用到就会比较快。因为在内存中运算最快,可在此函数中添加参数MEMORY_ONLY
。.persist(MEMORY_ONLY)
等价于.cache()
。持久化会占用内存资源,后面不用的话应去持久化,采用.unpersist()
。
注意:这里的持久化是标记为持久化,只有触发行动操作才真正触发持久化。
4.6 分区(分区的作用和原则、设置分区的方法)
RDD分区的作用:1.增加并行度;2.减少通信开销。
RDD分区原则:使得分区的个数尽量等于集群中CPU核心(core)数目。
设置分区的个数:
>>> list = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(list, 2) #设置了两个分区
也可使用reparitition
方法重新设置分区个数:
>>> data = sc.parallelize([1, 2, 3, 4, 5], 2)
>>> len(data.glom().collect()) #显示data这个RDD的分区数量
2
>>> rdd = data.repartition(1) #对data这个RDD重新分区
>>> len(rdd.glom().collect()) #显示data这个RDD的分区数量
1
>>>
4.7 分区(自定义分区方法)
动态调整分区大小很有必要,分区数量可在具体函数如sc.parallelize
中设置。
Spark提供了自带的HashPartitioner(哈希分区)与RangePartitioner(区域分区)能满足大多数应用场景的需求。与此同时,可自定义函数规定哪些数据分在什么区。
实例:根据key值的最后一位数字,写到不同文件。例如:
10写入到part-00000
11写入到part-00001
.
.
.
19写入到part-00009
自定义函数如下函数(文件名为TestPartitioner.py):
from pyspark import SparkConf, SparkContext
def MyPartitioner(key):
print("MyPatitioner is running")
print('The key is %d' % key)
return key%10
def main():
print("The main function is running")
conf = SparkConf().setMaster("local").setAppName("MyApp")
sc = SparkContext(conf=conf)
data = sc.parallelize(range(10), 5)
# 转成键值对数据,方便使用partitionBy函数
# 对data数据进行重新分区
# 转回只含键值的数据
data.map(lambda x: (x,1))\
.partitionBy(10, MyPartitioner)\
.map(lambda x: x[0])\
.saveAsTextFile("file:///home/chenfy/patitioner")
# 这里是目录地址,而不是文件地址
if __name__ == '__main__': # 判断本文件是单独执行则执行以下程序,如果只调用本文件的一部分,则不执行以下程序。
main()
运行TestPartitioner.py命令及输出:
[chenfy@entrobus32 ~]$ python3 TestPartitioner.py
The main function is running
19/09/16 13:41:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[Stage 0:> (0 + 1) / 5]MyPatitioner is running
The key is 0
MyPatitioner is running
The key is 1
MyPatitioner is running
The key is 2
MyPatitioner is running
The key is 3
[Stage 0:=======================> (2 + 1) / 5]MyPatitioner is running
The key is 4
MyPatitioner is running
The key is 5
MyPatitioner is running
The key is 6
MyPatitioner is running
The key is 7
MyPatitioner is running
The key is 8
MyPatitioner is running
The key is 9
[chenfy@entrobus32 ~]$
4.8 一个综合实例
假设有一个本地文件word.txt,里面包含多行文本,每行文本由多个单词构成,单词之间用空格分隔。可以使用如下语句进行词频统计(即统计每个单词出现的次数):
>>> lines = sc.textFile("file:///home/chenfy/word.txt")
>>> wordCount = lines.flatMap(lambda line: line.split(" ")).\
... map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
>>> print(wordCount.collect())
[('is', 3), ('good', 1), ('Spark', 2), ('Hadoop', 1), ('fast', 1), ('better', 1)]
>>>
4.9 键值对RDD的创建
从文件加载
>>> lines = sc.textFile("file:///home/chenfy/word.txt")
>>> pairRDD = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1))
>>> pairRDD.foreach(print)
('Hadoop', 1)
('is', 1)
('good', 1)
('Spark', 1)
('is', 1)
('fast', 1)
('Spark', 1)
('is', 1)
('better', 1)
>>>
通过并行集合(列表)创建RDD
>>> list = ["Hadoop", "Spark", "Hive", "Spark"]
>>> rdd = sc.parallelize(list)
>>> pairRDD = rdd.map(lambda word: (word, 1))
>>> pairRDD.foreach(print)
('Spark', 1)
('Hadoop', 1)
('Spark', 1)
('Hive', 1)
>>>
4.10 常用的键值对RDD转换操作(reduceByKey和groupByKey)
reduceByKey(func)
:使用func
函数合并具有相同键的值。
>>> pairRDD = sc.parallelize([("Hadoop", 1), ("Spark", 1), ("Hive", 1), ("Spark", 1)])
>>> pairRDD.reduceByKey(lambda a, b: a+b).foreach(print)
('Spark', 2)
('Hadoop', 1)
('Hive', 1)
>>>
groupByKey()
:对具有相同键的值进行分组。
>>> list = [("spark", 1), ("spark", 2), ("hadoop", 3), ("hadoop", 5)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.groupByKey()
PythonRDD[23] at RDD at PythonRDD.scala:53
>>> pairRDD.groupByKey().foreach(print)
('spark', <pyspark.resultiterable.ResultIterable object at 0x7fef172ccf28>)
('hadoop', <pyspark.resultiterable.ResultIterable object at 0x7feeae3f2a90>)
>>>
groupByKey()
和reduceByKey(func)
的区别:
groupByKey()
对每个key进行操作,但只生成一个sequence
,groupByKey()
本身不能自定义函数,需要先用groupByKey()
生成RDD
,然后才能对此RDD通过map
进行性自定义函数操作。reduceByKey(func)
用于对每个key
对应的多个value进行merge
操作,最重要的是它能够在本地先进行merge
操作,并且merge
操作做可以通过函数自定义。>>> words = ["one", "two", "two", "three", "three", "three"] >>> wordPairsRDD = sc.parallelize(words).map(lambda word: (word, 1)) >>> wordCountsWithReduce = wordPairsRDD.reduceByKey(lambda a, b: a+b) >>> wordCountsWithReduce.foreach(print) ('one', 1) ('two', 2) ('three', 3) >>> wordCountsWithGroup = wordPairsRDD.groupByKey().\ ... map(lambda t: (t[0], sum(t[1]))) >>> wordCountsWithGroup.foreach(print) ('three', 3) ('two', 2) ('one', 1) >>>
4.11 常用的PairRDD转换操作(keys、values、sortByKey、mapValues、join)
.keys()
:将Pair RDD中的key
返回形成一个新的RDD。
.values()
:将Pair RDD中的value
返回形成一个新的RDD。
.sortByKey()
:返回一个根据key
排序的RDD(看示例,似乎默认参数False
降序,但True
似乎不能升序?)。
>>> list = [("Hadoop", 1), ("Spark", 1), ("Hive", 1), ("Spark", 1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.keys().foreach(print)
Spark
Hadoop
Spark
Hive
>>> pairRDD.values().foreach(print)
1
1
1
1
>>> pairRDD.foreach(print)
('Spark', 1)
('Hive', 1)
('Spark', 1)
('Hadoop', 1)
>>> pairRDD.sortByKey().foreach(print)
('Hadoop', 1)
('Hive', 1)
('Spark', 1)
('Spark', 1)
>>> pairRDD.sortByKey(True).foreach(print)
('Hadoop', 1)
('Spark', 1)
('Spark', 1)
('Hive', 1)
>>> pairRDD.sortByKey(False).foreach(print)
('Hadoop', 1)
('Hive', 1)
('Spark', 1)
('Spark', 1)
>>>
.sortByKey()
和.sortBy()
的区别:
.sortByKey()
是.sortBy()
的特殊形式,仅能对key
进行排序;而.sortBy()
既能对key
排序,又能对value
排序,甚至还能更精细比如对key
或value
的一部分进行排序,但需在参数中具体指明。>>> d1 = sc.parallelize([("c", 8), ("b", 25), ("c", 17), ("a", 42), \ ... ("b", 4), ("d", 9), ("e", 17), ("c", 2), ("f", 29), ("g", 21), ("b", 9)]) >>> d1.reduceByKey(lambda a, b: a+b).sortByKey(False).collect() [('g', 21), ('f', 29), ('e', 17), ('d', 9), ('c', 27), ('b', 38), ('a', 42)] >>> d1 = sc.parallelize([("c", 8), ("b", 25), ("c", 17), ("a", 42), \ ... ("b", 4), ("d", 9), ("e", 17), ("c", 2), ("f", 29), ("g", 21), ("b", 9)]) >>> d1.reduceByKey(lambda a, b: a+b).sortBy(lambda x: x, False).collect() [('g', 21), ('f', 29), ('e', 17), ('d', 9), ('c', 27), ('b', 38), ('a', 42)] >>> d1.reduceByKey(lambda a, b: a+b).sortBy(lambda x: x[0], False).collect() [('g', 21), ('f', 29), ('e', 17), ('d', 9), ('c', 27), ('b', 38), ('a', 42)] >>> d1.reduceByKey(lambda a, b: a+b).sortBy(lambda x: x[1], False).collect() [('a', 42), ('b', 38), ('f', 29), ('c', 27), ('g', 21), ('e', 17), ('d', 9)] >>>
.mapValues(func)
:对键值对RDD中的每个value
都应用同一个函数,但是key
不变化。
>>> list = [("Hadoop", 1), ("Spark", 1), ("Hive", 1), ("Spark", 1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD1 = pairRDD.mapValues(lambda x: x+1)
>>> pairRDD1.foreach(print)
('Spark', 2)
('Hive', 2)
('Spark', 2)
('Hadoop', 2)
>>>
.join
:内连接。对于给定的两输入数据集(K,v1)
和(k,v2)
,只有在两个数据集中都存在的key
才会被输出,最终得到一个(k,(v1,v22))
类型的数据集。
>>> pairRDD1 = sc.parallelize([("spark", 1), ("spark", 2), ("hadoop", 3), ("hadoop", 5)])
>>> pairRDD2 = sc.parallelize([("spark", "fast")])
>>> pairRDD3 = pairRDD1.join(pairRDD2)
>>> pairRDD3.foreach(print)
[Stage 72:===========================> (40 + 40) / 80]('spark', (1, 'fast'))
('spark', (2, 'fast'))
>>>
4.12 一个综合实例
题目:给定一组键值对("spark", 2), ("hadoop",6), ("hadoop", 4), ("spark", 6)
,键值对的key
表示图书名称,value
表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
>>> rdd = sc.parallelize([("spark", 2), ("hadoop", 6), ("hadoop", 4), ("spark", 6)])
>>> rdd.mapValues(lambda x: (x, 1)).\
... reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])).\
... mapValues(lambda x: x[0]/x[1]).collect()
[('hadoop', 5.0), ('spark', 4.0)]
>>>
4.13 文件数据读写
4.13.1 本地文件系统的读写
从本地文件中读取数据创建RDD
>>> textFile = sc.textFile("file:///home/chenfy/word.txt")
>>> textFile.first()
'Hadoop is good'
>>>
把RDD写入到文本文件中
>>> textFile.saveAsTextFile("file:///home/chenfy/writeback")
>>> exit()
[chenfy@entrobus32 ~]$ cd /home/chenfy/writeback/
[chenfy@entrobus32 writeback]$ ls
part-00000 part-00001 _SUCCESS
[chenfy@entrobus32 writeback]$
注:可以看出存储的是一个文件路径而非单个文件。
再次把数据加载在RDD中
>>> textFile = sc.textFile("file:///home/chenfy/writeback")
>>> textFile.first()
'Spark is better'
>>>
4.13.2 分布式文件系统HDFS的数据读写
暂不清楚如何进入HDFS,需要怎样的权限等。
分布式文件HDFS的数据读写,也是采用textFile()
方法。该方法可传入一本地或HDFS的目录地址或文件地址。如果是文件地址,加载文件;如果是目录地址,加载该目录下所有问价的数据。
>>>textFile = sc.textFile("hdfs://localhost:9000/usr/hadoop/word.txt")
>>>textFile.first()
把RDD中的数据保存到HDFS文件中
>>>textFile = sc.textFile("word.txt")
>>>textFile.saveAsTextFile("writeback")
4.14 读写HBase数据(HBase介绍)
HBase数据库是四维数据库,由行键、列族、列限定符、版本时间戳定位具体数据(四维定位);而关系型数据库如MySQL仅由行键、列限定符即可确定数据(二维定位)。
4.15 读写HBase数据(创建一个HBase表)
HBase的安装(伪分布式模式)详见厦门大学数据库实验室HBase安装教程。
启动Hadoop(为什么需要启动Hadoop,因为HBase依赖Hadoop的分布式环境。)
以下启动Hadoop的命令并不能成功,可能跟权限有关。但查到32号机确实有Hadoop,暂时先跳过此节。
[chenfy@entrobus32 ~]$ which hadoop
/usr/bin/hadoop
[chenfy@entrobus32 ~]$ cd /usr/bin/hadoop
-bash: cd: /usr/bin/hadoop: 不是目录
[chenfy@entrobus32 ~]$ ls -l|grep 'hadoop'
[chenfy@entrobus32 ~]$ ls -l|grep 'hadoop'
[chenfy@entrobus32 ~]$ cd /usr/bin/hadoop
-bash: cd: /usr/bin/hadoop: 不是目录
[chenfy@entrobus32 ~]$ ^C
[chenfy@entrobus32 ~]$ start-dfs.sh start-mapreted.sh
-bash: start-dfs.sh: 未找到命令
[chenfy@entrobus32 ~]$ start-all.sh
-bash: start-all.sh: 未找到命令
[chenfy@entrobus32 ~]$ hadoop-daemon.sh start namenode
-bash: hadoop-daemon.sh: 未找到命令
[chenfy@entrobus32 ~]$
$ cd /usr/local/hadoop
$ ./sbin/start-all.sh
启动HBase
$ cd /usr/local/hbase
$ ./bin/start-hbase.sh //启动HBase
$ ./bin/hbase shell //启动hbase shell
hbase> disable 'student' # 使student表失效
hbase> drop 'student' # 删除student表
创建列族信息
hbase> create 'student', 'info'
录入student表第一个学生记录
hbase> put 'student', '1', 'info:name', 'Xueqian'
hbase> put 'student', '1', 'info:gender', 'F'
hbase> put 'student', '1', 'info:age', '23'
录入student表第二个学生记录
hbase> put 'student', '2', 'info:name', 'Weiliang'
hbase> put 'student', '2', 'info:gender', 'M'
hbase> put 'student', '2', 'info:age', '24'
4.16 读写HBase数据(配置Spark)
应导入jar包,包括所有以‘hbase’开头的jar包、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar到spark安装目录下。
$ cd /usr/local/spark/jars
$ mkdir hbase
$ cd hbase
$ cp /usr/local/hbase/lib/hbase*.jar ./
$ cp /usr/local/hbase/lib/guava-12.0.1.jar ./
$ cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
$ cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./
此外,在Spark 2.0以上版本中,缺少把HBase数据转换成Python可读取数据的jar包,需要另行下载。可以访问下面地址下载spark-examples_2.11-1.6.0-typesafe-001.jar。下载以后保存到“/usr/local/spark/jars/hbase”目录中。
使用vim编辑器打开spark-env.sh文件,设置Spark的spark-env.sh文件,告诉Spark可以在哪个路径下找到HBase相关的jar文件,命令如下:
$ cd /usr/local/spark/conf
$ vim spark-env.sh
打开spark-env.sh文件以后,可以在文件最前面增加下面一行内容:
export
SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoopclasspath):$(/usr/local/hbase/bin/hbaseclasspath):/usr/local/spark/jars/hbase/*
这样,后面编译和运行过程才不会出错。
用SparkContext提供的newAPIHadoopRDD API将表的内容以RDD的形式加载到Spark中。
SparkOperateHBase.py:
#!/usr/bin/env python3
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf=conf)
host = 'localhost'
table = 'student'
conf = {"hbase.zookeeper.quorum":host,"hbase.mapreduce.inputtable":table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat", "org.apche.hadoop.hbase.io.ImmutableBytesWritable", "org.apache.hadoop.hbase.client.Result", keyConverter=keyConv, valueConverter=valueConver, conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k,v) in output:
print(k, v)
执行结果:
1 {"qualifier":"age", "timestamp":"1545728145163", "columnFamily":"info", "row":"1", "type":"Put", "value":"23"}
{"qualifier":"gender", "timestamp":"1545728114020", "columnFamily":"info", "row":"1", "type":"Put", "value":"F"}
{"qualifier":"name", "timestamp":"1545728100663", "columnFamily":"info", "row":"1", "type":"Put", "value":"Xueqian"}
2 {"qualifier":"age", "timestamp":"1545728168727", "columnFamily":"info", "row":"2", "type":"Put", "value":"24"}
...
执行该代码文件,命令如下:
$ cd /usr/local/spark/mycode/rdd
$ /usr/local/spaark/bin/spark-submit SparkOperateHBase.py
4.17 读写HBase数据(编写程序读取HBase数据)
4.18 读写HBase数据(编写程序向HBase写入数据)
公司32号机可通过hbase shell
启动hbase。
[chenfy@entrobus32 ~]$ hbase shell
Java HotSpot(TM) 64-Bit Server VM warning: Using incremental CMS is deprecated and will likely be removed in a future release
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hbase/lib/phoenix-4.14.0-cdh5.13.2-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hbase/lib/phoenix-4.14.0-cdh5.13.2-hive.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hbase/lib/phoenix-4.14.0-cdh5.13.2-pig.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hbase/lib/phoenix-4.14.0-cdh5.13.2-thin-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.2.0-cdh5.13.3, rUnknown, Sat Mar 17 04:43:46 PDT 2018
hbase(main):001:0>
用list
命令可显示当前HBase数据库中有哪些已经创建好的表。
hbase(main):002:0> list
TABLE
0 row(s) in 0.0690 seconds
=> []
hbase(main):003:0>
如果想创建一个student表需保证student表不存在。否则需用下面语句删除student表:
hbase> disable 'student'
hbase> drop 'student'
自然,现在情况没有student表,那么将显示:
hbase(main):003:0> disable 'student'
ERROR: Table student does not exist.
Start disable of named table:
hbase> disable 't1'
hbase> disable 'ns1:t1'
hbase(main):004:0>
下面来创建一个student表,并录入如下数据:
id | name | gender | age |
---|---|---|---|
1 | Xueqian | F | 23 |
2 | Weiliang | M | 24 |
结论:32号机无创建表的权限。
4.19 案例1:求TOP值
任务描述,file1.txt及file2.txt均含有4列数据,名称分别为orderid、userid、payment、productid。求Top N个payment值。 file1.txt:
1,1768,50,155
2,1218,600,211
3,2239,788,242
4,3101,28,599
5,4899,290,129
6,3110,54,1201
7,4436,259,877
8,2369,7890,27
file2.txt:
100,4287,226,233
101, 6562,489,124
102,1124 ,33,17
103,3267,159,179
104,4569,57,125
105,1438,37,116
代码文件TopN.py
,其内部内容为:
#!usr/bin/env python3
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc=SparkContext(conf=conf)
# 读取file文件夹,将读取其内file1.txt及file2.txt
lines = sc.textFile("file:///home/chenfy/file")
# 仅保留非空行,仅保留含4个数字的行
result1 = lines.filter(lambda line: (len(line.strip())>0) and (len(line.split(","))==4))
# 保留第三列
result2 = result1.map(lambda x: x.split(",")[2])
# 第三列转化为数字和空字符的键值对
result3 = result2.map(lambda x: (int(x), ""))
# 分区调整为1
result4 = result3.repartition(1)
# 按键降序排列
result5 = result4.sortByKey(False)
# 丢弃值(空字符)
result6 = result5.map(lambda x: x[0])
# 取前5
result7 = result6.take(5)
# aaa
for a in result7:
print(a)
运行过程:
[chenfy@entrobus32 ~]$ python3 TopN.py
19/09/16 16:54:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
7890
788
600
489
290
[chenfy@entrobus32 ~]$
正如手工排序将得到的结果。
4.20 案例2:文件排序
任务描述: 有多个输入文件(file1.txt、file2.txt、file3.txt),每个文件中的每一行内容均为一个整数。要求读取所有文件中的整数,进行排序后输出到一个新的文件中,输出的内容个数为每行两个整数,第一个整数位第二个整数的排序位次,第二个整数为原待排序的整数。 file1.txt:
33
37
12
40
file2.txt:
4
16
39
5
file3.txt:
1
45
25
任务内容很好理解,输出文件不示例。 代码文件FileSort.py:
#!/usr/bin/env python3
from pyspark import SparkConf, SparkContext
index = 0
def getindex():
global index
index += 1
return index
def main():
conf = SparkConf().setMaster("local[1]").setAppName("FileSort")
sc = SparkContext(conf=conf)
# 读取/home/chenfy/filesort/路径下所有file开头的txt文件
lines = sc.textFile("file:///home/chenfy/filesort/file*.txt")
index = 0
# 仅保留含有非空字符的行
result1 = lines.filter(lambda line: (len(line.strip())>0))
# 将字符形式的数字转化成数字、空字符组成的键值对
result2 = result1.map(lambda x: (int(x.strip()), ""))
# 改成一个分区
result3 = result2.repartition(1)
# 将键值对按键升序排列
result4 = result3.sortByKey(True)
# 丢弃值(空字符)
result5 = result4.map(lambda x: x[0])
# 在数字前面加索引,索引按getindex所示从0开始顺次递增
result6 = result5.map(lambda x: (getindex(), x))
# 打印排序并加索引的结果
result6.foreach(print)
# 结果存入/home/chenfy/sortresult路径
result6.saveAsTextFile("file:///home/chenfy/filesort/sortresult")
if __name__ == '__main__':
main()
运行结果:
[chenfy@entrobus32 filesort]$ python3 FileSort.py
19/09/16 17:45:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[Stage 0:> (0 + 2) / 3](1, 1)
(2, 4)
(3, 5)
(4, 12)
(5, 16)
(6, 16)
(7, 16)
(8, 16)
(9, 16)
(10, 25)
(11, 33)
(12, 37)
(13, 39)
(14, 39)
(15, 39)
(16, 39)
(17, 39)
(18, 40)
(19, 45)
(20, 54)
(21, 54)
(22, 54)
(23, 54)
[chenfy@entrobus32 filesort]$
显然,结果符合预期。
4.21 案例3:二次排序
任务要求: 对于一个给定文件(file1.txt),请对数据进行排序,首先根据第一列数据降序排序,如第一列数据相等,则根据第二列数据降序排序。 file1.txt:
5 3
1 6
4 9
8 3
4 7
5 6
3 2
输出结果:
8 3
5 6
5 3
4 9
4 7
3 2
1 6
SecondarySortKey.py:
from operator import gt
from pyspark import SparkContext, SparkConf
class SecondarySortKey():
def __init__(self, k):
self.column1 = k[0]
self.column2 = k[1]
def __gt__(self, other):
if other.column1 == self.column1:
# 当第一列值相等时,比较第二列
return gt(self.column2, other.column2)
else:
# 否则只比较第一列
return gt(self.column1, other.column1)
def main():
conf = SparkConf().setAppName('spark_sort').setMaster('local[1]')
sc = SparkContext(conf=conf)
file = "file:///home/chenfy/secondarysort/file1.txt"
rdd1 = sc.textFile(file)
# 过滤空行
rdd2 = rdd1.filter(lambda x: (len(x.strip())>0))
# 在每行前面加这两个数字本身作为键
rdd3 = rdd2.map(lambda x: ((int(x.split(" ")[0]), int(x.split(" ")[1])), x))
# (num1, num2)作为键,但spark不会知道这样的键该怎么排序,SecondarySortKey函数告诉spark:键的第一值相同就看第二个值,否则只看第一个值。
# map函数要求保留字符形式的数字本身。
rdd4 = rdd3.map(lambda x: (SecondarySortKey(x[0]), x[1]))
# 根据上步获得信息排序
rdd5 = rdd4.sortByKey(False)
# 丢弃键保留值(字符形式的数字本身)
rdd6 = rdd5.map(lambda x: x[1])
# 打印结果,方便debug
rdd6.foreach(print)
if __name__ == '__main__':
main()
运行结果:
[chenfy@entrobus32 secondarysort]$ python3 SecondarySortApp.py
19/09/16 20:33:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[Stage 0:> (0 + 1) / 1]8 3
5 6
5 3
4 9
4 7
3 2
1 6
[chenfy@entrobus32 secondarysort]$
显然,与要求结果同。
5. Spark SQL
5.1 Spark SQL简介
- Spark SQL增加了DataFrame(即带有Schema信息的RDD),使用户可以在Spark SQL中执行SQL语句,数据既可以来自RDD,也可以是Hive、 HDFS、Cassandra等外部数据源,还可以是JSON格式的数据。
- Spark SQL目前支持Scala、Java、Python三种语言,支持SQL-92规范。
为什么推出Spark SQL:
Spark SQL之前系统的不足:
- 关系数据库已经很流行
- 关系数据库在大数据时代已经不能满足要求
- 首先,用户需要从不同数据源执行各种操作,包括结构化、半结构化和非结构化数据
- 其次,用户需要执行高级分析,比如机器学习和图像处理
- 在实际大数据应用中,经常需要融合关系查询和复杂分析算法(比如机器学习或图像处理),但是,缺少这样的系统
Spark SQL填补了这个鸿沟:
- 首先,可以提供DataFrame API,可以对内部和外部各种数据源执行各 种关系型操作
- 其次,可以支持大数据中的大量数据源和数据分析算法 Spark SQL可以融合:传统关系数据库的结构化数据管理能力和机器学 习算法的数据处理能力
5.2 DataFrame概述
- DataFrame的推出,让Spark具备了处理大规模结构化数据的能力,不仅比原有的RDD转化方式更加简单易用,而且获得了更高的计算性能
- Spark能够轻松实现从MySQL到DataFrame的转化,并且支持SQL查询
- RDD是分布式的 Java对象的集合,但是,对象内部结构对于RDD而言却是不可知的
- DataFrame是一种以RDD为基础的分布式数据集,提供了详细的结构信息
5.3 DataFrame的创建
- 从Spark2.0以上版本开始,Spark使用全新的SparkSession接口替代Spark1.6中的SQLContext及HiveContext接口来实现其对数据加载、转换、处理等功能。SparkSession实现了SQLContext及HiveContext所有功能。
- SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并且支持把DataFrame转换成SQLContext自身中的表, 然后使用SQL语句来操作数据。SparkSession亦提供了HiveQL以及其他依赖于Hive的功能的支持。
可以通过如下语句创建一个SparkSession对象:
from pyspark import SparkConteext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=conf)
实际上,在启动 进入pyspark以后,pyspark就默认提供了一个SparkContext对象(名称为sc)和一个SparkSession对象(名称为spark)。
在创建DataFrame时,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame,例如:
- spark.read.text("people.txt"):读取本地文件people.txt创建DataFramee。
- spark.read.json("people.json"):读取people.json文件创建DataFrame;在读取本地文件或HDFS文件时,要注意给出正确的文件路径。
- spark.read.parquet("people.parquet"):读取people.parquet文件创建DataFram。
或者也可以对应使用如下格式的语句:
- spark.read.format("text").load("people.txt")
- spark.read.format("json").load("people.json")
- spark.read.format("parquet").load("people.parquet")
例如,读取本地/home/chenfy/people.json文件:
>>> df = spark.read.json("file:///home/chenfy/resources/people.json")
>>> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
>>>
5.4 DataFrame的保存
可使用spark.write操作,把一个DataFrame操作保存成不同格式的文件:
- df.write.text("people.txt")
- df.write.json("people.json")
- df.write.parquet("people.parquet")
也可对应使用如下格式的语句:
- df.write.format("text").save("people.txt")
- df.write.format("json").save("people.json")
- df.write.format("parquet").save("people.parquet")
下面从示例文件people.json中创建一个DataFrame,将其保存到令一json文件。然后仅选取name列,把该列数据保存到文本文件:
>>> peopleDF = spark.read.format("json").\
... load("file:///home/chenfy/resources/people.json")
>>> peopleDF.select("name", "age").write.format("json").\
... save("file:///home/chenfy/people_new_json")
>>> peopleDF.select("name").write.format("text").\
... save("file:///home/chenfy/people_new_text")
>>> exit()
[chenfy@entrobus32 resources]$ cd ~
[chenfy@entrobus32 ~]$ cd people_new_text
[chenfy@entrobus32 people_new_text]$ ls
part-00000-2d3a6add-79ee-4156-8692-b85f0e259950-c000.txt _SUCCESS
[chenfy@entrobus32 people_new_text]$ cd ~
[chenfy@entrobus32 ~]$ cd people_new_json
[chenfy@entrobus32 people_new_json]$ ls
part-00000-f5299bd1-fa0b-4c24-9d91-da8619cb6717-c000.json _SUCCESS
注:从紧上示例可以看出,保存的将是路径。保存路径的最后一级是保存的文件夹名(即使最后一级是*.json
等),将自动创建此文件夹并在此文件夹中生产语句定义了的文件名、文件格式。
5.5 DataFrame的常用操作
>>> df = spark.read.json("file:///home/chenfy/resources/people.json")
>>> df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
>>> df.select(df["name"], df["age"]+1).show()
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
>>> df.filter(df["age"]>20).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
>>> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
| 19| 1|
|null| 1|
| 30| 1|
+----+-----+
>>> df.sort(df["age"].desc()).show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
|null|Michael|
+----+-------+
>>> df.sort(df["age"].desc(), df["name"].asc()).show()
+----+-------+
| age| name|
+----+-------+
| 30| Andy|
| 19| Justin|
|null|Michael|
+----+-------+
>>>
5.6 利用反射机制推断RDD模式
注:5.6、5.7我看标题有点懵。DataFrame相对于RDD多了schema,怎么传递schema信息给DataFrame呢,林老师给了5.6、5.7两种方法。
将people1.txt加载到内存中生成一个DataFrame,并查询其中的数据。 people.txt:
Michael, 29
Andy, 30
Justin, 19
>>> from pyspark.sql import Row
>>> people = spark.sparkContext.\
... textFile("file:///home/chenfy/resources/people1.txt").\
... map(lambda line: line.split(",")).\
... map(lambda p: Row(name=p[0], age=int(p[1])))
>>> schemaPeople = spark.createDataFrame(people)
# 必须注册为临时表才能供下面的查询使用
# 之后sql语句的表的名字正是来自这里
>>> schemaPeople.createOrReplaceTempView("people")
>>> personsDF = spark.sql("select name, age from people where age > 20")
19/09/17 10:43:20 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
19/09/17 10:43:20 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
19/09/17 10:43:22 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
# DataFrame中的每一个元素都是一行记录,包含name和age两个字段,分别用p.name和p.age来获取值。
>>> personsRDD = personsDF.rdd.map(lambda p: "Name: " + p.name + ", Age: " + str(p.age))
>>> personsRDD.foreach(print)
Name: Michael, Age: 29
Name: Andy, Age: 30
>>>
5.7 使用编程方式定义RDD模式
>>> from pyspark.types import *
>>> from pyspark.sql import Row
# 生成表头
>>> schemaString = "name age"
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(" ")]
>>> schema = StructType(fields)
# 生成表中的记录
>>> lines = spark.sparkContext.\
... textFile("file:///home/chenfy/resources/people1.txt")
>>> parts = lines.map(lambda x: x.split(","))
>>> people = parts.map(lambda p: Row(p[0], p[1].strip()))
# 将表头和表中的记录拼装在一起
>>> schemaPeople = spark.createDataFrame(people, schema)
# 注册为临时表供sql查询使用
>>> schemaPeople.createOrReplaceTempView("people")
>>> results = spark.sql("SELECT name, age FROM people")
>>> results.show()
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
>>>
5.8 MySQL数据库准备工作
我并不知道能登录的账户。之后从同事处要到34号机上的MySQL账号:
[chenfy@entrobus32 ~]$ mysql -h10.18.0.34 -ucfy -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 1003
Server version: 5.7.27 MySQL Community Server (GPL)
Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> create database spark;
Query OK, 1 row affected (0.00 sec)
mysql> use spark;
Database changed
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
Query OK, 0 rows affected (0.35 sec)
mysql> insert into student values(1, 'Xueqian', 'F', 23);
Query OK, 1 row affected (0.08 sec)
mysql> insert into student values(2, 'Weiliang', 'M', 24);
Query OK, 1 row affected (0.04 sec)
mysql> select * from student;
+------+----------+--------+------+
| id | name | gender | age |
+------+----------+--------+------+
| 1 | Xueqian | F | 23 |
| 2 | Weiliang | M | 24 |
+------+----------+--------+------+
2 rows in set (0.00 sec)
mysql>exit;
jdbcDF = spark.read\
.format("jdbc")\
.option("driver","com.mysql.jdbc.Driver")\
.option("url","jdbc:mysql://10.18.0.34:3306/spark")\
.option("dbtable","student")\
.option("user","cfy")\
.option("password", "123456")\
.load()
不能成功,换成下面也暂时不能成功,报错未有很明确信息。
password = 123456
from pyspark import SparkContext
from pyspark.sql import SQLContext,Row
sqlContext=SQLContext(sc)
jdbcDF = sqlContext.read.format('jdbc').options(url="jdbc:mysql://10.18.0.34/spark?user=cfy&password=password",dbtable="student").load()
# jdbcDF = sqlContext.read.jdbc(url="jdbc:mysql://10.18.0.34?user=cfy&password=password",table="spark.student")
本章后面内容只能先暂时放一放。
5.9 使用Spark SQL读写数据库
6. Spark Streaming
静态数据和流数据:
- 很多企业为了支持决策而分析而构建的数据仓库系统,其中存放的大量历史数据就是静态数据。技术人员可利用数据挖掘和OLAP(On-Line AnalyticaL Processing)分析工具从静态数据中找到对企业有价值的信息。
- 近年来,在Web应用、网络监控、传感器监控等领域,兴起了一种新的数据密集型应用--流数据,即数据以大量、快速、时变的流形式持续到达。实例如PM2.5检测、电子商务网站用户点击流。
流数据具有如下特征:
- 数据快速持续到达,潜在大小也许是无穷无尽的
- 数据来源众多,格式复杂
- 数据量大,但是不十分关注存储,一旦经过处理,要么被丢弃,要么被归档存储
- 注重数据的整体价值,不过分关注个别数据
- 数据顺序颠倒,或者不完整,系统无法控制将要处理的新到达的数据元素的顺序
- 对静态数据和流数据的处理,对应着两种截然不同发的计算模式:批量计算和实时计算。
- 批量计算:充裕时间处理静态数据,如Hadoop。
- 流计算不适合采用批量计算,因为流数据不适合用传统的关系模型建模
- 流数据必须采用实时计算,响应时间为秒级
- 数据量少时,不是问题,但是,在大数据时代,数据格式负责、来源众多、数据量巨大,对实时计算提出了很大的挑战。因此,针对流数据的实时计算--流计算,应运而生。
6.1 流计算概述
- 流计算:实时获取来自不同数据源的海量数据,经过实时分析处理,获取有价值的信息。
- 流计算秉承一个基本理念,即数据的价值随着时间的流逝而降低,如用户点击流。因此,当时间出现时就应立即进行处理,而不是缓存起来进行批量处理。为了及时处理流数据,就需要一个低延迟、可扩展、高可靠的的处理引擎。
对于一个流计算系统来说,它应该达到如下需求: * 高性能:处理大数据的基本要求,如每秒处理几十万条数据 * 海量式:支持TB级甚至时PB级的数据规模 * 实时性:保证较低的延迟时间,达到秒级别,甚至是毫秒级别 * 分布式:支持大数据的基本架构,必须能够平滑扩展 * 易用性:能快速进行开发和部署 * 可靠性:能可靠地处理流数据
流计算框架:商业级:IBM InfoSphere Streams和IBM StreamBase;开源:Twitter Storm和Yahoo! S4(Simple Scalaable Streaming System);公司自身的:Facebook Puma、Dstream和银河流数据处理平台。
流数据处理流程:数据实时采集、数据实时计算、实时查询服务
6.2 Spark Streaming
Spark Streaming可整合多种输入数据源,如Kafka、 Flume、HDFS,甚至是普通的TCP套接字。经处理后的 数据可存储至文件系统、数据库,或显示在仪表盘里。
Spark Streaming最主要的抽象是DStream(Discretized Stream,离散化数据 流),表示连续不断的数据流。在内部实现上,Spark Streaming的输入数据按 照时间片(如1秒)分成一段一段,每一段数据转换为Spark中的RDD,这些分 段就是Dstream,并且对DStream的操作都最终转变为对相应的RDD的操作。
6.3 DStream操作概述
编写Spark Streaming程序的基本步骤是:
- 1.通过创建输入DStream来定义输入源
- 2.通过对DStream应用转换操作和输出操作来定义流计算
- 3.用streamingContext.start()来开始接收数据和处理流程
- 4.通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)
- 5.可以通过streamingContext.stop()来手动结束流计算 进程
6.4 文件流
终端窗口,启动pyspark,输入以下命令,程序将开始自动进入循环监听状态,屏幕上会显示一堆的信息:
>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc,10)
>>> lines = ssc.textFileStream("file:///home/chenfy/streaming/logfile")
>>> words = lines.flatMap(lambda line: linesplit(' '))
>>> wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a+b)
>>> wordCounts.pprint()
>>> ssc.start()
------------------------------------------- it
Time: 2019-09-17 20:03:40
-------------------------------------------
在/home/chenfy/streaming/logfile目录下新家一个log.txt文件,就可以在监听窗口显示词频统计结果。(注:以上代码暂不明白什么意思。)
也可采用独立应用程序方式船舰文件流:
#!usr/bin/env python3
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[2]')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
lines = ssc.textFileStream('file:///home/chenfy/streaming/logfile')
words = lines.flatMap(lambda line: line.split(' '))
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a+b)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
cd /home/chenfy/streaming/logfile/
spark-submit FileStreaming.py
也能运行,输出持续不断,但不知道什么意思。
6.5 套接字流(使用NC程序产生数据)
使用 套接字流作为数据: 创建并在/home/chenfy/streaming/socket/NetworkWordCount.py文件输入如下内容:
#!/usr/bin/env python3
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ = "__main__":
if len(sys.argv) != 3:
print("Usage: NetWorkWordCount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(' '))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
不知怎么,没有像林老师说的一样得到词频统计,也没报错。
6.6 套接字流(使用Socket编程实现自定义数据流)
下面我们再前进一步,把数据源头的产生方式修改一下,不要使用nc程序,而是采用以下自己编写的程序产生Socket数据源:
#!usr/bin/env python3
import socket
# 生成socket对象
server = socket.socket()
# 绑定ip和端口,端口可随意指定,
# 但为了NetworkWordCount.py能监听到,其执行语句应写同一端口
server.bind(('localhost', 8888))
# 监听绑定的窗口
server.listen(1)
while 1:
# 为了方便识别,打印一个“我在等待”
print("I'm waiting the connect...")
# 这里用两个值接收,因为连接上之后使用的是客户端发来请求的这个实例。
# 所以下面的传输要使用conn实例操作
conn, addr = server.accept()
# 打印连接成功
print("Connect success! Connection is from %s." % addr[0])
# 打印正在发送数据
print('Sending data...')
conn.send('I love hadoop, I love spark, hadoop is good and spark is fast.'.encode())
conn.close()
print('Connection is broken.')
启动Socket服务端,即DataSourceSocket.py程序:
spark-submit DataSourceSocket.py
启动客户端,即NetworkWordCount.py程序:
spark-submit NetworkWordCount.py localhost 8888
将在客户端输出DataSourceSocket.py程序中conn.send语句发送内容的词频统计。
6.7 RDD队列流
import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingQueueStream")
ssc = StreamingContext(sc,2)
# 创建一个队列,通过该队列可以把RDD推给一个RDD队列
rddQueue = []
for i in range(5):
rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
time.sleep(1)
# 创建一个RDD队列六
inputStream = ssc.queueStream(rddQueue)
mappedStream = inputStream.map(lambda x: (x % 10, 1))
reducedStream = mappedStream.reduceByKey(lambda a, b: a+b)
reducedStream.pprint()
ssc.start()
ssc.stop(stopSparkContext=True, stopGraceFully=True)
成功运行,不知什么意思。
6.8 使用Apache Kafka作为Spark Streaming数据源(准备工作)
Kafka简介:
- Kafka是一种高吞吐量的分布式发布订阅消息系统,用户通过Kafka系统可以发 布大量的消息,同时也能实时订阅消费消息
- Kafka可以同时满足在线实时处理和批量离线处
- 在公司的大数据生态 系统中,可以把 Kafka作为数据交换 枢纽,不同类型的分 布式系统(关系数据 库、NoSQL数据库、 流处理系统、批处理系统等),可以统一接入到Kafka,实现 和Hadoop各个组件之间的不同类型数据的实时高效交换
- Broker Kafka集群包含一个或多个服务器,这种服务器被称为broker
- Topic:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上,但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
- Partition:Partition是物理上的概念,每个Topic包含一个或多个Partition.
- Producer:负责发布消息到Kafka broker
- Consumer 消息消费者,向Kafka broker读取消息的客户端。
- Consumer Group:每个Consumer属于一个特定的Consumer Group(可为每个Consumer 指定group name,若不指定group name则属于默认的group)
Kafka安装:(暂略)
Kafka启动:未检测到32号机装了Kafka,暂略。
6.9 使用Apache Kafka作为Spark Streaming数据源(编写流计算程序)
6.10 DStream无状态转换操作
DStream转换操作:
- DStream无状态转换操作
- DStream有状态操作
无状态转换操作实例:之前“套接字流”部分介绍的词频统计,就是采用无状态转换,每次统计,都是只统计当前批次到达的单词的词频,和之前批次无关,不会进行累计。
无状态转换操作包括:
- map(func):对源DStream的每个元素,采用func函数进行转换,得到一个新的Dstream
- flatMap(func):与map相似,但是每个输入项可用被映射为0个或者多个输出项
- filter(func):返回一个新的DStream,仅包含源DStream中满足函数func的项
- repartition(numPartitions):通过创建更多或者更少的分区改变DStream的并行程度
- union(otherStream):返回一个新的DStream,包含源DStream和其他DStream的元素
- countByValue():应用于元素类型为K(注:按我理解即非键值对数据)的DStream上,返回一个(K,V)键值对类型的新DStream,每个键的值是在原DStream的每个RDD中的出现次数
- reduceByKey(func, [numTasks]):当在一个由(K,V)键值对组成的DStream上执行该操作时,返回一个新的由(K,V)键值对组成的DStream,每一个key对应的值均由给定的recuce函数(func)聚集起来
- join(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V)键 值对,一个包含(K,W)键值对),返回一个包含(K, (V, W))键值对的新Dstream
- cogroup(otherStream, [numTasks]):当应用于两个DStream(一个包含(K,V) 键值对,一个包含(K,W)键值对),返回一个包含(K, Seq[V], Seq[W])的元组(注:暂未理解cogroup的作用和输出)
- transform(func):通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。支持在新的DStream中做任何RDD操作
6.11 DStream有状态转换操作(滑动窗口转换操作)
有状态转换操作包括:
- 滑动窗口转换操作
- updateStateByKey操作
滑动窗口转换操作包括:
- window(windowLength, slideInterval) 基于源DStream产生的窗口化的批 数据,计算得到一个新的Dstream一些窗口转换操作的含义:
- countByWindow(windowLength, slideInterval) 返回流中元素的一个滑动窗口数
- reduceByWindow(func, windowLength, slideInterval) 返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数func必须满足结合律,从而可以支持并行计算
- countByValueAndWindow(windowLength, slideInterval, [numTasks]) 当应用到一个(K,V)键值对组成的DStream上, 返回一个由(K,V)键值对组成的新的DStream。每个key对应的值都是它们在滑动窗口中出现的频率 reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 应用到一个(K,V)键值对组成的DStream上时,会返回一个由(K,V)键值对组成的新的DStream。每一个key对应的值均由给定的reduce函数(func函数)进行聚合计算。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。可以通过numTasks参数的设置来指定不同的任务数
- reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):更加高效的reduceByKeyAndWindow,每个窗口的reduce值,是基于先前窗口的reduce值进行增量计算得到的;它会对进入滑动窗口的新数据进行reduce操作,并对离开窗口的老数据进行“逆向reduce‖操作。但是,只能用于“可逆reduce函数”,即那些reduce函数都有一个对应的“逆向reduce函数”(以InvFunc参数传入)
/home/chenfy/streaming/socket/WindowedNetworkWordCount.py内容如下:
#!/usr/bin/env python3
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: WindowedNetworkWordCount.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingWindowedWordCount")
ssc = StreamingContext(sc,10)
ssc.checkpoint("file:///home/chenfy/streaming/socket/checkpoint")
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKeyAndWindow(lambda x, y: x+y, lambda x, y: x-y, 30, 10)
counts.pprint()
ssc.start()
ssc.awaitTermination()
注:无报错,但不知为何并不会输出词频统计。
6.12 DStream有状态转换操作(updateStateByKey操作)
需要在跨批次之间维护状态时,就必须使用updateStateByKey操作
词频统计实例:
对于有状态转换操作而言,本批次的词频统计,会在之前批次的词频统计结果的基础上进行不断累加,所以,最终统计得到的词频,是所有批次的单词的总的词频统计结果
#!/usr/bin/env python3
from __future__ import print_function
import sys
from pysaprk import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
ssc = StreamingContext(sc,1)
ssc.checkpoint("file:///home/chenfy/streaming/stateful/")
# RDD with initial state (key, value) pairs
initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)
lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
running_counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.updateStateByKey(updateFunc, initialRDD=initialStateRDD)
running_counts.pprint()
ssc.start()
ssc.awaitTermination()
注:按示例运行未报错,但没有得到相应结果。
6.13 输出操作
暂无法连接到MySQL,需要sudo权限安装PyMySQL,略。
7. Structured Streaming程序的基本步骤
7.1 概述
7.1.1 基本概念
- Structured Streaming的关键思想是将实时数据流视为一张正在不断添加数据的表
- 可以把流计算等同于在一个静态表上的批处理查询,Spark会在不断添加数据的无界输入表上运行计算,并进行增量查询
- 在无界表上对输入的查询将生成结果表,系统每隔一定的周期会 触发对无界表的计算并更新结果表
7.1.2 两种处理模型
1.微批处理:
- Structured Streaming默认使用微批处理执行模型,这意味着Spark流计算引擎会定期检查流数据源,并对自上一批次结束后到达的新数据执行批量查询
- 数据到达和得到处理并输出结果之间的延时超过100毫秒
2.持续处理:
- Spark从2.3.0版本开始引入了持续处理的试验性功能,可以实现流计算的毫秒级延迟
- 在持续处理模式下,Spark不再根据触发器来周期性启动任务,而是启动一系列的连续读取、处理和写入结果的长时间运行的任务
- Structured Streaming处理的数据跟Spark Streaming一样,也是源源不断的数据流,区别在于,Spark Streaming采用的数据抽象是DStream(本质上就是一系列RDD),而Structured Streaming采用的数据抽象是DataFrame。
- Structured Streaming可以使用Spark SQL的DataFrame/Dataset来处理数据流。虽然Spark SQL也是采用DataFrame作为数据抽象,但是,Spark SQL只能处理静态的数据,而Structured Streaming可以处理结构化的数据流。这样,Structured Streaming就将Spark SQL和Spark Streaming二者的特性结合了起来。
- Structured Streaming可以对DataFrame/Dataset应用前面章节提到的各种操作,包括select、where、groupBy、map、filter、 flatMap等。
- Spark Streaming只能实现秒级的实时响应,而Structured Streaming由于采用了全新的设计方式,采用微批处理模型时可以实现100毫秒级别的实时响应,采用持续处理模型时可以支持毫秒级的实时响应。
7.2 编写Structued Streaming程序的基本操作
编写Structured Streaming程序的基本步骤包括:
- 导入pyspark模块
- 创建SparkSession对象
- 创建输入数据源
- 定义流计算过程
- 启动流计算并输出结果
实施任务:一个包含很多行英文语句的数据流源源不断到达,Structured Streaming程序对每行英文语句进行拆分,并统计每个单词出现的频率
1.步骤1:导入pyspark模块
from pyspark.sql import SparkSession from pyspark.sql.functions import split from pyspark.sql.functions import explode
2.步骤2:创建SparkSession对象
if __name__ == "__main__": spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate() spark.sparkContext.setLogLevel('WARN')
3.步骤3:创建输入数据源 创建一个输入数据源,从“监听在本机(localhost)的9999端口上 的服务”那里接收文本数据,具体语句如下:
lines = spark \ .readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load()
4.步骤4:定义流计算过程 有了输入数据源以后,接着需要定义相关的查询语句,具体如下:
words = lines.select(explode(split(lines.value, " ")).alias("word")) wordCounts = words.groupBy("word").count()
5.步骤5:启动流计算并输出结果 定义完查询语句后,下面就可以开始真正执行流计算,具体语句如下:
query = wordCounts \ .writeStream \ .outputMode("complete") \ .format("console") \ .trigger(processingTime="8 seconds") \ .start() query.awaitTermination()
把以上几个步骤的代码写入文件StructuredNetworkWordCount.py,在执行此文件之前需要启动HDFS(我未有权限或暂不知道路径)
7.3 File源
- File源(或称为“文件源”)以文件流的形式读取某个目录中的文件,支持的文件格式为csv、json、orc、parquet、text等。
- 需要注意的是,文件放置到给定目录的操作应当是原子性的,即不能长时间在给定目录内打开文件写入内容,而是应当采取大部分操作系统都支持的、通过写入到临时文件后移动文件到给定目录的方式来完成。
7.4 Kafka源
Kafka源是流处理最理想的输入源,因为它可以保证实时和容错。
7.5 Socket源和Rate源
Socket源从一个本地或远程主机的某个端口服务上读取数据,数据 的编码为UTF8。因为Socket源使用内存保存读取到的所有数据, 并且远端服务不能保证数据在出错后可以使用检查点或者指定当前 已处理的偏移量来重放数据,所以,它无法提供端到端的容错保障。 Socket源一般仅用于测试或学习用途。
Rate源可每秒生成特定个数的数据行,每个数据行包括时间戳和值字 段。时间戳是消息发送的时间,值是从开始到当前消息发送的总个数, 从0开始。Rate源一般用来作为调试或性能基准测试。
7.6 输出操作
输出模式用于指定写入接收器的内容,主要有以下几种:
- Append模式:只有结果表中自上次触发间隔后增加的新行,才会被写入外部存储器。这种模式一般适用于“不希望更改结果表 中现有行的内容”的使用场景。
- Complete模式:已更新的完整的结果表可被写入外部存储器。
- Update模式:只有自上次触发间隔后结果表中发生更新的行,才会被写入外部存储器。这种模式与Complete模式相比,输出较少,如果结果表的部分行没有更新,则不会输出任何内容。当查询不包括聚合时,这个模式等同于Append模式。
8. Spark MLlib
- 传统的机器学习算法,由于技术和单机存储的限制,只能 在少量数据上使用,依赖于数据抽样
- 大数据技术的出现,可以支持在全量数据上进行机器学习
- 机器学习算法涉及大量迭代计算
- 基于磁盘的MapReduce不适合进行大量迭代计算
- 基于内存的Spark比较适合进行大量迭代计算
- 需要注意的是,MLlib中只包含能够在集群上运行良好的并行算法,这一点很重要
- 有些经典的机器学习算法没有包含在其中,就是因为它们不能并行执行
- 相反地,一些较新的研究得出的算法因为适用于集群,也被包含在MLlib中,例如分布式随机森林算法、最小交替二乘算法。这样的选择使得MLlib中的每一个算法都适用于大规模数据集
- 如果是小规模数据集上训练各机器学习模型,最好还是在各个节点上使用单节点的机器学习算法库(比如Weka)
8.1 Spark MLlib简介
- MLlib是Spark的机器学习(Machine Learning)库,旨在简化机器学习的工程实践工作
- MLlib由一些通用的学习算法和工具组成,包括分类、回归、聚类、协同过滤、降维等,同时还包括底层的优化原语和高层的流水线(Pipeline)API,具体如下:
- 算法工具:常用的学习算法,如分类、回归、聚类和协 同过滤;
- 特征化工具:特征提取、转化、降维和选择工具;
- 流水线(Pipeline):用于构建、评估和调整机器学习工作流的工具;
- 持久性:保存和加载算法、模型和管道;
- 实用工具:线性代数、统计、数据处理等工具。
Spark 机器学习库从1.2 版本以后被分为两个包:
- spark.mllib包含基于RDD的原始算法API。Spark MLlib 历史比较长,在1.0 以前的版本即已经包含了,提供的算 法实现都是基于原始的 RDD
- spark.ml则提供了基于DataFrames高层次的API, 可以用来构建机器学习工作流(PipeLine)。ML Pipeline弥补了原始 MLlib库的不足,向用户提供了一个基于DataFrame的机器学习工作流式API套件
MLlib目前支持4种常见的机器学习问题:分类、回归、聚类和协同过滤
离散数据 | 连续数据 | |
---|---|---|
监督学习 | Classification LogisticRegression(with Elastic) SVM DecisionTree RandomForest GBT NaiveBayes MultilayerPerceptron OneVsRest |
Regression LinearRegression(with Elastic) DecisionTree RandomForest GBT AFTSurvivalRegression IsotoniRegression |
无监督学习 | Clustering KMeans GaussianMixture LDA PowerIterationClustering BisectingKMeans |
Dimenssionality Reduction matrix factorization PCA SVD ALS WLS |
8.2 机器学习流水线
PipeLine:翻译为流水线或者管道。流水线将多个工作流阶段(转换器和估计器)连接在一起,形成机器学习的工作流,并获得结果输出。
要构建一个 Pipeline流水线,首先需要定义 Pipeline 中的各 个流水线阶段PipelineStage(包括转换器和评估器),比 如指标提取和转换模型训练等。有了这些处理特定问题的转 换器和评估器,就可以按照具体的处理逻辑有序地组织 PipelineStages 并创建一个Pipeline。
pipeline = Pipeline(stages=[stage1, stage2, stage3])
然后就可以把训练数据集作为输入参数,调用 Pipeline 实例 的 fit 方法来开始以流的方式来处理源训练数据。这个调用 会返回一个 PipelineModel 类实例,进而被用来预测测试数 据的标签。
注:完全跟scikit-learn一样。
如下构建一个典型的机器学习过程来具体介绍流水线的运用。任务为:查找所有包含“spark”的句子,即将包含“spark”的句子的标签设为1,没有“spark”的句子的标签设为0。
/home/chenfy/ml/logs_pipe.py文件内容如下:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("WordCount").getOrCreate()
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
# Prepare training data from a list of (id, text, label) tuples.
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])
# define some stages for the pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
# construt the pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
# fit model
model = pipeline.fit(training)
# Prepare test data
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop"),
], ["id", "text"])
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print("(%d, %s)-->prob=%s, prediction=%f." % (rid, text, str(prob), prediction))
运行:
[chenfy@entrobus32 ml]$ spark-submit logs_pipe.py
.
.
.
(4, spark i j k)-->prob=[0.1596407738787475,0.8403592261212525], prediction=1.000000.
(5, l m n)-->prob=[0.8378325685476744,0.16216743145232562], prediction=0.000000.
(6, spark hadoop spark)-->prob=[0.06926633132976037,0.9307336686702395], prediction=1.000000.
(7, apache hadoop)-->prob=[0.9821575333444218,0.01784246665557808], prediction=0.000000.
.
.
.
8.3 特征提取:TF-IDF
特征提取的作用和意义完全是机器学习内容,略,这里仅示例spark代码。
过程描述:
- 在下面的代码段中,我们以一组句子开始
- 首先使用分解器Tokenizer把句子划分为单个词语
- 对每一个句子(词袋),使用HashingTF将句子转换为 特征向量
- 最后使用IDF重新调整特征向量(这种转换通常可以提 高使用文本特征的性能
/home/chenfy/tf_idf.py内容如下:
# if not in pyspark commandline, make sure add below these two lines.
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("TF-IDF").getOrCreate()
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
sentenceData = spark.createDataFrame([
(0, "I heard about Spark and I love Spark"),
(0, "I wish java could use case classes"),
(0, "Logistic regression models are neat"),
]).toDF("label", "sentence")
tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show()
hashingTF = HashingTF(inputCol='words', outputCol='rawFeatures', numFeatures=2000)
featurizedData = hashingTF.transform(wordsData)
featurizedData.select("words", "rawFeatures").show(truncate=False)
idf = IDF(inputCol='rawFeatures', outputCol='features')
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)
rescaledData.select("features", "label").show(truncate=False)
运行输出:
[chenfy@entrobus32 ml]$ spark-submit tf_idf.py
.
.
.
+-----+--------------------+--------------------+
|label| sentence| words|
+-----+--------------------+--------------------+
| 0|I heard about Spa...|[i, heard, about,...|
| 0|I wish java could...|[i, wish, java, c...|
| 0|Logistic regressi...|[logistic, regres...|
+-----+--------------------+--------------------+
.
.
.
+---------------------------------------------+---------------------------------------------------------------------+
|words |rawFeatures |
+---------------------------------------------+---------------------------------------------------------------------+
|[i, heard, about, spark, and, i, love, spark]|(2000,[240,333,1105,1329,1357,1777],[1.0,1.0,2.0,2.0,1.0,1.0]) |
|[i, wish, java, could, use, case, classes] |(2000,[213,342,489,495,1329,1809,1967],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|[logistic, regression, models, are, neat] |(2000,[286,695,1138,1193,1604],[1.0,1.0,1.0,1.0,1.0]) |
+---------------------------------------------+---------------------------------------------------------------------+
.
.
.
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features |label|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|(2000,[240,333,1105,1329,1357,1777],[0.6931471805599453,0.6931471805599453,1.3862943611198906,0.5753641449035617,0.6931471805599453,0.6931471805599453]) |0 |
|(2000,[213,342,489,495,1329,1809,1967],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.28768207245178085,0.6931471805599453,0.6931471805599453])|0 |
|(2000,[286,695,1138,1193,1604],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453]) |0 |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
.
.
.
8.4 特征转换:标签和索引转化
一般,机器学习的输入,如有字符,需转化成数字或数字的组合,方能作为机器学习函数的输入。这就是标签转索引;标签转索引成功训练模型后,希望重新变为方便理解的字符,这样就是索引转标签。
Spark | scikit-learn |
---|---|
StringIndexer.transoform() | LabelEncoder().transoform() |
IndexToString.transoform() | LabelEncoder().inverse_transoform() |
OneHotEncoder.transoform() | OneHotEncoder().transoform() |
VectorIndexer.transoform() 可根据maxCategories参数判断类别变量,并仅将类别变量onehot |
8.5 逻辑斯蒂回归分类器
# if not in pyspark commandline, make sure add below these two lines.
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("LOGISTIC_IRIS").getOrCreate()
from pyspark.ml.linalg import Vector, Vectors
from pyspark.sql import Row, functions
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, \
VectorIndexer, HashingTF, Tokenizer
from pyspark.ml.classification import LogisticRegression, \
LogisticRegressionModel, BinaryLogisticRegressionSummary, LogisticRegression
def f(x):
rel = {}
rel['features'] = Vectors.\
dense(float(x[0]), float(x[1]), float(x[2]), float(x[3]))
rel['label'] = str(x[4])
return rel
data = spark.sparkContext.\
textFile("file:///home/chenfy/ml/iris.txt").\
map(lambda line: line.split(',')).\
map(lambda p: Row(**f(p))).toDF()
data.show()
labelIndexer = StringIndexer().\
setInputCol("label").\
setOutputCol("indexedLabel").\
fit(data)
featureIndexer = VectorIndexer().\
setInputCol("features").\
setOutputCol("indexedFeatures").\
fit(data)
lr = LogisticRegression().\
setLabelCol("indexedLabel").\
setFeaturesCol("indexedFeatures").\
setMaxIter(100).\
setRegParam(0.3).\
setElasticNetParam(0.8)
print("LogisticRegression parameters:\n" + lr.explainParams())
labelConverter = IndexToString().\
setInputCol("prediction").\
setOutputCol("predictedLabel").\
setLabels(labelIndexer.labels)
lrPipeline = Pipeline().\
setStages([labelIndexer, featureIndexer, lr, labelConverter])
trainingData, testData = data.randomSplit([0.7, 0.3])
lrPipelineModel = lrPipeline.fit(trainingData)
lrPredictions = lrPipelineModel.transform(testData)
preRel = lrPredictions.select(
"predictedLabel",
"label",
"features",
"probability"
).collect()
for item in preRel:
print(str(item["label"]) + ',' + \
str(item["features"]) + '-->prob=' + \
str(item["probability"]) + 'predictedLabel' + \
str(item["predictedLabel"])
)
evaluator = MulticlassClassificationEvaluator().\
setLabelCol("indexedLabel").\
setPredictionCol("prediction")
lrAccuray = evaluator.evaluate(lrPredictions)
print("Accuray: {}".format(lrAccuray))
lrModel = lrPipelineModel.stages[2]
print("Coefficients: \n" + str(lrModel.coefficientMatrix) + \
"\nIntercept: " + str(lrModel.interceptVector) + \
"\nnumClasses: " + str(lrModel.numClasses) + \
"\nnumFeatures: " + str(lrModel.numFeatures))
运行过程:
[chenfy@entrobus32 ml]$ spark-submit iris_logs.py
.
.
.
Accuray: 0.708498023715415
Coefficients:
3 X 4 CSRMatrix
(0,2) -0.2475
(0,3) -0.254
(1,3) 0.351
Intercept: [0.8275049304783856,-0.195156119496122,-0.6323488109822636]
numClasses: 3
numFeatures: 4
.
.
.
注:可能也应像scikit-learn等中一样设置seed等以达到重现效果,三次运行出现了0.46~0.94的精确度。
8.6 决策树分类器
仍然上节数据集
/home/chenfy/ml/iris_dt.py内容如下:
# if not in pyspark commandline, make sure add below these two lines.
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("DT_IRIS").getOrCreate()
from pyspark.ml.classification import DecisionTreeClassificationModel
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vector,Vectors
from pyspark.sql import Row
from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer
def f(x):
rel = {}
rel['features']=Vectors.\
dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
rel['label'] = str(x[4])
return rel
data = spark.sparkContext.\
textFile("file:///home/chenfy/ml/iris.txt").\
map(lambda line: line.split(',')).\
map(lambda p: Row(**f(p))).\
toDF()
labelIndexer = StringIndexer().\
setInputCol("label").\
setOutputCol("indexedLabel").\
fit(data)
featureIndexer = VectorIndexer().\
setInputCol("features").\
setOutputCol("indexedFeatures").\
setMaxCategories(4).\
fit(data) >>> labelConverter = IndexToString().\
setInputCol("prediction").\
setOutputCol("predictedLabel").\
setLabels(labelIndexer.labels)
trainingData, testData = data.randomSplit([0.7, 0.3])
dtClassifier = DecisionTreeClassifier().\
setLabelCol("indexedLabel").\
setFeaturesCol("indexedFeatures")
dtPipeline = Pipeline().\
setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
dtPipelineModel = dtPipeline.fit(trainingData)
dtPredictions = dtPipelineModel.transform(testData)
dtPredictions.select("predictedLabel", "label", "features").show(20)
evaluator = MulticlassClassificationEvaluator().\
setLabelCol("indexedLabel").\
setPredictionCol("prediction")
dtAccuracy = evaluator.evaluate(dtPredictions)
print("Accuracy: ".format(dtAccuracy))
treeModelClassifier = dtPipelineModel.stages[2]
print("Learned classification tree model:\n" + \
str(treeModelClassifier.toDebugString))
运行过程:
[chenfy@entrobus32 ml]$ spark-submit iris_dt.py
.
.
.
Accuracy: 0.9554738562091503
Learned classification tree model:
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_e0e08e2e4629) of depth 5 with 15 nodes
If (feature 2 <= 2.5999999999999996)
Predict: 0.0
Else (feature 2 > 2.5999999999999996)
If (feature 3 <= 1.7000000000000002)
If (feature 2 <= 5.15)
If (feature 2 <= 4.85)
Predict: 1.0
Else (feature 2 > 4.85)
If (feature 1 <= 2.25)
Predict: 2.0
Else (feature 1 > 2.25)
Predict: 1.0
Else (feature 2 > 5.15)
Predict: 2.0
Else (feature 3 > 1.7000000000000002)
If (feature 2 <= 4.85)
If (feature 0 <= 5.95)
Predict: 1.0
Else (feature 0 > 5.95)
Predict: 2.0
Else (feature 2 > 4.85)
Predict: 2.0
.
.
.