我现在想尝试使用Apache Spark

虽然现在有点晚了,但我一直没有去尝试过Apache Spark,所以我决定试一试。
我想要理解它,并尝试使用机器学习库做些什么的原因是,我想更深入地学习并尝试一些东西。
这只是我的学习笔记。

顺便提一下,关于MLlib的部分,我也参考了这篇文章。

スクリーンショット 2016-07-24 1.01.06.png

安装设定

    • java 1.8

 

    • mac

 

    apache spark 1.6系
    1. 可以从这里下载Spark。

 

    解压缩并设置SPARK_HOME,并将路径添加到bin目录。
> tar xfv spark-1.6.1-bin-hadoop2.6.tgz

> mv spark-1.6.1-bin-hadoop2.6 spark-1.6.1

export SPARK_HOME=/usr/local/project/apache-spark/spark-1.6.1
export PATH=$PATH:$SPARK_HOME/bin

运行以下命令来引用.bashrc文件:
source ~/.bashrc

Chinese translation:
运行以下命令来引用.bashrc文件:
source ~/.bashrc

    我会确认路径是否通畅。
> spark-shell

Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel("INFO")
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_25)
Type in expressions to have them evaluated.
Type :help for more information.
Spark context available as sc.
16/07/23 23:53:02 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/07/23 23:53:03 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/07/23 23:53:09 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/07/23 23:53:09 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
16/07/23 23:53:13 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/07/23 23:53:14 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
16/07/23 23:53:20 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/07/23 23:53:20 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
SQL context available as sqlContext.

scala>

当启动时出现这种情况,暂时算是设置完成了。

简单调整

建议您尝试简单地按照这个页面的快速入门内容进行操作,这样会很好。

> pyspark

> textFile = sc.textFile("README.md")

> textFile.count()

>  textFile.first()

可以试着查看”快速开始”的内容。

这个也是在Quick start里写的,我试过了。

