from load_accounts import * from urllib.parse import urlparse import polars as pl def run_preprocess(): #accounts = pl.concat( # read_accounts_file("data/accounts.feather"), # read_accounts_file("data/account_lookup_2023.feather") #) accounts = read_accounts_file( "data/account_lookup_compressed.feather" ).unique(["account", "server"]) # Write a parsed accounts file for R to use a = accounts.with_columns( pl.col("url").map_elements( lambda x: urlparse(x).netloc.encode().decode('idna') ).alias("host"), pl.col("data_string").str.contains("""\"moved\": \{""").alias("has_moved"), pl.col("data").struct.field("suspended"), ) a_save = a.drop(["data", "data_string"]) a_save.select( sorted(a_save.columns) ).write_ipc("data/scratch/accounts.feather") moved_accounts = a.filter(pl.col("has_moved")).with_columns(# Do this again now we know the rows are all moved accounts pl.col("data_string").str.json_decode().alias("data") ).with_columns( pl.col("data").struct.field("moved") ).drop_nulls("moved").with_columns( pl.col("moved").struct.field("acct").alias("moved_acct"), ).with_columns( pl.when( pl.col("moved_acct").str.contains('@') ).then( pl.col("moved_acct").str.split('@').list.get(1) ).otherwise( pl.col("server") ).alias("moved_server"), pl.when( pl.col("moved_acct").str.contains('@') ).then( pl.col("moved_acct").str.split('@').list.get(0) ).otherwise( pl.col("moved_acct") ).alias("moved_acct") ) number_of_accounts = len(a) popular_servers = a.group_by("server").count().sort("count", descending=True) common_moves = moved_accounts.group_by( ["server", "moved_server"] ).count().sort("count", descending=True) common_moves.write_ipc("data/scratch/moved_accounts.feather") common_moves.rename({ "server": "Source", "moved_server": "Target", }).write_csv("data/scratch/moved_accounts.csv") maccounts = moved_accounts.select(["account", "server", "moved_server", "moved_acct"]) maccounts.write_ipc("data/scratch/individual_moved_accounts.feather") popular_servers.write_ipc("data/scratch/popular_servers.feather") jm = pl.read_json("data/joinmastodon.json") jm.write_ipc("data/scratch/joinmastodon.feather") read_metadata_file("data/metadata-2024-01-31.feather").drop( ["data", "data_string"] ).write_ipc("data/scratch/metadata.feather") read_metadata_file("data/metadata_2023-10-01.feather").drop( ["data", "data_string"] ).write_ipc("data/scratch/metadata-2023-10-01.feather") profile_accounts = read_accounts_file("data/profiles_local.feather") p = profile_accounts.with_columns( pl.col("url").map_elements(lambda x: urlparse(x).netloc.encode().decode('idna')).alias("host"), pl.col("username").alias("account"), pl.lit(False).alias("has_moved"), pl.lit(False).alias("suspended"), ).drop( ["data", "data_string"] ) p.select(sorted(p.columns)).write_ipc("data/scratch/accounts_processed_profiles.feather") all_accounts = pl.scan_ipc( [ "data/scratch/accounts.feather", #"data/scratch/accounts_processed_recent.feather", "data/scratch/accounts_processed_profiles.feather" ]).collect() all_accounts.filter(pl.col("host").eq(pl.col("server"))).unique(["account", "server"]).write_ipc("data/scratch/all_accounts.feather") if __name__ == "__main__": run_preprocess()