From 8c934d93c5c5bf45dfacc8638aea6ddf17d54175 Mon Sep 17 00:00:00 2001 From: Matthew Gaughan Date: Tue, 7 Jan 2025 13:08:28 -0600 Subject: [PATCH] working on bringing in activity data --- src/helper_scripts/decompression_script.py | 39 + src/helper_scripts/dump_collector.py | 19 +- src/lib/spark-warehouse/bot_isolation.ipynb | 307 ++++++ .../pandas_bot_isolation.ipynb | 997 ++++++++++++++++++ 4 files changed, 1353 insertions(+), 9 deletions(-) create mode 100644 src/helper_scripts/decompression_script.py create mode 100644 src/lib/spark-warehouse/bot_isolation.ipynb create mode 100644 src/lib/spark-warehouse/pandas_bot_isolation.ipynb diff --git a/src/helper_scripts/decompression_script.py b/src/helper_scripts/decompression_script.py new file mode 100644 index 0000000..24d30ed --- /dev/null +++ b/src/helper_scripts/decompression_script.py @@ -0,0 +1,39 @@ +import requests +import bz2 +import shutil +import os + +FILE_LOC_PREFIX = "/data/users/mgaughan/mw-repo-lifecycles/wiki_activity_data/single_activity_files" + +def decompress(filepath): + decompressed_filepath = filepath[:-4] + with bz2.BZ2File(filepath) as fr, open(decompressed_filepath,"wb") as fw: + shutil.copyfileobj(fr,fw) + print(f"Decompressed {decompressed_filepath}") + os.remove(filepath) + print(f"Deleted {filepath}") + +def decompress_directory(directory_name): + # Traverse the directory + for root, dirs, files in os.walk(directory_name): + for file in files: + if file.endswith('.bz2'): + # Full path to the file + filepath = os.path.join(root, file) + print(filepath) + # Apply the decompress function + decompress(filepath) + +def cleanup(directory_name): + for root, dirs, files in os.walk(directory_name): + for file in files: + if file.endswith('.bz2'): + filepath = os.path.join(root, file) + os.remove(filepath) + print(f"Deleted {filepath}") + + +if __name__ == "__main__": + #batch_parallel_for_single() + decompress_directory(FILE_LOC_PREFIX) + #cleanup(FILE_LOC_PREFIX) \ No newline at end of file diff --git a/src/helper_scripts/dump_collector.py b/src/helper_scripts/dump_collector.py index 4534be5..85f0a87 100644 --- a/src/helper_scripts/dump_collector.py +++ b/src/helper_scripts/dump_collector.py @@ -192,11 +192,12 @@ ALL_PROJECTS = [ DUMP = "2024-11" test_url = f"https://dumps.wikimedia.org/other/mediawiki_history/{DUMP}/aawiki/{DUMP}.aawiki.all-time.tsv.bz2" DUMP_LOC_PREFIX = f"https://dumps.wikimedia.org/other/mediawiki_history/{DUMP}/" -FILE_LOC_PREFIX = "/data/users/mgaughan/mw-repo-lifecycles/wiki_activity_data/" +FILE_LOC_PREFIX = "/data/users/mgaughan/mw-repo-lifecycles/wiki_activity_data/compressed_yearly_activity_files/" YEARS = ['2010', '2011', '2012', '2013', '2014', '2015', '2016', '2017', '2018', '2019', '2020', '2021', '2022', '2023', '2024'] def download_file(url): + print(f"Try to get {url}") response = requests.get(url) if "content-disposition" in response.headers: content_disposition = response.headers["content-disposition"] @@ -213,24 +214,24 @@ def decompress(filepath): with bz2.BZ2File(filepath) as fr, open(decompressed_filepath,"wb") as fw: shutil.copyfileobj(fr,fw) print(f"Decompressed {decompressed_filepath}") - #os.remove(filepath) + os.remove(filepath) print(f"Deleted {filepath}") -def batch_parallel_for_yearly(): +def batch_for_yearly(): urls = [] for entry in YEARLY_PROJECTS: for year in YEARS: urls.append(f"{DUMP_LOC_PREFIX}{entry}/{DUMP}.{entry}.{year}.tsv.bz2") - - - + for url in urls: + try: + download_file(url) + except Exception: + print(f"error! {url}") def batch_parallel_for_single(): urls = [] for entry in ALL_PROJECTS: - if ALL_PROJECTS.index(entry) < ALL_PROJECTS.index("skwikiquote"): - continue if entry not in YEARLY_PROJECTS and entry not in MONTHLY_PROJECTS: urls.append(f"{DUMP_LOC_PREFIX}{entry}/{DUMP}.{entry}.all-time.tsv.bz2") for url in urls: @@ -262,7 +263,7 @@ def decompress_directory(directory_name): decompress(filepath) if __name__ == "__main__": - batch_parallel_for_single() + batch_for_yearly() #decompress_directory(FILE_LOC_PREFIX) #file = download_file(test_url) #decompress("/data/users/mgaughan/mw-repo-lifecycles/wiki_activity_data/2024-11.zuwiktionary.all-time.tsv.bz2") \ No newline at end of file diff --git a/src/lib/spark-warehouse/bot_isolation.ipynb b/src/lib/spark-warehouse/bot_isolation.ipynb new file mode 100644 index 0000000..ed551f5 --- /dev/null +++ b/src/lib/spark-warehouse/bot_isolation.ipynb @@ -0,0 +1,307 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "ename": "ModuleNotFoundError", + "evalue": "No module named 'pyspark.sql.SparkSession'", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mModuleNotFoundError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[8], line 4\u001b[0m\n\u001b[1;32m 2\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mpyspark\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01msql\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mtypes\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m StructType, StructField, StringType, LongType, BooleanType, IntegerType, ArrayType\n\u001b[1;32m 3\u001b[0m \u001b[38;5;28;01mfrom\u001b[39;00m \u001b[38;5;21;01mpyspark\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01msql\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mfunctions\u001b[39;00m \u001b[38;5;28;01mimport\u001b[39;00m count, lit, desc\n\u001b[0;32m----> 4\u001b[0m \u001b[38;5;28;01mimport\u001b[39;00m \u001b[38;5;21;01mpyspark\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01msql\u001b[39;00m\u001b[38;5;21;01m.\u001b[39;00m\u001b[38;5;21;01mSparkSession\u001b[39;00m \u001b[38;5;28;01mas\u001b[39;00m \u001b[38;5;21;01mSparkSession\u001b[39;00m\n", + "\u001b[0;31mModuleNotFoundError\u001b[0m: No module named 'pyspark.sql.SparkSession'" + ] + } + ], + "source": [ + "import re\n", + "from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, IntegerType, ArrayType\n", + "from pyspark.sql.functions import count, lit, desc\n", + "import pyspark.sql.SparkSession as SparkSession" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "mediawiki_history_path = \"/data/users/mgaughan/mw-repo-lifecycles/wiki_activity_data/single_activity_files\"" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "# Note: string unescaping and array conversion is done later\n", + "mediawiki_history_schema = StructType([\n", + "\n", + " StructField(\"wiki_db\", StringType(), nullable = False),\n", + " StructField(\"event_entity\", StringType(), nullable = False),\n", + " StructField(\"event_type\", StringType(), nullable = False),\n", + " StructField(\"event_timestamp\", StringType(), nullable = True),\n", + " StructField(\"event_comment_escaped\", StringType(), nullable = True),\n", + "\n", + " StructField(\"event_user_id\", LongType(), nullable = True),\n", + " StructField(\"event_user_text_historical_escaped\", StringType(), nullable = True),\n", + " StructField(\"event_user_text_escaped\", StringType(), nullable = True),\n", + " StructField(\"event_user_blocks_historical_string\", StringType(), nullable = True),\n", + " StructField(\"event_user_blocks_string\", StringType(), nullable = True),\n", + " StructField(\"event_user_groups_historical_string\", StringType(), nullable = True),\n", + " StructField(\"event_user_groups_string\", StringType(), nullable = True),\n", + " StructField(\"event_user_is_bot_by_historical_string\", StringType(), nullable = True),\n", + " StructField(\"event_user_is_bot_by_string\", StringType(), nullable = True),\n", + " StructField(\"event_user_is_created_by_self\", BooleanType(), nullable = True),\n", + " StructField(\"event_user_is_created_by_system\", BooleanType(), nullable = True),\n", + " StructField(\"event_user_is_created_by_peer\", BooleanType(), nullable = True),\n", + " StructField(\"event_user_is_anonymous\", BooleanType(), nullable = True),\n", + " StructField(\"event_user_registration_timestamp\", StringType(), nullable = True),\n", + " StructField(\"event_user_creation_timestamp\", StringType(), nullable = True),\n", + " StructField(\"event_user_first_edit_timestamp\", StringType(), nullable = True),\n", + " StructField(\"event_user_revision_count\", LongType(), nullable = True),\n", + " StructField(\"event_user_seconds_since_previous_revision\", LongType(), nullable = True),\n", + "\n", + " StructField(\"page_id\", LongType(), nullable = True),\n", + " StructField(\"page_title_historical_escaped\", StringType(), nullable = True),\n", + " StructField(\"page_title_escaped\", StringType(), nullable = True),\n", + " StructField(\"page_namespace_historical\", IntegerType(), nullable = True),\n", + " StructField(\"page_namespace_is_content_historical\", BooleanType(), nullable = True),\n", + " StructField(\"page_namespace\", IntegerType(), nullable = True),\n", + " StructField(\"page_namespace_is_content\", BooleanType(), nullable = True),\n", + " StructField(\"page_is_redirect\", BooleanType(), nullable = True),\n", + " StructField(\"page_is_deleted\", BooleanType(), nullable = True),\n", + " StructField(\"page_creation_timestamp\", StringType(), nullable = True),\n", + " StructField(\"page_first_edit_timestamp\", StringType(), nullable = True),\n", + " StructField(\"page_revision_count\", LongType(), nullable = True),\n", + " StructField(\"page_seconds_since_previous_revision\", LongType(), nullable = True),\n", + "\n", + " StructField(\"user_id\", LongType(), nullable = True),\n", + " StructField(\"user_text_historical_escaped\", StringType(), nullable = True),\n", + " StructField(\"user_text_escaped\", StringType(), nullable = True),\n", + " StructField(\"user_blocks_historical_string\", StringType(), nullable = True),\n", + " StructField(\"user_blocks_string\", StringType(), nullable = True),\n", + " StructField(\"user_groups_historical_string\", StringType(), nullable = True),\n", + " StructField(\"user_groups_string\", StringType(), nullable = True),\n", + " StructField(\"user_is_bot_by_historical_string\", StringType(), nullable = True),\n", + " StructField(\"user_is_bot_by_string\", StringType(), nullable = True),\n", + " StructField(\"user_is_created_by_self\", BooleanType(), nullable = True),\n", + " StructField(\"user_is_created_by_system\", BooleanType(), nullable = True),\n", + " StructField(\"user_is_created_by_peer\", BooleanType(), nullable = True),\n", + " StructField(\"user_is_anonymous\", BooleanType(), nullable = True),\n", + " StructField(\"user_registration_timestamp\", StringType(), nullable = True),\n", + " StructField(\"user_creation_timestamp\", StringType(), nullable = True),\n", + " StructField(\"user_first_edit_timestamp\", StringType(), nullable = True),\n", + "\n", + " StructField(\"revision_id\", LongType(), nullable = True),\n", + " StructField(\"revision_parent_id\", LongType(), nullable = True),\n", + " StructField(\"revision_minor_edit\", BooleanType(), nullable = True),\n", + " StructField(\"revision_deleted_parts_string\", StringType(), nullable = True),\n", + " StructField(\"revision_deleted_parts_are_suppressed\", BooleanType(), nullable = True),\n", + " StructField(\"revision_text_bytes\", LongType(), nullable = True),\n", + " StructField(\"revision_text_bytes_diff\", LongType(), nullable = True),\n", + " StructField(\"revision_text_sha1\", StringType(), nullable = True),\n", + " StructField(\"revision_content_model\", StringType(), nullable = True),\n", + " StructField(\"revision_content_format\", StringType(), nullable = True),\n", + " StructField(\"revision_is_deleted_by_page_deletion\", BooleanType(), nullable = True),\n", + " StructField(\"revision_deleted_by_page_deletion_timestamp\", StringType(), nullable = True),\n", + " StructField(\"revision_is_identity_reverted\", BooleanType(), nullable = True),\n", + " StructField(\"revision_first_identity_reverting_revision_id\", LongType(), nullable = True),\n", + " StructField(\"revision_seconds_to_identity_revert\", LongType(), nullable = True),\n", + " StructField(\"revision_is_identity_revert\", BooleanType(), nullable = True),\n", + " StructField(\"revision_is_from_before_page_creation\", BooleanType(), nullable = True),\n", + " StructField(\"revision_tags_string\", StringType(), nullable = True)\n", + "])" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "ename": "AttributeError", + "evalue": "module 'pyspark' has no attribute 'read'", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mAttributeError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[5], line 2\u001b[0m\n\u001b[1;32m 1\u001b[0m \u001b[38;5;66;03m# Note: It's important to set .option(\"quote\", \"\") to prevent spark to automaticallu use double-quotes to quote text\u001b[39;00m\n\u001b[0;32m----> 2\u001b[0m mediawiki_history_raw \u001b[38;5;241m=\u001b[39m \u001b[43mspark\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mread\u001b[49m\u001b[38;5;241m.\u001b[39moption(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mdelimiter\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;130;01m\\t\u001b[39;00m\u001b[38;5;124m\"\u001b[39m)\u001b[38;5;241m.\u001b[39moption(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mquote\u001b[39m\u001b[38;5;124m\"\u001b[39m, \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m\"\u001b[39m)\u001b[38;5;241m.\u001b[39mschema(mediawiki_history_schema)\u001b[38;5;241m.\u001b[39mcsv(mediawiki_history_path)\n", + "\u001b[0;31mAttributeError\u001b[0m: module 'pyspark' has no attribute 'read'" + ] + } + ], + "source": [ + "spark = SparkSession\n", + "\n", + "# Note: It's important to set .option(\"quote\", \"\") to prevent spark to automaticallu use double-quotes to quote text\n", + "mediawiki_history_raw = spark.read.option(\"delimiter\", \"\\t\").option(\"quote\", \"\").schema(mediawiki_history_schema).csv(mediawiki_history_path)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "# Unescaping and array-splitting UDFs\n", + "def unescape(str):\n", + " if (str is None):\n", + " return None\n", + " else:\n", + " return str.replace(\"\\\\n\", \"\\n\").replace(\"\\\\r\", \"\\r\").replace(\"\\\\t\", \"\\t\")\n", + "# The comma splitter applies a negative lookahead for \\ to prevent splitting escaped commas\n", + "def toArray(str):\n", + " if (str is None):\n", + " return []\n", + " else:\n", + " return [s.strip().replace(\"\\\\,\", \",\") for s in re.split(\"(?