Spark-RDD 操作

將在雲端硬碟裡面來執行,先準備雲端,以下程式碼存成 .ipynb ,之後放入雲端硬碟

(SparkonColaboratoryTEMPLATE) ˋˋˋ { "nbformat": 4, "nbformatminor": 0, "metadata": { "colab": { "name": "Spark on Colaboratory TEMPLATE 20190413", "version": "0.3.2", "provenance": [], "collapsedsections": [] }, ˋˋˋ "kernelspec": { "name": "python3", "displayname": "Python 3" },

"accelerator": "GPU"

},

"cells": [ { "metadata": { "id": "kvD4HBMi0ohY", "colab_type": "text" },

  "cell_type": "markdown",
  "source": [
    "# First, run this cell and wait till it complete.\n",
    "## You need run this by each time when you need Spark.\n",
    "## around 1~2 minutes for installing Spark in google colab, when you see \"Ready to use Spark!\", you can go next step."

  ]
},
{

  "metadata": {
    "id": "fUhBhrGmyAvs",
    "colab_type": "code",
    "colab": {
      "base_uri": "https://localhost:8080/",
      "height": 36
    },
    "outputId": "1d6c9dbe-3075-4802-d690-ba760a777f77"
  },

  "cell_type": "code",
  "source": [

    "# Install Java, Spark, and Findspark\n",
    "# This installs Apache Spark 2.3.3, Java 8, and Findspark, a library that makes it easy for Python to find Spark.\n",
    "\n",
    "!apt-get install openjdk-8-jdk-headless -qq > /dev/null\n",
    "!wget -q http://apache.osuosl.org/spark/spark-2.3.3/spark-2.3.3-bin-hadoop2.7.tgz\n",
    "!tar xf spark-2.3.3-bin-hadoop2.7.tgz\n",
    "!pip install -q findspark\n",
    "\n",
    "\n",
    "#Set Environment Variables\n",
    "# Set the locations where Spark and Java are installed.\n",
    "\n",
    "import os\n",
    "os.environ[\"JAVA_HOME\"] = \"/usr/lib/jvm/java-8-openjdk-amd64\"\n",
    "os.environ[\"SPARK_HOME\"] = \"/content/spark-2.3.3-bin-hadoop2.7\"\n",
    "\n",
    "# Start a SparkSession\n",
    "# This will start a local Spark session.\n",
    "\n",
    "import findspark\n",
    "findspark.init()\n",
    "\n",
    "import pyspark\n",
    "sc = pyspark.SparkContext()\n",
    "\n",
    "print(\"Ready to use Spark!\")"
  ],

  "execution_count": 1,
  "outputs": [
    {
      "output_type": "stream",
      "text": [
        "Ready to use Spark!\n"
      ],
      "name": "stdout"
    }
  ]
},
{

  "metadata": {
    "id": "T3ULPx4Y1LiR",
    "colab_type": "text"
  },

  "cell_type": "markdown",
  "source": [
    "# Use Spark!\n",
    "That's all there is to it - you're ready to use Spark!"
  ]
},

{
  "metadata": {
    "id": "XJp8ZI-VzYEz",
    "colab_type": "code",
    "outputId": "d5083084-09c2-4e3f-da2a-abbc621f8aa7",
    "colab": {
      "base_uri": "https://localhost:8080/",
      "height": 206
    }
  },
  "cell_type": "code",
  "source": [
    "#df = spark.createDataFrame([{\"hello\": \"world\"} for x in range(1000)])\n",
    "#df.show(3)\n",
    "sc"
  ],

  "execution_count": 2,
  "outputs": [
    {
      "output_type": "execute_result",
      "data": {
        "text/html": [
          "\n",
          "        <div>\n",
          "            <p><b>SparkContext</b></p>\n",
          "\n",
          "            <p><a href="\&quot;http://7a14a535e00b:4040\&quot;">Spark UI</a></p>\n",
          "\n",
          "            <dl>\n",
          "              <dt>Version</dt>\n",
          "                <dd>v2.3.3</dd>\n",
          "              <dt>Master</dt>\n",
          "                <dd>local[*]</dd>\n",
          "              <dt>AppName</dt>\n",
          "                <dd>pyspark-shell</dd>\n",
          "            </dl>\n",
          "        </div>\n",
          "        "
        ],
        "text/plain": [
          "<sparkcontext master="local[*]" appname="pyspark-shell">"
        ]
      },

      "metadata": {
        "tags": []

      },
      "execution_count": 2
    }
  ]
},
{
  "metadata": {
    "id": "o7H1FTk4cDzF",
    "colab_type": "code",
    "colab": {}
  },

  "cell_type": "code",
  "source": [
    ""
  ],
  "execution_count": 0,
  "outputs": []
}

] }

ˋˋˋ



使用 Colaboratory_ 開啟

開啟後會長這樣,先執行上面程式碼,每次開啟時都要執行一次。



