markdown
將在雲端硬碟裡面來執行,先準備雲端,以下程式碼存成 .ipynb ,之後放入雲端硬碟
(Spark_on_Colaboratory_TEMPLATE)
ˋˋˋ
{
"nbformat": 4,
"nbformat_minor": 0,
"metadata": {
"colab": {
"name": "Spark on Colaboratory TEMPLATE 20190413",
"version": "0.3.2",
"provenance": [],
"collapsed_sections": []
},
ˋˋˋ
"kernelspec": {
"name": "python3",
"display_name": "Python 3"
},
"accelerator": "GPU"
},
"cells": [
{
"metadata": {
"id": "kvD4HBMi0ohY",
"colab_type": "text"
},
"cell_type": "markdown",
"source": [
"# First, run this cell and wait till it complete.\n",
"## You need run this by each time when you need Spark.\n",
"## around 1~2 minutes for installing Spark in google colab, when you see \"Ready to use Spark!\", you can go next step."
]
},
{
"metadata": {
"id": "fUhBhrGmyAvs",
"colab_type": "code",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 36
},
"outputId": "1d6c9dbe-3075-4802-d690-ba760a777f77"
},
"cell_type": "code",
"source": [
"# Install Java, Spark, and Findspark\n",
"# This installs Apache Spark 2.3.3, Java 8, and Findspark, a library that makes it easy for Python to find Spark.\n",
"\n",
"!apt-get install openjdk-8-jdk-headless -qq > /dev/null\n",
"!wget -q http://apache.osuosl.org/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz\n",
"!tar xf spark-2.3.3-bin-hadoop2.7.tgz\n",
"!pip install -q findspark\n",
"\n",
"\n",
"#Set Environment Variables\n",
"# Set the locations where Spark and Java are installed.\n",
"\n",
"import os\n",
"os.environ[\"JAVA_HOME\"] = \"/usr/lib/jvm/java-8-openjdk-amd64\"\n",
"os.environ[\"SPARK_HOME\"] = \"/content/spark-2.3.3-bin-hadoop2.7\"\n",
"\n",
"# Start a SparkSession\n",
"# This will start a local Spark session.\n",
"\n",
"import findspark\n",
"findspark.init()\n",
"\n",
"import pyspark\n",
"sc = pyspark.SparkContext()\n",
"\n",
"print(\"Ready to use Spark!\")"
],
"execution_count": 1,
"outputs": [
{
"output_type": "stream",
"text": [
"Ready to use Spark!\n"
],
"name": "stdout"
}
]
},
{
"metadata": {
"id": "T3ULPx4Y1LiR",
"colab_type": "text"
},
"cell_type": "markdown",
"source": [
"# Use Spark!\n",
"That's all there is to it - you're ready to use Spark!"
]
},
{
"metadata": {
"id": "XJp8ZI-VzYEz",
"colab_type": "code",
"outputId": "d5083084-09c2-4e3f-da2a-abbc621f8aa7",
"colab": {
"base_uri": "https://localhost:8080/",
"height": 206
}
},
"cell_type": "code",
"source": [
"#df = spark.createDataFrame([{\"hello\": \"world\"} for x in range(1000)])\n",
"#df.show(3)\n",
"sc"
],
"execution_count": 2,
"outputs": [
{
"output_type": "execute_result",
"data": {
"text/html": [
"\n",
" "
]
},
"metadata": {
"tags": []
},
"execution_count": 2
}
]
},
{
"metadata": {
"id": "o7H1FTk4cDzF",
"colab_type": "code",
"colab": {}
},
"cell_type": "code",
"source": [
""
],
"execution_count": 0,
"outputs": []
}
]
}
ˋˋˋ
\n",
"
\n",
" "
],
"text/plain": [
"SparkContext
\n", "\n", " \n", "\n", "- \n",
"
- Version \n", "
v2.3.3
\n",
" - Master \n", "
local[*]
\n",
" - AppName \n", "
pyspark-shell
\n",
"
使用 Colaboratory_ 開啟
開啟後會長這樣,先執行上面程式碼,每次開啟時都要執行一次。
兩段程式碼都跑過就可以開始了。
sc
接下來這個要上傳資料使用(就是這個跟在本地端做法不同)
# upload file from local machine
from google.colab import files
uploaded = files.upload()
inputRDD = sc.textFile("log.txt")
#tTransformations
errorsRDD = inputRDD.filter(lambda x: "Error" in x)
warningsRDD = inputRDD.filter(lambda x: "Warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)
In [10]:
#Action
inputRDD.count()
Out[10]:
In [11]:
errorsRDD.count()
Out[11]:
In [12]:
warningsRDD.count()
Out[12]:
In [13]:
badLinesRDD.count()
Out[13]:
In [16]:
badLinesRDD.take(5)
Out[16]:
In [17]:
#用了兩個Action: count() take()
print ("Input had " +str( badLinesRDD.count()) + " concerning lines")
print ("Here are 10 examples:")
for line in badLinesRDD.take(10):
print (line)
In [20]:
##filter
VV = sc.parallelize([2, 18, 9, 22, 17, 24, 8, 12, 27])
DD = VV.filter(lambda x: x % 3 == 0) #Trams
DD.collect() #Action
Out[20]:
In [21]:
##map
nums = sc.parallelize(["a", 2, 3, 4])
squared = nums.map(lambda x: x*2)
squared.collect()
Out[21]:
In [22]:
lines = sc.parallelize(["hello world", "hi"])
words = lines.map(lambda line: line.split(" "))
words.collect() #map() 原本有兩個 結果就是兩個
Out[22]:
In [23]:
lines = sc.parallelize(["hello world", "hi"])
words2 = lines.flatMap(lambda line: line.split(" "))
words2.collect() #flatMap() 前後不一定會一致
Out[23]:
In [24]:
rdd = sc.parallelize([2, 3, 4])
rss1 = rdd.flatMap(lambda x: range(1, x))
rss1.collect()
Out[24]:
In [27]:
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) #原本兩個
x1 = x.flatMap(lambda x: x)
x2 = x1.flatMap(lambda x: x)
In [28]:
x1.collect() #變成4個
Out[28]:
In [29]:
x2.collect() #變成7個
Out[29]:
In [71]:
#sample
nums = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
new_nums = nums.sample(False,0.5) #每個抽到的機率
new_nums.collect()
Out[71]:
In [63]:
for n in new_nums.take(5):
print (n)
留言
張貼留言