Archived
1
0
This repository has been archived on 2025-03-03. You can view files and clone it, but cannot push or open issues or pull requests.
2014-06-10 12:49:00 +02:00

325 lines
12 KiB
Python

import re
from scrapy import log
from scrapy.http import Request
from scrapy.selector import Selector
from source import Source
from FourmiCrawler.items import Result
# [TODO]: values can be '128.', perhaps remove the dot in that case?
# [TODO]: properties have references and comments which do not exist in the
# Result item, but should be included eventually.
class NIST(Source):
"""
NIST Scraper plugin
This plugin manages searching for a chemical on the NIST website
and parsing the resulting page if the chemical exists on NIST.
"""
website = "http://webbook.nist.gov/*"
search = 'cgi/cbook.cgi?Name=%s&Units=SI&cTP=on'
cfg = {}
def __init__(self, config={}):
"""
Initialization of NIST scraper
:param config: configuration variables for this scraper, must contain
'reliability' key.
"""
Source.__init__(self, config)
self.ignore_list = set()
self.cfg = config
def parse(self, response):
"""
This function is called when a Response matching the variable
'website' is available for parsing the Response object.
:param response: The Scrapy Response object to be parsed
:return: a list of Result items and Request objects
"""
sel = Selector(response)
title = sel.xpath('head/title/text()').extract()[0]
if title == 'Name Not Found':
log.msg('NIST: Chemical not found!', level=log.ERROR)
return
if title not in self.ignore_list:
self.ignore_list.update(title)
log.msg('NIST emit synonym: %s' % title, level=log.DEBUG)
self._spider.get_synonym_requests(title)
requests = []
requests.extend(self.parse_generic_info(sel))
symbol_table = {}
tds = sel.xpath('//table[@class="symbol_table"]/tr/td')
for (symbol_td, name_td) in zip(tds[::2], tds[1::2]):
symbol = ''.join(symbol_td.xpath('node()').extract())
name = name_td.xpath('text()').extract()[0]
symbol_table[symbol] = name
log.msg('NIST symbol: |%s|, name: |%s|' % (symbol, name),
level=log.DEBUG)
for table in sel.xpath('//table[@class="data"]'):
summary = table.xpath('@summary').extract()[0]
if summary == 'One dimensional data':
log.msg('NIST table: Aggregrate data', level=log.DEBUG)
requests.extend(
self.parse_aggregate_data(table, symbol_table))
elif table.xpath('tr/th="Initial Phase"').extract()[0] == '1':
log.msg('NIST table; Enthalpy/entropy of phase transition',
level=log.DEBUG)
requests.extend(self.parse_transition_data(table, summary))
elif table.xpath('tr[1]/td'):
log.msg('NIST table: Horizontal table', level=log.DEBUG)
elif summary == 'Antoine Equation Parameters':
log.msg('NIST table: Antoine Equation Parameters',
level=log.DEBUG)
requests.extend(self.parse_antoine_data(table, summary))
elif len(table.xpath('tr[1]/th')) == 5:
log.msg('NIST table: generic 5 columns', level=log.DEBUG)
# Symbol (unit) Temperature (K) Method Reference Comment
requests.extend(self.parse_generic_data(table, summary))
elif len(table.xpath('tr[1]/th')) == 4:
log.msg('NIST table: generic 4 columns', level=log.DEBUG)
# Symbol (unit) Temperature (K) Reference Comment
requests.extend(self.parse_generic_data(table, summary))
else:
log.msg('NIST table: NOT SUPPORTED', level=log.WARNING)
continue # Assume unsupported
return requests
def parse_generic_info(self, sel):
"""
This function parses: synonyms, chemical formula, molecular weight,
InChI, InChiKey, CAS number
:param sel: A Selector object of the entire page in the original
response
:return: a list of Result items
"""
ul = sel.xpath('body/ul[li/strong="IUPAC Standard InChI:"]')
li = ul.xpath('li')
raw_synonyms = ul.xpath('li[strong="Other names:"]/text()').extract()
for synonym in raw_synonyms[0].strip().split(';\n'):
log.msg('NIST synonym: %s' % synonym, level=log.DEBUG)
self.ignore_list.update(synonym)
self._spider.get_synonym_requests(synonym)
data = {}
raw_formula = ul.xpath('li[strong/a="Formula"]//text()').extract()
data['Chemical formula'] = ''.join(raw_formula[2:]).strip()
raw_mol_weight = ul.xpath('li[strong/a="Molecular weight"]/text()')
data['Molecular weight'] = raw_mol_weight.extract()[0].strip()
raw_inchi = ul.xpath('li[strong="IUPAC Standard InChI:"]//tt/text()')
data['IUPAC Standard InChI'] = raw_inchi.extract()[0]
raw_inchikey = ul.xpath('li[strong="IUPAC Standard InChIKey:"]'
'/tt/text()')
data['IUPAC Standard InChIKey'] = raw_inchikey.extract()[0]
raw_cas_number = ul.xpath('li[strong="CAS Registry Number:"]/text()')
data['CAS Registry Number'] = raw_cas_number.extract()[0].strip()
requests = []
for key, value in data.iteritems():
result = self.newresult(
attribute=key,
value=value
)
requests.append(result)
return requests
def parse_aggregate_data(self, table, symbol_table):
"""
This function parses the table(s) which contain possible links to
individual data points
:param table: a Selector object of the table to be parsed
:param symbol_table: a dictionary containing translations of raw HTML
tags to human readable names
:return: a list of Result items and Request objects
"""
results = []
for tr in table.xpath('tr[td]'):
extra_data_url = tr.xpath('td[last()][a="Individual data points"]'
'/a/@href').extract()
if extra_data_url:
request = Request(url=self.website[:-1] + extra_data_url[0],
callback=self.parse_individual_datapoints)
results.append(request)
continue
data = []
for td in tr.xpath('td'):
data.append(''.join(td.xpath('node()').extract()))
name = symbol_table[data[0]]
condition = ''
m = re.match(r'(.*) at (.*)', name)
if m:
name = m.group(1)
condition = m.group(2)
result = self.newresult(
attribute=name,
value=data[1] + ' ' + data[2],
conditions=condition
)
log.msg('NIST: |%s|' % data, level=log.DEBUG)
results.append(result)
return results
def parse_transition_data(self, table, summary):
"""
This function parses the table containing properties regarding phase
changes
:param table: a Selector object of the table to be parsed
:param summary: the name of the property
:return: a list of Result items
"""
results = []
tr_unit = ''.join(table.xpath('tr[1]/th[1]/node()').extract())
m = re.search(r'\((.*)\)', tr_unit)
unit = '!'
if m:
unit = m.group(1)
for tr in table.xpath('tr[td]'):
tds = tr.xpath('td/text()').extract()
result = self.newresult(
attribute=summary,
value=tds[0] + ' ' + unit,
conditions='%s K, (%s -> %s)' % (tds[1], tds[2], tds[3])
)
results.append(result)
return results
def parse_generic_data(self, table, summary):
"""
Parses the common tables of 4 and 5 rows. Assumes they are of the
form:
Symbol (unit)|Temperature (K)|Method|Reference|Comment
Symbol (unit)|Temperature (K)|Reference|Comment
:param table: a Selector object of the table to be parsed
:param summary: the name of the property
:return: a list of Result items
"""
results = []
tr_unit = ''.join(table.xpath('tr[1]/th[1]/node()').extract())
m = re.search(r'\((.*)\)', tr_unit)
unit = '!'
if m:
unit = m.group(1)
for tr in table.xpath('tr[td]'):
tds = tr.xpath('td/text()').extract()
result = self.newresult(
attribute=summary,
value=tds[0] + ' ' + unit,
conditions='%s K' % tds[1]
)
results.append(result)
return results
def parse_antoine_data(self, table, summary):
"""
This function parses the table containing parameters for the Antione
equation
:param table: a Selector object of the table to be parsed
:param summary: the name of the property
:return: a list of Result items
"""
results = []
for tr in table.xpath('tr[td]'):
tds = tr.xpath('td/text()').extract()
result = self.newresult(
attribute=summary,
value='A=%s, B=%s, C=%s' % (tds[1], tds[2], tds[3]),
conditions='%s K' % tds[0]
)
results.append(result)
return results
def parse_individual_datapoints(self, response):
"""
This function parses the 'individual data points' page linked from
the aggregate data table(s)
:param response: the Scrapy Response object to be parsed
:return: a list of Result items
"""
sel = Selector(response)
table = sel.xpath('//table[@class="data"]')[0]
results = []
name = table.xpath('@summary').extract()[0]
condition = ''
m = re.match(r'(.*) at (.*)', name)
if m:
name = m.group(1)
condition = m.group(2)
tr_unit = ''.join(table.xpath('tr[1]/th[1]/node()').extract())
m = re.search(r'\((.*)\)', tr_unit)
unit = '!'
if m:
unit = m.group(1)
for tr in table.xpath('tr[td]'):
tds = tr.xpath('td/text()').extract()
uncertainty = ''
m = re.search('Uncertainty assigned by TRC = (.*?) ', tds[-1])
if m:
uncertainty = '+- %s ' % m.group(1)
# [TODO]: get the plusminus sign working in here
result = self.newresult(
attribute=name,
value='%s %s%s' % (tds[0], uncertainty, unit),
conditions=condition
)
results.append(result)
return results
def newresult(self, attribute, value, conditions=''):
"""
This function abstracts from the Result item and provides default
values
:param attribute: the name of the attribute
:param value: the value of the attribute
:param conditions: optional conditions regarding the value
:return: A Result item
"""
return Result({
'attribute': attribute,
'value': value,
'source': 'NIST',
'reliability': self.cfg['reliability'],
'conditions': conditions
})
def new_compound_request(self, compound):
"""
This function is called when a new synonym is returned to the spider
to generate new requests
:param compound: the name of the compound to search for
"""
if compound not in self.ignore_list:
self.ignore_list.update(compound)
return Request(url=self.website[:-1] + self.search % compound,
callback=self.parse)