Pyspark

若工作场景涉及大数据,则需要使用pyspark。找了三天,暂时发现Tomasz Drabas, Denny Lee的《Learning PySpark》应该是一本较适合的书,感知其难度适中。发现此书不介绍环境搭建。Bill Chambers and Matei Zaharia的《Spark: The Definitive Guide》也是,难度适中,但不涉及环境搭建。不涉及环境搭建是不行的,因为可能版本演进太快,可用代码在不同环境无法运行。比如,anaconda里面conda install -c conda-forge pyspark搭建的环境无法运行《Spark: The Definitive Guide》里的代码,也许涉及本地安装等变量的设置问题而无法运行等,暂不清楚。还是应该找本包括环境搭建的书。琳达贵的《python+spark2.0+hadoop机器学习与大数据实战》包含环境搭建的过程,需要内存和固态空间,我电脑不一定跟得上,试着安装吧。2019-08-21装好虚拟机装好linux装好hadoop,目前很卡。

20190826: 最近在学习scrapy爬虫,今天看点这本书,明天看点那个博客,因此笔记就很杂乱。启示是:要先列一个提纲,这样即使看的书、知识比较杂也好,都可以把笔记写在对应位置。

网易云课堂有一免费课程《Spark编程基础》,我接下来的笔记大纲则遵从此课程。当然,有可能会找其他资料等补充此笔记大纲。

1. 大数据计数概括

1.1 大数据时代

1.2 大数据的概念

1.3 大数据的影响

1.4 大数据的关键技术

1.5 答案数据计算模式

1.6 代表大数据技术之Hadoop

1.7 代表大数据技术之Spark

1.8 代表大数据技术之Flink和Beam

2. Spark的设计与运行原理

2.1 Spark概述

2.2 Spark生态系统

2.3 基本概念和架构设计

2.4 Spark运行原理(RDD概念、操作和特性)

2.5 RDD运行原理(RDD之间的依赖关系)

2.6 RDD运行原理(阶段的划分和RDD运行过程)

2.7 Spark的部署和应用方式

3. Spark环境搭建和使用方法

3.1 安装Spark

3.1.1 单机安装Spark

参考:

  1. 在Windows中使用VirtualBox安装Ubuntu

  2. Hadoop安装教程_单机/伪分布式配置_Hadoop2.6.0/Ubuntu14.04

  3. Spark2.1.0入门:Spark的安装和使用

  4. Spark 2.0分布式集群环境搭建(Python版)

注:

3.2 克隆主机

参考:

  1. 从零开始的Hadoop大数据集群(伪)搭建,全免费VirtualBox虚拟机Ubuntu版,学习向,超详细---(一)
  2. 从零开始的Hadoop大数据集群(伪)搭建,全免费VirtualBox虚拟机Ubuntu版,学习向,超详细---(二)
  3. 从零开始的Hadoop大数据集群(伪)搭建,全免费VirtualBox虚拟机Ubuntu版,学习向,超详细---(三)

备注: * 关于克隆主机,出现在参考2,注意副本类型选择完全复制。经测试,复制出的副本,除计算机名字,完全就是被复制计算机的副本,具有被复制计算机全部功能。因此,为效率必须安装主从均相同的软件和环境并配置共性的文件,之后再进行虚拟机复制。

3.3 各主机组成集群

现在到了最易混淆的地方了,怎么将各台虚拟机组成集群。有虚拟机名(像我是Ubuntu、Ubuntu0、Ubuntu1、Ubuntu2)、主机名(也叫hostname,像我将Ubuntu0、1,2分别取名mater、slave1、slave2)。 参考:

  1. virtualBox里Ubuntu设置静态IP
  2. Hadoop 2.7分布式集群环境搭建
  3. vmware 虚拟机ping win10宿主机
  4. Ubuntu安装SSH服务器故障分析及解决办法
  5. Spark 2.0分布式集群环境搭建(Python版)

注:

还是没有解决,初始建立的ubuntu能通过ssh localhost连接到本地localhost,然后重新复制的也能,那么不求甚解地重新复制系统吧。另virtualBox搭建hadoop集群(1)虚拟机网络配置我猜测它的网络设置也许靠谱。百度时候应盯住“virtualbox hadoop”来搜索也许还有更合适的带图的步骤详细的示例(为什么不“virtualbox spark”呢,因为会过滤掉很多只装hadoop集群的例子,这些例子也很宝贵----20190916)。

3.2 在pyspark中运行代码

3.3 开发Spark独立应用程序

3.4 Spark集群环境搭建

3.5 在集群上运行Spark应用程序

4. RDD编程基础

4.1 RDD创建

从本地文件系统中加载数据创建RDD的示例:

如:我在公司32号机我用户根目录下放入word.txt文件,文件内容如下:

Hadoop is good
Spark is fast
Spark is better

运行如下命令:

>>> lines = sc.textFile("file:///home/chenfy/word.txt")
>>> lines.foreach(print)
[Stage 0:>                                                          (0 + 2) / 2]Spark is better
Hadoop is good
Spark is fast
>>>

将文件放入集群HDFS可采用命令如hdfs dfs -put /home/hdfs/files/text.txt /input,前面为服务器中本地文件路径,后面为HDFS文件路径。一般,我们很可能不知道集群HDFS路径,那么可采用su hdfs来切换到HDFS用户,其提示我输入密码,我无此权限,于是作罢。

从分布式文件系统HDFS中加载数据:

lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.text")  #主机名localhost 端口号9000,来源于配置。 
lines = sc.textFile("/user/hadoop/word.text")
lines = sc.textFile("word.txt")

注:三条语句完全等价,可以使用其中任意一种方式。这条或者能在公司集群端口运行,可惜暂不知道怎么将我创建的word.txt放到集群分布式文件系统。

注:可能关于集群各种路径是难点,本地,集群hdfs、集群中某台主机的本地、集群jupyter工作路径等等。

通过并行集合(组合)创建RDD

>>> array = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(array)
>>> rdd.foreach(print)
[Stage 0:>                                                        (0 + 40) / 40]1
2
4
5
3
>>>   

4.2 RDD操作(转换操作之filter、map、flatMap)

每次转换生成一个新的RDD,但RDD是不能更改的。 多次转换即一个有向无环图。转换操作是惰性的,只记录这个转换需要进行的操作,不真正计算。

操作 含义
filter(func) 筛选出满足函数func的元素,并返回一个新的数据集
map(func) 将每个元素传递到函数func中,并将结果返回为一个新的数据集
flatMap(func) 与map()相似,但每个输入元素都可以映射到0或多个输出结果
groupByKey() 应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集
reduceByKey(func) 应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果

filter(func):筛选出满足函数func的元素,并返回一个新的数据集。

>>> lines = sc.textFile("file:///home/chenfy/word.txt")
>>> linesWithSpark = lines.filter(lambda line: "Spark" in line)
>>> linesWithSpark.foreach(print)
[Stage 0:>                                                          (0 + 2) / 2]Spark is better
Spark is fast
>>>  

map(func):将每个元素传递到func函数中,并将结果返回为一个新的数据集:

>>> data = [1, 2, 3, 4,5]
>>> rdd1 = sc.parallelize(data)
>>> rdd2 = rdd1.map(lambda x:x+10)
>>> rdd2.foreach(print)
14
12
11
15
13
>>> 
>>> lines = sc.textFile("file:///home/chenfy/word.txt")
>>> words = lines.map(lambda line: line.split(" "))
>>> words.foreach(print)
['Spark', 'is', 'better']
['Hadoop', 'is', 'good']
['Spark', 'is', 'fast']
>>> 

flatMap(func)map(func)得到的元素可能为列表、集合等内部结构,flatMap(func)将之进一步拆散(“拍扁”)。

