We're creating a fresh archive because the history for our old chapter includes API keys, data files, and other material we can't share.
178 lines
5.6 KiB
Python
178 lines
5.6 KiB
Python
from collections import Counter
|
|
from datetime import datetime
|
|
import json
|
|
import argparse
|
|
import csv
|
|
import random
|
|
|
|
random.seed(2017)
|
|
|
|
def main():
|
|
|
|
parser = argparse.ArgumentParser(description='Change a big ugly abstract file to a nice CSV')
|
|
parser.add_argument('-i', help='Abstract file')
|
|
parser.add_argument('-o', help='TSV output file')
|
|
args = parser.parse_args()
|
|
|
|
with open(args.i, 'r') as i:
|
|
with open(args.o, 'w') as o:
|
|
# Have to get the field names
|
|
first_line = clean_abstract(json.loads(next(i)))
|
|
fieldnames = first_line.keys()
|
|
output = csv.DictWriter(o, fieldnames, delimiter='\t')
|
|
output.writeheader()
|
|
output.writerow(first_line)
|
|
for line in i:
|
|
output.writerow(clean_abstract(json.loads(line)))
|
|
|
|
|
|
def clean_abstract(json_response):
|
|
result = json_response['abstracts-retrieval-response']
|
|
head = result['item']['bibrecord']['head']
|
|
try:
|
|
attributes = {
|
|
'modal_country': get_country(head),
|
|
'abstract' : get_abstract(result),
|
|
'title' : get_title(result),
|
|
'source_title': get_source_title(head),
|
|
'language': result['language']['@xml:lang'],
|
|
'first_ASJC_subject_area': get_subject(result, '$'),
|
|
'first_ASJC_classification': get_subject(result, '@code'),
|
|
'first_CPX_class': get_CPX_class(head, 'classification-description'),
|
|
'date': to_date(result['coredata']['prism:coverDate']),
|
|
'aggregation_type' : if_exists('prism:aggregationType',result['coredata'],else_val='NA'),
|
|
'eid' : result['coredata']['eid'],
|
|
'cited_by_count': result['coredata']['citedby-count'],
|
|
'num_citations': get_citation_count(result)
|
|
}
|
|
except KeyError:
|
|
raise
|
|
except TypeError:
|
|
# print(result)
|
|
raise
|
|
return attributes
|
|
|
|
def get_citation_count(result):
|
|
try:
|
|
return result['item']['bibrecord']['tail']['bibliography']['@refcount']
|
|
except TypeError:
|
|
return None
|
|
|
|
def get_title(result):
|
|
try:
|
|
return result['coredata']['dc:title']
|
|
except KeyError:
|
|
raise
|
|
|
|
|
|
def get_source_title(head):
|
|
try:
|
|
return head['source']['sourcetitle']
|
|
except KeyError:
|
|
raise
|
|
|
|
def get_abstract(result):
|
|
try:
|
|
abstract = result['coredata']['dc:description']
|
|
abstract = abstract.replace('\n',' ')
|
|
return abstract
|
|
except KeyError:
|
|
return None
|
|
|
|
def get_auth_names(head):
|
|
try:
|
|
auth_info = [x['author'] for x in make_list(head['author-group'])]
|
|
except KeyError:
|
|
print(head)
|
|
auth_names = []
|
|
for auth_group in auth_info:
|
|
for auth in make_list(auth_group):
|
|
auth_names.append('{} {}'.format(
|
|
auth['preferred-name']['ce:given-name'],
|
|
auth['preferred-name']['ce:surname']))
|
|
return auth_names
|
|
|
|
def get_country(head):
|
|
all_countries = get_aff_info(head, 'country')
|
|
if all_countries:
|
|
# Find the mode. If there's more than one, choose randomly
|
|
modes = Counter
|
|
s = set(all_countries)
|
|
max_count = max([all_countries.count(x) for x in s])
|
|
modes = [x for x in s if all_countries.count(x) == max_count]
|
|
return random.choice(modes)
|
|
|
|
def get_aff_info(head, affiliation_key):
|
|
aff_info = []
|
|
try:
|
|
authors = make_list(head['author-group'])
|
|
except KeyError:
|
|
return None
|
|
for x in authors:
|
|
try:
|
|
num_auth = len(make_list(x['author']))
|
|
except KeyError:
|
|
# Apparently there are things called "collaborations", which don't have affiliation info.
|
|
# I'm just skipping them
|
|
continue
|
|
except TypeError:
|
|
# And apparently "None" appears in the author list for no reason. :)
|
|
continue
|
|
try:
|
|
curr_inst = x['affiliation'][affiliation_key]
|
|
# Add one instance for each author from this institution
|
|
aff_info += [curr_inst] * num_auth
|
|
except KeyError:
|
|
# If there isn't affiliation info for these authors, return empty str
|
|
aff_info += [''] * num_auth
|
|
return aff_info
|
|
|
|
def get_keywords(head):
|
|
cite_info = head['citation-info']
|
|
try:
|
|
keywords = [x for x in
|
|
make_list(cite_info['author-keywords']['author-keyword'])]
|
|
# When there's only one keyword, it's a string. Otherwise, we will
|
|
# have a list of dictionaries
|
|
if len(keywords) == 1:
|
|
return keywords
|
|
else:
|
|
return [x['$'] for x in keywords]
|
|
except KeyError:
|
|
return None
|
|
|
|
def get_subject(result, key):
|
|
try:
|
|
return [x[key] for x in make_list(result['subject-areas']['subject-area'])][0]
|
|
except KeyError:
|
|
print(result)
|
|
raise
|
|
|
|
def get_CPX_class(head, class_key):
|
|
try:
|
|
for x in head['enhancement']['classificationgroup']['classifications']:
|
|
if x['@type'] == 'CPXCLASS':
|
|
try:
|
|
return [y[class_key] for y in make_list(x['classification'])][0]
|
|
except (KeyError, TypeError):
|
|
return None
|
|
except KeyError:
|
|
print(head['enhancement']['classificationgroup'])
|
|
raise
|
|
|
|
def to_date(date_string):
|
|
return datetime.strptime(date_string, '%Y-%m-%d')
|
|
|
|
|
|
def if_exists(key, dictionary, else_val = None):
|
|
try:
|
|
return dictionary[key]
|
|
except KeyError:
|
|
return else_val
|
|
|
|
def make_list(list_or_dict):
|
|
return list_or_dict if isinstance(list_or_dict, list) else [list_or_dict]
|
|
|
|
if __name__ == '__main__':
|
|
main()
|