兩段程式碼都跑過就可以開始了。

sc

接下來這個要上傳資料使用(就是這個跟在本地端做法不同)

# upload file from local machine
from google.colab import files
uploaded = files.upload()


inputRDD = sc.textFile("log.txt")

#tTransformations
errorsRDD = inputRDD.filter(lambda x: "Error" in x)
warningsRDD = inputRDD.filter(lambda x: "Warning" in x)
badLinesRDD = errorsRDD.union(warningsRDD)
In [10]:
#Action
inputRDD.count()
Out[10]:
5220
In [11]:
errorsRDD.count()
Out[11]:
195
In [12]:
warningsRDD.count()
Out[12]:
4496
In [13]:
badLinesRDD.count()
Out[13]:
4691
In [16]:
badLinesRDD.take(5)
Out[16]:
['2013-10-22 03:38:10 4588 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.',
 '2013-10-22 03:38:10 4588 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.',
 '2013-10-22 03:38:10 4588 InnoDB: Error: Fetch of persistent statistics requested for table "member"."memberdata" but the required system tables mysql.innodb_table_stats and mysql.innodb_index_stats are not present or have unexpected structure. Using transient stats instead.',
 '2013-10-22 03:38:20 26c0 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.',
 '2013-10-22 03:46:24 23bc InnoDB: Error: Table "mysql"."innodb_table_stats" not found.']
In [17]:
#用了兩個Action:  count()  take()
print ("Input had " +str( badLinesRDD.count()) + " concerning lines")
print ("Here are 10 examples:")
for line in badLinesRDD.take(10):
    print (line)
Input had 4691 concerning lines
Here are 10 examples:
2013-10-22 03:38:10 4588 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.
2013-10-22 03:38:10 4588 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.
2013-10-22 03:38:10 4588 InnoDB: Error: Fetch of persistent statistics requested for table "member"."memberdata" but the required system tables mysql.innodb_table_stats and mysql.innodb_index_stats are not present or have unexpected structure. Using transient stats instead.
2013-10-22 03:38:20 26c0 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.
2013-10-22 03:46:24 23bc InnoDB: Error: Table "mysql"."innodb_table_stats" not found.
2013-10-22 03:46:24 23bc InnoDB: Error: Table "mysql"."innodb_table_stats" not found.
2013-10-22 03:46:24 23bc InnoDB: Error: Fetch of persistent statistics requested for table "4c"."memberdata" but the required system tables mysql.innodb_table_stats and mysql.innodb_index_stats are not present or have unexpected structure. Using transient stats instead.
2013-10-22 03:46:34 26c0 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.
2013-10-23 18:35:07 564 InnoDB: Error: Table "mysql"."innodb_table_stats" not found.
2013-10-23 18:35:07 564 InnoDB: Error: Fetch of persistent statistics requested for table "4c"."memberdata" but the required system tables mysql.innodb_table_stats and mysql.innodb_index_stats are not present or have unexpected structure. Using transient stats instead.

單個RDD的操作

In [20]:
##filter
VV = sc.parallelize([2, 18, 9, 22, 17, 24, 8, 12, 27])
DD = VV.filter(lambda x: x % 3 == 0) #Trams
DD.collect() #Action
Out[20]:
[18, 9, 24, 12, 27]
In [21]:
##map
nums = sc.parallelize(["a", 2, 3, 4])
squared = nums.map(lambda x: x*2)
squared.collect()
Out[21]:
['aa', 4, 6, 8]
In [22]:
lines = sc.parallelize(["hello world", "hi"])
words = lines.map(lambda line: line.split(" "))
words.collect() #map() 原本有兩個 結果就是兩個
Out[22]:
[['hello', 'world'], ['hi']]
In [23]:
lines = sc.parallelize(["hello world", "hi"])
words2 = lines.flatMap(lambda line: line.split(" "))
words2.collect() #flatMap() 前後不一定會一致
Out[23]:
['hello', 'world', 'hi']
In [24]:
rdd = sc.parallelize([2, 3, 4])
rss1 = rdd.flatMap(lambda x: range(1, x))
rss1.collect()
Out[24]:
[1, 1, 2, 1, 2, 3]
In [27]:
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])]) #原本兩個
x1 = x.flatMap(lambda x: x)
x2 = x1.flatMap(lambda x: x)
In [28]:
x1.collect() #變成4個
Out[28]:
['a', ['x', 'y', 'z'], 'b', ['p', 'r']]
In [29]:
x2.collect() #變成7個
Out[29]:
['a', 'x', 'y', 'z', 'b', 'p', 'r']
In [71]:
#sample

nums = sc.parallelize([1,2,3,4,5,6,7,8,9,10])  
new_nums = nums.sample(False,0.5)  #每個抽到的機率
new_nums.collect()  
Out[71]:
[3, 6, 7, 8]
In [63]:
for n in new_nums.take(5):  
     print (n) 
1
2
3
4
5


留言