>>> lines = sc.textFile("file:///home/chenfy/word.txt")
>>> words = lines.flatMap(lambda line: line.split(" "))
>>> words.foreach(print)
Hadoop
is
good
Spark
is
fast
Spark
is
better
>>> 

4.3 RDD操作(转换操作之groupByKey、reduceByKey)

groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集。

>>> words = sc.parallelize([("Hadoop", 1), ("is", 1), ("good", 1), ("Spark", 1), ("is", 1), ("fast", 1), ("Spark", 1), ("is", 1), ("better", 1)])
>>> words1 = words.groupByKey()
>>> words1.foreach(print)
[Stage 4:============================================>            (31 + 9) / 40]('good', <pyspark.resultiterable.ResultIterable object at 0x7f697a08ff28>)
('better', <pyspark.resultiterable.ResultIterable object at 0x7f697a08ff28>)
('is', <pyspark.resultiterable.ResultIterable object at 0x7f697a08ff28>)
('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7f697a08ff28>)
('fast', <pyspark.resultiterable.ResultIterable object at 0x7f697a08ff28>)
('Spark', <pyspark.resultiterable.ResultIterable object at 0x7f697a08ff28>)
>>> 

reduceByKey(func):在groupByKey()基础上,对Iterable运用func函数,得到新的键值对(K,Vnew)形式。

4.4 RDD操作(行动操作)

Spark程序徐执行行动操作时,才会执行真正的计算,从文件中加载数据,完成一次次的转换操作,最终完成行动操作得到结果(注:若之前的转换操作有错误,直到代码运行到行动操作时才报错)。

表 常用的RDD行动操作API

操作 含义
count() 返回数据集中的元素个数
collect() 以数组的形式返回数据集中的所有元素
first() 返回数据集中的第一个元素
take(n) 以数组的心事返回数据集中的前n各元素
reduce(func) 通过func(输入两个参数并返回一个值)聚合数据集中的元素
foreach(func) 将数据集中的每个元素传递到函数func中运行
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.count()
5
>>> rdd.first()
1
>>> rdd.take(3)
[1, 2, 3]
>>> rdd.reduce(lambda a, b: a+b)
15
>>> rdd.collect()
[1, 2, 3, 4, 5]
>>> rdd.foreach(lambda elem: print(elem))
1
2
5
3
4
>>> 

4.5 持久化

.persist()可进行持久化,后面再用到就会比较快。因为在内存中运算最快,可在此函数中添加参数MEMORY_ONLY.persist(MEMORY_ONLY)等价于.cache()。持久化会占用内存资源,后面不用的话应去持久化,采用.unpersist()

注意:这里的持久化是标记为持久化,只有触发行动操作才真正触发持久化。

4.6 分区(分区的作用和原则、设置分区的方法)

RDD分区的作用:1.增加并行度;2.减少通信开销。

RDD分区原则:使得分区的个数尽量等于集群中CPU核心(core)数目。

设置分区的个数:

>>> list = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(list, 2)  #设置了两个分区

也可使用reparitition方法重新设置分区个数:

>>> data = sc.parallelize([1, 2, 3, 4, 5], 2)
>>> len(data.glom().collect())   #显示data这个RDD的分区数量
2
>>> rdd = data.repartition(1)    #对data这个RDD重新分区
>>> len(rdd.glom().collect())    #显示data这个RDD的分区数量
1
>>> 

4.7 分区(自定义分区方法)

动态调整分区大小很有必要,分区数量可在具体函数如sc.parallelize中设置。

Spark提供了自带的HashPartitioner(哈希分区)与RangePartitioner(区域分区)能满足大多数应用场景的需求。与此同时,可自定义函数规定哪些数据分在什么区。

实例:根据key值的最后一位数字,写到不同文件。例如:

10写入到part-00000
11写入到part-00001
.
.
.
19写入到part-00009

自定义函数如下函数(文件名为TestPartitioner.py):

from pyspark import SparkConf, SparkContext

def MyPartitioner(key):
    print("MyPatitioner is running")
    print('The key is %d' % key)
    return key%10

def main():
    print("The main function is running")
    conf = SparkConf().setMaster("local").setAppName("MyApp")
    sc = SparkContext(conf=conf)
    data = sc.parallelize(range(10), 5)
    # 转成键值对数据,方便使用partitionBy函数
    # 对data数据进行重新分区
    # 转回只含键值的数据
    data.map(lambda x: (x,1))\
        .partitionBy(10, MyPartitioner)\
        .map(lambda x: x[0])\
        .saveAsTextFile("file:///home/chenfy/patitioner")
        # 这里是目录地址,而不是文件地址

if __name__ == '__main__':   # 判断本文件是单独执行则执行以下程序,如果只调用本文件的一部分,则不执行以下程序。
    main()

运行TestPartitioner.py命令及输出:

[chenfy@entrobus32 ~]$ python3 TestPartitioner.py
The main function is running
19/09/16 13:41:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[Stage 0:>                                                          (0 + 1) / 5]MyPatitioner is running
The key is 0
MyPatitioner is running
The key is 1
MyPatitioner is running
The key is 2
MyPatitioner is running
The key is 3
[Stage 0:=======================>                                   (2 + 1) / 5]MyPatitioner is running
The key is 4
MyPatitioner is running
The key is 5
MyPatitioner is running
The key is 6
MyPatitioner is running
The key is 7
MyPatitioner is running
The key is 8
MyPatitioner is running
The key is 9
[chenfy@entrobus32 ~]$   

4.8 一个综合实例

假设有一个本地文件word.txt,里面包含多行文本,每行文本由多个单词构成,单词之间用空格分隔。可以使用如下语句进行词频统计(即统计每个单词出现的次数):

>>> lines = sc.textFile("file:///home/chenfy/word.txt")
>>> wordCount = lines.flatMap(lambda line: line.split(" ")).\
... map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
>>> print(wordCount.collect())
[('is', 3), ('good', 1), ('Spark', 2), ('Hadoop', 1), ('fast', 1), ('better', 1)]
>>> 

4.9 键值对RDD的创建

从文件加载

>>> lines = sc.textFile("file:///home/chenfy/word.txt")
>>> pairRDD = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1))
>>> pairRDD.foreach(print)
('Hadoop', 1)
('is', 1)
('good', 1)
('Spark', 1)
('is', 1)
('fast', 1)
('Spark', 1)
('is', 1)
('better', 1)
>>> 

通过并行集合(列表)创建RDD

>>> list = ["Hadoop", "Spark", "Hive", "Spark"]
>>> rdd = sc.parallelize(list)
>>> pairRDD = rdd.map(lambda word: (word, 1))
>>> pairRDD.foreach(print)
('Spark', 1)
('Hadoop', 1)
('Spark', 1)
('Hive', 1)
>>> 

4.10 常用的键值对RDD转换操作(reduceByKey和groupByKey)

reduceByKey(func):使用func函数合并具有相同键的值。

>>> pairRDD = sc.parallelize([("Hadoop", 1), ("Spark", 1), ("Hive", 1), ("Spark", 1)])
>>> pairRDD.reduceByKey(lambda a, b: a+b).foreach(print)
('Spark', 2)
('Hadoop', 1)
('Hive', 1)
>>> 

groupByKey():对具有相同键的值进行分组。

