334 lines
12 KiB
Python
334 lines
12 KiB
Python
import re
|
|
|
|
from scrapy import log
|
|
from scrapy.http import Request
|
|
from scrapy.selector import Selector
|
|
|
|
from source import Source
|
|
from FourmiCrawler.items import Result
|
|
|
|
|
|
# [TODO]: values can be '128.', perhaps remove the dot in that case?
|
|
# [TODO]: properties have references and comments which do not exist in the
|
|
# Result item, but should be included eventually.
|
|
|
|
class NIST(Source):
|
|
"""
|
|
NIST Scraper plugin
|
|
This plugin manages searching for a chemical on the NIST website
|
|
and parsing the resulting page if the chemical exists on NIST.
|
|
"""
|
|
website = "http://webbook\.nist\.gov/.*"
|
|
|
|
search = 'cgi/cbook.cgi?Name=%s&Units=SI&cTP=on'
|
|
|
|
def __init__(self, config=None):
|
|
"""
|
|
Initialization of NIST scraper
|
|
:param config: configuration variables for this scraper, must contain
|
|
'reliability' key.
|
|
"""
|
|
Source.__init__(self, config)
|
|
self.ignore_list = set()
|
|
|
|
def parse(self, response):
|
|
"""
|
|
This function is called when a Response matching the variable
|
|
'website' is available for parsing the Response object.
|
|
:param response: The Scrapy Response object to be parsed
|
|
:return: a list of Result items and Request objects
|
|
"""
|
|
sel = Selector(response)
|
|
|
|
title = sel.xpath('head/title/text()').extract()[0]
|
|
if title == 'Name Not Found':
|
|
log.msg('NIST: Chemical not found!', level=log.ERROR)
|
|
return
|
|
if title not in self.ignore_list:
|
|
self.ignore_list.update(title)
|
|
log.msg('NIST emit synonym: %s' % title, level=log.DEBUG)
|
|
self._spider.get_synonym_requests(title)
|
|
|
|
requests = []
|
|
|
|
requests.extend(self.parse_generic_info(sel))
|
|
|
|
symbol_table = {}
|
|
tds = sel.xpath('//table[@class="symbol_table"]/tr/td')
|
|
for (symbol_td, name_td) in zip(tds[::2], tds[1::2]):
|
|
symbol = ''.join(symbol_td.xpath('node()').extract())
|
|
name = name_td.xpath('text()').extract()[0]
|
|
symbol_table[symbol] = name
|
|
log.msg('NIST symbol: |%s|, name: |%s|' % (symbol, name),
|
|
level=log.DEBUG)
|
|
|
|
requests.extend(self.parse_tables(sel, symbol_table))
|
|
|
|
return requests
|
|
|
|
def parse_tables(self, sel, symbol_table):
|
|
"""
|
|
This function identifies and distributes parsing of tables to other
|
|
functions below.
|
|
:param sel: A Selector object of the whole page
|
|
:param symbol_table: a dictionary containing translations of raw HTML
|
|
tags to human readable names
|
|
:return: a list of Result items and Requests
|
|
"""
|
|
requests = []
|
|
|
|
for table in sel.xpath('//table[@class="data"]'):
|
|
summary = table.xpath('@summary').extract()[0]
|
|
if summary == 'One dimensional data':
|
|
log.msg('NIST table: Aggregrate data', level=log.DEBUG)
|
|
requests.extend(
|
|
self.parse_aggregate_data(table, symbol_table))
|
|
elif table.xpath('tr/th="Initial Phase"').extract()[0] == '1':
|
|
log.msg('NIST table; Enthalpy/entropy of phase transition',
|
|
level=log.DEBUG)
|
|
requests.extend(self.parse_transition_data(table, summary))
|
|
elif table.xpath('tr[1]/td'):
|
|
log.msg('NIST table: Horizontal table', level=log.DEBUG)
|
|
elif summary == 'Antoine Equation Parameters':
|
|
log.msg('NIST table: Antoine Equation Parameters',
|
|
level=log.DEBUG)
|
|
requests.extend(self.parse_antoine_data(table, summary))
|
|
elif len(table.xpath('tr[1]/th')) == 5:
|
|
log.msg('NIST table: generic 5 columns', level=log.DEBUG)
|
|
# Symbol (unit) Temperature (K) Method Reference Comment
|
|
requests.extend(self.parse_generic_data(table, summary))
|
|
elif len(table.xpath('tr[1]/th')) == 4:
|
|
log.msg('NIST table: generic 4 columns', level=log.DEBUG)
|
|
# Symbol (unit) Temperature (K) Reference Comment
|
|
requests.extend(self.parse_generic_data(table, summary))
|
|
else:
|
|
log.msg('NIST table: NOT SUPPORTED', level=log.WARNING)
|
|
continue # Assume unsupported
|
|
return requests
|
|
|
|
def parse_generic_info(self, sel):
|
|
"""
|
|
This function parses: synonyms, chemical formula, molecular weight,
|
|
InChI, InChiKey, CAS number
|
|
:param sel: A Selector object of the entire page in the original
|
|
response
|
|
:return: a list of Result items
|
|
"""
|
|
ul = sel.xpath('body/ul[li/strong="IUPAC Standard InChI:"]')
|
|
|
|
raw_synonyms = ul.xpath('li[strong="Other names:"]/text()').extract()
|
|
for synonym in raw_synonyms[0].strip().split(';\n'):
|
|
log.msg('NIST synonym: %s' % synonym, level=log.DEBUG)
|
|
self.ignore_list.update(synonym)
|
|
self._spider.get_synonym_requests(synonym)
|
|
|
|
data = {}
|
|
|
|
raw_formula = ul.xpath('li[strong/a="Formula"]//text()').extract()
|
|
data['Chemical formula'] = ''.join(raw_formula[2:]).strip()
|
|
|
|
raw_mol_weight = ul.xpath('li[strong/a="Molecular weight"]/text()')
|
|
data['Molecular weight'] = raw_mol_weight.extract()[0].strip()
|
|
|
|
raw_inchi = ul.xpath('li[strong="IUPAC Standard InChI:"]//tt/text()')
|
|
data['IUPAC Standard InChI'] = raw_inchi.extract()[0]
|
|
|
|
raw_inchikey = ul.xpath('li[strong="IUPAC Standard InChIKey:"]'
|
|
'/tt/text()')
|
|
data['IUPAC Standard InChIKey'] = raw_inchikey.extract()[0]
|
|
|
|
raw_cas_number = ul.xpath('li[strong="CAS Registry Number:"]/text()')
|
|
data['CAS Registry Number'] = raw_cas_number.extract()[0].strip()
|
|
|
|
requests = []
|
|
for key, value in data.iteritems():
|
|
result = self.newresult(
|
|
attribute=key,
|
|
value=value
|
|
)
|
|
requests.append(result)
|
|
|
|
return requests
|
|
|
|
def parse_aggregate_data(self, table, symbol_table):
|
|
"""
|
|
This function parses the table(s) which contain possible links to
|
|
individual data points
|
|
:param table: a Selector object of the table to be parsed
|
|
:param symbol_table: a dictionary containing translations of raw HTML
|
|
tags to human readable names
|
|
:return: a list of Result items and Request objects
|
|
"""
|
|
results = []
|
|
for tr in table.xpath('tr[td]'):
|
|
extra_data_url = tr.xpath('td[last()][a="Individual data points"]'
|
|
'/a/@href').extract()
|
|
if extra_data_url:
|
|
request = Request(url=self.website[:-1] + extra_data_url[0],
|
|
callback=self.parse_individual_datapoints)
|
|
results.append(request)
|
|
continue
|
|
data = []
|
|
for td in tr.xpath('td'):
|
|
data.append(''.join(td.xpath('node()').extract()))
|
|
|
|
name = symbol_table[data[0]]
|
|
condition = ''
|
|
|
|
m = re.match(r'(.*) at (.*)', name)
|
|
if m:
|
|
name = m.group(1)
|
|
condition = m.group(2)
|
|
|
|
result = self.newresult(
|
|
attribute=name,
|
|
value=data[1] + ' ' + data[2],
|
|
conditions=condition
|
|
)
|
|
log.msg('NIST: |%s|' % data, level=log.DEBUG)
|
|
results.append(result)
|
|
return results
|
|
|
|
def parse_transition_data(self, table, summary):
|
|
"""
|
|
This function parses the table containing properties regarding phase
|
|
changes
|
|
:param table: a Selector object of the table to be parsed
|
|
:param summary: the name of the property
|
|
:return: a list of Result items
|
|
"""
|
|
results = []
|
|
|
|
unit = self.get_unit(table)
|
|
|
|
for tr in table.xpath('tr[td]'):
|
|
tds = tr.xpath('td/text()').extract()
|
|
result = self.newresult(
|
|
attribute=summary,
|
|
value=tds[0] + ' ' + unit,
|
|
conditions='%s K, (%s -> %s)' % (tds[1], tds[2], tds[3])
|
|
)
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
def parse_generic_data(self, table, summary):
|
|
"""
|
|
Parses the common tables of 4 and 5 rows. Assumes they are of the
|
|
form:
|
|
Symbol (unit)|Temperature (K)|Method|Reference|Comment
|
|
Symbol (unit)|Temperature (K)|Reference|Comment
|
|
:param table: a Selector object of the table to be parsed
|
|
:param summary: the name of the property
|
|
:return: a list of Result items
|
|
"""
|
|
results = []
|
|
|
|
unit = self.get_unit(table)
|
|
|
|
for tr in table.xpath('tr[td]'):
|
|
tds = tr.xpath('td/text()').extract()
|
|
result = self.newresult(
|
|
attribute=summary,
|
|
value=tds[0] + ' ' + unit,
|
|
conditions='%s K' % tds[1]
|
|
)
|
|
results.append(result)
|
|
return results
|
|
|
|
def parse_antoine_data(self, table, summary):
|
|
"""
|
|
This function parses the table containing parameters for the Antione
|
|
equation
|
|
:param table: a Selector object of the table to be parsed
|
|
:param summary: the name of the property
|
|
:return: a list of Result items
|
|
"""
|
|
results = []
|
|
|
|
for tr in table.xpath('tr[td]'):
|
|
tds = tr.xpath('td/text()').extract()
|
|
result = self.newresult(
|
|
attribute=summary,
|
|
value='A=%s, B=%s, C=%s' % (tds[1], tds[2], tds[3]),
|
|
conditions='%s K' % tds[0]
|
|
)
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
def parse_individual_datapoints(self, response):
|
|
"""
|
|
This function parses the 'individual data points' page linked from
|
|
the aggregate data table(s)
|
|
:param response: the Scrapy Response object to be parsed
|
|
:return: a list of Result items
|
|
"""
|
|
sel = Selector(response)
|
|
table = sel.xpath('//table[@class="data"]')[0]
|
|
|
|
results = []
|
|
|
|
name = table.xpath('@summary').extract()[0]
|
|
condition = ''
|
|
m = re.match(r'(.*) at (.*)', name)
|
|
if m:
|
|
name = m.group(1)
|
|
condition = m.group(2)
|
|
|
|
unit = self.get_unit(table)
|
|
|
|
for tr in table.xpath('tr[td]'):
|
|
tds = tr.xpath('td/text()').extract()
|
|
uncertainty = ''
|
|
m = re.search('Uncertainty assigned by TRC = (.*?) ', tds[-1])
|
|
if m:
|
|
uncertainty = '+- %s ' % m.group(1)
|
|
# [TODO]: get the plusminus sign working in here
|
|
result = self.newresult(
|
|
attribute=name,
|
|
value='%s %s%s' % (tds[0], uncertainty, unit),
|
|
conditions=condition
|
|
)
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
@staticmethod
|
|
def get_unit(table):
|
|
tr_unit = ''.join(table.xpath('tr[1]/th[1]/node()').extract())
|
|
m = re.search(r'\((.*)\)', tr_unit)
|
|
unit = '!'
|
|
if m:
|
|
unit = m.group(1)
|
|
|
|
return unit
|
|
|
|
def newresult(self, attribute, value, conditions=''):
|
|
"""
|
|
This function abstracts from the Result item and provides default
|
|
values
|
|
:param attribute: the name of the attribute
|
|
:param value: the value of the attribute
|
|
:param conditions: optional conditions regarding the value
|
|
:return: A Result item
|
|
"""
|
|
return Result({
|
|
'attribute': attribute,
|
|
'value': value,
|
|
'source': 'NIST',
|
|
'reliability': self.cfg['reliability'],
|
|
'conditions': conditions
|
|
})
|
|
|
|
def new_compound_request(self, compound):
|
|
"""
|
|
This function is called when a new synonym is returned to the spider
|
|
to generate new requests
|
|
:param compound: the name of the compound to search for
|
|
"""
|
|
if compound not in self.ignore_list:
|
|
self.ignore_list.update(compound)
|
|
return Request(url=self.website[:-2] + self.search % compound,
|
|
callback=self.parse)
|