>>> def max(a, b):
...   if a > b:
...     return a
...   else:
...     return b
...
>>> textFile.map(lambda line: len(line.split())).reduce(max)
>>> wordCounts.collect()
16/07/25 12:58:36 INFO SparkContext: Starting job: collect at <stdin>:1
16/07/25 12:58:36 INFO DAGScheduler: Registering RDD 9 (reduceByKey at <stdin>:1)
16/07/25 12:58:36 INFO DAGScheduler: Got job 5 (collect at <stdin>:1) with 2 output partitions
16/07/25 12:58:36 INFO DAGScheduler: Final stage: ResultStage 6 (collect at <stdin>:1)
16/07/25 12:58:36 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 5)
16/07/25 12:58:36 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 5)
16/07/25 12:58:36 INFO DAGScheduler: Submitting ShuffleMapStage 5 (PairwiseRDD[9] at reduceByKey at <stdin>:1), which has no missing parents
16/07/25 12:58:36 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 8.2 KB, free 195.2 KB)
16/07/25 12:58:36 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 5.2 KB, free 200.4 KB)
16/07/25 12:58:36 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:54829 (size: 5.2 KB, free: 511.1 MB)
16/07/25 12:58:36 INFO SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:1006
16/07/25 12:58:36 INFO DAGScheduler: Submitting 2 missing tasks from ShuffleMapStage 5 (PairwiseRDD[9] at reduceByKey at <stdin>:1)
16/07/25 12:58:36 INFO TaskSchedulerImpl: Adding task set 5.0 with 2 tasks
16/07/25 12:58:36 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 9, localhost, partition 0,PROCESS_LOCAL, 2149 bytes)
16/07/25 12:58:36 INFO TaskSetManager: Starting task 1.0 in stage 5.0 (TID 10, localhost, partition 1,PROCESS_LOCAL, 2149 bytes)
16/07/25 12:58:36 INFO Executor: Running task 0.0 in stage 5.0 (TID 9)
16/07/25 12:58:36 INFO Executor: Running task 1.0 in stage 5.0 (TID 10)
16/07/25 12:58:36 INFO HadoopRDD: Input split: file:/usr/local/project/apache-spark/spark-1.6.2/README.md:1679+1680
16/07/25 12:58:36 INFO HadoopRDD: Input split: file:/usr/local/project/apache-spark/spark-1.6.2/README.md:0+1679
/usr/local/project/apache-spark/spark-1.6.2/python/lib/pyspark.zip/pyspark/shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
/usr/local/project/apache-spark/spark-1.6.2/python/lib/pyspark.zip/pyspark/shuffle.py:58: UserWarning: Please install psutil to have better support with spilling
16/07/25 12:58:36 INFO PythonRunner: Times: total = 70, boot = 12, init = 10, finish = 48
16/07/25 12:58:36 INFO PythonRunner: Times: total = 65, boot = 6, init = 10, finish = 49
16/07/25 12:58:36 INFO Executor: Finished task 0.0 in stage 5.0 (TID 9). 2318 bytes result sent to driver
16/07/25 12:58:36 INFO Executor: Finished task 1.0 in stage 5.0 (TID 10). 2318 bytes result sent to driver
16/07/25 12:58:36 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 9) in 150 ms on localhost (1/2)
16/07/25 12:58:36 INFO TaskSetManager: Finished task 1.0 in stage 5.0 (TID 10) in 146 ms on localhost (2/2)
16/07/25 12:58:36 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool
16/07/25 12:58:36 INFO DAGScheduler: ShuffleMapStage 5 (reduceByKey at <stdin>:1) finished in 0.154 s
16/07/25 12:58:36 INFO DAGScheduler: looking for newly runnable stages
16/07/25 12:58:36 INFO DAGScheduler: running: Set()
16/07/25 12:58:36 INFO DAGScheduler: waiting: Set(ResultStage 6)
16/07/25 12:58:36 INFO DAGScheduler: failed: Set()
16/07/25 12:58:36 INFO DAGScheduler: Submitting ResultStage 6 (PythonRDD[12] at collect at <stdin>:1), which has no missing parents
16/07/25 12:58:36 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 5.1 KB, free 205.4 KB)
16/07/25 12:58:36 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 3.2 KB, free 208.6 KB)
16/07/25 12:58:36 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:54829 (size: 3.2 KB, free: 511.1 MB)
16/07/25 12:58:36 INFO SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1006
16/07/25 12:58:36 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 6 (PythonRDD[12] at collect at <stdin>:1)
16/07/25 12:58:36 INFO TaskSchedulerImpl: Adding task set 6.0 with 2 tasks
16/07/25 12:58:36 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 11, localhost, partition 0,NODE_LOCAL, 1894 bytes)
16/07/25 12:58:36 INFO TaskSetManager: Starting task 1.0 in stage 6.0 (TID 12, localhost, partition 1,NODE_LOCAL, 1894 bytes)
16/07/25 12:58:36 INFO Executor: Running task 0.0 in stage 6.0 (TID 11)
16/07/25 12:58:36 INFO Executor: Running task 1.0 in stage 6.0 (TID 12)
16/07/25 12:58:36 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
16/07/25 12:58:36 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks out of 2 blocks
16/07/25 12:58:36 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 10 ms
16/07/25 12:58:36 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 10 ms
16/07/25 12:58:36 INFO PythonRunner: Times: total = 10, boot = -97, init = 106, finish = 1
16/07/25 12:58:36 INFO Executor: Finished task 1.0 in stage 6.0 (TID 12). 3633 bytes result sent to driver
16/07/25 12:58:36 INFO PythonRunner: Times: total = 15, boot = -101, init = 116, finish = 0
16/07/25 12:58:36 INFO Executor: Finished task 0.0 in stage 6.0 (TID 11). 3853 bytes result sent to driver
16/07/25 12:58:36 INFO TaskSetManager: Finished task 1.0 in stage 6.0 (TID 12) in 61 ms on localhost (1/2)
16/07/25 12:58:36 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 11) in 64 ms on localhost (2/2)
16/07/25 12:58:36 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool
16/07/25 12:58:36 INFO DAGScheduler: ResultStage 6 (collect at <stdin>:1) finished in 0.070 s
16/07/25 12:58:36 INFO DAGScheduler: Job 5 finished: collect at <stdin>:1, took 0.294915 s
[(u'when', 1), (u'R,', 1), (u'including', 3), (u'computation', 1), (u'using:', 1), (u'guidance', 2), (u'Scala,', 1), (u'environment', 1), (u'only', 1), (u'rich', 1), (u'Apache', 1), (u'sc.parallelize(range(1000)).count()', 1), (u'Building', 1), (u'guide,', 1), (u'return', 2), (u'Please', 3), (u'Try', 1), (u'not', 1), (u'Spark', 13), (u'scala>', 1), (u'Note', 1), (u'cluster.', 1), (u'./bin/pyspark', 1), (u'params', 1), (u'through', 1), (u'GraphX', 1), (u'[run', 1), (u'abbreviated', 1), (u'[project', 2), (u'##', 8), (u'library', 1), (u'see', 1), (u'"local"', 1), (u'[Apache', 1), (u'will', 1), (u'#', 1), (u'processing,', 1), (u'for', 11), (u'[building', 1), (u'provides', 1), (u'print', 1), (u'supports', 2), (u'built,', 1), (u'[params]`.', 1), (u'available', 1), (u'run', 7), (u'tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools).', 1), (u'This', 2), (u'Hadoop,', 2), (u'Tests', 1), (u'example:', 1), (u'-DskipTests', 1), (u'Maven](http://maven.apache.org/).', 1), (u'programming', 1), (u'running', 1), (u'against', 1), (u'site,', 1), (u'comes', 1), (u'package.', 1), (u'and', 10), (u'package.)', 1), (u'prefer', 1), (u'documentation,', 1), (u'submit', 1), (u'tools', 1), (u'use', 3), (u'from', 1), (u'For', 2), (u'./bin/run-example', 2), (u'fast', 1), (u'systems.', 1), (u'<http://spark.apache.org/>', 1), (u'Hadoop-supported', 1), (u'way', 1), (u'README', 1), (u'MASTER', 1), (u'engine', 1), (u'building', 2), (u'usage', 1), (u'instance:', 1), (u'with', 3), (u'protocols', 1), (u'And', 1), (u'this', 1), (u'setup', 1), (u'shell:', 2), (u'project', 1), (u'following', 2), (u'distribution', 1), (u'detailed', 2), (u'have', 1), (u'stream', 1), (u'is', 6), (u'higher-level', 1), (u'tests', 2), (u'1000:', 2), (u'sample', 1), (u'["Specifying', 1), (u'Alternatively,', 1), (u'file', 1), (u'need', 1), (u'You', 3), (u'instructions.', 1), (u'different', 1), (u'programs,', 1), (u'storage', 1), (u'same', 1), (u'machine', 1), (u'Running', 1), (u'which', 2), (u'you', 4), (u'A', 1), (u'About', 1), (u'sc.parallelize(1', 1), (u'locally.', 1), (u'Hive', 2), (u'optimized', 1), (u'uses', 1), (u'Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version)', 1), (u'variable', 1), (u'The', 1), (u'data', 1), (u'a', 8), (u'"yarn"', 1), (u'Thriftserver', 1), (u'processing.', 1), (u'./bin/spark-shell', 1), (u'Python', 2), (u'Spark](#building-spark).', 1), (u'clean', 1), (u'the', 21), (u'requires', 1), (u'talk', 1), (u'help', 1), (u'Hadoop', 3), (u'high-level', 1), (u'find', 1), (u'web', 1), (u'Shell', 2), (u'how', 2), (u'graph', 1), (u'run:', 1), (u'should', 2), (u'to', 14), (u'module,', 1), (u'given.', 1), (u'directory.', 1), (u'must', 1), (u'SparkPi', 2), (u'do', 2), (u'Programs', 1), (u'Many', 1), (u'YARN,', 1), (u'using', 2), (u'Example', 1), (u'Once', 1), (u'HDFS', 1), (u'Because', 1), (u'name', 1), (u'Testing', 1), (u'refer', 2), (u'Streaming', 1), (u'SQL', 2), (u'them,', 1), (u'analysis.', 1), (u'set', 2), (u'Scala', 2), (u'thread,', 1), (u'individual', 1), (u'examples', 2), (u'changed', 1), (u'runs.', 1), (u'Pi', 1), (u'More', 1), (u'Python,', 2), (u'Versions', 1), (u'its', 1), (u'version', 1), (u'wiki](https://cwiki.apache.org/confluence/display/SPARK).', 1), (u'`./bin/run-example', 1), (u'Configuration', 1), (u'command,', 2), (u'can', 6), (u'core', 1), (u'Guide](http://spark.apache.org/docs/latest/configuration.html)', 1), (u'MASTER=spark://host:7077', 1), (u'Documentation', 1), (u'downloaded', 1), (u'distributions.', 1), (u'Spark.', 1), (u'Spark"](http://spark.apache.org/docs/latest/building-spark.html).', 1), (u'["Building', 1), (u'`examples`', 2), (u'on', 5), (u'package', 1), (u'of', 5), (u'APIs', 1), (u'pre-built', 1), (u'Big', 1), (u'or', 3), (u'learning,', 1), (u'locally', 2), (u'overview', 1), (u'one', 2), (u'(You', 1), (u'Online', 1), (u'versions', 1), (u'your', 1), (u'threads.', 1), (u'>>>', 1), (u'spark://', 1), (u'contains', 1), (u'system', 1), (u'start', 1), (u'build/mvn', 1), (u'basic', 1), (u'configure', 1), (u'that', 2), (u'N', 1), (u'"local[N]"', 1), (u'DataFrames,', 1), (u'particular', 2), (u'be', 2), (u'an', 3), (u'easiest', 1), (u'Interactive', 2), (u'cluster', 2), (u'page](http://spark.apache.org/documentation.html)', 1), (u'<class>', 1), (u'example', 3), (u'are', 1), (u'Data.', 1), (u'mesos://', 1), (u'computing', 1), (u'URL,', 1), (u'in', 5), (u'general', 2), (u'To', 2), (u'at', 2), (u'1000).count()', 1), (u'if', 4), (u'built', 1), (u'no', 1), (u'Java,', 1), (u'MLlib', 1), (u'also', 4), (u'other', 1), (u'build', 3), (u'online', 1), (u'several', 1), (u'[Configuration', 1), (u'class', 2), (u'programs', 2), (u'documentation', 3), (u'It', 2), (u'graphs', 1), (u'./dev/run-tests', 1), (u'first', 1), (u'latest', 1)]
>>>

试着按照Spark编程指南逐步进行操作。

从开始开始

rdd = sc.parallelize(range(1, 10)).map(lambda x: (x, 'a' * x))
rdd.saveAsSequenceFile('~/hoge.txt')
sorted(sc.sequenceFile('~/hoge.txt').collect())
[(1, u'a'), (2, u'aa'), (3, u'aaa'), (4, u'aaaa'), (5, u'aaaaa'), (6, u'aaaaaa'), (7, u'aaaaaaa'), (8, u'aaaaaaaa'), (9, u'aaaaaaaaa')]

机器学习库

边看这里边做

协同过滤 – 基于RDD的API常常在推荐引擎等领域被提及,所以我们来试试看做一下。

测试数据可以在[这里](https://github.com/apache/spark/blob/master/data/mllib/als/test.data)找到。

最后通过predict方法对数据进行预测,因为测试数据是3、2、5.0,所以希望能够得到接近5的值。

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda l: l.split(',')).map(lambda l: Rating(int(l[0]), int(l[1]), float(l[2])))
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)
model.predict(3, 2) 
・・・・・・・・・・・・・・・・
・・・・
4.996948080474724

因为是4.99,所以几乎是5。总的来说,我认为结果是不错的。

关于MLlib以外的示例,我们将在另一篇文章中详细介绍。
此文暂时告一段落。

广告
将在 10 秒后关闭
bannerAds