212 lines
7.0 KiB
Python
Executable File
212 lines
7.0 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# Scrape the Wikia userroles api
|
|
# Copyright (C) 2018 Nathan TeBlunthuis
|
|
|
|
# This program is free software: you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
import argparse
|
|
import csv
|
|
import json
|
|
import sys
|
|
import time
|
|
import re
|
|
import os
|
|
from importlib import reload
|
|
from json.decoder import JSONDecodeError
|
|
from os import path
|
|
from scraper_utils import prepare_output, read_wikilist, add_parser_arguments
|
|
|
|
import requests
|
|
|
|
reload(sys)
|
|
|
|
roles = ['bot', 'sysop', 'bureaucrat', 'staff', 'rollback', # 'util',
|
|
'helper', 'vstf', 'checkuser-global', 'bot-global',
|
|
'council', 'authenticated', 'checkuser', 'chatmoderator',
|
|
'adminmentor', 'steward', 'oversight', 'founder', 'rollbacker', 'checkuser', 'researcher']
|
|
|
|
|
|
class ListUserAPI():
|
|
|
|
def __init__(self, url_root, wikitype):
|
|
self.wikitype = wikitype
|
|
if self.wikitype == "wikia":
|
|
self._api_url = url_root + 'index.php?action=ajax&rs=ListusersAjax::axShowUsers'
|
|
else: # wikitype == "wikipedia"
|
|
self._api_url = url_root + 'api.php'
|
|
|
|
def _fetch_http(self, url, params):
|
|
if self.wikitype == "wikia":
|
|
response = requests.get(url=url, params=params, headers={
|
|
'Accept-encoding': 'gzip'})
|
|
return(response.text)
|
|
else: # wikitype == "wikipedia"
|
|
response = requests.get(url=url, params=params)
|
|
return(response)
|
|
|
|
def call(self, params):
|
|
response = self._fetch_http(self._api_url, params)
|
|
if self.wikitype == "wikia":
|
|
return json.loads(response)
|
|
else:
|
|
return response.json()
|
|
|
|
|
|
def write_user_csvfile(output_file, user_list):
|
|
csvfile = csv.writer(output_file, delimiter='\t',
|
|
quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
|
|
|
|
# construct and output the header
|
|
csvfile.writerow(['username', 'groups',
|
|
'edits', 'last.logged', 'last.edited'])
|
|
|
|
for user in user_list:
|
|
csvfile.writerow(user)
|
|
|
|
|
|
def get_administrators_for_wiki(wikiname, url_root, wikitype="wikia"):
|
|
increment_size = 500
|
|
offset = 0
|
|
|
|
if wikitype == "wikia":
|
|
|
|
query = {'groups': 'bot,sysop,bureaucrat,',
|
|
'edits': 0,
|
|
'limit': increment_size,
|
|
'offset': offset,
|
|
'numOrder': 1,
|
|
'order': 'username:asc'}
|
|
|
|
else: # wikitype == "wikipedia"
|
|
query = {'action': 'query',
|
|
'list': 'allusers',
|
|
'augroup': "|".join(roles),
|
|
'auprop': 'groups',
|
|
'aulimit': 500,
|
|
'format': 'json'}
|
|
|
|
# FIND THE CORRECT URL (there may be redirects)
|
|
|
|
if wikitype == "wikia":
|
|
url_root = requests.get(url_root).url
|
|
re_str = "^http://(community|www).wikia.com/"
|
|
if re.match(re_str, url_root):
|
|
# api_url
|
|
# 'http://community.wikia.com/wiki/Community_Central:Not_a_valid_Wikia':
|
|
print("ERROR: %s no longer exists" % wikiname)
|
|
|
|
return "deleted"
|
|
try:
|
|
wiki = ListUserAPI(url_root, wikitype=wikitype)
|
|
rv = wiki.call(query)
|
|
|
|
except requests.ConnectionError as e:
|
|
print("ERROR: cannot read the event log: %s" % wikiname)
|
|
notauthorized.append(wikiname)
|
|
return "notauthorized"
|
|
|
|
except JSONDecodeError as e:
|
|
print("ERROR: cannot read the event log: %s" % wikiname)
|
|
notauthorized.append(wikiname)
|
|
return "notauthorized"
|
|
|
|
output_file = open("{0}/{1}.tsv".format(output_path, wikiname), 'w')
|
|
if wikitype == "wikia":
|
|
raw_userlist = rv['aaData']
|
|
|
|
while (rv['iTotalRecords'] + offset) < rv['iTotalDisplayRecords']:
|
|
# increment the offset and make a new query
|
|
offset = offset + increment_size
|
|
query['offset'] = offset
|
|
rv = wiki.call(query)
|
|
raw_userlist.extend(rv['aaData'])
|
|
print("Another one: offset is %s" % offset)
|
|
|
|
# go through and edit the html output of the json
|
|
processed_userlist = []
|
|
for row in raw_userlist:
|
|
row[0] = re.sub(r'^.*?<a href=.*?>(.*?)<.*$', r'\1', row[0])
|
|
|
|
# work around change in wikia api that removed last.logged
|
|
if len(row) < 5:
|
|
row.append(row[3])
|
|
row[3] = None
|
|
|
|
row[4] = re.sub(r'^.*oldid=(\d+)".*$', r'\1', row[4])
|
|
row[4] = re.sub(r'^\-$', r'', row[4])
|
|
processed_userlist.append(row)
|
|
|
|
write_user_csvfile(output_file, processed_userlist)
|
|
output_file.close()
|
|
|
|
else:
|
|
raw_userlist = rv['query']['allusers']
|
|
outlines = ['\t'.join(["username", "groups"])]
|
|
while 'continue' in rv:
|
|
query['continue'] = str(rv['continue'])
|
|
query['aufrom'] = str(rv['continue']['aufrom'])
|
|
rv = wiki.call(query)
|
|
raw_userlist = rv['query']['allusers']
|
|
outlines.extend(
|
|
['\t'.join([q['name'], ','.join(q['groups'])]) for q in raw_userlist])
|
|
output_file.write('\n'.join(outlines))
|
|
output_file.flush()
|
|
outlines = []
|
|
|
|
# open and then send data to the output data file
|
|
|
|
# the call is
|
|
# %run userroles_from_listusers.py --sep=\\t --nuke_old ../identifyWikis/wikiteamWikilist.tsv /com/projects/messagewalls/userroles/listusers
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(
|
|
description="Get user roles for Wikis from the Mediawiki list users API")
|
|
|
|
parser = add_parser_arguments(parser)
|
|
args = parser.parse_args()
|
|
output_path = args.output
|
|
header = not args.no_header
|
|
|
|
prepare_output(output_path, args.nuke_old)
|
|
|
|
wikilist = read_wikilist(args)
|
|
deleted = []
|
|
notauthorized = []
|
|
|
|
files = [os.path.join(output_path, i) for i in os.listdir(output_path)]
|
|
|
|
for wiki, url, wikitype in wikilist:
|
|
if "{0}.{1}".format(path.join(output_path, wiki), 'tsv') in files:
|
|
print("SKIPPING: file \"%s\" already exists)" % wiki)
|
|
continue
|
|
print("Processing wiki: %s" % wiki)
|
|
|
|
result = get_administrators_for_wiki(wiki, url, wikitype=wikitype)
|
|
if result == "deleted":
|
|
deleted.append(wiki)
|
|
elif result == "notauthorized":
|
|
notauthorized.append(wiki)
|
|
else:
|
|
pass
|
|
time.sleep(1)
|
|
|
|
df = open("allusers_error_deleted.txt", 'w')
|
|
df.write('\n'.join(deleted))
|
|
df.close()
|
|
|
|
na = open("allusers_error_notauthorized.txt", 'w')
|
|
na.write('\n'.join(notauthorized))
|
|
na.close()
|