diff --git a/src/lib/spark-warehouse/bot_isolation.ipynb b/src/lib/spark-warehouse/bot_isolation.ipynb index 3c0236d..a20eeb5 100644 --- a/src/lib/spark-warehouse/bot_isolation.ipynb +++ b/src/lib/spark-warehouse/bot_isolation.ipynb @@ -361,7 +361,9 @@ "25/01/16 12:21:31 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.\n", "25/01/16 12:21:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.\n", "25/01/16 12:21:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.\n", - "[Stage 0:==> (397 + 56) / 8283]\r" + "25/01/16 12:29:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.\n", + "25/01/16 12:29:01 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.\n", + "[Stage 0:==> (447 + 56) / 8283]\r" ] } ], diff --git a/src/lib/spark-warehouse/bot_isolation.py b/src/lib/spark-warehouse/bot_isolation.py index 648d82d..de73e18 100644 --- a/src/lib/spark-warehouse/bot_isolation.py +++ b/src/lib/spark-warehouse/bot_isolation.py @@ -23,7 +23,7 @@ def toArray(str): if __name__ == "__main__": - mediawiki_history_path = "/data_ext/users/nws8519/mw-repo-lifecycles/wiki_activity_data/yearly_activity_files/" + mediawiki_history_path = "/data_ext/users/nws8519/mw-repo-lifecycles/wiki_activity_data/single_activity_files/" # Note: string unescaping and array conversion is done later mediawiki_history_schema = StructType([ @@ -190,6 +190,7 @@ if __name__ == "__main__": "to_array(revision_tags_string) AS revision_tags" ) + mediawiki_history = mediawiki_history.repartitionByRange(300, "wiki_db") activity_count_df = mediawiki_history.where("event_user_is_bot_by_historical is not null and event_user_is_bot_by is not null") activity_count_df = activity_count_df.selectExpr("wiki_db", "SUBSTR(event_timestamp, 0, 10) as day", "event_entity", "event_type") activity_count_df = activity_count_df.groupBy("wiki_db", "day", "event_entity", "event_type").agg(count(lit(1)).alias("activity_count")) @@ -197,4 +198,4 @@ if __name__ == "__main__": sort(desc("day")). \ show(10, False) - activity_count_df.write.format("csv").save("011625_dab_yearly.csv") \ No newline at end of file + activity_count_df.write.format("csv").save("011625_dab_single.csv") \ No newline at end of file