203 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			203 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import re
 | 
						|
import os
 | 
						|
from pyspark.sql.types import StructType, StructField, StringType, LongType, BooleanType, IntegerType, ArrayType
 | 
						|
#from pyspark.sql.functions import count, lit, desc, col, array
 | 
						|
import pyspark.sql.functions as F
 | 
						|
from pyspark.sql import SparkSession
 | 
						|
 | 
						|
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-11-openjdk-amd64"
 | 
						|
os.environ['JRE_HOME'] = "/usr/lib/jvm/java-11-openjdk-amd64/jre"
 | 
						|
 | 
						|
# Unescaping and array-splitting UDFs
 | 
						|
def unescape(str):
 | 
						|
    if (str is None):
 | 
						|
        return None
 | 
						|
    else:
 | 
						|
        return str.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t")
 | 
						|
# The comma splitter applies a negative lookahead for \ to prevent splitting escaped commas
 | 
						|
def toArray(str):
 | 
						|
    if (str is None):
 | 
						|
        return []
 | 
						|
    else:
 | 
						|
        return [s.strip().replace("\\,", ",") for s in re.split("(?<!\\\\),", unescape(str))]
 | 
						|
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
 | 
						|
    mediawiki_history_path = "/data_ext/users/nws8519/mw-repo-lifecycles/wiki_activity_data/yearly_activity_files/"
 | 
						|
 | 
						|
    # Note: string unescaping and array conversion is done later
 | 
						|
    mediawiki_history_schema = StructType([
 | 
						|
 | 
						|
        StructField("wiki_db", StringType(), nullable = False),
 | 
						|
        StructField("event_entity", StringType(), nullable = False),
 | 
						|
        StructField("event_type", StringType(), nullable = False),
 | 
						|
        StructField("event_timestamp", StringType(), nullable = True),
 | 
						|
        StructField("event_comment_escaped", StringType(), nullable = True),
 | 
						|
 | 
						|
        StructField("event_user_id", LongType(), nullable = True),
 | 
						|
        StructField("event_user_text_historical_escaped", StringType(), nullable = True),
 | 
						|
        StructField("event_user_text_escaped", StringType(), nullable = True),
 | 
						|
        StructField("event_user_blocks_historical_string", StringType(), nullable = True),
 | 
						|
        StructField("event_user_blocks_string", StringType(), nullable = True),
 | 
						|
        StructField("event_user_groups_historical_string", StringType(), nullable = True),
 | 
						|
        StructField("event_user_groups_string", StringType(), nullable = True),
 | 
						|
        StructField("event_user_is_bot_by_historical_string", StringType(), nullable = True),
 | 
						|
        StructField("event_user_is_bot_by_string", StringType(), nullable = True),
 | 
						|
        StructField("event_user_is_created_by_self", BooleanType(), nullable = True),
 | 
						|
        StructField("event_user_is_created_by_system", BooleanType(), nullable = True),
 | 
						|
        StructField("event_user_is_created_by_peer", BooleanType(), nullable = True),
 | 
						|
        StructField("event_user_is_anonymous", BooleanType(), nullable = True),
 | 
						|
        StructField("event_user_registration_timestamp", StringType(), nullable = True),
 | 
						|
        StructField("event_user_creation_timestamp", StringType(), nullable = True),
 | 
						|
        StructField("event_user_first_edit_timestamp", StringType(), nullable = True),
 | 
						|
        StructField("event_user_revision_count", LongType(), nullable = True),
 | 
						|
        StructField("event_user_seconds_since_previous_revision", LongType(), nullable = True),
 | 
						|
 | 
						|
        StructField("page_id", LongType(), nullable = True),
 | 
						|
        StructField("page_title_historical_escaped", StringType(), nullable = True),
 | 
						|
        StructField("page_title_escaped", StringType(), nullable = True),
 | 
						|
        StructField("page_namespace_historical", IntegerType(), nullable = True),
 | 
						|
        StructField("page_namespace_is_content_historical", BooleanType(), nullable = True),
 | 
						|
        StructField("page_namespace", IntegerType(), nullable = True),
 | 
						|
        StructField("page_namespace_is_content", BooleanType(), nullable = True),
 | 
						|
        StructField("page_is_redirect", BooleanType(), nullable = True),
 | 
						|
        StructField("page_is_deleted", BooleanType(), nullable = True),
 | 
						|
        StructField("page_creation_timestamp", StringType(), nullable = True),
 | 
						|
        StructField("page_first_edit_timestamp", StringType(), nullable = True),
 | 
						|
        StructField("page_revision_count", LongType(), nullable = True),
 | 
						|
        StructField("page_seconds_since_previous_revision", LongType(), nullable = True),
 | 
						|
 | 
						|
        StructField("user_id", LongType(), nullable = True),
 | 
						|
        StructField("user_text_historical_escaped",  StringType(), nullable = True),
 | 
						|
        StructField("user_text_escaped", StringType(), nullable = True),
 | 
						|
        StructField("user_blocks_historical_string", StringType(), nullable = True),
 | 
						|
        StructField("user_blocks_string", StringType(), nullable = True),
 | 
						|
        StructField("user_groups_historical_string", StringType(), nullable = True),
 | 
						|
        StructField("user_groups_string", StringType(), nullable = True),
 | 
						|
        StructField("user_is_bot_by_historical_string", StringType(), nullable = True),
 | 
						|
        StructField("user_is_bot_by_string", StringType(), nullable = True),
 | 
						|
        StructField("user_is_created_by_self", BooleanType(), nullable = True),
 | 
						|
        StructField("user_is_created_by_system", BooleanType(), nullable = True),
 | 
						|
        StructField("user_is_created_by_peer", BooleanType(), nullable = True),
 | 
						|
        StructField("user_is_anonymous", BooleanType(), nullable = True),
 | 
						|
        StructField("user_registration_timestamp", StringType(), nullable = True),
 | 
						|
        StructField("user_creation_timestamp", StringType(), nullable = True),
 | 
						|
        StructField("user_first_edit_timestamp", StringType(), nullable = True),
 | 
						|
 | 
						|
        StructField("revision_id", LongType(), nullable = True),
 | 
						|
        StructField("revision_parent_id", LongType(), nullable = True),
 | 
						|
        StructField("revision_minor_edit", BooleanType(), nullable = True),
 | 
						|
        StructField("revision_deleted_parts_string", StringType(), nullable = True),
 | 
						|
        StructField("revision_deleted_parts_are_suppressed", BooleanType(), nullable = True),
 | 
						|
        StructField("revision_text_bytes", LongType(), nullable = True),
 | 
						|
        StructField("revision_text_bytes_diff", LongType(), nullable = True),
 | 
						|
        StructField("revision_text_sha1", StringType(), nullable = True),
 | 
						|
        StructField("revision_content_model", StringType(), nullable = True),
 | 
						|
        StructField("revision_content_format", StringType(), nullable = True),
 | 
						|
        StructField("revision_is_deleted_by_page_deletion", BooleanType(), nullable = True),
 | 
						|
        StructField("revision_deleted_by_page_deletion_timestamp", StringType(), nullable = True),
 | 
						|
        StructField("revision_is_identity_reverted", BooleanType(), nullable = True),
 | 
						|
        StructField("revision_first_identity_reverting_revision_id", LongType(), nullable = True),
 | 
						|
        StructField("revision_seconds_to_identity_revert", LongType(), nullable = True),
 | 
						|
        StructField("revision_is_identity_revert", BooleanType(), nullable = True),
 | 
						|
        StructField("revision_is_from_before_page_creation", BooleanType(), nullable = True),
 | 
						|
        StructField("revision_tags_string", StringType(), nullable = True)
 | 
						|
    ])
 | 
						|
 | 
						|
    spark = SparkSession.builder.appName('activityData').config("spark.driver.extraJavaOptions", "-Djava.home=/usr/lib/jvm/java-11-openjdk-amd64").getOrCreate()
 | 
						|
 | 
						|
    # Note: It's important to set .option("quote", "") to prevent spark to automaticallu use double-quotes to quote text
 | 
						|
    mediawiki_history_raw = spark.read.option("delimiter", "\t").option("quote", "").schema(mediawiki_history_schema).csv(mediawiki_history_path)
 | 
						|
 | 
						|
    spark.udf.register("unescape", unescape, StringType())
 | 
						|
    spark.udf.register("to_array", toArray, ArrayType(StringType(), False))
 | 
						|
 | 
						|
    mediawiki_history = mediawiki_history_raw.selectExpr(
 | 
						|
    
 | 
						|
    "wiki_db",
 | 
						|
    "event_entity",
 | 
						|
    "event_type",
 | 
						|
    "event_timestamp",
 | 
						|
    "unescape(event_comment_escaped) AS event_comment",
 | 
						|
    
 | 
						|
    "event_user_id",
 | 
						|
    "unescape(event_user_text_historical_escaped) AS event_user_text_historical",
 | 
						|
    "unescape(event_user_text_escaped) AS event_user_text",
 | 
						|
    "to_array(event_user_blocks_historical_string) AS event_user_blocks_historical",
 | 
						|
    "to_array(event_user_blocks_string) AS event_user_blocks",
 | 
						|
    "to_array(event_user_groups_historical_string) AS event_user_groups_historical",
 | 
						|
    "to_array(event_user_groups_string) AS event_user_groups",
 | 
						|
    "to_array(event_user_is_bot_by_historical_string) AS event_user_is_bot_by_historical",
 | 
						|
    "to_array(event_user_is_bot_by_string) AS event_user_is_bot_by",
 | 
						|
    "event_user_is_created_by_self",
 | 
						|
    "event_user_is_created_by_system",
 | 
						|
    "event_user_is_created_by_peer",
 | 
						|
    "event_user_is_anonymous",
 | 
						|
    "event_user_registration_timestamp",
 | 
						|
    "event_user_creation_timestamp",
 | 
						|
    "event_user_first_edit_timestamp",
 | 
						|
    "event_user_revision_count",
 | 
						|
    "event_user_seconds_since_previous_revision",
 | 
						|
    
 | 
						|
    "page_id",
 | 
						|
    "unescape(page_title_historical_escaped) AS page_title_historical",
 | 
						|
    "unescape(page_title_escaped) AS page_title",
 | 
						|
    "page_namespace_historical",
 | 
						|
    "page_namespace_is_content_historical",
 | 
						|
    "page_namespace",
 | 
						|
    "page_namespace_is_content",
 | 
						|
    "page_is_redirect",
 | 
						|
    "page_is_deleted",
 | 
						|
    "page_creation_timestamp",
 | 
						|
    "page_first_edit_timestamp",
 | 
						|
    "page_revision_count",
 | 
						|
    "page_seconds_since_previous_revision",
 | 
						|
    
 | 
						|
    "user_id",
 | 
						|
    "unescape(user_text_historical_escaped) AS user_text_historical",
 | 
						|
    "unescape(user_text_escaped) AS user_text",
 | 
						|
    "to_array(user_blocks_historical_string) AS user_blocks_historical",
 | 
						|
    "to_array(user_blocks_string) AS user_blocks",
 | 
						|
    "to_array(user_groups_historical_string) AS user_groups_historical",
 | 
						|
    "to_array(user_groups_string) AS user_groups",
 | 
						|
    "to_array(user_is_bot_by_historical_string) AS user_is_bot_by_historical",
 | 
						|
    "to_array(user_is_bot_by_string) AS user_is_bot_by",
 | 
						|
    "user_is_created_by_self",
 | 
						|
    "user_is_created_by_system",
 | 
						|
    "user_is_created_by_peer",
 | 
						|
    "user_is_anonymous",
 | 
						|
    "user_registration_timestamp",
 | 
						|
    "user_creation_timestamp",
 | 
						|
    "user_first_edit_timestamp",
 | 
						|
    
 | 
						|
    "revision_id",
 | 
						|
    "revision_parent_id",
 | 
						|
    "revision_minor_edit",
 | 
						|
    "to_array(revision_deleted_parts_string) AS revision_deleted_parts",
 | 
						|
    "revision_deleted_parts_are_suppressed",
 | 
						|
    "revision_text_bytes",
 | 
						|
    "revision_text_bytes_diff",
 | 
						|
    "revision_text_sha1",
 | 
						|
    "revision_content_model",
 | 
						|
    "revision_content_format",
 | 
						|
    "revision_is_deleted_by_page_deletion",
 | 
						|
    "revision_deleted_by_page_deletion_timestamp",
 | 
						|
    "revision_is_identity_reverted",
 | 
						|
    "revision_first_identity_reverting_revision_id",
 | 
						|
    "revision_seconds_to_identity_revert",
 | 
						|
    "revision_is_identity_revert",
 | 
						|
    "revision_is_from_before_page_creation",
 | 
						|
    "to_array(revision_tags_string) AS revision_tags"
 | 
						|
    )
 | 
						|
 | 
						|
    mediawiki_history = mediawiki_history.repartitionByRange(300, "wiki_db")
 | 
						|
    activity_count_df = mediawiki_history.filter(F.col("event_user_is_bot_by")==F.array())
 | 
						|
    #activity_count_df = mediawiki_history.where("event_user_is_bot_by_historical is empty and event_user_is_bot_by is empty")
 | 
						|
    activity_count_df = activity_count_df.selectExpr("wiki_db", "SUBSTR(event_timestamp, 0, 10) as day", "event_entity", "event_type")
 | 
						|
    activity_count_df = activity_count_df.groupBy("wiki_db", "day", "event_entity", "event_type").agg(F.count(F.lit(1)).alias("activity_count"))
 | 
						|
    activity_count_df.\
 | 
						|
        sort(F.desc("day")). \
 | 
						|
        show(10, False)
 | 
						|
 | 
						|
    activity_count_df.write.format("csv").save("020925_nonbot_yearly.csv") |