1
0

spark updates

This commit is contained in:
Matthew Gaughan 2025-01-09 15:40:47 -06:00
parent 092306d777
commit 75729e27ad
2 changed files with 100 additions and 22 deletions

View File

@ -15,6 +15,7 @@ def decompress(filepath):
def decompress_directory(directory_name): def decompress_directory(directory_name):
# Traverse the directory # Traverse the directory
OSErrors = 0
for root, dirs, files in os.walk(directory_name): for root, dirs, files in os.walk(directory_name):
for file in files: for file in files:
if file.endswith('.bz2'): if file.endswith('.bz2'):
@ -22,7 +23,13 @@ def decompress_directory(directory_name):
filepath = os.path.join(root, file) filepath = os.path.join(root, file)
print(filepath) print(filepath)
# Apply the decompress function # Apply the decompress function
#try:
decompress(filepath) decompress(filepath)
#except OSError:
#OSErrors += 1
#print(f"OSError @ {filepath}")
return OSErrors
def cleanup(directory_name): def cleanup(directory_name):
for root, dirs, files in os.walk(directory_name): for root, dirs, files in os.walk(directory_name):
@ -35,5 +42,6 @@ def cleanup(directory_name):
if __name__ == "__main__": if __name__ == "__main__":
#batch_parallel_for_single() #batch_parallel_for_single()
decompress_directory(FILE_LOC_PREFIX) decompression_errors = decompress_directory(FILE_LOC_PREFIX)
print(f"We had {decompression_errors} OSErrors during decompression.")
#cleanup(FILE_LOC_PREFIX) #cleanup(FILE_LOC_PREFIX)

View File

@ -29,7 +29,7 @@
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"mediawiki_history_path = \"/data/users/mgaughan/mw-repo-lifecycles/wiki_activity_data/single_activity_files\"" "mediawiki_history_path = \"/data/users/mgaughan/mw-repo-lifecycles/wiki_activity_data/test\""
] ]
}, },
{ {
@ -149,7 +149,7 @@
"text": [ "text": [
"Setting default log level to \"WARN\".\n", "Setting default log level to \"WARN\".\n",
"To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n", "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n",
"25/01/08 11:39:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n" "25/01/09 14:31:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable\n"
] ]
} }
], ],
@ -303,21 +303,13 @@
"name": "stdout", "name": "stdout",
"output_type": "stream", "output_type": "stream",
"text": [ "text": [
"+--------------+-------+--------------+\n", "+------------+-------+--------------+\n",
"|wiki_db |month |revision_count|\n", "|wiki_db |month |revision_count|\n",
"+--------------+-------+--------------+\n", "+------------+-------+--------------+\n",
"|euwiki |2019-12|356355 |\n", "|kwwiki |2019-12|1079 |\n",
"|cewiki |2019-12|229351 |\n", "|kowikiquote |2019-12|146 |\n",
"|elwiktionary |2019-12|227666 |\n", "|zuwiktionary|2019-12|135 |\n",
"|cywiki |2019-12|139174 |\n", "+------------+-------+--------------+\n",
"|tgwiki |2019-12|65694 |\n",
"|zh_min_nanwiki|2019-12|59755 |\n",
"|bnwiki |2019-12|55698 |\n",
"|elwiki |2019-12|49604 |\n",
"|dewiktionary |2019-12|47897 |\n",
"|urwiki |2019-12|45793 |\n",
"+--------------+-------+--------------+\n",
"only showing top 10 rows\n",
"\n" "\n"
] ]
} }
@ -335,15 +327,93 @@
}, },
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": 29,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [
"source": [] {
"name": "stdout",
"output_type": "stream",
"text": [
"+------------+-----------+--------------+\n",
"|event_entity|event_type |revision_count|\n",
"+------------+-----------+--------------+\n",
"|revision |create |267553 |\n",
"|user |create |26999 |\n",
"|page |create |25214 |\n",
"|page |delete |9842 |\n",
"|page |create-page|9365 |\n",
"|user |rename |1738 |\n",
"|page |move |1657 |\n",
"|user |alterblocks|87 |\n",
"|page |restore |20 |\n",
"|user |altergroups|2 |\n",
"+------------+-----------+--------------+\n",
"\n"
]
}
],
"source": [
"mediawiki_history. \\\n",
" where(\"event_user_is_bot_by_historical is not null and event_user_is_bot_by is not null\"). \\\n",
" groupBy(\"wikidb\", \"event_entity\", \"event_type\"). \\\n",
" agg(count(lit(1)).alias(\"revision_count\")). \\\n",
" sort(desc(\"revision_count\")). \\\n",
" show(20, False)"
]
},
{
"cell_type": "code",
"execution_count": 27,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"+------------+--------------+\n",
"|wiki_db |revision_count|\n",
"+------------+--------------+\n",
"|kwwiki |16625 |\n",
"|kowikiquote |6779 |\n",
"|zuwiktionary|3595 |\n",
"+------------+--------------+\n",
"\n"
]
}
],
"source": [
"mediawiki_history. \\\n",
" where(\"event_entity = 'user' and event_type='create'\"). \\\n",
" selectExpr(\"wiki_db\", \"SUBSTR(event_timestamp, 0, 7) as month\"). \\\n",
" where(\"event_user_is_bot_by_historical is not null and event_user_is_bot_by is not null\"). \\\n",
" groupBy(\"wiki_db\"). \\\n",
" agg(count(lit(1)).alias(\"revision_count\")). \\\n",
" sort(desc(\"revision_count\")). \\\n",
" show(10, False)"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"None\n"
]
}
],
"source": [
"#df.write.format(\"csv\").save(filepath)\n",
"print(bot_user_creation)"
]
} }
], ],
"metadata": { "metadata": {
"kernelspec": { "kernelspec": {
"display_name": "Python 3", "display_name": "base",
"language": "python", "language": "python",
"name": "python3" "name": "python3"
}, },