1
0

updates to scripts

This commit is contained in:
Matthew Gaughan 2025-01-15 14:42:48 -06:00
parent 75729e27ad
commit 02043006e0
2 changed files with 65 additions and 39 deletions

View File

@ -2,8 +2,9 @@ import requests
import bz2 import bz2
import shutil import shutil
import os import os
import sys
FILE_LOC_PREFIX = "/data/users/mgaughan/mw-repo-lifecycles/wiki_activity_data/yearly_activity_files/" #FILE_LOC_PREFIX = "/data_ext/users/nws8519/mw-repo-lifecycles/wiki_activity_data/yearly_activity_files/"
def decompress(filepath): def decompress(filepath):
decompressed_filepath = filepath[:-4] decompressed_filepath = filepath[:-4]
@ -23,11 +24,13 @@ def decompress_directory(directory_name):
filepath = os.path.join(root, file) filepath = os.path.join(root, file)
print(filepath) print(filepath)
# Apply the decompress function # Apply the decompress function
#try: try:
decompress(filepath) decompress(filepath)
#except OSError: except OSError:
#OSErrors += 1 OSErrors += 1
#print(f"OSError @ {filepath}") print(f"OSError @ {filepath}")
os.remove(filepath)
print(f"Deleted {filepath}")
return OSErrors return OSErrors
@ -42,6 +45,6 @@ def cleanup(directory_name):
if __name__ == "__main__": if __name__ == "__main__":
#batch_parallel_for_single() #batch_parallel_for_single()
decompression_errors = decompress_directory(FILE_LOC_PREFIX) decompression_errors = decompress_directory(sys.argv[1])
print(f"We had {decompression_errors} OSErrors during decompression.") print(f"We had {decompression_errors} OSErrors during decompression.")
#cleanup(FILE_LOC_PREFIX) #cleanup(FILE_LOC_PREFIX)

View File

