1
0

trying to run it in background

This commit is contained in:
Matthew Gaughan 2025-01-16 12:29:04 -06:00
parent 02043006e0
commit 64ffe9f80d
3 changed files with 260 additions and 1831 deletions

View File

@ -2,7 +2,7 @@
"cells": [ "cells": [
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 1, "execution_count": 3,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
@ -15,7 +15,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 2, "execution_count": 4,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
@ -25,16 +25,16 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 3, "execution_count": 5,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"mediawiki_history_path = \"/data/users/mgaughan/mw-repo-lifecycles/wiki_activity_data/test\"" "mediawiki_history_path = \"/data_ext/users/nws8519/mw-repo-lifecycles/wiki_activity_data/yearly_activity_files/\""
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 4, "execution_count": 6,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
@ -120,7 +120,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 5, "execution_count": 7,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -129,7 +129,7 @@
"'/usr/lib/jvm/java-11-openjdk-amd64'" "'/usr/lib/jvm/java-11-openjdk-amd64'"
] ]
}, },
"execution_count": 5, "execution_count": 7,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@ -140,7 +140,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 6, "execution_count": 8,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -149,7 +149,7 @@
"text": [ "text": [
"Setting default log level to \"WARN\".\n", "Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
"25/01/10 10:42:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" "25/01/16 12:17:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
] ]
} }
], ],
@ -162,7 +162,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 7, "execution_count": 9,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
@ -183,7 +183,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 8, "execution_count": 10,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -192,7 +192,7 @@
"<function __main__.toArray(str)>" "<function __main__.toArray(str)>"
] ]
}, },
"execution_count": 8, "execution_count": 10,
"metadata": {}, "metadata": {},
"output_type": "execute_result" "output_type": "execute_result"
} }
@ -204,7 +204,7 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 9, "execution_count": 11,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
@ -289,32 +289,22 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 10, "execution_count": 12,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
"name": "stderr", "data": {
"output_type": "stream", "text/plain": [
"text": [ "'\\nmediawiki_history. where(\"event_entity = \\'revision\\' and event_type = \\'create\\'\"). selectExpr(\"wiki_db\", \"SUBSTR(event_timestamp, 0, 7) as month\"). where(\"month = \\'2019-12\\'\"). groupBy(\"wiki_db\", \"month\"). agg(count(lit(1)).alias(\"revision_count\")). sort(desc(\"revision_count\")). show(10, False)\\n'"
" \r" ]
] },
}, "execution_count": 12,
{ "metadata": {},
"name": "stdout", "output_type": "execute_result"
"output_type": "stream",
"text": [
"+------------+-------+--------------+\n",
"|wiki_db |month |revision_count|\n",
"+------------+-------+--------------+\n",
"|kwwiki |2019-12|1079 |\n",
"|kowikiquote |2019-12|146 |\n",
"|zuwiktionary|2019-12|135 |\n",
"+------------+-------+--------------+\n",
"\n"
]
} }
], ],
"source": [ "source": [
"'''\n",
"mediawiki_history. \\\n", "mediawiki_history. \\\n",
" where(\"event_entity = 'revision' and event_type = 'create'\"). \\\n", " where(\"event_entity = 'revision' and event_type = 'create'\"). \\\n",
" selectExpr(\"wiki_db\", \"SUBSTR(event_timestamp, 0, 7) as month\"). \\\n", " selectExpr(\"wiki_db\", \"SUBSTR(event_timestamp, 0, 7) as month\"). \\\n",
@ -322,55 +312,63 @@
" groupBy(\"wiki_db\", \"month\"). \\\n", " groupBy(\"wiki_db\", \"month\"). \\\n",
" agg(count(lit(1)).alias(\"revision_count\")). \\\n", " agg(count(lit(1)).alias(\"revision_count\")). \\\n",
" sort(desc(\"revision_count\")). \\\n", " sort(desc(\"revision_count\")). \\\n",
" show(10, False)" " show(10, False)\n",
"'''"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 30, "execution_count": 15,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
"name": "stderr", "name": "stderr",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"[Stage 27:===> (2 + 33) / 35]\r" "ERROR:root:KeyboardInterrupt while sending command. (64 + 56) / 8283]\n",
"Traceback (most recent call last):\n",
" File \"/opt/conda-analytics/lib/python3.10/site-packages/py4j/java_gateway.py\", line 1038, in send_command\n",
" response = connection.send_command(command)\n",
" File \"/opt/conda-analytics/lib/python3.10/site-packages/py4j/clientserver.py\", line 511, in send_command\n",
" answer = smart_decode(self.stream.readline()[:-1])\n",
" File \"/opt/conda-analytics/lib/python3.10/socket.py\", line 717, in readinto\n",
" return self._sock.recv_into(b)\n",
"KeyboardInterrupt\n"
] ]
}, },
{ {
"name": "stdout", "ename": "KeyboardInterrupt",
"output_type": "stream", "evalue": "",
"text": [ "output_type": "error",
"+------------+----------+------------+\n", "traceback": [
"|wiki_db |day |action_count|\n", "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"+------------+----------+------------+\n", "\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)",
"|kwwiki |2024-11-30|6 |\n", "Cell \u001b[0;32mIn[15], line 6\u001b[0m\n\u001b[1;32m 2\u001b[0m activity_count_df \u001b[38;5;241m=\u001b[39m activity_count_df\u001b[38;5;241m.\u001b[39mselectExpr(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mwiki_db\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mSUBSTR(event_timestamp, 0, 10) as day\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mevent_entity\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mevent_type\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[1;32m 3\u001b[0m activity_count_df \u001b[38;5;241m=\u001b[39m activity_count_df\u001b[38;5;241m.\u001b[39mgroupBy(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mwiki_db\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mday\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mevent_entity\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mevent_type\u001b[39m\u001b[38;5;124m\"\u001b[39m)\u001b[38;5;241m.\u001b[39magg(count(lit(\u001b[38;5;241m1\u001b[39m))\u001b[38;5;241m.\u001b[39malias(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mactivity_count\u001b[39m\u001b[38;5;124m\"\u001b[39m))\n\u001b[1;32m 4\u001b[0m \u001b[43mactivity_count_df\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m\\\u001b[49m\n\u001b[1;32m 5\u001b[0m \u001b[43m \u001b[49m\u001b[43msort\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdesc\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mday\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m \u001b[49m\u001b[43m\\\u001b[49m\n\u001b[0;32m----> 6\u001b[0m \u001b[43m \u001b[49m\u001b[43mshow\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m10\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m)\u001b[49m\n",
"|kowikiquote |2024-11-29|3 |\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/pyspark/sql/dataframe.py:947\u001b[0m, in \u001b[0;36mDataFrame.show\u001b[0;34m(self, n, truncate, vertical)\u001b[0m\n\u001b[1;32m 887\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mshow\u001b[39m(\u001b[38;5;28mself\u001b[39m, n: \u001b[38;5;28mint\u001b[39m \u001b[38;5;241m=\u001b[39m \u001b[38;5;241m20\u001b[39m, truncate: Union[\u001b[38;5;28mbool\u001b[39m, \u001b[38;5;28mint\u001b[39m] \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mTrue\u001b[39;00m, vertical: \u001b[38;5;28mbool\u001b[39m \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mFalse\u001b[39;00m) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[1;32m 888\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Prints the first ``n`` rows to the console.\u001b[39;00m\n\u001b[1;32m 889\u001b[0m \n\u001b[1;32m 890\u001b[0m \u001b[38;5;124;03m .. versionadded:: 1.3.0\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 945\u001b[0m \u001b[38;5;124;03m name | Bob\u001b[39;00m\n\u001b[1;32m 946\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 947\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_show_string\u001b[49m\u001b[43m(\u001b[49m\u001b[43mn\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtruncate\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mvertical\u001b[49m\u001b[43m)\u001b[49m)\n",
"|kwwiki |2024-11-29|17 |\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/pyspark/sql/dataframe.py:978\u001b[0m, in \u001b[0;36mDataFrame._show_string\u001b[0;34m(self, n, truncate, vertical)\u001b[0m\n\u001b[1;32m 969\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m:\n\u001b[1;32m 970\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m PySparkTypeError(\n\u001b[1;32m 971\u001b[0m error_class\u001b[38;5;241m=\u001b[39m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mNOT_BOOL\u001b[39m\u001b[38;5;124m\"\u001b[39m,\n\u001b[1;32m 972\u001b[0m message_parameters\u001b[38;5;241m=\u001b[39m{\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 975\u001b[0m },\n\u001b[1;32m 976\u001b[0m )\n\u001b[0;32m--> 978\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_jdf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mshowString\u001b[49m\u001b[43m(\u001b[49m\u001b[43mn\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mint_truncate\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mvertical\u001b[49m\u001b[43m)\u001b[49m\n",
"|zuwiktionary|2024-11-29|1 |\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/py4j/java_gateway.py:1321\u001b[0m, in \u001b[0;36mJavaMember.__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1314\u001b[0m args_command, temp_args \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_build_args(\u001b[38;5;241m*\u001b[39margs)\n\u001b[1;32m 1316\u001b[0m command \u001b[38;5;241m=\u001b[39m proto\u001b[38;5;241m.\u001b[39mCALL_COMMAND_NAME \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1317\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mcommand_header \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1318\u001b[0m args_command \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1319\u001b[0m proto\u001b[38;5;241m.\u001b[39mEND_COMMAND_PART\n\u001b[0;32m-> 1321\u001b[0m answer \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgateway_client\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msend_command\u001b[49m\u001b[43m(\u001b[49m\u001b[43mcommand\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1322\u001b[0m return_value \u001b[38;5;241m=\u001b[39m get_return_value(\n\u001b[1;32m 1323\u001b[0m answer, \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mgateway_client, \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mtarget_id, \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mname)\n\u001b[1;32m 1325\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m temp_arg \u001b[38;5;129;01min\u001b[39;00m temp_args:\n",
"|kwwiki |2024-11-28|38 |\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/py4j/java_gateway.py:1038\u001b[0m, in \u001b[0;36mGatewayClient.send_command\u001b[0;34m(self, command, retry, binary)\u001b[0m\n\u001b[1;32m 1036\u001b[0m connection \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_get_connection()\n\u001b[1;32m 1037\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m-> 1038\u001b[0m response \u001b[38;5;241m=\u001b[39m \u001b[43mconnection\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msend_command\u001b[49m\u001b[43m(\u001b[49m\u001b[43mcommand\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1039\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m binary:\n\u001b[1;32m 1040\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m response, \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_create_connection_guard(connection)\n",
"|kowikiquote |2024-11-28|2 |\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/py4j/clientserver.py:511\u001b[0m, in \u001b[0;36mClientServerConnection.send_command\u001b[0;34m(self, command)\u001b[0m\n\u001b[1;32m 509\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[1;32m 510\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[0;32m--> 511\u001b[0m answer \u001b[38;5;241m=\u001b[39m smart_decode(\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mstream\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mreadline\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m[:\u001b[38;5;241m-\u001b[39m\u001b[38;5;241m1\u001b[39m])\n\u001b[1;32m 512\u001b[0m logger\u001b[38;5;241m.\u001b[39mdebug(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mAnswer received: \u001b[39m\u001b[38;5;132;01m{0}\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39mformat(answer))\n\u001b[1;32m 513\u001b[0m \u001b[38;5;66;03m# Happens when a the other end is dead. There might be an empty\u001b[39;00m\n\u001b[1;32m 514\u001b[0m \u001b[38;5;66;03m# answer before the socket raises an error.\u001b[39;00m\n",
"|kwwiki |2024-11-27|45 |\n", "File \u001b[0;32m/opt/conda-analytics/lib/python3.10/socket.py:717\u001b[0m, in \u001b[0;36mSocketIO.readinto\u001b[0;34m(self, b)\u001b[0m\n\u001b[1;32m 715\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28;01mTrue\u001b[39;00m:\n\u001b[1;32m 716\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 717\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_sock\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mrecv_into\u001b[49m\u001b[43m(\u001b[49m\u001b[43mb\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 718\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m timeout:\n\u001b[1;32m 719\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_timeout_occurred \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mTrue\u001b[39;00m\n",
"|kowikiquote |2024-11-26|2 |\n", "\u001b[0;31mKeyboardInterrupt\u001b[0m: "
"|kwwiki |2024-11-26|138 |\n",
"|zuwiktionary|2024-11-26|1 |\n",
"+------------+----------+------------+\n",
"only showing top 10 rows\n",
"\n"
] ]
}, },
{ {
"name": "stderr", "name": "stderr",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
" \r" "25/01/16 12:21:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.\n",
"25/01/16 12:21:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.\n",
"25/01/16 12:21:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.\n",
"25/01/16 12:21:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.\n",
"[Stage 0:==> (397 + 56) / 8283]\r"
] ]
} }
], ],
"source": [ "source": [
"activity_count_df = mediawiki_history.where(\"event_user_is_bot_by_historical is not null and event_user_is_bot_by is not null\")\n", "activity_count_df = mediawiki_history.where(\"event_user_is_bot_by_historical is not null and event_user_is_bot_by is not null\")\n",
"activity_count_df = activity_count_df.selectExpr(\"wiki_db\", \"SUBSTR(event_timestamp, 0, 10) as day\")\n", "activity_count_df = activity_count_df.selectExpr(\"wiki_db\", \"SUBSTR(event_timestamp, 0, 10) as day\", \"event_entity\", \"event_type\")\n",
"activity_count_df = activity_count_df.groupBy(\"wiki_db\", \"day\", \"event_\" \"event_type\").agg(count(lit(1)).alias(\"activity_count\"))\n", "activity_count_df = activity_count_df.groupBy(\"wiki_db\", \"day\", \"event_entity\", \"event_type\").agg(count(lit(1)).alias(\"activity_count\"))\n",
"activity_count_df.\\\n", "activity_count_df.\\\n",
" sort(desc(\"day\")). \\\n", " sort(desc(\"day\")). \\\n",
" show(10, False)" " show(10, False)"
@ -397,6 +395,7 @@
} }
], ],
"source": [ "source": [
"'''\n",
"filtered_df = mediawiki_history. \\\n", "filtered_df = mediawiki_history. \\\n",
" where(\"event_entity = 'user' and event_type='create'\"). \\\n", " where(\"event_entity = 'user' and event_type='create'\"). \\\n",
" drop(\"event_user_blocks_historical\") .\\\n", " drop(\"event_user_blocks_historical\") .\\\n",
@ -405,7 +404,8 @@
" groupBy(\"wiki_db\"). \\\n", " groupBy(\"wiki_db\"). \\\n",
" agg(count(lit(1)).alias(\"revision_count\")). \\\n", " agg(count(lit(1)).alias(\"revision_count\")). \\\n",
" sort(desc(\"revision_count\")). \\\n", " sort(desc(\"revision_count\")). \\\n",
" show(10, False)" " show(10, False)\n",
"'''"
] ]
}, },
{ {
@ -430,7 +430,7 @@
} }
], ],
"source": [ "source": [
"filtered_df.write.format(\"csv\").save(\"test.csv\")" "activity_count_df.write.format(\"csv\").save(\"011625_dab_yearly.csv\")"
] ]
} }
], ],

