1
0
Files
wikia_userroles_scraper/userroles_from_listusers.py
2018-06-02 14:56:49 -07:00

204 lines
6.6 KiB
Python
Executable File

#!/usr/bin/env python3
# Scrape the Wikia userroles api
# Copyright (C) 2018 Nathan TeBlunthuis
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
import argparse
import csv
import json
import sys
import time
import re
import os
from importlib import reload
from json.decoder import JSONDecodeError
from os import path
from scraper_utils import prepare_output, read_wikilist, add_parser_arguments
import requests
reload(sys)
roles = ['bot', 'sysop', 'bureaucrat', 'staff', 'rollback', # 'util',
'helper', 'vstf', 'checkuser-global', 'bot-global',
'council', 'authenticated', 'checkuser', 'chatmoderator',
'adminmentor', 'steward', 'oversight', 'founder', 'rollbacker', 'checkuser', 'researcher']
class ListUserAPI():
def __init__(self, url_root, wikitype):
self.wikitype = wikitype
if self.wikitype == "wikia":
self._api_url = url_root + 'index.php?action=ajax&rs=ListusersAjax::axShowUsers'
else: # wikitype == "wikipedia"
self._api_url = url_root + 'api.php'
def _fetch_http(self, url, params):
if self.wikitype == "wikia":
response = requests.get(url=url, params=params, headers={
'Accept-encoding': 'gzip'})
return(response.text)
else: # wikitype == "wikipedia"
response = requests.get(url=url, params=params)
return(response)
def call(self, params):
response = self._fetch_http(self._api_url, params)
if self.wikitype == "wikia":
return json.loads(response)
else:
return response.json()
def write_user_csvfile(output_file, user_list):
csvfile = csv.writer(output_file, delimiter='\t',
quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
# construct and output the header
csvfile.writerow(['username', 'groups',
'edits', 'last.logged', 'last.edited'])
for user in user_list:
csvfile.writerow(user)
def get_administrators_for_wiki(wikiname, url_root, wikitype="wikia"):
increment_size = 500
offset = 0
if wikitype == "wikia":
query = {'groups': 'bot,sysop,bureaucrat,',
'edits': 0,
'limit': increment_size,
'offset': offset,
'numOrder': 1,
'order': 'username:asc'}
else: # wikitype == "wikipedia"
query = {'action': 'query',
'list': 'allusers',
'augroup': "|".join(roles),
'auprop': 'groups',
'aulimit': 500,
'format': 'json'}
# FIND THE CORRECT URL (there may be redirects)
if wikitype == "wikia":
url_root = requests.get(url_root).url
re_str = "^http://(community|www).wikia.com/"
if re.match(re_str, url_root):
# api_url
# 'http://community.wikia.com/wiki/Community_Central:Not_a_valid_Wikia':
print("ERROR: %s no longer exists" % wikiname)
return "deleted"
try:
wiki = ListUserAPI(url_root, wikitype=wikitype)
rv = wiki.call(query)
except requests.ConnectionError as e:
print("ERROR: cannot read the event log: %s" % wikiname)
notauthorized.append(wikiname)
return "notauthorized"
except JSONDecodeError as e:
print("ERROR: cannot read the event log: %s" % wikiname)
notauthorized.append(wikiname)
return "notauthorized"
output_file = open("{0}/{1}.tsv".format(output_path, wikiname), 'w')
if wikitype == "wikia":
raw_userlist = rv['aaData']
while (rv['iTotalRecords'] + offset) < rv['iTotalDisplayRecords']:
# increment the offset and make a new query
offset = offset + increment_size
query['offset'] = offset
rv = wiki.call(query)
raw_userlist.extend(rv['aaData'])
print("Another one: offset is %s" % offset)
# go through and edit the html output of the json
processed_userlist = []
for row in raw_userlist:
row[0] = re.sub(r'^.*?<a href=.*?>(.*?)<.*$', r'\1', row[0])
# work around change in wikia api that removed last.logged
if len(row) < 5:
row.append(row[3])
row[3] = None
row[4] = re.sub(r'^.*oldid=(\d+)".*$', r'\1', row[4])
row[4] = re.sub(r'^\-$', r'', row[4])
processed_userlist.append(row)
write_user_csvfile(output_file, processed_userlist)
output_file.close()
else:
raw_userlist = rv['query']['allusers']
outlines = ['\t'.join(["username", "groups"])]
while 'continue' in rv:
query['continue'] = str(rv['continue'])
query['aufrom'] = str(rv['continue']['aufrom'])
rv = wiki.call(query)
raw_userlist = rv['query']['allusers']
outlines.extend(
['\t'.join([q['name'], ','.join(q['groups'])]) for q in raw_userlist])
output_file.write('\n'.join(outlines))
output_file.flush()
outlines = []
# open and then send data to the output data file
# the call is
# %run userroles_from_listusers.py --sep=\\t --nuke_old ../identifyWikis/wikiteamWikilist.tsv /com/projects/messagewalls/userroles/listusers
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Get user roles for Wikis from the Mediawiki list users API")
parser = add_parser_arguments(parser)
args = parser.parse_args()
output_path = args.output
header = not args.no_header
prepare_output(output_path, args.nuke_old)
wikilist = read_wikilist(args)
deleted = []
notauthorized = []
files = [os.path.join(output_path, i) for i in os.listdir(output_path)]
for wiki, url, wikitype in wikilist:
if "{0}.{1}".format(path.join(output_path, wiki), 'tsv') in files:
print("SKIPPING: file \"%s\" already exists)" % wiki)
continue
print("Processing wiki: %s" % wiki)
result = get_administrators_for_wiki(wiki, url, wikitype=wikitype)
if result == "deleted":
deleted.append(wiki)
elif result == "notauthorized":
notauthorized.append(wiki)
else:
pass
time.sleep(1)
df = open("allusers_error_deleted.txt", 'w')
df.write('\n'.join(deleted))
df.close()
na = open("allusers_error_notauthorized.txt", 'w')
na.write('\n'.join(notauthorized))
na.close()