# This file is part of morss # # Copyright (C) 2013-2020 pictuga # # This program is free software: you can redistribute it and/or modify it under # the terms of the GNU Affero General Public License as published by the Free # Software Foundation, either version 3 of the License, or (at your option) any # later version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more # details. # # You should have received a copy of the GNU Affero General Public License along # with this program. If not, see . import csv import json import re from copy import deepcopy from datetime import datetime from fnmatch import fnmatch import dateutil.parser import lxml.html from dateutil import tz from lxml import etree from .readabilite import parse as html_parse from .util import * json.encoder.c_make_encoder = None try: # python 2 from ConfigParser import RawConfigParser from StringIO import StringIO except ImportError: # python 3 from configparser import RawConfigParser from io import StringIO try: # python 2 basestring except NameError: # python 3 basestring = unicode = str def parse_rules(filename=None): if not filename: filename = pkg_path('feedify.ini') config = RawConfigParser() config.read(filename) rules = dict([(x, dict(config.items(x))) for x in config.sections()]) for section in rules.keys(): # for each ruleset for arg in rules[section].keys(): # for each rule if rules[section][arg].startswith('file:'): path = data_path('www', rules[section][arg][5:]) file_raw = open(path).read() file_clean = re.sub('<[/?]?(xsl|xml)[^>]+?>', '', file_raw) rules[section][arg] = file_clean elif '\n' in rules[section][arg]: rules[section][arg] = rules[section][arg].split('\n')[1:] return rules def parse(data, url=None, encoding=None, ruleset=None): " Determine which ruleset to use " if ruleset is not None: rulesets = [ruleset] else: rulesets = parse_rules().values() parsers = [FeedXML, FeedHTML, FeedJSON] # 1) Look for a ruleset based on path if url is not None: for ruleset in rulesets: if 'path' in ruleset: for path in ruleset['path']: if fnmatch(url, path): parser = [x for x in parsers if x.mode == ruleset.get('mode')][0] # FIXME what if no mode specified? return parser(data, ruleset, encoding=encoding) # 2) Try each and every parser # 3) Look for working ruleset for given parser # 3a) See if parsing works # 3b) See if .items matches anything for parser in parsers: try: feed = parser(data, encoding=encoding) except (ValueError, SyntaxError): # parsing did not work pass else: # parsing worked, now we try the rulesets ruleset_candidates = [x for x in rulesets if x.get('mode') in (parser.mode, None) and 'path' not in x] # 'path' as they should have been caught beforehands # try anyway if no 'mode' specified for ruleset in ruleset_candidates: feed.rules = ruleset try: feed.items[0] except (AttributeError, IndexError, TypeError): # parsing and or item picking did not work out pass else: # it worked! return feed raise TypeError('no way to handle this feed') class ParserBase(object): def __init__(self, data=None, rules=None, parent=None, encoding=None): if rules is None: rules = parse_rules()[self.default_ruleset] self.rules = rules if data is None: data = rules['base'] self.parent = parent self.encoding = encoding self.root = self.parse(data) def parse(self, raw): pass def remove(self): # delete oneslf pass def tostring(self, **k): # output in its input format # to output in sth fancy (json, csv, html), change class type with .convert first pass def torss(self, **k): return self.convert(FeedXML).tostring(**k) def tojson(self, **k): return self.convert(FeedJSON).tostring(**k) def tocsv(self, encoding='unicode'): out = StringIO() c = csv.writer(out, dialect=csv.excel) for item in self.items: c.writerow([getattr(item, x) for x in item.dic]) out.seek(0) out = out.read() if encoding != 'unicode': out = out.encode(encoding) return out def tohtml(self, **k): return self.convert(FeedHTML).tostring(**k) def convert(self, TargetParser): target = TargetParser() if type(self) == TargetParser and self.rules == target.rules: # check both type *AND* rules (e.g. when going from freeform xml to rss) return self for attr in target.dic: if attr == 'items': for item in self.items: target.append(item) else: setattr(target, attr, getattr(self, attr)) return target # RULE-BASED FUNCTIONS def rule_search(self, rule): # xpath, return the first one only try: return self.rule_search_all(rule)[0] except IndexError: return None def rule_search_all(self, rule): # xpath, return all raw matches (useful to find feed items) pass def rule_search_last(self, rule): # xpath, return only the first raw match try: return self.rule_search_all(rule)[-1] except IndexError: return None def rule_create(self, rule): # create node based on rule # (duplicate, copy existing (or template) or create from scratch, if possible) # --> might want to create node_duplicate helper fns pass def rule_remove(self, rule): # remove node from its parent. Returns nothing pass def rule_set(self, rule, value): # set the value. Returns nothing pass def rule_str(self, rule): # GETs inside (pure) text from it pass # PARSERS def time_prs(self, x): # parse try: return parse_time(x) except ValueError: return None def time_fmt(self, x): # format try: time = parse_time(x) return time.strftime(self.rules.get('timeformat', self.default_timeformat)) except ValueError: pass default_timeformat = "%D" # HELPERS def get_raw(self, rule_name): # get the raw output, for self.get_raw('items') if rule_name not in self.rules: return [] return self.rule_search_all(self.rules[rule_name]) def get(self, rule_name): # simple function to get nice text from the rule name # for use in @property, ie. self.get('title') if rule_name not in self.rules: return None return self.rule_str(self.rules[rule_name]) or None def set(self, rule_name, value): # simple function to set nice text from the rule name. Returns nothing if rule_name not in self.rules: return if value is None: self.rmv(rule_name) return try: self.rule_set(self.rules[rule_name], value) except AttributeError: # does not exist, have to create it try: self.rule_create(self.rules[rule_name]) except AttributeError: # no way to create it, give up pass else: self.rule_set(self.rules[rule_name], value) def rmv(self, rule_name): # easy deleter if rule_name not in self.rules: return self.rule_remove(self.rules[rule_name]) class ParserXML(ParserBase): default_ruleset = 'rss-channel' mode = 'xml' mimetype = ['text/xml', 'application/xml', 'application/rss+xml', 'application/rdf+xml', 'application/atom+xml', 'application/xhtml+xml'] NSMAP = {'atom': 'http://www.w3.org/2005/Atom', 'atom03': 'http://purl.org/atom/ns#', 'rdf': 'http://www.w3.org/1999/02/22-rdf-syntax-ns#', 'content': 'http://purl.org/rss/1.0/modules/content/', 'rssfake': 'http://purl.org/rss/1.0/'} def parse(self, raw): parser = etree.XMLParser(recover=True, remove_blank_text=True, remove_pis=True) # remove_blank_text needed for pretty_print return etree.fromstring(raw, parser) def remove(self): return self.root.getparent().remove(self.root) def tostring(self, encoding='unicode', **k): return etree.tostring(self.root, encoding=encoding, method='xml', **k) def _rule_parse(self, rule): test = re.search(r'^(.*)/@([a-z]+)$', rule) # to match //div/a/@href return test.groups() if test else (rule, None) def _resolve_ns(self, rule): # shortname to full name match = re.search(r'^([^:]+):([^:]+)$', rule) # to match fakerss:content if match: match = match.groups() if match[0] in self.NSMAP: return "{%s}%s" % (self.NSMAP[match[0]], match[1].lower()) return rule @staticmethod def _inner_html(xml): return (xml.text or '') + ''.join([etree.tostring(child, encoding='unicode') for child in xml]) @staticmethod def _clean_node(xml): if xml is not None: if len(xml): [xml.remove(child) for child in xml] xml.text = None def rule_search_all(self, rule): try: match = self.root.xpath(rule, namespaces=self.NSMAP) if isinstance(match, str): # some xpath rules return a single string instead of an array (e.g. concatenate() ) return [match,] else: return match except etree.XPathEvalError: return [] def rule_create(self, rule): # duplicate, copy from template or create from scratch rrule, key = self._rule_parse(rule) # try recreating based on the rule (for really basic rules, ie. plain RSS) `/feed/item` if re.search(r'^[a-zA-Z0-9/:]+$', rrule): chain = rrule.strip('/').split('/') current = self.root if rrule[0] == '/': # we skip the first chain-element, as we _start_ from the first/root one # i.e. for "/rss/channel/title" we only keep "/channel/title" chain = chain[1:] for (i, node) in enumerate(chain): test = current.find(self._resolve_ns(node)) if test is not None and i < len(chain) - 1: # yay, go on current = test else: # opps need to create element = etree.Element(self._resolve_ns(node)) current.append(element) current = element return current # try duplicating from existing (works well with fucked up structures) match = self.rule_search_last(rrule) if match: element = deepcopy(match) match.getparent().append(element) return element return None def rule_remove(self, rule): rrule, key = self._rule_parse(rule) match = self.rule_search(rrule) if match is None: return elif key is not None: if key in match.attrib: del match.attrib[key] else: match.getparent().remove(match) def rule_set(self, rule, value): rrule, key = self._rule_parse(rule) match = self.rule_search(rrule) html_rich = ('atom' in rule or self.rules.get('mode') == 'html') \ and rule in [self.rules.get('item_desc'), self.rules.get('item_content')] if key is not None: match.attrib[key] = value else: if html_rich: self._clean_node(match) match.append(lxml.html.fragment_fromstring(value, create_parent='div')) if self.rules.get('mode') == 'html': match.find('div').drop_tag() # not supported by lxml.etree else: # i.e. if atom match.attrib['type'] = 'xhtml' else: if match is not None and len(match): self._clean_node(match) match.attrib['type'] = 'html' match.text = value def rule_str(self, rule): match = self.rule_search(rule) html_rich = ('atom' in rule or self.mode == 'html') \ and rule in [self.rules.get('item_desc'), self.rules.get('item_content')] if isinstance(match, etree._Element): if html_rich: # atom stuff return self._inner_html(match) else: return etree.tostring(match, method='text', encoding='unicode').strip() else: return match # might be None is no match class ParserHTML(ParserXML): default_ruleset = 'html' mode = 'html' mimetype = ['text/html', 'application/xhtml+xml'] def parse(self, raw): return html_parse(raw, encoding=self.encoding) def tostring(self, encoding='unicode', **k): return lxml.html.tostring(self.root, encoding=encoding, method='html', **k) def rule_search_all(self, rule): try: # do proper "class" matching (too "heavy" to type as-it in rules) pattern = r'\[class=([^\]]+)\]' repl = r'[@class and contains(concat(" ", normalize-space(@class), " "), " \1 ")]' rule = re.sub(pattern, repl, rule) match = self.root.xpath(rule) if isinstance(match, str): # for some xpath rules, see XML parser return [match,] else: return match except etree.XPathEvalError: return [] def rule_create(self, rule): # try duplicating from existing (works well with fucked up structures) rrule, key = self._rule_parse(rule) match = self.rule_search_last(rule) if match is not None: element = deepcopy(match) match.getparent().append(element) else: raise AttributeError('no way to create item') def parse_time(value): # parsing per se if value is None or value == 0: time = None elif isinstance(value, basestring): if re.match(r'^[0-9]+$', value): time = datetime.fromtimestamp(int(value)) else: time = dateutil.parser.parse(value) elif isinstance(value, int): time = datetime.fromtimestamp(value) elif isinstance(value, datetime): time = value else: time = None # add default time zone if none set if time is not None and time.tzinfo is None: time = time.replace(tzinfo=tz.tzutc()) return time class ParserJSON(ParserBase): default_ruleset = 'json' mode = 'json' mimetype = ['application/json', 'application/javascript', 'text/javascript'] def parse(self, raw): return json.loads(raw) def remove(self): # impossible to "delete" oneself per se but can clear all its items for attr in self.root: del self.root[attr] def tostring(self, encoding='unicode', **k): dump = json.dumps(self.root, ensure_ascii=False, **k) # ensure_ascii = False to have proper (unicode) string and not \u00 if encoding != 'unicode': return dump.encode(encoding) else: return dump def _rule_parse(self, rule): return rule.split(".") def rule_search_all(self, rule): try: rrule = self._rule_parse(rule) cur = self.root for node in rrule: if node == '[]': break else: cur = cur[node] return cur if isinstance(cur, list) else [cur,] except (AttributeError, KeyError): return [] def rule_create(self, rule): # create from scracth rrule = self._rule_parse(rule) cur = self.root for (i, node) in enumerate(rrule): if rrule[i+1] == '[]': if node in cur and isinstance(cur[node], list): cur[node].append({}) else: cur[node] = [{}] return else: if node in cur: # yay, go on cur = cur[node] else: # opps need to create cur[node] = {} def rule_remove(self, rule): if '[]' in rule: raise ValueError('not supported') # FIXME rrule = self._rule_parse(rule) cur = self.root try: for node in rrule[:-1]: cur = cur[node] del cur[rrule[-1]] except KeyError: # nothing to delete pass def rule_set(self, rule, value): if '[]' in rule: raise ValueError('not supported') # FIXME rrule = self._rule_parse(rule) cur = self.root for node in rrule[:-1]: cur = cur[node] cur[rrule[-1]] = value def rule_str(self, rule): out = self.rule_search(rule) return out.replace('\n', '
') if out else out def wrap_uniq(wrapper_fn_name): " Wraps the output of the function with the specified function " # This is called when parsing "wrap_uniq('wrap_item')" def decorator(func): # This is called when parsing "@wrap_uniq('wrap_item')" def wrapped_func(self, *args, **kwargs): # This is called when the wrapped function is called output = func(self, *args, **kwargs) output_id = id(output) try: return self._map[output_id] except (KeyError, AttributeError): if not hasattr(self, '_map'): self._map = {} wrapper_fn = getattr(self, wrapper_fn_name) obj = wrapper_fn(output) self._map[output_id] = obj return obj return wrapped_func return decorator class Feed(object): itemsClass = property(lambda x: Item) # because Item is define below, i.e. afterwards dic = ('title', 'desc', 'items') title = property( lambda f: f.get('title'), lambda f,x: f.set('title', x), lambda f: f.rmv('title') ) description = desc = property( lambda f: f.get('desc'), lambda f,x: f.set('desc', x), lambda f: f.rmv('desc') ) items = property( lambda f: f ) def append(self, new=None): self.rule_create(self.rules['items']) item = self.items[-1] for attr in self.itemsClass.dic: try: setattr(item, attr, getattr(new, attr)) except AttributeError: try: setattr(item, attr, new[attr]) except (KeyError, IndexError, TypeError): pass return item def wrap_item(self, item): return self.itemsClass(item, self.rules, self) @wrap_uniq('wrap_item') def __getitem__(self, key): return self.get_raw('items')[key] def __delitem__(self, key): self[key].remove() def __len__(self): return len(self.get_raw('items')) class Item(object): dic = ('title', 'link', 'desc', 'content', 'time', 'updated') def __init__(self, xml=None, rules=None, parent=None): self._id = self._gen_id(xml) self.root = xml self.rules = rules self.parent = parent @staticmethod def _gen_id(xml=None, *args, **kwargs): return id(xml) title = property( lambda f: f.get('item_title'), lambda f,x: f.set('item_title', x), lambda f: f.rmv('item_title') ) link = property( lambda f: f.get('item_link'), lambda f,x: f.set('item_link', x), lambda f: f.rmv('item_link') ) description = desc = property( lambda f: f.get('item_desc'), lambda f,x: f.set('item_desc', x), lambda f: f.rmv('item_desc') ) content = property( lambda f: f.get('item_content'), lambda f,x: f.set('item_content', x), lambda f: f.rmv('item_content') ) time = property( lambda f: f.time_prs(f.get('item_time')), lambda f,x: f.set('item_time', f.time_fmt(x)), lambda f: f.rmv('item_time') ) updated = property( lambda f: f.time_prs(f.get('item_updated')), lambda f,x: f.set('item_updated', f.time_fmt(x)), lambda f: f.rmv('item_updated') ) class ItemXML(Item, ParserXML): pass class FeedXML(Feed, ParserXML): itemsClass = ItemXML def root_siblings(self): out = [] current = self.root.getprevious() while current is not None: out.append(current) current = current.getprevious() return out def tostring(self, encoding='unicode', **k): # override needed due to "getroottree" inclusion # and to add stylesheet stylesheets = [x for x in self.root_siblings() if isinstance(x, etree.PIBase) and x.target == 'xml-stylesheet'] for stylesheet in stylesheets: # remove all stylesheets present (be that ours or others') self.root.append(stylesheet) # needed as we can't delete root siblings https://stackoverflow.com/a/60232366 self.root.remove(stylesheet) self.root.addprevious(etree.PI('xml-stylesheet', 'type="text/xsl" href="/sheet.xsl"')) return etree.tostring(self.root.getroottree(), encoding=encoding, method='xml', **k) class ItemHTML(Item, ParserHTML): pass class FeedHTML(Feed, ParserHTML): itemsClass = ItemHTML class ItemJSON(Item, ParserJSON): def remove(self): rrule = self._rule_parse(self.rules['items']) cur = self.parent.root for node in rrule: if node == '[]': cur.remove(self.root) return cur = cur[node] class FeedJSON(Feed, ParserJSON): itemsClass = ItemJSON if __name__ == '__main__': import sys from . import crawler req = crawler.adv_get(sys.argv[1] if len(sys.argv) > 1 else 'https://www.nytimes.com/', follow='rss') feed = parse(req['data'], url=req['url'], encoding=req['encoding']) if sys.flags.interactive: print('>>> Interactive shell: try using `feed`') else: for item in feed.items: print(item.title, item.link)