>>> list = [("spark", 1), ("spark", 2), ("hadoop", 3), ("hadoop", 5)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.groupByKey()
PythonRDD[23] at RDD at PythonRDD.scala:53
>>> pairRDD.groupByKey().foreach(print)
('spark', <pyspark.resultiterable.ResultIterable object at 0x7fef172ccf28>)
('hadoop', <pyspark.resultiterable.ResultIterable object at 0x7feeae3f2a90>)
>>> 

groupByKey()reduceByKey(func)的区别:

4.11 常用的PairRDD转换操作(keys、values、sortByKey、mapValues、join)

.keys():将Pair RDD中的key返回形成一个新的RDD。

.values():将Pair RDD中的value返回形成一个新的RDD。

.sortByKey():返回一个根据key排序的RDD(看示例,似乎默认参数False降序,但True似乎不能升序?)。

>>> list = [("Hadoop", 1), ("Spark", 1), ("Hive", 1), ("Spark", 1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD.keys().foreach(print)
Spark
Hadoop
Spark
Hive
>>> pairRDD.values().foreach(print)
1
1
1
1
>>> pairRDD.foreach(print)
('Spark', 1)
('Hive', 1)
('Spark', 1)
('Hadoop', 1)
>>> pairRDD.sortByKey().foreach(print)
('Hadoop', 1)
('Hive', 1)
('Spark', 1)
('Spark', 1)
>>> pairRDD.sortByKey(True).foreach(print)
('Hadoop', 1)
('Spark', 1)
('Spark', 1)
('Hive', 1)
>>> pairRDD.sortByKey(False).foreach(print)
('Hadoop', 1)
('Hive', 1)
('Spark', 1)
('Spark', 1)
>>> 

.sortByKey().sortBy()的区别:

.mapValues(func):对键值对RDD中的每个value都应用同一个函数,但是key不变化。

>>> list = [("Hadoop", 1), ("Spark", 1), ("Hive", 1), ("Spark", 1)]
>>> pairRDD = sc.parallelize(list)
>>> pairRDD1 = pairRDD.mapValues(lambda x: x+1)
>>> pairRDD1.foreach(print)
('Spark', 2)
('Hive', 2)
('Spark', 2)
('Hadoop', 2)
>>> 

.join:内连接。对于给定的两输入数据集(K,v1)(k,v2),只有在两个数据集中都存在的key才会被输出,最终得到一个(k,(v1,v22))类型的数据集。

>>> pairRDD1 = sc.parallelize([("spark", 1), ("spark", 2), ("hadoop", 3), ("hadoop", 5)])
>>> pairRDD2 = sc.parallelize([("spark", "fast")])
>>> pairRDD3 = pairRDD1.join(pairRDD2)
>>> pairRDD3.foreach(print)
[Stage 72:===========================>                           (40 + 40) / 80]('spark', (1, 'fast'))
('spark', (2, 'fast'))
>>>

4.12 一个综合实例

题目:给定一组键值对("spark", 2), ("hadoop",6), ("hadoop", 4), ("spark", 6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。

>>> rdd = sc.parallelize([("spark", 2), ("hadoop", 6), ("hadoop", 4), ("spark", 6)])
>>> rdd.mapValues(lambda x: (x, 1)).\
... reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])).\
... mapValues(lambda x: x[0]/x[1]).collect()
[('hadoop', 5.0), ('spark', 4.0)]
>>> 

4.13 文件数据读写

4.13.1 本地文件系统的读写

从本地文件中读取数据创建RDD

>>> textFile = sc.textFile("file:///home/chenfy/word.txt")
>>> textFile.first()
'Hadoop is good'
>>> 

把RDD写入到文本文件中

>>> textFile.saveAsTextFile("file:///home/chenfy/writeback")
>>> exit()
[chenfy@entrobus32 ~]$ cd /home/chenfy/writeback/
[chenfy@entrobus32 writeback]$ ls
part-00000  part-00001  _SUCCESS
[chenfy@entrobus32 writeback]$ 

注:可以看出存储的是一个文件路径而非单个文件。

再次把数据加载在RDD中

>>> textFile = sc.textFile("file:///home/chenfy/writeback")
>>> textFile.first()
'Spark is better'                                                               
>>> 

4.13.2 分布式文件系统HDFS的数据读写

暂不清楚如何进入HDFS,需要怎样的权限等。

分布式文件HDFS的数据读写,也是采用textFile()方法。该方法可传入一本地或HDFS的目录地址或文件地址。如果是文件地址,加载文件;如果是目录地址,加载该目录下所有问价的数据。

>>>textFile = sc.textFile("hdfs://localhost:9000/usr/hadoop/word.txt")
>>>textFile.first()

把RDD中的数据保存到HDFS文件中

>>>textFile = sc.textFile("word.txt")
>>>textFile.saveAsTextFile("writeback")

4.14 读写HBase数据(HBase介绍)

HBase数据库是四维数据库,由行键、列族、列限定符、版本时间戳定位具体数据(四维定位);而关系型数据库如MySQL仅由行键、列限定符即可确定数据(二维定位)。

4.15 读写HBase数据(创建一个HBase表)

HBase的安装(伪分布式模式)详见厦门大学数据库实验室HBase安装教程

启动Hadoop(为什么需要启动Hadoop,因为HBase依赖Hadoop的分布式环境。)

以下启动Hadoop的命令并不能成功,可能跟权限有关。但查到32号机确实有Hadoop,暂时先跳过此节。

[chenfy@entrobus32 ~]$ which hadoop
/usr/bin/hadoop
[chenfy@entrobus32 ~]$ cd /usr/bin/hadoop
-bash: cd: /usr/bin/hadoop: 不是目录
[chenfy@entrobus32 ~]$ ls -l|grep 'hadoop'
[chenfy@entrobus32 ~]$ ls -l|grep 'hadoop'
[chenfy@entrobus32 ~]$ cd /usr/bin/hadoop
-bash: cd: /usr/bin/hadoop: 不是目录
[chenfy@entrobus32 ~]$ ^C
[chenfy@entrobus32 ~]$ start-dfs.sh start-mapreted.sh
-bash: start-dfs.sh: 未找到命令
[chenfy@entrobus32 ~]$ start-all.sh
-bash: start-all.sh: 未找到命令
[chenfy@entrobus32 ~]$ hadoop-daemon.sh start namenode
-bash: hadoop-daemon.sh: 未找到命令
[chenfy@entrobus32 ~]$ 
$ cd /usr/local/hadoop
$ ./sbin/start-all.sh

启动HBase

$ cd /usr/local/hbase
$ ./bin/start-hbase.sh  //启动HBase
$ ./bin/hbase shell  //启动hbase shell
hbase> disable 'student'  # 使student表失效
hbase> drop 'student'   # 删除student表

创建列族信息

hbase> create 'student', 'info'

录入student表第一个学生记录

hbase> put 'student', '1', 'info:name', 'Xueqian'
hbase> put 'student', '1', 'info:gender', 'F'
hbase> put 'student', '1', 'info:age', '23'

录入student表第二个学生记录

hbase> put 'student', '2', 'info:name', 'Weiliang'
hbase> put 'student', '2', 'info:gender', 'M'
hbase> put 'student', '2', 'info:age', '24'

4.16 读写HBase数据(配置Spark)

应导入jar包,包括所有以‘hbase’开头的jar包、guava-12.0.1.jar、htrace-core-3.1.0-incubating.jar和protobuf-java-2.5.0.jar到spark安装目录下。

$ cd /usr/local/spark/jars
$ mkdir hbase
$ cd hbase
$ cp /usr/local/hbase/lib/hbase*.jar ./
$ cp /usr/local/hbase/lib/guava-12.0.1.jar ./
$ cp /usr/local/hbase/lib/htrace-core-3.1.0-incubating.jar ./
$ cp /usr/local/hbase/lib/protobuf-java-2.5.0.jar ./

此外,在Spark 2.0以上版本中,缺少把HBase数据转换成Python可读取数据的jar包,需要另行下载。可以访问下面地址下载spark-examples_2.11-1.6.0-typesafe-001.jar。下载以后保存到“/usr/local/spark/jars/hbase”目录中。

使用vim编辑器打开spark-env.sh文件,设置Spark的spark-env.sh文件,告诉Spark可以在哪个路径下找到HBase相关的jar文件,命令如下:

$ cd /usr/local/spark/conf
$ vim spark-env.sh

打开spark-env.sh文件以后,可以在文件最前面增加下面一行内容:

export
SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoopclasspath):$(/usr/local/hbase/bin/hbaseclasspath):/usr/local/spark/jars/hbase/*

这样,后面编译和运行过程才不会出错。

用SparkContext提供的newAPIHadoopRDD API将表的内容以RDD的形式加载到Spark中。

SparkOperateHBase.py:

#!/usr/bin/env python3
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc = SparkContext(conf=conf)
host = 'localhost'
table = 'student'
conf = {"hbase.zookeeper.quorum":host,"hbase.mapreduce.inputtable":table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat", "org.apche.hadoop.hbase.io.ImmutableBytesWritable", "org.apache.hadoop.hbase.client.Result", keyConverter=keyConv, valueConverter=valueConver, conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k,v) in output:
    print(k, v)

执行结果:

1 {"qualifier":"age", "timestamp":"1545728145163", "columnFamily":"info", "row":"1", "type":"Put", "value":"23"}
{"qualifier":"gender", "timestamp":"1545728114020", "columnFamily":"info", "row":"1", "type":"Put", "value":"F"}
{"qualifier":"name", "timestamp":"1545728100663", "columnFamily":"info", "row":"1", "type":"Put", "value":"Xueqian"}
2 {"qualifier":"age", "timestamp":"1545728168727", "columnFamily":"info", "row":"2", "type":"Put", "value":"24"}
...

执行该代码文件,命令如下:

$ cd /usr/local/spark/mycode/rdd
$ /usr/local/spaark/bin/spark-submit SparkOperateHBase.py

4.17 读写HBase数据(编写程序读取HBase数据)

4.18 读写HBase数据(编写程序向HBase写入数据)

公司32号机可通过hbase shell启动hbase。

[chenfy@entrobus32 ~]$ hbase shell
Java HotSpot(TM) 64-Bit Server VM warning: Using incremental CMS is deprecated and will likely be removed in a future release
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hbase/lib/phoenix-4.14.0-cdh5.13.2-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hbase/lib/phoenix-4.14.0-cdh5.13.2-hive.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hbase/lib/phoenix-4.14.0-cdh5.13.2-pig.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/lib/hbase/lib/phoenix-4.14.0-cdh5.13.2-thin-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.13.3-1.cdh5.13.3.p0.2/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
HBase Shell; enter 'help<RETURN>' for list of supported commands.
Type "exit<RETURN>" to leave the HBase Shell
Version 1.2.0-cdh5.13.3, rUnknown, Sat Mar 17 04:43:46 PDT 2018

hbase(main):001:0> 

list命令可显示当前HBase数据库中有哪些已经创建好的表。

hbase(main):002:0> list
TABLE                                                                                        
0 row(s) in 0.0690 seconds

=> []
hbase(main):003:0> 

如果想创建一个student表需保证student表不存在。否则需用下面语句删除student表:

hbase> disable 'student'
hbase> drop 'student'

自然,现在情况没有student表,那么将显示:

hbase(main):003:0> disable 'student'

ERROR: Table student does not exist.

Start disable of named table:
  hbase> disable 't1'
  hbase> disable 'ns1:t1'


hbase(main):004:0> 

下面来创建一个student表,并录入如下数据:

id name gender age
1 Xueqian F 23
2 Weiliang M 24

结论:32号机无创建表的权限。

4.19 案例1:求TOP值

任务描述,file1.txt及file2.txt均含有4列数据,名称分别为orderid、userid、payment、productid。求Top N个payment值。 file1.txt:

1,1768,50,155
2,1218,600,211
3,2239,788,242
4,3101,28,599
5,4899,290,129
6,3110,54,1201
7,4436,259,877
8,2369,7890,27

file2.txt:

100,4287,226,233
101,  6562,489,124
102,1124  ,33,17
103,3267,159,179
104,4569,57,125
105,1438,37,116

代码文件TopN.py,其内部内容为:

#!usr/bin/env python3

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("ReadHBase")
sc=SparkContext(conf=conf)
# 读取file文件夹,将读取其内file1.txt及file2.txt
lines = sc.textFile("file:///home/chenfy/file")
# 仅保留非空行,仅保留含4个数字的行
result1 = lines.filter(lambda line: (len(line.strip())>0) and (len(line.split(","))==4))
# 保留第三列
result2 = result1.map(lambda x: x.split(",")[2])
# 第三列转化为数字和空字符的键值对
result3 = result2.map(lambda x: (int(x), ""))
# 分区调整为1
result4 = result3.repartition(1)
# 按键降序排列
result5 = result4.sortByKey(False)
# 丢弃值(空字符)
result6 = result5.map(lambda x: x[0])
# 取前5
result7 = result6.take(5)
# aaa
for a in result7:
    print(a)

运行过程:

[chenfy@entrobus32 ~]$ python3 TopN.py
19/09/16 16:54:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
7890                                                                            
788
600
489
290
[chenfy@entrobus32 ~]$ 

正如手工排序将得到的结果。

4.20 案例2:文件排序

任务描述: 有多个输入文件(file1.txt、file2.txt、file3.txt),每个文件中的每一行内容均为一个整数。要求读取所有文件中的整数,进行排序后输出到一个新的文件中,输出的内容个数为每行两个整数,第一个整数位第二个整数的排序位次,第二个整数为原待排序的整数。 file1.txt:

33
37
12
40

file2.txt:

4
16
39
5

file3.txt:

1
45
25

任务内容很好理解,输出文件不示例。 代码文件FileSort.py:

#!/usr/bin/env python3

from pyspark import SparkConf, SparkContext

index = 0
def getindex():
    global index
    index += 1
    return index

def main():
    conf = SparkConf().setMaster("local[1]").setAppName("FileSort")
    sc = SparkContext(conf=conf)
    # 读取/home/chenfy/filesort/路径下所有file开头的txt文件
    lines = sc.textFile("file:///home/chenfy/filesort/file*.txt")
    index = 0
    # 仅保留含有非空字符的行
    result1 = lines.filter(lambda line: (len(line.strip())>0))
    # 将字符形式的数字转化成数字、空字符组成的键值对
    result2 = result1.map(lambda x: (int(x.strip()), ""))
    # 改成一个分区
    result3 = result2.repartition(1)
    # 将键值对按键升序排列
    result4 = result3.sortByKey(True)
    # 丢弃值(空字符)
    result5 = result4.map(lambda x: x[0])
    # 在数字前面加索引,索引按getindex所示从0开始顺次递增
    result6 = result5.map(lambda x: (getindex(), x))
    # 打印排序并加索引的结果
    result6.foreach(print)
    # 结果存入/home/chenfy/sortresult路径
    result6.saveAsTextFile("file:///home/chenfy/filesort/sortresult")

if __name__ == '__main__':
    main()

运行结果:

[chenfy@entrobus32 filesort]$ python3 FileSort.py
19/09/16 17:45:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[Stage 0:>                                                          (0 + 2) / 3](1, 1)
(2, 4)
(3, 5)
(4, 12)
(5, 16)
(6, 16)
(7, 16)
(8, 16)
(9, 16)
(10, 25)
(11, 33)
(12, 37)
(13, 39)
(14, 39)
(15, 39)
(16, 39)
(17, 39)
(18, 40)
(19, 45)
(20, 54)
(21, 54)
(22, 54)
(23, 54)
[chenfy@entrobus32 filesort]$

显然,结果符合预期。

4.21 案例3:二次排序

任务要求: 对于一个给定文件(file1.txt),请对数据进行排序,首先根据第一列数据降序排序,如第一列数据相等,则根据第二列数据降序排序。 file1.txt:

5 3
1 6
4 9
8 3
4 7
5 6
3 2

输出结果:

8 3
5 6
5 3
4 9
4 7
3 2
1 6

SecondarySortKey.py:

from operator import gt
from pyspark import SparkContext, SparkConf

class SecondarySortKey():
    def __init__(self, k):
        self.column1 = k[0]
        self.column2 = k[1]

    def __gt__(self, other):
        if other.column1 == self.column1:
            # 当第一列值相等时,比较第二列
            return gt(self.column2, other.column2)
        else:
            # 否则只比较第一列
            return gt(self.column1, other.column1)

def main():
    conf = SparkConf().setAppName('spark_sort').setMaster('local[1]')
    sc = SparkContext(conf=conf)
    file = "file:///home/chenfy/secondarysort/file1.txt"
    rdd1 = sc.textFile(file)
    # 过滤空行
    rdd2 = rdd1.filter(lambda x: (len(x.strip())>0))
    # 在每行前面加这两个数字本身作为键
    rdd3 = rdd2.map(lambda x: ((int(x.split(" ")[0]), int(x.split(" ")[1])), x))
    # (num1, num2)作为键,但spark不会知道这样的键该怎么排序,SecondarySortKey函数告诉spark:键的第一值相同就看第二个值,否则只看第一个值。
    # map函数要求保留字符形式的数字本身。
    rdd4 = rdd3.map(lambda x: (SecondarySortKey(x[0]), x[1]))
    # 根据上步获得信息排序
    rdd5 = rdd4.sortByKey(False)
    # 丢弃键保留值(字符形式的数字本身)
    rdd6 = rdd5.map(lambda x: x[1])
    # 打印结果,方便debug
    rdd6.foreach(print)

if __name__ == '__main__':
    main()

运行结果:

[chenfy@entrobus32 secondarysort]$ python3 SecondarySortApp.py
19/09/16 20:33:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[Stage 0:>                                                          (0 + 1) / 1]8 3
5 6
5 3
4 9
4 7
3 2
1 6
[chenfy@entrobus32 secondarysort]$ 

显然,与要求结果同。

5. Spark SQL

5.1 Spark SQL简介

为什么推出Spark SQL:

Spark SQL之前系统的不足:

Spark SQL填补了这个鸿沟:

5.2 DataFrame概述

5.3 DataFrame的创建

可以通过如下语句创建一个SparkSession对象:

from pyspark import SparkConteext, SparkConf
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(conf=conf)

实际上,在启动 进入pyspark以后,pyspark就默认提供了一个SparkContext对象(名称为sc)和一个SparkSession对象(名称为spark)。

在创建DataFrame时,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame,例如:

或者也可以对应使用如下格式的语句:

例如,读取本地/home/chenfy/people.json文件:

>>> df = spark.read.json("file:///home/chenfy/resources/people.json")
>>> df.show()                                                                   
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

>>> 

5.4 DataFrame的保存

可使用spark.write操作,把一个DataFrame操作保存成不同格式的文件:

也可对应使用如下格式的语句:

下面从示例文件people.json中创建一个DataFrame,将其保存到令一json文件。然后仅选取name列,把该列数据保存到文本文件:

>>> peopleDF = spark.read.format("json").\
... load("file:///home/chenfy/resources/people.json")
>>> peopleDF.select("name", "age").write.format("json").\
... save("file:///home/chenfy/people_new_json")
>>> peopleDF.select("name").write.format("text").\
... save("file:///home/chenfy/people_new_text")
>>> exit()
[chenfy@entrobus32 resources]$ cd ~
[chenfy@entrobus32 ~]$ cd people_new_text
[chenfy@entrobus32 people_new_text]$ ls
part-00000-2d3a6add-79ee-4156-8692-b85f0e259950-c000.txt  _SUCCESS
[chenfy@entrobus32 people_new_text]$ cd ~
[chenfy@entrobus32 ~]$ cd people_new_json
[chenfy@entrobus32 people_new_json]$ ls
part-00000-f5299bd1-fa0b-4c24-9d91-da8619cb6717-c000.json  _SUCCESS

注:从紧上示例可以看出,保存的将是路径。保存路径的最后一级是保存的文件夹名(即使最后一级是*.json等),将自动创建此文件夹并在此文件夹中生产语句定义了的文件名、文件格式。

5.5 DataFrame的常用操作

>>> df = spark.read.json("file:///home/chenfy/resources/people.json")
>>> df.printSchema()                                                            
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

>>> df.select(df["name"], df["age"]+1).show()
+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+

>>> df.filter(df["age"]>20).show()
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+

>>> df.groupBy("age").count().show()
+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+

>>> df.sort(df["age"].desc()).show()
+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
|null|Michael|
+----+-------+

>>> df.sort(df["age"].desc(), df["name"].asc()).show()
+----+-------+
| age|   name|
+----+-------+
|  30|   Andy|
|  19| Justin|
|null|Michael|
+----+-------+

>>>

5.6 利用反射机制推断RDD模式

注:5.6、5.7我看标题有点懵。DataFrame相对于RDD多了schema,怎么传递schema信息给DataFrame呢,林老师给了5.6、5.7两种方法。

将people1.txt加载到内存中生成一个DataFrame,并查询其中的数据。 people.txt:

Michael, 29
Andy, 30
Justin, 19
>>> from pyspark.sql import Row
>>> people = spark.sparkContext.\
... textFile("file:///home/chenfy/resources/people1.txt").\
... map(lambda line: line.split(",")).\
... map(lambda p: Row(name=p[0], age=int(p[1])))
>>> schemaPeople = spark.createDataFrame(people)
# 必须注册为临时表才能供下面的查询使用
# 之后sql语句的表的名字正是来自这里
>>> schemaPeople.createOrReplaceTempView("people")                              
>>> personsDF = spark.sql("select name, age from people where age > 20")
19/09/17 10:43:20 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
19/09/17 10:43:20 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
19/09/17 10:43:22 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
# DataFrame中的每一个元素都是一行记录,包含name和age两个字段,分别用p.name和p.age来获取值。
>>> personsRDD = personsDF.rdd.map(lambda p: "Name: " + p.name + ", Age: " + str(p.age))
>>> personsRDD.foreach(print)
Name: Michael, Age: 29
Name: Andy, Age: 30
>>> 

5.7 使用编程方式定义RDD模式

>>> from pyspark.types import *
>>> from pyspark.sql import Row
# 生成表头
>>> schemaString = "name age"
>>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(" ")]
>>> schema = StructType(fields)
# 生成表中的记录
>>> lines = spark.sparkContext.\
... textFile("file:///home/chenfy/resources/people1.txt")
>>> parts = lines.map(lambda x: x.split(","))
>>> people = parts.map(lambda p: Row(p[0], p[1].strip()))
# 将表头和表中的记录拼装在一起
>>> schemaPeople = spark.createDataFrame(people, schema)
# 注册为临时表供sql查询使用
>>> schemaPeople.createOrReplaceTempView("people")
>>> results = spark.sql("SELECT name, age FROM people")
>>> results.show()
+-------+---+
|   name|age|
+-------+---+
|Michael| 29|
|   Andy| 30|
| Justin| 19|
+-------+---+

>>> 

5.8 MySQL数据库准备工作

我并不知道能登录的账户。之后从同事处要到34号机上的MySQL账号:

[chenfy@entrobus32 ~]$ mysql -h10.18.0.34 -ucfy -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 1003
Server version: 5.7.27 MySQL Community Server (GPL)

Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> create database spark;
Query OK, 1 row affected (0.00 sec)

mysql> use spark;
Database changed
mysql> create table student (id int(4), name char(20), gender char(4), age int(4));
Query OK, 0 rows affected (0.35 sec)

mysql> insert into student values(1, 'Xueqian', 'F', 23);
Query OK, 1 row affected (0.08 sec)

mysql> insert into student values(2, 'Weiliang', 'M', 24);
Query OK, 1 row affected (0.04 sec)

mysql> select * from student;
+------+----------+--------+------+
| id   | name     | gender | age  |
+------+----------+--------+------+
|    1 | Xueqian  | F      |   23 |
|    2 | Weiliang | M      |   24 |
+------+----------+--------+------+
2 rows in set (0.00 sec)

mysql>exit;
jdbcDF = spark.read\
.format("jdbc")\
.option("driver","com.mysql.jdbc.Driver")\
.option("url","jdbc:mysql://10.18.0.34:3306/spark")\
.option("dbtable","student")\
.option("user","cfy")\
.option("password", "123456")\
.load()

不能成功,换成下面也暂时不能成功,报错未有很明确信息。

password = 123456
from pyspark import SparkContext
from pyspark.sql import SQLContext,Row
sqlContext=SQLContext(sc)
jdbcDF = sqlContext.read.format('jdbc').options(url="jdbc:mysql://10.18.0.34/spark?user=cfy&password=password",dbtable="student").load()
# jdbcDF = sqlContext.read.jdbc(url="jdbc:mysql://10.18.0.34?user=cfy&password=password",table="spark.student")

本章后面内容只能先暂时放一放。

5.9 使用Spark SQL读写数据库

6. Spark Streaming

静态数据和流数据:

6.1 流计算概述

对于一个流计算系统来说,它应该达到如下需求: * 高性能:处理大数据的基本要求,如每秒处理几十万条数据 * 海量式:支持TB级甚至时PB级的数据规模 * 实时性:保证较低的延迟时间,达到秒级别,甚至是毫秒级别 * 分布式:支持大数据的基本架构,必须能够平滑扩展 * 易用性:能快速进行开发和部署 * 可靠性:能可靠地处理流数据

流计算框架:商业级:IBM InfoSphere Streams和IBM StreamBase;开源:Twitter Storm和Yahoo! S4(Simple Scalaable Streaming System);公司自身的:Facebook Puma、Dstream和银河流数据处理平台。

流数据处理流程:数据实时采集、数据实时计算、实时查询服务

6.2 Spark Streaming

Spark Streaming可整合多种输入数据源,如Kafka、 Flume、HDFS,甚至是普通的TCP套接字。经处理后的 数据可存储至文件系统、数据库,或显示在仪表盘里。

Spark Streaming最主要的抽象是DStream(Discretized Stream,离散化数据 流),表示连续不断的数据流。在内部实现上,Spark Streaming的输入数据按 照时间片(如1秒)分成一段一段,每一段数据转换为Spark中的RDD,这些分 段就是Dstream,并且对DStream的操作都最终转变为对相应的RDD的操作。

6.3 DStream操作概述

编写Spark Streaming程序的基本步骤是:

6.4 文件流

终端窗口,启动pyspark,输入以下命令,程序将开始自动进入循环监听状态,屏幕上会显示一堆的信息:

>>> from pyspark import SparkContext
>>> from pyspark.streaming import StreamingContext
>>> ssc = StreamingContext(sc,10)
>>> lines = ssc.textFileStream("file:///home/chenfy/streaming/logfile")
>>> words = lines.flatMap(lambda line: linesplit(' '))
>>> wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a+b)
>>> wordCounts.pprint()
>>> ssc.start()
-------------------------------------------                                     it
Time: 2019-09-17 20:03:40
-------------------------------------------

在/home/chenfy/streaming/logfile目录下新家一个log.txt文件,就可以在监听窗口显示词频统计结果。(注:以上代码暂不明白什么意思。)

也可采用独立应用程序方式船舰文件流:

#!usr/bin/env python3

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext

conf = SparkConf()
conf.setAppName('TestDStream')
conf.setMaster('local[2]')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
lines = ssc.textFileStream('file:///home/chenfy/streaming/logfile')
words = lines.flatMap(lambda line: line.split(' '))
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda a, b: a+b)
wordCounts.pprint()
ssc.start()
ssc.awaitTermination()
cd /home/chenfy/streaming/logfile/
spark-submit FileStreaming.py

也能运行,输出持续不断,但不知道什么意思。

6.5 套接字流(使用NC程序产生数据)

使用 套接字流作为数据: 创建并在/home/chenfy/streaming/socket/NetworkWordCount.py文件输入如下内容:

#!/usr/bin/env python3

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ = "__main__":
    if len(sys.argv) != 3:
        print("Usage: NetWorkWordCount.py <hostname> <port>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 1)
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(' '))\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda a, b: a+b)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

不知怎么,没有像林老师说的一样得到词频统计,也没报错。

6.6 套接字流(使用Socket编程实现自定义数据流)

下面我们再前进一步,把数据源头的产生方式修改一下,不要使用nc程序,而是采用以下自己编写的程序产生Socket数据源:

#!usr/bin/env python3
import socket
# 生成socket对象
server = socket.socket()
# 绑定ip和端口,端口可随意指定,
# 但为了NetworkWordCount.py能监听到,其执行语句应写同一端口
server.bind(('localhost', 8888))
# 监听绑定的窗口
server.listen(1)
while 1:
    # 为了方便识别,打印一个“我在等待”
    print("I'm waiting the connect...")
    # 这里用两个值接收,因为连接上之后使用的是客户端发来请求的这个实例。
    # 所以下面的传输要使用conn实例操作
    conn, addr = server.accept()
    # 打印连接成功
    print("Connect success! Connection is from %s." % addr[0])
    # 打印正在发送数据
    print('Sending data...')
    conn.send('I love hadoop, I love spark, hadoop is good and spark is fast.'.encode())
    conn.close()
    print('Connection is broken.')

启动Socket服务端,即DataSourceSocket.py程序:

spark-submit DataSourceSocket.py

启动客户端,即NetworkWordCount.py程序:

spark-submit NetworkWordCount.py localhost 8888

将在客户端输出DataSourceSocket.py程序中conn.send语句发送内容的词频统计。

6.7 RDD队列流

import time
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    sc = SparkContext(appName="PythonStreamingQueueStream")
    ssc = StreamingContext(sc,2)
    # 创建一个队列,通过该队列可以把RDD推给一个RDD队列
    rddQueue = []
    for i in range(5):
        rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]
        time.sleep(1)

    # 创建一个RDD队列六
    inputStream = ssc.queueStream(rddQueue)
    mappedStream = inputStream.map(lambda x: (x % 10, 1))
    reducedStream = mappedStream.reduceByKey(lambda a, b: a+b)
    reducedStream.pprint()
    ssc.start()
    ssc.stop(stopSparkContext=True, stopGraceFully=True)

成功运行,不知什么意思。

6.8 使用Apache Kafka作为Spark Streaming数据源(准备工作)

Kafka简介:

Kafka安装:(暂略)

Kafka启动:未检测到32号机装了Kafka,暂略。

6.9 使用Apache Kafka作为Spark Streaming数据源(编写流计算程序)

6.10 DStream无状态转换操作

DStream转换操作:

无状态转换操作实例:之前“套接字流”部分介绍的词频统计,就是采用无状态转换,每次统计,都是只统计当前批次到达的单词的词频,和之前批次无关,不会进行累计。

无状态转换操作包括:

6.11 DStream有状态转换操作(滑动窗口转换操作)

有状态转换操作包括:

滑动窗口转换操作包括:

/home/chenfy/streaming/socket/WindowedNetworkWordCount.py内容如下:

#!/usr/bin/env python3
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: WindowedNetworkWordCount.py <hostname> <port>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingWindowedWordCount")
    ssc = StreamingContext(sc,10)
    ssc.checkpoint("file:///home/chenfy/streaming/socket/checkpoint")
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" "))\
    .map(lambda word: (word, 1))\
    .reduceByKeyAndWindow(lambda x, y: x+y, lambda x, y: x-y, 30, 10)
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()

注:无报错,但不知为何并不会输出词频统计。

6.12 DStream有状态转换操作(updateStateByKey操作)

需要在跨批次之间维护状态时,就必须使用updateStateByKey操作

词频统计实例:

对于有状态转换操作而言,本批次的词频统计,会在之前批次的词频统计结果的基础上进行不断累加,所以,最终统计得到的词频,是所有批次的单词的总的词频统计结果

#!/usr/bin/env python3
from __future__ import print_function
import sys
from pysaprk import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: NetworkWordCountStateful.py <hostname> <port>", file=sys.stderr)
        exit(-1)

    sc = SparkContext(appName="PythonStreamingStatefulNetworkWordCount")
    ssc = StreamingContext(sc,1)
    ssc.checkpoint("file:///home/chenfy/streaming/stateful/")
    # RDD with initial state (key, value) pairs
    initialStateRDD = sc.parallelize([(u'hello', 1), (u'world', 1)])

    def updateFunc(new_values, last_sum):
        return sum(new_values) + (last_sum or 0)
    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    running_counts = lines.flatMap(lambda line: line.split(" "))\
        .map(lambda word: (word, 1))\
        .updateStateByKey(updateFunc, initialRDD=initialStateRDD)
    running_counts.pprint()
    ssc.start()
    ssc.awaitTermination()

注:按示例运行未报错,但没有得到相应结果。

6.13 输出操作

暂无法连接到MySQL,需要sudo权限安装PyMySQL,略。

7. Structured Streaming程序的基本步骤

7.1 概述

7.1.1 基本概念

7.1.2 两种处理模型

1.微批处理:

2.持续处理:

7.2 编写Structued Streaming程序的基本操作

编写Structured Streaming程序的基本步骤包括:

实施任务:一个包含很多行英文语句的数据流源源不断到达,Structured Streaming程序对每行英文语句进行拆分,并统计每个单词出现的频率

把以上几个步骤的代码写入文件StructuredNetworkWordCount.py,在执行此文件之前需要启动HDFS(我未有权限或暂不知道路径)

7.3 File源

7.4 Kafka源

Kafka源是流处理最理想的输入源,因为它可以保证实时和容错。

7.5 Socket源和Rate源

Socket源从一个本地或远程主机的某个端口服务上读取数据,数据 的编码为UTF8。因为Socket源使用内存保存读取到的所有数据, 并且远端服务不能保证数据在出错后可以使用检查点或者指定当前 已处理的偏移量来重放数据,所以,它无法提供端到端的容错保障。 Socket源一般仅用于测试或学习用途。

Rate源可每秒生成特定个数的数据行,每个数据行包括时间戳和值字 段。时间戳是消息发送的时间,值是从开始到当前消息发送的总个数, 从0开始。Rate源一般用来作为调试或性能基准测试。

7.6 输出操作

输出模式用于指定写入接收器的内容,主要有以下几种:

8. Spark MLlib

8.1 Spark MLlib简介

Spark 机器学习库从1.2 版本以后被分为两个包:

MLlib目前支持4种常见的机器学习问题:分类、回归、聚类和协同过滤

离散数据 连续数据
监督学习 Classification
LogisticRegression(with Elastic)
SVM
DecisionTree
RandomForest
GBT
NaiveBayes
MultilayerPerceptron
OneVsRest
Regression
LinearRegression(with Elastic)

DecisionTree
RandomForest
GBT
AFTSurvivalRegression
IsotoniRegression

无监督学习 Clustering
KMeans
GaussianMixture
LDA
PowerIterationClustering
BisectingKMeans
Dimenssionality Reduction
matrix factorization
PCA
SVD
ALS
WLS

8.2 机器学习流水线

PipeLine:翻译为流水线或者管道。流水线将多个工作流阶段(转换器和估计器)连接在一起,形成机器学习的工作流,并获得结果输出。

要构建一个 Pipeline流水线,首先需要定义 Pipeline 中的各 个流水线阶段PipelineStage(包括转换器和评估器),比 如指标提取和转换模型训练等。有了这些处理特定问题的转 换器和评估器,就可以按照具体的处理逻辑有序地组织 PipelineStages 并创建一个Pipeline。

pipeline = Pipeline(stages=[stage1, stage2, stage3])

然后就可以把训练数据集作为输入参数,调用 Pipeline 实例 的 fit 方法来开始以流的方式来处理源训练数据。这个调用 会返回一个 PipelineModel 类实例,进而被用来预测测试数 据的标签。

注:完全跟scikit-learn一样。

如下构建一个典型的机器学习过程来具体介绍流水线的运用。任务为:查找所有包含“spark”的句子,即将包含“spark”的句子的标签设为1,没有“spark”的句子的标签设为0。

/home/chenfy/ml/logs_pipe.py文件内容如下:

from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("WordCount").getOrCreate()

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# Prepare training data from a list of (id, text, label) tuples.
training = spark.createDataFrame([
    (0, "a b c d e spark", 1.0),
    (1, "b d", 0.0),
    (2, "spark f g h", 1.0),
    (3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# define some stages for the pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)

# construt the pipeline
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# fit model
model =  pipeline.fit(training)

# Prepare test data
test = spark.createDataFrame([
    (4, "spark i j k"),
    (5, "l m n"),
    (6, "spark hadoop spark"),
    (7, "apache hadoop"),
], ["id", "text"])

prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
    rid, text, prob, prediction = row
    print("(%d, %s)-->prob=%s, prediction=%f." % (rid, text, str(prob), prediction))

运行:

[chenfy@entrobus32 ml]$ spark-submit logs_pipe.py
.
.
.
(4, spark i j k)-->prob=[0.1596407738787475,0.8403592261212525], prediction=1.000000.
(5, l m n)-->prob=[0.8378325685476744,0.16216743145232562], prediction=0.000000.
(6, spark hadoop spark)-->prob=[0.06926633132976037,0.9307336686702395], prediction=1.000000.
(7, apache hadoop)-->prob=[0.9821575333444218,0.01784246665557808], prediction=0.000000.
.
.
.

8.3 特征提取:TF-IDF

特征提取的作用和意义完全是机器学习内容,略,这里仅示例spark代码。

过程描述:

/home/chenfy/tf_idf.py内容如下:

# if not in pyspark commandline, make sure add below these two lines.
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("TF-IDF").getOrCreate()


from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
    (0, "I heard about Spark and I love Spark"),
    (0, "I wish java could use case classes"),
    (0, "Logistic regression models are neat"),
]).toDF("label", "sentence")

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)
wordsData.show()

hashingTF = HashingTF(inputCol='words', outputCol='rawFeatures', numFeatures=2000)
featurizedData = hashingTF.transform(wordsData)
featurizedData.select("words", "rawFeatures").show(truncate=False)

idf = IDF(inputCol='rawFeatures', outputCol='features')
idfModel = idf.fit(featurizedData)

rescaledData = idfModel.transform(featurizedData)
rescaledData.select("features", "label").show(truncate=False)

运行输出:

[chenfy@entrobus32 ml]$ spark-submit tf_idf.py
.
.
.

+-----+--------------------+--------------------+
|label|            sentence|               words|
+-----+--------------------+--------------------+
|    0|I heard about Spa...|[i, heard, about,...|
|    0|I wish java could...|[i, wish, java, c...|
|    0|Logistic regressi...|[logistic, regres...|
+-----+--------------------+--------------------+

.
.
.

+---------------------------------------------+---------------------------------------------------------------------+
|words                                        |rawFeatures                                                          |
+---------------------------------------------+---------------------------------------------------------------------+
|[i, heard, about, spark, and, i, love, spark]|(2000,[240,333,1105,1329,1357,1777],[1.0,1.0,2.0,2.0,1.0,1.0])       |
|[i, wish, java, could, use, case, classes]   |(2000,[213,342,489,495,1329,1809,1967],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])|
|[logistic, regression, models, are, neat]    |(2000,[286,695,1138,1193,1604],[1.0,1.0,1.0,1.0,1.0])                |
+---------------------------------------------+---------------------------------------------------------------------+

.
.
.
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|features                                                                                                                                                                       |label|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|(2000,[240,333,1105,1329,1357,1777],[0.6931471805599453,0.6931471805599453,1.3862943611198906,0.5753641449035617,0.6931471805599453,0.6931471805599453])                       |0    |
|(2000,[213,342,489,495,1329,1809,1967],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.28768207245178085,0.6931471805599453,0.6931471805599453])|0    |
|(2000,[286,695,1138,1193,1604],[0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453,0.6931471805599453])                                               |0    |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
.
.
.

8.4 特征转换:标签和索引转化

一般,机器学习的输入,如有字符,需转化成数字或数字的组合,方能作为机器学习函数的输入。这就是标签转索引;标签转索引成功训练模型后,希望重新变为方便理解的字符,这样就是索引转标签。

Spark scikit-learn
StringIndexer.transoform() LabelEncoder().transoform()
IndexToString.transoform() LabelEncoder().inverse_transoform()
OneHotEncoder.transoform() OneHotEncoder().transoform()
VectorIndexer.transoform()
可根据maxCategories参数判断类别变量,并仅将类别变量onehot

8.5 逻辑斯蒂回归分类器

# if not in pyspark commandline, make sure add below these two lines.
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("LOGISTIC_IRIS").getOrCreate()

from pyspark.ml.linalg import Vector, Vectors
from pyspark.sql import Row, functions
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString, StringIndexer, \
VectorIndexer, HashingTF, Tokenizer
from pyspark.ml.classification import LogisticRegression, \
LogisticRegressionModel, BinaryLogisticRegressionSummary, LogisticRegression

def f(x):
    rel = {}
    rel['features'] = Vectors.\
    dense(float(x[0]), float(x[1]), float(x[2]), float(x[3]))
    rel['label']  = str(x[4])
    return rel

data = spark.sparkContext.\
textFile("file:///home/chenfy/ml/iris.txt").\
map(lambda line: line.split(',')).\
map(lambda p: Row(**f(p))).toDF()

data.show()

labelIndexer = StringIndexer().\
setInputCol("label").\
setOutputCol("indexedLabel").\
fit(data)
featureIndexer = VectorIndexer().\
setInputCol("features").\
setOutputCol("indexedFeatures").\
fit(data)


lr = LogisticRegression().\
setLabelCol("indexedLabel").\
setFeaturesCol("indexedFeatures").\
setMaxIter(100).\
setRegParam(0.3).\
setElasticNetParam(0.8)
print("LogisticRegression parameters:\n" + lr.explainParams())

labelConverter = IndexToString().\
setInputCol("prediction").\
setOutputCol("predictedLabel").\
setLabels(labelIndexer.labels)
lrPipeline = Pipeline().\
setStages([labelIndexer, featureIndexer, lr, labelConverter])

trainingData, testData = data.randomSplit([0.7, 0.3])
lrPipelineModel = lrPipeline.fit(trainingData)
lrPredictions = lrPipelineModel.transform(testData)

preRel = lrPredictions.select(
    "predictedLabel",
    "label",
    "features",
    "probability"
).collect()

for item in preRel:
    print(str(item["label"]) + ',' + \
    str(item["features"]) + '-->prob=' + \
    str(item["probability"]) + 'predictedLabel' + \
    str(item["predictedLabel"])
    )

evaluator = MulticlassClassificationEvaluator().\
setLabelCol("indexedLabel").\
setPredictionCol("prediction")
lrAccuray = evaluator.evaluate(lrPredictions)
print("Accuray: {}".format(lrAccuray))

lrModel = lrPipelineModel.stages[2]
print("Coefficients: \n" + str(lrModel.coefficientMatrix) + \
"\nIntercept: " + str(lrModel.interceptVector) + \
"\nnumClasses: " + str(lrModel.numClasses) + \
"\nnumFeatures: " + str(lrModel.numFeatures))

运行过程:

[chenfy@entrobus32 ml]$ spark-submit iris_logs.py


.
.
.
Accuray: 0.708498023715415
Coefficients: 
3 X 4 CSRMatrix
(0,2) -0.2475
(0,3) -0.254
(1,3) 0.351
Intercept: [0.8275049304783856,-0.195156119496122,-0.6323488109822636]
numClasses: 3
numFeatures: 4
.
.
.

注:可能也应像scikit-learn等中一样设置seed等以达到重现效果,三次运行出现了0.46~0.94的精确度。

8.6 决策树分类器

仍然上节数据集

/home/chenfy/ml/iris_dt.py内容如下:

# if not in pyspark commandline, make sure add below these two lines.
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("DT_IRIS").getOrCreate()

from pyspark.ml.classification import DecisionTreeClassificationModel
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.linalg import Vector,Vectors
from pyspark.sql import Row
from pyspark.ml.feature import IndexToString,StringIndexer,VectorIndexer

def f(x):
    rel = {}
    rel['features']=Vectors.\ 
    dense(float(x[0]),float(x[1]),float(x[2]),float(x[3]))
    rel['label'] = str(x[4])
    return rel
data = spark.sparkContext.\
textFile("file:///home/chenfy/ml/iris.txt").\
map(lambda line: line.split(',')).\
map(lambda p: Row(**f(p))).\
toDF()

labelIndexer = StringIndexer().\
setInputCol("label").\
setOutputCol("indexedLabel").\
fit(data)
featureIndexer = VectorIndexer().\
setInputCol("features").\
setOutputCol("indexedFeatures").\
setMaxCategories(4).\
fit(data) >>> labelConverter = IndexToString().\
setInputCol("prediction").\
setOutputCol("predictedLabel").\
setLabels(labelIndexer.labels)
trainingData, testData = data.randomSplit([0.7, 0.3])

dtClassifier = DecisionTreeClassifier().\
setLabelCol("indexedLabel").\
setFeaturesCol("indexedFeatures")

dtPipeline = Pipeline().\
setStages([labelIndexer, featureIndexer, dtClassifier, labelConverter])
dtPipelineModel = dtPipeline.fit(trainingData)
dtPredictions = dtPipelineModel.transform(testData)
dtPredictions.select("predictedLabel", "label", "features").show(20)

evaluator = MulticlassClassificationEvaluator().\
setLabelCol("indexedLabel").\
setPredictionCol("prediction")
dtAccuracy = evaluator.evaluate(dtPredictions)
print("Accuracy: ".format(dtAccuracy))

treeModelClassifier = dtPipelineModel.stages[2]
print("Learned classification tree model:\n" + \
str(treeModelClassifier.toDebugString)) 

运行过程:

[chenfy@entrobus32 ml]$ spark-submit iris_dt.py
.
.
.
Accuracy: 0.9554738562091503
Learned classification tree model:
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_e0e08e2e4629) of depth 5 with 15 nodes
  If (feature 2 <= 2.5999999999999996)
   Predict: 0.0
  Else (feature 2 > 2.5999999999999996)
   If (feature 3 <= 1.7000000000000002)
    If (feature 2 <= 5.15)
     If (feature 2 <= 4.85)
      Predict: 1.0
     Else (feature 2 > 4.85)
      If (feature 1 <= 2.25)
       Predict: 2.0
      Else (feature 1 > 2.25)
       Predict: 1.0
    Else (feature 2 > 5.15)
     Predict: 2.0
   Else (feature 3 > 1.7000000000000002)
    If (feature 2 <= 4.85)
     If (feature 0 <= 5.95)
      Predict: 1.0
     Else (feature 0 > 5.95)
      Predict: 2.0
    Else (feature 2 > 4.85)
     Predict: 2.0
.
.
.