View File

@ -0,0 +1,200 @@
import re
import os
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, IntegerType, ArrayType
from pyspark.sql.functions import count, lit, desc
from pyspark.sql import SparkSession
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ['JRE_HOME'] = "/usr/lib/jvm/java-11-openjdk-amd64/jre"
# Unescaping and array-splitting UDFs
def unescape(str):
if (str is None):
return None
else:
return str.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t")
# The comma splitter applies a negative lookahead for \ to prevent splitting escaped commas
def toArray(str):
if (str is None):
return []
else:
return [s.strip().replace("\\,", ",") for s in re.split("(?<!\\\\),", unescape(str))]
if __name__ == "__main__":
mediawiki_history_path = "/data_ext/users/nws8519/mw-repo-lifecycles/wiki_activity_data/yearly_activity_files/"
# Note: string unescaping and array conversion is done later
mediawiki_history_schema = StructType([
StructField("wiki_db", StringType(), nullable = False),
StructField("event_entity", StringType(), nullable = False),
StructField("event_type", StringType(), nullable = False),
StructField("event_timestamp", StringType(), nullable = True),
StructField("event_comment_escaped", StringType(), nullable = True),
StructField("event_user_id", LongType(), nullable = True),
StructField("event_user_text_historical_escaped", StringType(), nullable = True),
StructField("event_user_text_escaped", StringType(), nullable = True),
StructField("event_user_blocks_historical_string", StringType(), nullable = True),
StructField("event_user_blocks_string", StringType(), nullable = True),
StructField("event_user_groups_historical_string", StringType(), nullable = True),
StructField("event_user_groups_string", StringType(), nullable = True),
StructField("event_user_is_bot_by_historical_string", StringType(), nullable = True),
StructField("event_user_is_bot_by_string", StringType(), nullable = True),
StructField("event_user_is_created_by_self", BooleanType(), nullable = True),
StructField("event_user_is_created_by_system", BooleanType(), nullable = True),
StructField("event_user_is_created_by_peer", BooleanType(), nullable = True),
StructField("event_user_is_anonymous", BooleanType(), nullable = True),
StructField("event_user_registration_timestamp", StringType(), nullable = True),
StructField("event_user_creation_timestamp", StringType(), nullable = True),
StructField("event_user_first_edit_timestamp", StringType(), nullable = True),
StructField("event_user_revision_count", LongType(), nullable = True),
StructField("event_user_seconds_since_previous_revision", LongType(), nullable = True),
StructField("page_id", LongType(), nullable = True),
StructField("page_title_historical_escaped", StringType(), nullable = True),
StructField("page_title_escaped", StringType(), nullable = True),
StructField("page_namespace_historical", IntegerType(), nullable = True),
StructField("page_namespace_is_content_historical", BooleanType(), nullable = True),
StructField("page_namespace", IntegerType(), nullable = True),
StructField("page_namespace_is_content", BooleanType(), nullable = True),
StructField("page_is_redirect", BooleanType(), nullable = True),
StructField("page_is_deleted", BooleanType(), nullable = True),
StructField("page_creation_timestamp", StringType(), nullable = True),
StructField("page_first_edit_timestamp", StringType(), nullable = True),
StructField("page_revision_count", LongType(), nullable = True),
StructField("page_seconds_since_previous_revision", LongType(), nullable = True),
StructField("user_id", LongType(), nullable = True),
StructField("user_text_historical_escaped", StringType(), nullable = True),
StructField("user_text_escaped", StringType(), nullable = True),
StructField("user_blocks_historical_string", StringType(), nullable = True),
StructField("user_blocks_string", StringType(), nullable = True),
StructField("user_groups_historical_string", StringType(), nullable = True),
StructField("user_groups_string", StringType(), nullable = True),
StructField("user_is_bot_by_historical_string", StringType(), nullable = True),
StructField("user_is_bot_by_string", StringType(), nullable = True),
StructField("user_is_created_by_self", BooleanType(), nullable = True),
StructField("user_is_created_by_system", BooleanType(), nullable = True),
StructField("user_is_created_by_peer", BooleanType(), nullable = True),
StructField("user_is_anonymous", BooleanType(), nullable = True),
StructField("user_registration_timestamp", StringType(), nullable = True),
StructField("user_creation_timestamp", StringType(), nullable = True),
StructField("user_first_edit_timestamp", StringType(), nullable = True),
StructField("revision_id", LongType(), nullable = True),
StructField("revision_parent_id", LongType(), nullable = True),
StructField("revision_minor_edit", BooleanType(), nullable = True),
StructField("revision_deleted_parts_string", StringType(), nullable = True),
StructField("revision_deleted_parts_are_suppressed", BooleanType(), nullable = True),
StructField("revision_text_bytes", LongType(), nullable = True),
StructField("revision_text_bytes_diff", LongType(), nullable = True),
StructField("revision_text_sha1", StringType(), nullable = True),
StructField("revision_content_model", StringType(), nullable = True),
StructField("revision_content_format", StringType(), nullable = True),
StructField("revision_is_deleted_by_page_deletion", BooleanType(), nullable = True),
StructField("revision_deleted_by_page_deletion_timestamp", StringType(), nullable = True),
StructField("revision_is_identity_reverted", BooleanType(), nullable = True),
StructField("revision_first_identity_reverting_revision_id", LongType(), nullable = True),
StructField("revision_seconds_to_identity_revert", LongType(), nullable = True),
StructField("revision_is_identity_revert", BooleanType(), nullable = True),
StructField("revision_is_from_before_page_creation", BooleanType(), nullable = True),
StructField("revision_tags_string", StringType(), nullable = True)
])
spark = SparkSession.builder.appName('activityData').config("spark.driver.extraJavaOptions", "-Djava.home=/usr/lib/jvm/java-11-openjdk-amd64").getOrCreate()
# Note: It's important to set .option("quote", "") to prevent spark to automaticallu use double-quotes to quote text
mediawiki_history_raw = spark.read.option("delimiter", "\t").option("quote", "").schema(mediawiki_history_schema).csv(mediawiki_history_path)
spark.udf.register("unescape", unescape, StringType())
spark.udf.register("to_array", toArray, ArrayType(StringType(), False))
mediawiki_history = mediawiki_history_raw.selectExpr(
"wiki_db",
"event_entity",
"event_type",
"event_timestamp",
"unescape(event_comment_escaped) AS event_comment",
"event_user_id",
"unescape(event_user_text_historical_escaped) AS event_user_text_historical",
"unescape(event_user_text_escaped) AS event_user_text",
"to_array(event_user_blocks_historical_string) AS event_user_blocks_historical",
"to_array(event_user_blocks_string) AS event_user_blocks",
"to_array(event_user_groups_historical_string) AS event_user_groups_historical",
"to_array(event_user_groups_string) AS event_user_groups",
"to_array(event_user_is_bot_by_historical_string) AS event_user_is_bot_by_historical",
"to_array(event_user_is_bot_by_string) AS event_user_is_bot_by",
"event_user_is_created_by_self",
"event_user_is_created_by_system",
"event_user_is_created_by_peer",
"event_user_is_anonymous",
"event_user_registration_timestamp",
"event_user_creation_timestamp",
"event_user_first_edit_timestamp",
"event_user_revision_count",
"event_user_seconds_since_previous_revision",
"page_id",
"unescape(page_title_historical_escaped) AS page_title_historical",
"unescape(page_title_escaped) AS page_title",
"page_namespace_historical",
"page_namespace_is_content_historical",
"page_namespace",
"page_namespace_is_content",
"page_is_redirect",
"page_is_deleted",
"page_creation_timestamp",
"page_first_edit_timestamp",
"page_revision_count",
"page_seconds_since_previous_revision",
"user_id",
"unescape(user_text_historical_escaped) AS user_text_historical",
"unescape(user_text_escaped) AS user_text",
"to_array(user_blocks_historical_string) AS user_blocks_historical",
"to_array(user_blocks_string) AS user_blocks",
"to_array(user_groups_historical_string) AS user_groups_historical",
"to_array(user_groups_string) AS user_groups",
"to_array(user_is_bot_by_historical_string) AS user_is_bot_by_historical",
"to_array(user_is_bot_by_string) AS user_is_bot_by",
"user_is_created_by_self",
"user_is_created_by_system",
"user_is_created_by_peer",
"user_is_anonymous",
"user_registration_timestamp",
"user_creation_timestamp",
"user_first_edit_timestamp",
"revision_id",
"revision_parent_id",
"revision_minor_edit",
"to_array(revision_deleted_parts_string) AS revision_deleted_parts",
"revision_deleted_parts_are_suppressed",
"revision_text_bytes",
"revision_text_bytes_diff",
"revision_text_sha1",
"revision_content_model",
"revision_content_format",
"revision_is_deleted_by_page_deletion",
"revision_deleted_by_page_deletion_timestamp",
"revision_is_identity_reverted",
"revision_first_identity_reverting_revision_id",
"revision_seconds_to_identity_revert",
"revision_is_identity_revert",
"revision_is_from_before_page_creation",
"to_array(revision_tags_string) AS revision_tags"
)
activity_count_df = mediawiki_history.where("event_user_is_bot_by_historical is not null and event_user_is_bot_by is not null")
activity_count_df = activity_count_df.selectExpr("wiki_db", "SUBSTR(event_timestamp, 0, 10) as day", "event_entity", "event_type")
activity_count_df = activity_count_df.groupBy("wiki_db", "day", "event_entity", "event_type").agg(count(lit(1)).alias("activity_count"))
activity_count_df.\
sort(desc("day")). \
show(10, False)
activity_count_df.write.format("csv").save("011625_dab_yearly.csv")

File diff suppressed because it is too large Load Diff