spark 集群运行python作业

spark 集群运行python作业

分类:

今天尝试用刚搭建好的spark集群运行python作业,遇到了一些问题,解决了一些坑的同时也对spark集群的运作和配置方式有了一些比较浅的认识,不像之前那么没有概念了,记录如下,之后还要继续更多的对Hadoop生态圈和spark并行计算框架的探究。

首先说下环境,集群有五个节点,集群环境是用cloudera manager 搭建的,hadoop用的是cloudera的CDH,我对CDH和hadoop之间关系的理解就是与linux和CentOS的关系一样,其他的的相关组件例如Hbase和Hive也是用cloudera安装的,之前我看到服务器上已经有实验室学长下好的spark parcels安装包,于是也没看具体版本就直接用了,之后发现是0.9版本的,略囧。。。因为spark已经发布到1.4版本了,0.9版本都没有spark-submit,而且也不没有R的原生API,不过不影响,直接看0.9的文档就可以了,如果必须用到新版的功能就重新部署吧。。。。。

首先记录一下spark的四种运行模式

下面用python的一个简单作业SimpleApp.py为例,,记录下脚本的运行过程

from pyspark import SparkContext,SparkConfconf=SparkConf()conf.setMaster(“spark://192.168.2.241:7077”)conf.setAppName(“test application”)logFile=”hdfs://hadoop241:8020/user/root/testfile”sc=SparkContext(conf=conf)logData=sc.textFile(logFile).cache()numAs=logData.filter(lambda s: ‘a’ in s).count()numBs=logData.filter(lambda s: ‘b’ in s).count()print “Lines with a:%i,lines with b:%i” % (numAs,numBs)

关于这里的问题主要涉及到连接集群的配置问题,也就是上述代码的conf部分,首先要连接集群的master节点,注意这里的配置写法

spark://192.168.2.241:7077

前缀spark不可少,否则会报“could not parse master URL”的错误即无法解析URL的错误,至于端口号可以在/etc/spark/conf中查询$SPARK_MASTER_PORT这个环境变量,(具体安装方式配置文件位置也不同,根据具体情况来确定)

还有

logFile=”hdfs://hadoop241:8020/user/root/testfile”

我观察到这里默认是从hdfs文件系统上读取文件的,所以首先要把待处理文件put到hdfs上,同样注意路径的写法,这里写的是hdfs上得绝对路径,也可以写相对路径 这里的testfile里只有两句话,用来测试作业能否正确执行

stay hungery,stay foolishsteve jobs

之后执行

$ pyspark SimpleApp.py

运行结果贴出如下,可以从中观察运算任务的分配调度过程

[root@hadoop241 workspace]:22:27 INFO slf4j.Slf4jLogger: Slf4jLogger started15/07/31 16:22:27 INFO Remoting: Starting remoting/07/31 16:22:27 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@hadoop241:34248]15/07/31 16:22:27 INFO spark.SparkEnv: Registering BlockManagerMaster15/07/31 16:22:27 INFO storage.DiskBlockManager: Created local directory at /tmp/spark-local-20150731162227-804b15/07/31 16:22:27 INFO storage.MemoryStore: MemoryStore started with capacity 294.9 MB.15/07/31 16:22:27 INFO network.ConnectionManager: Bound socket to port 42522 with id = ConnectionManagerId(hadoop241,42522)15/07/31 16:22:27 INFO storage.BlockManagerMaster: Trying to register BlockManager15/07/31 16:22:27 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop241:42522 with 294.9 MB RAM15/07/31 16:22:27 INFO storage.BlockManagerMaster: Registered BlockManager15/07/31 16:22:27 INFO spark.HttpServer: Starting HTTP Server20121106::::22:27 INFO spark.SparkEnv: Registering MapOutputTracker15/07/31 16:22:27 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-c6a0d067-075c-493b-81c8-754f569a91b515/07/31 16:22:27 INFO spark.HttpServer: Starting HTTP Server20121106::20121106{/storage/rdd,null}{/storage,null}{/stages/stage,null}{/stages/pool,null}{/stages,null}{/environment,null}{/executors,null}{/metrics/json,null}{/static,null}{/,null}::::7077…/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/0 on worker-20150728175302-hadoop246-7078 (hadoop246:7078) with 16 cores15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/0 on hostPort hadoop246:7078 with 16 cores, 512.0 MB RAM15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/1 on worker-20150728175303-hadoop245-7078 (hadoop245:7078) with 8 cores15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/1 on hostPort hadoop245:7078 with 8 cores, 512.0 MB RAM15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/2 on worker-20150728175303-hadoop254-7078 (hadoop254:7078) with 8 cores15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/2 on hostPort hadoop254:7078 with 8 cores, 512.0 MB RAM15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/3 on worker-20150728175302-hadoop241-7078 (hadoop241:7078) with 8 cores15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/3 on hostPort hadoop241:7078 with 8 cores, 512.0 MB RAM15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor added: app-20150731162228-0018/4 on worker-20150728175302-hadoop217-7078 (hadoop217:7078) with 8 cores15/07/31 16:22:28 INFO cluster.SparkDeploySchedulerBackend: Granted executor ID app-20150731162228-0018/4 on hostPort hadoop217:7078 with 8 cores, 512.0 MB RAM15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/3 is now RUNNING15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/2 is now RUNNING15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/1 is now RUNNING15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/0 is now RUNNING15/07/31 16:22:28 INFO client.AppClient$ClientActor: Executor updated: app-20150731162228-0018/4 is now RUNNING/07/31 16:22:28 INFO storage.MemoryStore: Block broadcast_0 stored as values to memory (estimated size 122.7 KB, free 294.8 MB)//07/31 16:22:29 INFO scheduler.DAGScheduler: Got job 0 (count at SimpleApp.py:13) with 2 output partitions (allowLocal=false)15/07/31 16:22:29 INFO scheduler.DAGScheduler: Final stage: Stage 0 (count at SimpleApp.py:13)15/07/31 16:22:29 INFO scheduler.DAGScheduler: Parents of final stage: List()15/07/31 16:22:29 INFO scheduler.DAGScheduler: Missing parents: List()15/07/31 16:22:29 INFO scheduler.DAGScheduler: Submitting Stage 0 (PythonRDD[2] at count at SimpleApp.py:13), which has no missing parents15/07/31 16:22:29 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 0 (PythonRDD[2] at count at SimpleApp.py:13)with 2 tasks/07/31 16:22:29 INFO scheduler.TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: hadoop246 (PROCESS_LOCAL)ms15/07/31 16:22:29 INFO scheduler.TaskSetManager: Starting task 0.0:1 as TID 1 on executor 0: hadoop246 (PROCESS_LOCAL)ms//07/31 16:22:30 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop246:34291 with 294.9 MB RAM15/07/31 16:22:30 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop217:35324 with 294.9 MB RAM15/07/31 16:22:30 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop241:54770 with 294.9 MB RAM//07/31 16:22:31 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop254:52135 with 294.9 MB RAM15/07/31 16:22:31 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Registering block manager hadoop245:16696 with 294.9 MB RAM15/07/31 16:22:31 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_1_0 in memory on hadoop246:34291 (size: 208.0 B, free: 294.9 MB)15/07/31 16:22:31 INFO storage.BlockManagerMasterActor$BlockManagerInfo: Added rdd_1_1 in memory on hadoop246:34291 (size: 176.0 B, free: 294.9 MB)ms on hadoop246 (progress: 0/2)ms on hadoop246 (progress: 1/2)15/07/31 16:22:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 1)15/07/31 16:22:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 0.0 from pool15/07/31 16:22:32 INFO scheduler.DAGScheduler: Completed ResultTask(0, 0)s15/07/31 16:22:32 INFO spark.SparkContext: Job finished: count at SimpleApp.py:13, took 2.550761544 s/07/31 16:22:32 INFO scheduler.DAGScheduler: Got job 1 (count at SimpleApp.py:14) with 2 output partitions (allowLocal=false)15/07/31 16:22:32 INFO scheduler.DAGScheduler: Final stage: Stage 1 (count at SimpleApp.py:14)15/07/31 16:22:32 INFO scheduler.DAGScheduler: Parents of final stage: List()15/07/31 16:22:32 INFO scheduler.DAGScheduler: Missing parents: List()15/07/31 16:22:32 INFO scheduler.DAGScheduler: Submitting Stage 1 (PythonRDD[3] at count at SimpleApp.py:14), which has no missing parents15/07/31 16:22:32 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from Stage 1 (PythonRDD[3] at count at SimpleApp.py:14)with 2 tasks15/07/31 16:22:32 INFO scheduler.TaskSetManager: Starting task 1.0:0 as TID 2 on executor 0: hadoop246 (PROCESS_LOCAL)ms15/07/31 16:22:32 INFO scheduler.TaskSetManager: Starting task 1.0:1 as TID 3 on executor 0: hadoop246 (PROCESS_LOCAL)msms on hadoop246 (progress: 0/2)15/07/31 16:22:32 INFO scheduler.DAGScheduler: Completed ResultTask(1, 1)ms on hadoop246 (progress: 1/2)15/07/31 16:22:32 INFO scheduler.DAGScheduler: Completed ResultTask(1, 0)15/07/31 16:22:32 INFO scheduler.TaskSchedulerImpl: Remove TaskSet 1.0 from pools15/07/31 16:22:32 INFO spark.SparkContext: Job finished: count at SimpleApp.py:14, took 0.04234127 sLines with a:1,lines with b:1

版权声明:本文为博主原创文章,未经博主允许不得转载。

上一篇ping localhost 没反应解决方案

顶1踩0

而是面对它们,同它们打交道,

spark 集群运行python作业

相关文章:

你感兴趣的文章:

标签云: