Spark-RDD 操作

markdown 將在雲端硬碟裡面來執行,先準備雲端,以下程式碼存成 .ipynb ,之後放入雲端硬碟 (Spark_on_Colaboratory_TEMPLATE) ˋˋˋ { "nbformat": 4, "nbformat_minor": 0, "metadata": { "colab": { "name": "Spark on Colaboratory TEMPLATE 20190413", "version": "0.3.2", "provenance": [], "collapsed_sections": [] }, ˋˋˋ "kernelspec": { "name": "python3", "display_name": "Python 3" }, "accelerator": "GPU" }, "cells": [ { "metadata": { "id": "kvD4HBMi0ohY", "colab_type": "text" }, "cell_type": "markdown", "source": [ "# First, run this cell and wait till it complete.\n", "## You need run this by each time when you need Spark.\n", "## around 1~2 minutes for installing Spark in google colab, when you see \"Ready to use Spark!\", you can go next step." ] }, { "metadata": { "id": "fUhBhrGmyAvs", "colab_type": "code", "colab": { "base_uri": "https://localhost:8080/", "height": 36 }, "outputId": "1d6c9dbe-3075-4802-d690-ba760a777f77" }, "cell_type": "code", "source": [ "# Install Java, Spark, and Findspark\n", "# This installs Apache Spark 2.3.3, Java 8, and Findspark, a library that makes it easy for Python to find Spark.\n", "\n", "!apt-get install openjdk-8-jdk-headless -qq > /dev/null\n", "!wget -q http://apache.osuosl.org/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz\n", "!tar xf spark-2.3.3-bin-hadoop2.7.tgz\n", "!pip install -q findspark\n", "\n", "\n", "#Set Environment Variables\n", "# Set the locations where Spark and Java are installed.\n", "\n", "import os\n", "os.environ[\"JAVA_HOME\"] = \"/usr/lib/jvm/java-8-openjdk-amd64\"\n", "os.environ[\"SPARK_HOME\"] = \"/content/spark-2.3.3-bin-hadoop2.7\"\n", "\n", "# Start a SparkSession\n", "# This will start a local Spark session.\n", "\n", "import findspark\n", "findspark.init()\n", "\n", "import pyspark\n", "sc = pyspark.SparkContext()\n", "\n", "print(\"Ready to use Spark!\")" ], "execution_count": 1, "outputs": [ { "output_type": "stream", "text": [ "Ready to use Spark!\n" ], "name": "stdout" } ] }, { "metadata": { "id": "T3ULPx4Y1LiR", "colab_type": "text" }, "cell_type": "markdown", "source": [ "# Use Spark!\n", "That's all there is to it - you're ready to use Spark!" ] }, { "metadata": { "id": "XJp8ZI-VzYEz", "colab_type": "code", "outputId": "d5083084-09c2-4e3f-da2a-abbc621f8aa7", "colab": { "base_uri": "https://localhost:8080/", "height": 206 } }, "cell_type": "code", "source": [ "#df = spark.createDataFrame([{\"hello\": \"world\"} for x in range(1000)])\n", "#df.show(3)\n", "sc" ], "execution_count": 2, "outputs": [ { "output_type": "execute_result", "data": { "text/html": [ "\n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v2.3.3
\n", "
Master
\n", "
local[*]
\n", "
AppName
\n", "
pyspark-shell
\n", "
\n", "
\n", " " ], "text/plain": [ "" ] }, "metadata": { "tags": [] }, "execution_count": 2 } ] }, { "metadata": { "id": "o7H1FTk4cDzF", "colab_type": "code", "colab": {} }, "cell_type": "code", "source": [ "" ], "execution_count": 0, "outputs": [] } ] } ˋˋˋ

使用 Colaboratory_ 開啟

開啟後會長這樣,先執行上面程式碼,每次開啟時都要執行一次。


兩段程式碼都跑過就可以開始了。

sc

接下來這個要上傳資料使用(就是這個跟在本地端做法不同)

# upload file from local machine
from google.colab import files
uploaded = files.upload()


inputRDD = sc.textFile("log.txt")

#tTransformations
errorsRDD = inputRDD.filter(lambda x: "Error" in x)
warningsRDD = inputRDD.filter(lambda x: "Warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)
In [10]:
#Action
inputRDD.count()
Out[10]:
5220
In [11]:
errorsRDD.count()
Out[11]:
195
In [12]:
warningsRDD.count()
Out[12]:
4496
In [13]:
badLinesRDD.count()
Out[13]:
4691
In [16]:
badLinesRDD.take(5)
Out[16]:
['2013-10-22 03:38:10 4588 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.',
 '2013-10-22 03:38:10 4588 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.',
 '2013-10-22 03:38:10 4588 InnoDB: Error: Fetch of persistent statistics requested for table "member"."memberdata" but the required system tables mysql.innodb_table_stats and mysql.innodb_index_stats are not present or have unexpected structure. Using transient stats instead.',
 '2013-10-22 03:38:20 26c0 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.',
 '2013-10-22 03:46:24 23bc InnoDB: Error: Table "mysql"."innodb_table_stats" not found.']
In [17]:
#用了兩個Action:  count()  take()
print ("Input had " +str( badLinesRDD.count()) + " concerning lines")
print ("Here are 10 examples:")
for line in badLinesRDD.take(10):
    print (line)
Input had 4691 concerning lines
Here are 10 examples:
2013-10-22 03:38:10 4588 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.
2013-10-22 03:38:10 4588 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.
2013-10-22 03:38:10 4588 InnoDB: Error: Fetch of persistent statistics requested for table "member"."memberdata" but the required system tables mysql.innodb_table_stats and mysql.innodb_index_stats are not present or have unexpected structure. Using transient stats instead.
2013-10-22 03:38:20 26c0 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.
2013-10-22 03:46:24 23bc InnoDB: Error: Table "mysql"."innodb_table_stats" not found.
2013-10-22 03:46:24 23bc InnoDB: Error: Table "mysql"."innodb_table_stats" not found.
2013-10-22 03:46:24 23bc InnoDB: Error: Fetch of persistent statistics requested for table "4c"."memberdata" but the required system tables mysql.innodb_table_stats and mysql.innodb_index_stats are not present or have unexpected structure. Using transient stats instead.
2013-10-22 03:46:34 26c0 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.
2013-10-23 18:35:07 564 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.
2013-10-23 18:35:07 564 InnoDB: Error: Fetch of persistent statistics requested for table "4c"."memberdata" but the required system tables mysql.innodb_table_stats and mysql.innodb_index_stats are not present or have unexpected structure. Using transient stats instead.

單個RDD的操作

In [20]:
##filter
VV = sc.parallelize([2, 18, 9, 22, 17, 24, 8, 12, 27])
DD = VV.filter(lambda x: x % 3 == 0) #Trams
DD.collect() #Action
Out[20]:
[18, 9, 24, 12, 27]
In [21]:
##map
nums = sc.parallelize(["a", 2, 3, 4])
squared = nums.map(lambda x: x*2)
squared.collect()
Out[21]:
['aa', 4, 6, 8]
In [22]:
lines = sc.parallelize(["hello world", "hi"])
words = lines.map(lambda line: line.split(" "))
words.collect() #map() 原本有兩個 結果就是兩個
Out[22]:
[['hello', 'world'], ['hi']]
In [23]:
lines = sc.parallelize(["hello world", "hi"])
words2 = lines.flatMap(lambda line: line.split(" "))
words2.collect() #flatMap() 前後不一定會一致
Out[23]:
['hello', 'world', 'hi']
In [24]:
rdd = sc.parallelize([2, 3, 4])
rss1 = rdd.flatMap(lambda x: range(1, x))
rss1.collect()
Out[24]:
[1, 1, 2, 1, 2, 3]
In [27]:
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) #原本兩個
x1 = x.flatMap(lambda x: x)
x2 = x1.flatMap(lambda x: x)
In [28]:
x1.collect() #變成4個
Out[28]:
['a', ['x', 'y', 'z'], 'b', ['p', 'r']]
In [29]:
x2.collect() #變成7個
Out[29]:
['a', 'x', 'y', 'z', 'b', 'p', 'r']
In [71]:
#sample

nums = sc.parallelize([1,2,3,4,5,6,7,8,9,10])  
new_nums = nums.sample(False,0.5)  #每個抽到的機率
new_nums.collect()  
Out[71]:
[3, 6, 7, 8]
In [63]:
for n in new_nums.take(5):  
     print (n) 
1
2
3
4
5


留言