We're creating a fresh archive because the history for our old chapter includes API keys, data files, and other material we can't share.
167 lines
6.1 KiB
Python
167 lines
6.1 KiB
Python
import requests
|
|
from datetime import datetime
|
|
from scopus_api import key as API_KEY
|
|
import json
|
|
import os
|
|
import logging
|
|
import re
|
|
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
RETRY_COUNT = 5
|
|
TIMEOUT_SECS = 10
|
|
|
|
# Initialize a global session object
|
|
s = requests.Session()
|
|
s.headers.update({'X-ELS-APIKey' : API_KEY,
|
|
'X-ELS-ResourceVersion' : 'XOCS',
|
|
'Accept' : 'application/json'})
|
|
|
|
def get_token(location_id = None):
|
|
'''Given a location_id, gets an authentication token'''
|
|
print('Getting a token')
|
|
api_resource = 'http://api.elsevier.com/authenticate'
|
|
# Parameters
|
|
payload = {'platform':'SCOPUS',
|
|
'choice': location_id}
|
|
r = s.get(api_resource, params = payload)
|
|
r.raise_for_status()
|
|
s.headers['X-ELS-AuthToken'] = r.json()['authenticate-response']['authtoken']
|
|
|
|
def get_search_results(query, output_file, results_per_call = 200,
|
|
tot_results=None, year=None, sort='+title', citation_call=False):
|
|
'''Handles getting search results. Takes a query and an output
|
|
file. Writes as many of the search results as possible to the
|
|
output file as JSON dictionaries, one per line.'''
|
|
result_set = []
|
|
results_added = 0
|
|
def curr_call(start=0, count=results_per_call):
|
|
'''Shorthand for the current call: DRY'''
|
|
return make_search_call(query, start=start,
|
|
count=count, year=year, sort=sort)
|
|
if tot_results == None:
|
|
# Call the API initially to figure out how many results there are, and write the results
|
|
initial_results = curr_call(count=results_per_call)
|
|
tot_results = int(initial_results['search-results']['opensearch:totalResults'])
|
|
result_set.append((initial_results, sort))
|
|
results_added += results_per_call
|
|
logging.debug("Total results: {}".format(tot_results))
|
|
|
|
if tot_results == 0:
|
|
return None
|
|
if tot_results > 5000:
|
|
# If this is just one year, we can't get any more granular, and
|
|
# we need to return what we can.
|
|
if tot_results > 10000:
|
|
print("{} results for {}. We can only retrieve 10,000".format(tot_results, year))
|
|
first_half = last_half = 5000
|
|
else:
|
|
# Get half, and correct for odd # of results
|
|
first_half = tot_results//2 + tot_results % 2
|
|
last_half = tot_results//2
|
|
# Break the search into the first half and the bottom half of results.
|
|
get_search_results(query, output_file,
|
|
year = year,
|
|
tot_results=first_half)
|
|
# Get the other half
|
|
get_search_results(query, output_file,
|
|
year = year,
|
|
tot_results = last_half, sort='-title')
|
|
# If there are 5000 or fewer to retrieve, then get them
|
|
else:
|
|
logging.debug('Retrieving {} results'.format(tot_results))
|
|
# As long as there are more citations to retrieve, then do it, and write
|
|
# them to the file
|
|
while results_added < tot_results:
|
|
# If we are near the end, then only get as many results as are left.
|
|
to_retrieve = min(results_per_call, (tot_results - results_added))
|
|
curr_results = curr_call(start=results_added, count=to_retrieve)
|
|
result_set.append((curr_results, sort))
|
|
results_added += results_per_call
|
|
# This is hacky, but I'm doing it
|
|
# If this is a citation call, then construct metadata to be written with the result
|
|
if citation_call:
|
|
metadata = {'parent_eid': re.match(r'refeid\((.*)\)', query).group(1)}
|
|
else:
|
|
metadata = {}
|
|
write_results(result_set, output_file, metadata)
|
|
|
|
def write_results(result_set, output_file, metadata={}):
|
|
for x in result_set:
|
|
search_json = x[0]
|
|
to_reverse = x[1].startswith('-')
|
|
try:
|
|
results = [x for x in search_json['search-results']['entry']]
|
|
except KeyError:
|
|
raise
|
|
if to_reverse:
|
|
results = results[::-1]
|
|
for x in results:
|
|
for k, v in metadata.items():
|
|
x[k] = v
|
|
json.dump(x, output_file)
|
|
output_file.write('\n')
|
|
|
|
|
|
def make_search_call(query, start=0, count=200,
|
|
sort='+title', year=None,
|
|
retry_limit = RETRY_COUNT,
|
|
timeout_secs = TIMEOUT_SECS):
|
|
api_resource = "https://api.elsevier.com/content/search/scopus"
|
|
# Parameters
|
|
payload = {'query':query,
|
|
'count':count,
|
|
'start':start,
|
|
'sort': sort,
|
|
'date': year}
|
|
for _ in range(retry_limit):
|
|
try:
|
|
r = s.get(api_resource,
|
|
params = payload,
|
|
timeout = timeout_secs)
|
|
logging.debug(r.url)
|
|
if r.status_code == 401:
|
|
get_token()
|
|
continue
|
|
if r.status_code == 400:
|
|
raise requests.exceptions.HTTPError('Bad request; possibly you aren\'t connected to an institution with Scopus acces?')
|
|
break
|
|
except requests.exceptions.Timeout:
|
|
pass
|
|
else:
|
|
raise requests.exceptions.Timeout('Timeout Error')
|
|
|
|
r.raise_for_status()
|
|
return r.json()
|
|
|
|
|
|
def get_cited_by(eid, output_file):
|
|
return get_search_results('refeid({})'.format(eid), output_file, results_per_call=200,
|
|
citation_call = True)
|
|
|
|
|
|
def get_abstract(eid, retry_limit = RETRY_COUNT,
|
|
timeout_secs = TIMEOUT_SECS):
|
|
api_resource = "http://api.elsevier.com/content/abstract/eid/{}".format(eid)
|
|
# Parameters
|
|
payload = {}
|
|
for _ in range(retry_limit):
|
|
try:
|
|
r = s.get(api_resource,
|
|
params = payload,
|
|
timeout = timeout_secs)
|
|
if r.status_code == 401:
|
|
get_token()
|
|
continue
|
|
if r.status_code == 400:
|
|
raise requests.exceptions.HTTPError('Bad request; possibly you aren\'t connected to an institution with Scopus acces?')
|
|
break
|
|
except requests.exceptions.Timeout:
|
|
pass
|
|
else:
|
|
raise requests.exceptions.Timeout('Timeout Error')
|
|
if r.status_code == 404:
|
|
return None
|
|
r.raise_for_status()
|
|
return r.content.decode('utf-8')
|