irpas技术客

大数据Spark(python版)_Acegem_python spark

网络 6300

大数据 大数据,Spark,Hadoop,python,pyspark 大数据Spark(python版) 前言(环境说明):1、下载和安装1)安装java JDK2)安装Hadoop(伪分布式)3)安装Spark(Local模式)附:其他安装(依个人需要)4)安装HBase(伪分布式) 2、配置相关文件1)修改Spark的配置文件spark-env.sh2)修改环境变量 ~/.bashrc文件 3、验证Spark是否安装成功4、Spark和Hadoop的交互5、在pyspark中运行代码0)前言1)启动2)pyspark常用命令 6、python脚本的pyspark简单例子例1:统计文本中某个词出现的次数例2:并行化处理列表结论: 7、Spark和HBase的交互1. 配置

大数据,Spark,Hadoop,python,pyspark 大数据Spark(python版) 前言(环境说明):

Spark默认语言Scala,也支持Python语言接口(pyspark),下面是python中使用spark的教程。 首先,Spark需要JDK、Hadoop生态支持。 个人环境: Linux:Ubuntu 16.04 JDK:1.8 Hadoop:3.1.3 Spark:2.4.0

Hadoop和Hbase的运行都是有三种模式:单机模式、伪分布式模式、分布式模式。

单机模式:在一台计算机上安装和使用,不涉及数据的分布式存储;伪分布式模式:在一台计算机上模拟一个小的集群;分布式模式:使用多台计算机实现物理意义上的分布式存储。

而Spark的部署模式有四种:单机模式(Local模式)、Standalone模式(使用Spark自带的简单集群管理器)、YARN模式(使用YARN作为集群管理器)、Mesos模式(使用Mesos作为集群管理器)。

我们可在单机上采用 Hadoop(伪分布式)+ Spark(Local模式)进行Hadoop和spark组合环境的搭建。 注意: 安装配置或启动使用Spark + Hadoop时,都要切换成新创建的名为hadoop的用户,详见下面 1、下载和安装 中的 2)安装Hadoop(伪分布式)

1、下载和安装 1)安装java JDK

见我的另一篇博文 https://blog.csdn.net/Acegem/article/details/120852985?spm=1001.2014.3001.5502

2)安装Hadoop(伪分布式)

见我的另一篇博文 https://blog.csdn.net/Acegem/article/details/122880274?spm=1001.2014.3001.5502

3)安装Spark(Local模式)

官网下载:https://spark.apache.org/downloads.html 选择Spark 2.4.0版本即可,下载spark-2.4.0-bin-without-hadoop.tgz,如下: 安装Spark:

sudo tar -zxvf spark-2.4.0-bin-without-hadoop.tgz -C /usr/local/ # 将下载好的spark-2.4.0-bin-without-hadoop.tg解压到 /usr/local下 cd /usr/local sudo mv ./spark-2.4.0-bin-without-hadoop/ ./spark sudo chown -R hadoop:hadoop ./spark # 此处的 hadoop 为你的用户名 附:其他安装(依个人需要) 4)安装HBase(伪分布式)

见我的另一篇博文 https://blog.csdn.net/Acegem/article/details/123189212

2、配置相关文件 1)修改Spark的配置文件spark-env.sh

先复制一份由Spark安装文件自带的配置文件模板。

cd /usr/local/spark cp ./conf/spark-env.sh.template ./conf/spark-env.sh

再编辑spark-env.sh文件(vim ./conf/spark-env.sh),在第一行添加以下配置信息:

export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

有了上面的配置信息以后,Spark就可以把数据存储到Hadoop分布式文件系统(HDFS)中,也可以从HDFS中读取数据。如果没有配置上面的信息,Spark就只能读写本地数据,无法读取HDFS中的数据。

2)修改环境变量 ~/.bashrc文件

注意:上面切换了hadoop用户,这里的~/.bashrc指的是hadoop用户的环境变量./bashrc,环境变量需要包含如下部分

