{ "cells": [ { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "import re\n", "import os\n", "from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, IntegerType, ArrayType\n", "from pyspark.sql.functions import count, lit, desc\n", "from pyspark.sql import SparkSession" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "os.environ['JAVA_HOME'] = \"/usr/lib/jvm/java-11-openjdk-amd64\"\n", "os.environ['JRE_HOME'] = \"/usr/lib/jvm/java-11-openjdk-amd64/jre\"" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "mediawiki_history_path = \"/data_ext/users/nws8519/mw-repo-lifecycles/wiki_activity_data/yearly_activity_files/\"" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# Note: string unescaping and array conversion is done later\n", "mediawiki_history_schema = StructType([\n", "\n", " StructField(\"wiki_db\", StringType(), nullable = False),\n", " StructField(\"event_entity\", StringType(), nullable = False),\n", " StructField(\"event_type\", StringType(), nullable = False),\n", " StructField(\"event_timestamp\", StringType(), nullable = True),\n", " StructField(\"event_comment_escaped\", StringType(), nullable = True),\n", "\n", " StructField(\"event_user_id\", LongType(), nullable = True),\n", " StructField(\"event_user_text_historical_escaped\", StringType(), nullable = True),\n", " StructField(\"event_user_text_escaped\", StringType(), nullable = True),\n", " StructField(\"event_user_blocks_historical_string\", StringType(), nullable = True),\n", " StructField(\"event_user_blocks_string\", StringType(), nullable = True),\n", " StructField(\"event_user_groups_historical_string\", StringType(), nullable = True),\n", " StructField(\"event_user_groups_string\", StringType(), nullable = True),\n", " StructField(\"event_user_is_bot_by_historical_string\", StringType(), nullable = True),\n", " StructField(\"event_user_is_bot_by_string\", StringType(), nullable = True),\n", " StructField(\"event_user_is_created_by_self\", BooleanType(), nullable = True),\n", " StructField(\"event_user_is_created_by_system\", BooleanType(), nullable = True),\n", " StructField(\"event_user_is_created_by_peer\", BooleanType(), nullable = True),\n", " StructField(\"event_user_is_anonymous\", BooleanType(), nullable = True),\n", " StructField(\"event_user_registration_timestamp\", StringType(), nullable = True),\n", " StructField(\"event_user_creation_timestamp\", StringType(), nullable = True),\n", " StructField(\"event_user_first_edit_timestamp\", StringType(), nullable = True),\n", " StructField(\"event_user_revision_count\", LongType(), nullable = True),\n", " StructField(\"event_user_seconds_since_previous_revision\", LongType(), nullable = True),\n", "\n", " StructField(\"page_id\", LongType(), nullable = True),\n", " StructField(\"page_title_historical_escaped\", StringType(), nullable = True),\n", " StructField(\"page_title_escaped\", StringType(), nullable = True),\n", " StructField(\"page_namespace_historical\", IntegerType(), nullable = True),\n", " StructField(\"page_namespace_is_content_historical\", BooleanType(), nullable = True),\n", " StructField(\"page_namespace\", IntegerType(), nullable = True),\n", " StructField(\"page_namespace_is_content\", BooleanType(), nullable = True),\n", " StructField(\"page_is_redirect\", BooleanType(), nullable = True),\n", " StructField(\"page_is_deleted\", BooleanType(), nullable = True),\n", " StructField(\"page_creation_timestamp\", StringType(), nullable = True),\n", " StructField(\"page_first_edit_timestamp\", StringType(), nullable = True),\n", " StructField(\"page_revision_count\", LongType(), nullable = True),\n", " StructField(\"page_seconds_since_previous_revision\", LongType(), nullable = True),\n", "\n", " StructField(\"user_id\", LongType(), nullable = True),\n", " StructField(\"user_text_historical_escaped\", StringType(), nullable = True),\n", " StructField(\"user_text_escaped\", StringType(), nullable = True),\n", " StructField(\"user_blocks_historical_string\", StringType(), nullable = True),\n", " StructField(\"user_blocks_string\", StringType(), nullable = True),\n", " StructField(\"user_groups_historical_string\", StringType(), nullable = True),\n", " StructField(\"user_groups_string\", StringType(), nullable = True),\n", " StructField(\"user_is_bot_by_historical_string\", StringType(), nullable = True),\n", " StructField(\"user_is_bot_by_string\", StringType(), nullable = True),\n", " StructField(\"user_is_created_by_self\", BooleanType(), nullable = True),\n", " StructField(\"user_is_created_by_system\", BooleanType(), nullable = True),\n", " StructField(\"user_is_created_by_peer\", BooleanType(), nullable = True),\n", " StructField(\"user_is_anonymous\", BooleanType(), nullable = True),\n", " StructField(\"user_registration_timestamp\", StringType(), nullable = True),\n", " StructField(\"user_creation_timestamp\", StringType(), nullable = True),\n", " StructField(\"user_first_edit_timestamp\", StringType(), nullable = True),\n", "\n", " StructField(\"revision_id\", LongType(), nullable = True),\n", " StructField(\"revision_parent_id\", LongType(), nullable = True),\n", " StructField(\"revision_minor_edit\", BooleanType(), nullable = True),\n", " StructField(\"revision_deleted_parts_string\", StringType(), nullable = True),\n", " StructField(\"revision_deleted_parts_are_suppressed\", BooleanType(), nullable = True),\n", " StructField(\"revision_text_bytes\", LongType(), nullable = True),\n", " StructField(\"revision_text_bytes_diff\", LongType(), nullable = True),\n", " StructField(\"revision_text_sha1\", StringType(), nullable = True),\n", " StructField(\"revision_content_model\", StringType(), nullable = True),\n", " StructField(\"revision_content_format\", StringType(), nullable = True),\n", " StructField(\"revision_is_deleted_by_page_deletion\", BooleanType(), nullable = True),\n", " StructField(\"revision_deleted_by_page_deletion_timestamp\", StringType(), nullable = True),\n", " StructField(\"revision_is_identity_reverted\", BooleanType(), nullable = True),\n", " StructField(\"revision_first_identity_reverting_revision_id\", LongType(), nullable = True),\n", " StructField(\"revision_seconds_to_identity_revert\", LongType(), nullable = True),\n", " StructField(\"revision_is_identity_revert\", BooleanType(), nullable = True),\n", " StructField(\"revision_is_from_before_page_creation\", BooleanType(), nullable = True),\n", " StructField(\"revision_tags_string\", StringType(), nullable = True)\n", "])" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'/usr/lib/jvm/java-11-openjdk-amd64'" ] }, "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], "source": [ "os.environ['JAVA_HOME']" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Setting default log level to \"WARN\".\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "25/01/16 12:17:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" ] } ], "source": [ "spark = SparkSession.builder.appName('activityData').config(\"spark.driver.extraJavaOptions\", \"-Djava.home=/usr/lib/jvm/java-11-openjdk-amd64\").getOrCreate()\n", "\n", "# Note: It's important to set .option(\"quote\", \"\") to prevent spark to automaticallu use double-quotes to quote text\n", "mediawiki_history_raw = spark.read.option(\"delimiter\", \"\\t\").option(\"quote\", \"\").schema(mediawiki_history_schema).csv(mediawiki_history_path)" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "\n", "# Unescaping and array-splitting UDFs\n", "def unescape(str):\n", " if (str is None):\n", " return None\n", " else:\n", " return str.replace(\"\\\\n\", \"\\n\").replace(\"\\\\r\", \"\\r\").replace(\"\\\\t\", \"\\t\")\n", "# The comma splitter applies a negative lookahead for \\ to prevent splitting escaped commas\n", "def toArray(str):\n", " if (str is None):\n", " return []\n", " else:\n", " return [s.strip().replace(\"\\\\,\", \",\") for s in re.split(\"(?" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "spark.udf.register(\"unescape\", unescape, StringType())\n", "spark.udf.register(\"to_array\", toArray, ArrayType(StringType(), False))" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [], "source": [ "mediawiki_history = mediawiki_history_raw.selectExpr(\n", " \n", " \"wiki_db\",\n", " \"event_entity\",\n", " \"event_type\",\n", " \"event_timestamp\",\n", " \"unescape(event_comment_escaped) AS event_comment\",\n", " \n", " \"event_user_id\",\n", " \"unescape(event_user_text_historical_escaped) AS event_user_text_historical\",\n", " \"unescape(event_user_text_escaped) AS event_user_text\",\n", " \"to_array(event_user_blocks_historical_string) AS event_user_blocks_historical\",\n", " \"to_array(event_user_blocks_string) AS event_user_blocks\",\n", " \"to_array(event_user_groups_historical_string) AS event_user_groups_historical\",\n", " \"to_array(event_user_groups_string) AS event_user_groups\",\n", " \"to_array(event_user_is_bot_by_historical_string) AS event_user_is_bot_by_historical\",\n", " \"to_array(event_user_is_bot_by_string) AS event_user_is_bot_by\",\n", " \"event_user_is_created_by_self\",\n", " \"event_user_is_created_by_system\",\n", " \"event_user_is_created_by_peer\",\n", " \"event_user_is_anonymous\",\n", " \"event_user_registration_timestamp\",\n", " \"event_user_creation_timestamp\",\n", " \"event_user_first_edit_timestamp\",\n", " \"event_user_revision_count\",\n", " \"event_user_seconds_since_previous_revision\",\n", " \n", " \"page_id\",\n", " \"unescape(page_title_historical_escaped) AS page_title_historical\",\n", " \"unescape(page_title_escaped) AS page_title\",\n", " \"page_namespace_historical\",\n", " \"page_namespace_is_content_historical\",\n", " \"page_namespace\",\n", " \"page_namespace_is_content\",\n", " \"page_is_redirect\",\n", " \"page_is_deleted\",\n", " \"page_creation_timestamp\",\n", " \"page_first_edit_timestamp\",\n", " \"page_revision_count\",\n", " \"page_seconds_since_previous_revision\",\n", " \n", " \"user_id\",\n", " \"unescape(user_text_historical_escaped) AS user_text_historical\",\n", " \"unescape(user_text_escaped) AS user_text\",\n", " \"to_array(user_blocks_historical_string) AS user_blocks_historical\",\n", " \"to_array(user_blocks_string) AS user_blocks\",\n", " \"to_array(user_groups_historical_string) AS user_groups_historical\",\n", " \"to_array(user_groups_string) AS user_groups\",\n", " \"to_array(user_is_bot_by_historical_string) AS user_is_bot_by_historical\",\n", " \"to_array(user_is_bot_by_string) AS user_is_bot_by\",\n", " \"user_is_created_by_self\",\n", " \"user_is_created_by_system\",\n", " \"user_is_created_by_peer\",\n", " \"user_is_anonymous\",\n", " \"user_registration_timestamp\",\n", " \"user_creation_timestamp\",\n", " \"user_first_edit_timestamp\",\n", " \n", " \"revision_id\",\n", " \"revision_parent_id\",\n", " \"revision_minor_edit\",\n", " \"to_array(revision_deleted_parts_string) AS revision_deleted_parts\",\n", " \"revision_deleted_parts_are_suppressed\",\n", " \"revision_text_bytes\",\n", " \"revision_text_bytes_diff\",\n", " \"revision_text_sha1\",\n", " \"revision_content_model\",\n", " \"revision_content_format\",\n", " \"revision_is_deleted_by_page_deletion\",\n", " \"revision_deleted_by_page_deletion_timestamp\",\n", " \"revision_is_identity_reverted\",\n", " \"revision_first_identity_reverting_revision_id\",\n", " \"revision_seconds_to_identity_revert\",\n", " \"revision_is_identity_revert\",\n", " \"revision_is_from_before_page_creation\",\n", " \"to_array(revision_tags_string) AS revision_tags\"\n", ")\n" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'\\nmediawiki_history. where(\"event_entity = \\'revision\\' and event_type = \\'create\\'\"). selectExpr(\"wiki_db\", \"SUBSTR(event_timestamp, 0, 7) as month\"). where(\"month = \\'2019-12\\'\"). groupBy(\"wiki_db\", \"month\"). agg(count(lit(1)).alias(\"revision_count\")). sort(desc(\"revision_count\")). show(10, False)\\n'" ] }, "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ "'''\n", "mediawiki_history. \\\n", " where(\"event_entity = 'revision' and event_type = 'create'\"). \\\n", " selectExpr(\"wiki_db\", \"SUBSTR(event_timestamp, 0, 7) as month\"). \\\n", " where(\"month = '2019-12'\"). \\\n", " groupBy(\"wiki_db\", \"month\"). \\\n", " agg(count(lit(1)).alias(\"revision_count\")). \\\n", " sort(desc(\"revision_count\")). \\\n", " show(10, False)\n", "'''" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "ERROR:root:KeyboardInterrupt while sending command. (64 + 56) / 8283]\n", "Traceback (most recent call last):\n", " File \"/opt/conda-analytics/lib/python3.10/site-packages/py4j/java_gateway.py\", line 1038, in send_command\n", " response = connection.send_command(command)\n", " File \"/opt/conda-analytics/lib/python3.10/site-packages/py4j/clientserver.py\", line 511, in send_command\n", " answer = smart_decode(self.stream.readline()[:-1])\n", " File \"/opt/conda-analytics/lib/python3.10/socket.py\", line 717, in readinto\n", " return self._sock.recv_into(b)\n", "KeyboardInterrupt\n" ] }, { "ename": "KeyboardInterrupt", "evalue": "", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)", "Cell \u001b[0;32mIn[15], line 6\u001b[0m\n\u001b[1;32m 2\u001b[0m activity_count_df \u001b[38;5;241m=\u001b[39m activity_count_df\u001b[38;5;241m.\u001b[39mselectExpr(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mwiki_db\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mSUBSTR(event_timestamp, 0, 10) as day\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mevent_entity\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mevent_type\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[1;32m 3\u001b[0m activity_count_df \u001b[38;5;241m=\u001b[39m activity_count_df\u001b[38;5;241m.\u001b[39mgroupBy(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mwiki_db\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mday\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mevent_entity\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mevent_type\u001b[39m\u001b[38;5;124m\"\u001b[39m)\u001b[38;5;241m.\u001b[39magg(count(lit(\u001b[38;5;241m1\u001b[39m))\u001b[38;5;241m.\u001b[39malias(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mactivity_count\u001b[39m\u001b[38;5;124m\"\u001b[39m))\n\u001b[1;32m 4\u001b[0m \u001b[43mactivity_count_df\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m\\\u001b[49m\n\u001b[1;32m 5\u001b[0m \u001b[43m \u001b[49m\u001b[43msort\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdesc\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mday\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m \u001b[49m\u001b[43m\\\u001b[49m\n\u001b[0;32m----> 6\u001b[0m \u001b[43m \u001b[49m\u001b[43mshow\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m10\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/pyspark/sql/dataframe.py:947\u001b[0m, in \u001b[0;36mDataFrame.show\u001b[0;34m(self, n, truncate, vertical)\u001b[0m\n\u001b[1;32m 887\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mshow\u001b[39m(\u001b[38;5;28mself\u001b[39m, n: \u001b[38;5;28mint\u001b[39m \u001b[38;5;241m=\u001b[39m \u001b[38;5;241m20\u001b[39m, truncate: Union[\u001b[38;5;28mbool\u001b[39m, \u001b[38;5;28mint\u001b[39m] \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mTrue\u001b[39;00m, vertical: \u001b[38;5;28mbool\u001b[39m \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[1;32m 888\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Prints the first ``n`` rows to the console.\u001b[39;00m\n\u001b[1;32m 889\u001b[0m \n\u001b[1;32m 890\u001b[0m \u001b[38;5;124;03m .. versionadded:: 1.3.0\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 945\u001b[0m \u001b[38;5;124;03m name | Bob\u001b[39;00m\n\u001b[1;32m 946\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 947\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_show_string\u001b[49m\u001b[43m(\u001b[49m\u001b[43mn\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtruncate\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mvertical\u001b[49m\u001b[43m)\u001b[49m)\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/pyspark/sql/dataframe.py:978\u001b[0m, in \u001b[0;36mDataFrame._show_string\u001b[0;34m(self, n, truncate, vertical)\u001b[0m\n\u001b[1;32m 969\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m:\n\u001b[1;32m 970\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m PySparkTypeError(\n\u001b[1;32m 971\u001b[0m error_class\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mNOT_BOOL\u001b[39m\u001b[38;5;124m\"\u001b[39m,\n\u001b[1;32m 972\u001b[0m message_parameters\u001b[38;5;241m=\u001b[39m{\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 975\u001b[0m },\n\u001b[1;32m 976\u001b[0m )\n\u001b[0;32m--> 978\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_jdf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mshowString\u001b[49m\u001b[43m(\u001b[49m\u001b[43mn\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mint_truncate\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mvertical\u001b[49m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/py4j/java_gateway.py:1321\u001b[0m, in \u001b[0;36mJavaMember.__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1314\u001b[0m args_command, temp_args \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_build_args(\u001b[38;5;241m*\u001b[39margs)\n\u001b[1;32m 1316\u001b[0m command \u001b[38;5;241m=\u001b[39m proto\u001b[38;5;241m.\u001b[39mCALL_COMMAND_NAME \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1317\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mcommand_header \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1318\u001b[0m args_command \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1319\u001b[0m proto\u001b[38;5;241m.\u001b[39mEND_COMMAND_PART\n\u001b[0;32m-> 1321\u001b[0m answer \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgateway_client\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msend_command\u001b[49m\u001b[43m(\u001b[49m\u001b[43mcommand\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1322\u001b[0m return_value \u001b[38;5;241m=\u001b[39m get_return_value(\n\u001b[1;32m 1323\u001b[0m answer, \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mgateway_client, \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mtarget_id, \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mname)\n\u001b[1;32m 1325\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m temp_arg \u001b[38;5;129;01min\u001b[39;00m temp_args:\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/py4j/java_gateway.py:1038\u001b[0m, in \u001b[0;36mGatewayClient.send_command\u001b[0;34m(self, command, retry, binary)\u001b[0m\n\u001b[1;32m 1036\u001b[0m connection \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_get_connection()\n\u001b[1;32m 1037\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m-> 1038\u001b[0m response \u001b[38;5;241m=\u001b[39m \u001b[43mconnection\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msend_command\u001b[49m\u001b[43m(\u001b[49m\u001b[43mcommand\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1039\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m binary:\n\u001b[1;32m 1040\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m response, \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_create_connection_guard(connection)\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/py4j/clientserver.py:511\u001b[0m, in \u001b[0;36mClientServerConnection.send_command\u001b[0;34m(self, command)\u001b[0m\n\u001b[1;32m 509\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[1;32m 510\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[0;32m--> 511\u001b[0m answer \u001b[38;5;241m=\u001b[39m smart_decode(\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mstream\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mreadline\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m[:\u001b[38;5;241m-\u001b[39m\u001b[38;5;241m1\u001b[39m])\n\u001b[1;32m 512\u001b[0m logger\u001b[38;5;241m.\u001b[39mdebug(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mAnswer received: \u001b[39m\u001b[38;5;132;01m{0}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39mformat(answer))\n\u001b[1;32m 513\u001b[0m \u001b[38;5;66;03m# Happens when a the other end is dead. There might be an empty\u001b[39;00m\n\u001b[1;32m 514\u001b[0m \u001b[38;5;66;03m# answer before the socket raises an error.\u001b[39;00m\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/socket.py:717\u001b[0m, in \u001b[0;36mSocketIO.readinto\u001b[0;34m(self, b)\u001b[0m\n\u001b[1;32m 715\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[1;32m 716\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 717\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_sock\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrecv_into\u001b[49m\u001b[43m(\u001b[49m\u001b[43mb\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 718\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m timeout:\n\u001b[1;32m 719\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_timeout_occurred \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mTrue\u001b[39;00m\n", "\u001b[0;31mKeyboardInterrupt\u001b[0m: " ] }, { "name": "stderr", "output_type": "stream", "text": [ "25/01/16 12:21:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.\n", "25/01/16 12:21:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.\n", "25/01/16 12:21:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.\n", "25/01/16 12:21:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.\n", "[Stage 0:==> (397 + 56) / 8283]\r" ] } ], "source": [ "activity_count_df = mediawiki_history.where(\"event_user_is_bot_by_historical is not null and event_user_is_bot_by is not null\")\n", "activity_count_df = activity_count_df.selectExpr(\"wiki_db\", \"SUBSTR(event_timestamp, 0, 10) as day\", \"event_entity\", \"event_type\")\n", "activity_count_df = activity_count_df.groupBy(\"wiki_db\", \"day\", \"event_entity\", \"event_type\").agg(count(lit(1)).alias(\"activity_count\"))\n", "activity_count_df.\\\n", " sort(desc(\"day\")). \\\n", " show(10, False)" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------------+--------------+\n", "|wiki_db |revision_count|\n", "+------------+--------------+\n", "|kwwiki |16625 |\n", "|kowikiquote |6779 |\n", "|zuwiktionary|3595 |\n", "+------------+--------------+\n", "\n" ] } ], "source": [ "'''\n", "filtered_df = mediawiki_history. \\\n", " where(\"event_entity = 'user' and event_type='create'\"). \\\n", " drop(\"event_user_blocks_historical\") .\\\n", " selectExpr(\"wiki_db\", \"SUBSTR(event_timestamp, 0, 7) as month\"). \\\n", " where(\"event_user_is_bot_by_historical is not null and event_user_is_bot_by is not null\"). \\\n", " groupBy(\"wiki_db\"). \\\n", " agg(count(lit(1)).alias(\"revision_count\")). \\\n", " sort(desc(\"revision_count\")). \\\n", " show(10, False)\n", "'''" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "ename": "Py4JJavaError", "evalue": "An error occurred while calling o228.save.\n: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: tsv. Please find packages at `https://spark.apache.org/third-party-projects.html`.\n\tat org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)\n\tat org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:873)\n\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:260)\n\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.ClassNotFoundException: tsv.DefaultSource\n\tat java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)\n\tat scala.util.Try$.apply(Try.scala:213)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)\n\tat scala.util.Failure.orElse(Try.scala:224)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)\n\t... 16 more\n", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)", "Cell \u001b[0;32mIn[25], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mfiltered_df\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mwrite\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mformat\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mtsv\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msave\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mtest.tsv\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/pyspark/sql/readwriter.py:1463\u001b[0m, in \u001b[0;36mDataFrameWriter.save\u001b[0;34m(self, path, format, mode, partitionBy, **options)\u001b[0m\n\u001b[1;32m 1461\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_jwrite\u001b[38;5;241m.\u001b[39msave()\n\u001b[1;32m 1462\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m-> 1463\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_jwrite\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msave\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpath\u001b[49m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/py4j/java_gateway.py:1322\u001b[0m, in \u001b[0;36mJavaMember.__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1316\u001b[0m command \u001b[38;5;241m=\u001b[39m proto\u001b[38;5;241m.\u001b[39mCALL_COMMAND_NAME \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1317\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mcommand_header \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1318\u001b[0m args_command \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1319\u001b[0m proto\u001b[38;5;241m.\u001b[39mEND_COMMAND_PART\n\u001b[1;32m 1321\u001b[0m answer \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mgateway_client\u001b[38;5;241m.\u001b[39msend_command(command)\n\u001b[0;32m-> 1322\u001b[0m return_value \u001b[38;5;241m=\u001b[39m \u001b[43mget_return_value\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 1323\u001b[0m \u001b[43m \u001b[49m\u001b[43manswer\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgateway_client\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mtarget_id\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mname\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1325\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m temp_arg \u001b[38;5;129;01min\u001b[39;00m temp_args:\n\u001b[1;32m 1326\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mhasattr\u001b[39m(temp_arg, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m_detach\u001b[39m\u001b[38;5;124m\"\u001b[39m):\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py:179\u001b[0m, in \u001b[0;36mcapture_sql_exception..deco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 177\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mdeco\u001b[39m(\u001b[38;5;241m*\u001b[39ma: Any, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkw: Any) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m Any:\n\u001b[1;32m 178\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 179\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mf\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43ma\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkw\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 180\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m Py4JJavaError \u001b[38;5;28;01mas\u001b[39;00m e:\n\u001b[1;32m 181\u001b[0m converted \u001b[38;5;241m=\u001b[39m convert_exception(e\u001b[38;5;241m.\u001b[39mjava_exception)\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/py4j/protocol.py:326\u001b[0m, in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 324\u001b[0m value \u001b[38;5;241m=\u001b[39m OUTPUT_CONVERTER[\u001b[38;5;28mtype\u001b[39m](answer[\u001b[38;5;241m2\u001b[39m:], gateway_client)\n\u001b[1;32m 325\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m answer[\u001b[38;5;241m1\u001b[39m] \u001b[38;5;241m==\u001b[39m REFERENCE_TYPE:\n\u001b[0;32m--> 326\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m Py4JJavaError(\n\u001b[1;32m 327\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mAn error occurred while calling \u001b[39m\u001b[38;5;132;01m{0}\u001b[39;00m\u001b[38;5;132;01m{1}\u001b[39;00m\u001b[38;5;132;01m{2}\u001b[39;00m\u001b[38;5;124m.\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39m\n\u001b[1;32m 328\u001b[0m \u001b[38;5;28mformat\u001b[39m(target_id, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m.\u001b[39m\u001b[38;5;124m\"\u001b[39m, name), value)\n\u001b[1;32m 329\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 330\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m Py4JError(\n\u001b[1;32m 331\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mAn error occurred while calling \u001b[39m\u001b[38;5;132;01m{0}\u001b[39;00m\u001b[38;5;132;01m{1}\u001b[39;00m\u001b[38;5;132;01m{2}\u001b[39;00m\u001b[38;5;124m. Trace:\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;132;01m{3}\u001b[39;00m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39m\n\u001b[1;32m 332\u001b[0m \u001b[38;5;28mformat\u001b[39m(target_id, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m.\u001b[39m\u001b[38;5;124m\"\u001b[39m, name, value))\n", "\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o228.save.\n: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: tsv. Please find packages at `https://spark.apache.org/third-party-projects.html`.\n\tat org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)\n\tat org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:873)\n\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:260)\n\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.ClassNotFoundException: tsv.DefaultSource\n\tat java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)\n\tat scala.util.Try$.apply(Try.scala:213)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)\n\tat scala.util.Failure.orElse(Try.scala:224)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)\n\t... 16 more\n" ] } ], "source": [ "activity_count_df.write.format(\"csv\").save(\"011625_dab_yearly.csv\")" ] } ], "metadata": { "kernelspec": { "display_name": "base", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.15" } }, "nbformat": 4, "nbformat_minor": 2 }