@ -149,7 +149,7 @@
"text": [ "text": [
"Setting default log level to \"WARN\".\n", "Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
"25/01/09 14:31:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" "25/01/10 10:42:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
] ]
} }
], ],
@ -327,43 +327,58 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 29, "execution_count": 30,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"[Stage 27:===> (2 + 33) / 35]\r"
]
},
{ {
"name": "stdout", "name": "stdout",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"+------------+-----------+--------------+\n", "+------------+----------+------------+\n",
"|event_entity|event_type |revision_count|\n", "|wiki_db |day |action_count|\n",
"+------------+-----------+--------------+\n", "+------------+----------+------------+\n",
"|revision |create |267553 |\n", "|kwwiki |2024-11-30|6 |\n",
"|user |create |26999 |\n", "|kowikiquote |2024-11-29|3 |\n",
"|page |create |25214 |\n", "|kwwiki |2024-11-29|17 |\n",
"|page |delete |9842 |\n", "|zuwiktionary|2024-11-29|1 |\n",
"|page |create-page|9365 |\n", "|kwwiki |2024-11-28|38 |\n",
"|user |rename |1738 |\n", "|kowikiquote |2024-11-28|2 |\n",
"|page |move |1657 |\n", "|kwwiki |2024-11-27|45 |\n",
"|user |alterblocks|87 |\n", "|kowikiquote |2024-11-26|2 |\n",
"|page |restore |20 |\n", "|kwwiki |2024-11-26|138 |\n",
"|user |altergroups|2 |\n", "|zuwiktionary|2024-11-26|1 |\n",
"+------------+-----------+--------------+\n", "+------------+----------+------------+\n",
"only showing top 10 rows\n",
"\n" "\n"
] ]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
} }
], ],
"source": [ "source": [
"mediawiki_history. \\\n", "activity_count_df = mediawiki_history.where(\"event_user_is_bot_by_historical is not null and event_user_is_bot_by is not null\")\n",
" where(\"event_user_is_bot_by_historical is not null and event_user_is_bot_by is not null\"). \\\n", "activity_count_df = activity_count_df.selectExpr(\"wiki_db\", \"SUBSTR(event_timestamp, 0, 10) as day\")\n",
" groupBy(\"wikidb\", \"event_entity\", \"event_type\"). \\\n", "activity_count_df = activity_count_df.groupBy(\"wiki_db\", \"day\", \"event_\" \"event_type\").agg(count(lit(1)).alias(\"activity_count\"))\n",
" agg(count(lit(1)).alias(\"revision_count\")). \\\n", "activity_count_df.\\\n",
" sort(desc(\"revision_count\")). \\\n", " sort(desc(\"day\")). \\\n",
" show(20, False)" " show(10, False)"
] ]
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 27, "execution_count": 17,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
@ -382,8 +397,9 @@
} }
], ],
"source": [ "source": [
"mediawiki_history. \\\n", "filtered_df = mediawiki_history. \\\n",
" where(\"event_entity = 'user' and event_type='create'\"). \\\n", " where(\"event_entity = 'user' and event_type='create'\"). \\\n",
" drop(\"event_user_blocks_historical\") .\\\n",
" selectExpr(\"wiki_db\", \"SUBSTR(event_timestamp, 0, 7) as month\"). \\\n", " selectExpr(\"wiki_db\", \"SUBSTR(event_timestamp, 0, 7) as month\"). \\\n",
" where(\"event_user_is_bot_by_historical is not null and event_user_is_bot_by is not null\"). \\\n", " where(\"event_user_is_bot_by_historical is not null and event_user_is_bot_by is not null\"). \\\n",
" groupBy(\"wiki_db\"). \\\n", " groupBy(\"wiki_db\"). \\\n",
@ -394,26 +410,33 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": 26, "execution_count": 25,
"metadata": {}, "metadata": {},
"outputs": [ "outputs": [
{ {
"name": "stdout", "ename": "Py4JJavaError",
"output_type": "stream", "evalue": "An error occurred while calling o228.save.\n: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: tsv. Please find packages at `https://spark.apache.org/third-party-projects.html`.\n\tat org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)\n\tat org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:873)\n\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:260)\n\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.ClassNotFoundException: tsv.DefaultSource\n\tat java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)\n\tat scala.util.Try$.apply(Try.scala:213)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)\n\tat scala.util.Failure.orElse(Try.scala:224)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)\n\t... 16 more\n",
"text": [ "output_type": "error",
"None\n" "traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mPy4JJavaError\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[25], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mfiltered_df\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mwrite\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mformat\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mtsv\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msave\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mtest.tsv\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/pyspark/sql/readwriter.py:1463\u001b[0m, in \u001b[0;36mDataFrameWriter.save\u001b[0;34m(self, path, format, mode, partitionBy, **options)\u001b[0m\n\u001b[1;32m 1461\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_jwrite\u001b[38;5;241m.\u001b[39msave()\n\u001b[1;32m 1462\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m-> 1463\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_jwrite\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43msave\u001b[49m\u001b[43m(\u001b[49m\u001b[43mpath\u001b[49m\u001b[43m)\u001b[49m\n",
"File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/py4j/java_gateway.py:1322\u001b[0m, in \u001b[0;36mJavaMember.__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m 1316\u001b[0m command \u001b[38;5;241m=\u001b[39m proto\u001b[38;5;241m.\u001b[39mCALL_COMMAND_NAME \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1317\u001b[0m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mcommand_header \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1318\u001b[0m args_command \u001b[38;5;241m+\u001b[39m\\\n\u001b[1;32m 1319\u001b[0m proto\u001b[38;5;241m.\u001b[39mEND_COMMAND_PART\n\u001b[1;32m 1321\u001b[0m answer \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mgateway_client\u001b[38;5;241m.\u001b[39msend_command(command)\n\u001b[0;32m-> 1322\u001b[0m return_value \u001b[38;5;241m=\u001b[39m \u001b[43mget_return_value\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 1323\u001b[0m \u001b[43m \u001b[49m\u001b[43manswer\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mgateway_client\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mtarget_id\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mname\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1325\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m temp_arg \u001b[38;5;129;01min\u001b[39;00m temp_args:\n\u001b[1;32m 1326\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28mhasattr\u001b[39m(temp_arg, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m_detach\u001b[39m\u001b[38;5;124m\"\u001b[39m):\n",
"File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py:179\u001b[0m, in \u001b[0;36mcapture_sql_exception.<locals>.deco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m 177\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mdeco\u001b[39m(\u001b[38;5;241m*\u001b[39ma: Any, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkw: Any) \u001b[38;5;241m-\u001b[39m\u001b[38;5;241m>\u001b[39m Any:\n\u001b[1;32m 178\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m:\n\u001b[0;32m--> 179\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mf\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43ma\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkw\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 180\u001b[0m \u001b[38;5;28;01mexcept\u001b[39;00m Py4JJavaError \u001b[38;5;28;01mas\u001b[39;00m e:\n\u001b[1;32m 181\u001b[0m converted \u001b[38;5;241m=\u001b[39m convert_exception(e\u001b[38;5;241m.\u001b[39mjava_exception)\n",
"File \u001b[0;32m/opt/conda-analytics/lib/python3.10/site-packages/py4j/protocol.py:326\u001b[0m, in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m 324\u001b[0m value \u001b[38;5;241m=\u001b[39m OUTPUT_CONVERTER[\u001b[38;5;28mtype\u001b[39m](answer[\u001b[38;5;241m2\u001b[39m:], gateway_client)\n\u001b[1;32m 325\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m answer[\u001b[38;5;241m1\u001b[39m] \u001b[38;5;241m==\u001b[39m REFERENCE_TYPE:\n\u001b[0;32m--> 326\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m Py4JJavaError(\n\u001b[1;32m 327\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mAn error occurred while calling \u001b[39m\u001b[38;5;132;01m{0}\u001b[39;00m\u001b[38;5;132;01m{1}\u001b[39;00m\u001b[38;5;132;01m{2}\u001b[39;00m\u001b[38;5;124m.\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39m\n\u001b[1;32m 328\u001b[0m \u001b[38;5;28mformat\u001b[39m(target_id, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m.\u001b[39m\u001b[38;5;124m\"\u001b[39m, name), value)\n\u001b[1;32m 329\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[1;32m 330\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m Py4JError(\n\u001b[1;32m 331\u001b[0m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mAn error occurred while calling \u001b[39m\u001b[38;5;132;01m{0}\u001b[39;00m\u001b[38;5;132;01m{1}\u001b[39;00m\u001b[38;5;132;01m{2}\u001b[39;00m\u001b[38;5;124m. Trace:\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;132;01m{3}\u001b[39;00m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124m\"\u001b[39m\u001b[38;5;241m.\u001b[39m\n\u001b[1;32m 332\u001b[0m \u001b[38;5;28mformat\u001b[39m(target_id, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m.\u001b[39m\u001b[38;5;124m\"\u001b[39m, name, value))\n",
"\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o228.save.\n: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: tsv. Please find packages at `https://spark.apache.org/third-party-projects.html`.\n\tat org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)\n\tat org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:873)\n\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:260)\n\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:243)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)\n\tat py4j.Gateway.invoke(Gateway.java:282)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\n\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: java.lang.ClassNotFoundException: tsv.DefaultSource\n\tat java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)\n\tat java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)\n\tat scala.util.Try$.apply(Try.scala:213)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)\n\tat scala.util.Failure.orElse(Try.scala:224)\n\tat org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)\n\t... 16 more\n"
] ]
} }
], ],
"source": [ "source": [
"#df.write.format(\"csv\").save(filepath)\n", "filtered_df.write.format(\"csv\").save(\"test.csv\")"
"print(bot_user_creation)"
] ]
} }
], ],
"metadata": { "metadata": {
"kernelspec": { "kernelspec": {
"display_name": "base", "display_name": "Python 3",
"language": "python", "language": "python",
"name": "python3" "name": "python3"
}, },