JAVA_HOMEHADOOP_HOMESPARK_HOMEPYTHONPATHPYSPARK_PYTHONPATH 相关~/.bashrc 变量整理如下: #=========== java env ======# export JAVA_HOME=/usr/local/java/jdk1.8.0_271 export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export PATH=$PATH:${JAVA_HOME}/bin:${JRE_HOME}/bin #=========== big data env ======# export HADOOP_HOME=/usr/local/hadoop export SPARK_HOME=/usr/local/spark export PYTHONPATH=$PYTHONPATH:$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.7-src.zip export PYSPARK_PYTHON=python3 export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$SPARK_HOME/bin:/usr/local/hbase/bin #===== python(anaconda) env 此处省略===== #

记得 source ~/.bashrc使更改生效。 上面配置了HBase的环境,如果需要HBase可参考…,也可先不设置。

3、验证Spark是否安装成功

配置完成后就可以直接使用,不需要像Hadoop运行启动命令。 通过运行Spark自带的示例SparkPi,可以验证Spark是否安装成功。

cd /usr/local/spark bin/run-example SparkPi

执行时会输出非常多的运行信息,输出的最终结果不容易找到,为了从大量的输出信息中快速找到我们想要的执行结果,可以通过 grep 命令进行过滤:

cd /usr/local/spark bin/run-example SparkPi | grep "Pi is"

过滤后的运行结果: 注:有时候需要下面的命令才能成功过滤(命令中的 2>&1 可以将所有的信息都输出到 stdout 中,否则由于输出日志的性质,还是会输出到屏幕中):

cd /usr/local/spark bin/run-example SparkPi 2>&1 | grep "Pi is"

同样过滤后的运行结果:

4、Spark和Hadoop的交互

上面已经完成了在单机上的“Hadoop(伪分布式)+ Spark(Local模式)”方式的搭建。 接下来,只需要启动Hadoop的HDFS,Spark就可以对HDFS中的数据进行读取或写入操作了。 再次强调: 启动使用Spark + Hadoop时,都要切换成新创建的名为hadoop的用户,详见上面 1、下载和安装 中的 2)安装Hadoop(伪分布式),否则下面会因为权限问题导致启动失败。 (1)切换hadoop用户:

su hadoop

回车后让输入密码,前面我设置的密码也叫hadoop (2)启动并登录ssh:

sudo service ssh start ssh localhost

(3)启动Hadoop的HDFS:

$ cd /usr/local/hadoop $ ./sbin/start-dfs.sh

若启动成功,会列出如下进程:NameNode、DataNode、SecondaryNameNode。如下: 也可以使用$ jps 命令查看进程,是否启动成功。

5、在pyspark中运行代码 0)前言

Spark支持Scala和Python,由于Spark框架本身就是使用Scala语言开发的,所以使用spark-shell命令会默认进入Scala的交互式运行环境。如果要进入Python的交互式执行环境,则需要执行pyspark命令。 看下安装spark软件的usr/local/spark/ 目录下内容: 可看到有python和R语言的接口,因为这两个是做大数据分析和人工智能非常火的语言。 再看下 usr/local/spark/python/ 目录下内容: 可看到pyspark目录。 我们知道,软件将代码编译后放在bin目录下, 再看下 usr/local/spark/bin 目录下内容: 可以看到可执行二进制文件 pyspark。

1)启动 $ cd /usr/local/spark $ ./bin/pyspark

(注:也可 $ cd /usr/local/spark/bin 再 ./pyspark,但不能 $ cd /usr/local/spark/bin 再 pyspark) 结果如下: 现在,就可以在里面输入python代码进行调试了,如下: 其中,输入exit()退出交互界面。

2)pyspark常用命令

(1)采用Local模式,在4个CPU核心上启动pyspark:

$ cd /usr/local/spark $ ./bin/pyspark --master local[4]

或者可以在CLASSPATH中添加 code.jar,命令如下:

$ cd /usr/local/spark $ ./bin/pyspark --master local[4] --jars code.jar

结果: (2)执行 ./bin/pyspark --help 获取完整选项列表,结果如下:

6、python脚本的pyspark简单例子

首先切换到hadoop用户:

su hadoop

因为之前配置的spark环境变量是在hadoop用户下的 ~/.bashrc,下面所有的代码、例子的运行都要在之前配置好的hadoop用户下执行python3 py文件名或python py文件名来执行代码。

再来了解一下sc的代码生成: 写法1:

from pyspark import SparkConf, SparkContext # 创建SparkConf对象,并给对象赋值 conf = SparkConf().setMaster("local").setAppName("My app") # 创建SparkContext对象,不妨命名为sc sc = SparkContext(conf=conf)

写法2:

from pyspark import SparkContext sc = SparkContext(appName="My app", master="local") 例1:统计文本中某个词出现的次数

word_count.py:分别统计 /usr/local/spark/README.md 文本中字母’a’和字母’b’出现的次数。

from pyspark import SparkConf, SparkContext # 创建SparkConf对象,并给对象赋值 conf = SparkConf().setMaster("local").setAppName("My app") # 创建SparkContext对象,不妨命名为sc sc = SparkContext(conf=conf) """ spark创建的sc,其功能之一是调用自带的textFile()函数来加载本地文件和HDFS文件创建RDD,如下面的 sc.textFile : """ logFile = "file:///usr/local/spark/README.md" # 加载本地文件生成RDD logData = sc.textFile(logFile, 2).cache() # 有的人也习惯将代码写成:rdd = sc.textFile(logFile, 2).cache() numAs = logData.filter(lambda line: 'a' in line).count() numBs = logData.filter(lambda line: 'b' in line).count() print('Lines with a:', numAs) print('Lines with a:', numBs)

打开终端直接执行 python3 word_count.py即可,运行结果:

Lines with a: 62 Lines with a: 31

【知识点】: 知识点1):spark创建的sc对象,可以加载本地文件和HDFS文件创建RDD。这里用Spark自带的本地文件README.md文件测试。加载HDFS文件和本地文件都是使用textFile()函数,区别是添加前缀(hdfs://和file:///)进行标识。

textFile = sc.textFile("file:///usr/local/spark/README.md") # # 有的人也习惯将代码写成:rdd = sc.textFile("file:///usr/local/spark/README.md")

知识点2):一些简单的RDD操作:

# python3 # 获取RDD文件textFile的第一行内容 textFile.first() # 获取RDD文件textFile所有项的计数 textFile.count() # 抽取含有“Spark”的行,返回一个新的RDD lineWithSpark = textFile.filter(lambda line: 'Spark' in line) # Scala写法:lineWithSpark = textFile.filter(line => line.contains("Spark")) # 统计新的RDD的行数 lineWithSpark.count()

可以通过组合RDD操作进行组合,可以实现简易MapReduce操作

// Scala //找出文本中每行的最多单词数 textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)

更多使用操作可以参考官方文档:https://spark.apache.org/docs/latest/rdd-programming-guide.html

【附】:运行方式除了直接python3 word_count.py外;还可以通过Spark自带的Spark-submit将py脚本提交到Spark中执行。如下: /usr/local/spark/bin/spark-submit /你的路径/word_count.py, 但该方式的执行过程默认会产生很多其他信息,为了避免信息干扰直接看到运行结果,可以修改log4j的日志信息显示级别,修改配置如下:

cd /usr/local/spark/conf sudo cp log4j.properties.template log4j.properties sudo vim log4j.properties

将log4j.properties里面的 log4j.rootCategory=INFO, console改成log4j.rootCategory=ERROR, console,再去执行/usr/local/spark/bin/spark-submit /你的路径/word_count.py便不会显示太多的输出信息了。 注: log4j.rootCategory=INFO, console // 显示所有信息 log4j.rootCategory=ERROR, console // 只会显示报错信息

例2:并行化处理列表 from pyspark import SparkConf, SparkContext # 创建SparkConf对象,并给对象赋值 conf = SparkConf().setMaster("local").setAppName("My app") # 创建SparkContext对象,不妨命名为sc sc = SparkContext(conf=conf) """ spark创建的sc,其功能之二是调用自带的parallelize()函数来加载自定义的变量来创建RDD,如下面的 sc.parallelize: (sc还有其他很多函数和功能,不全举例了) """ array = [1, 2, 3, 4, 5] rdd = sc.parallelize(array) rdd.foreach(print)

运行结果:

1 2 3 4 5

或者为乱序,因为是并行处理的,如下等等情况:

3 1 5 4 2 结论:

由例1和例2可看出:RDD是将某个将要被处理运算的目标(文件、变量等)转化成RDD,以实现大数据的高效运算,这就是Spark的RDD算子的作用。

7、Spark和HBase的交互 1. 配置

按照本文开头提到的安装好HBase后,要进行Spark与HBase的交互,需进行如下的配置:


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Python #spark