Cache HTTP content using a custom Handler

Much much cleaner. Nothing comparable
master
pictuga 2015-04-06 23:26:12 +08:00
parent 006478d451
commit 32aa96afa7
4 changed files with 264 additions and 226 deletions

View File

@ -119,7 +119,7 @@ Using cache and passing arguments:
```python ```python
>>> import morss >>> import morss
>>> url = 'http://feeds.bbci.co.uk/news/rss.xml' >>> url = 'http://feeds.bbci.co.uk/news/rss.xml'
>>> cache = '/tmp/morss-cache' # cache folder, needs write permission >>> cache = '/tmp/morss-cache.db' # sqlite cache location
>>> options = {'csv':True, 'md':True} >>> options = {'csv':True, 'md':True}
>>> xml_string = morss.process(url, cache, options) >>> xml_string = morss.process(url, cache, options)
>>> xml_string[:50] >>> xml_string[:50]
@ -130,16 +130,15 @@ Using cache and passing arguments:
Doing it step-by-step: Doing it step-by-step:
```python ```python
import morss import morss, morss.crawler
url = 'http://newspaper.example/feed.xml' url = 'http://newspaper.example/feed.xml'
options = morss.Options(csv=True, md=True) # arguments options = morss.Options(csv=True, md=True) # arguments
cache_path = '/tmp/morss-cache' # cache folder, needs write permission morss.crawler.sqlite_default = '/tmp/morss-cache.db' # sqlite cache location
url, cache = morss.Init(url, cache_path, options) # properly create folders and objects rss = morss.Fetch(url, options) # this only grabs the RSS feed
rss = morss.Fetch(url, cache, options) # this only grabs the RSS feed
rss = morss.Before(rss, options) # applies first round of options rss = morss.Before(rss, options) # applies first round of options
rss = morss.Gather(rss, url, cache, options) # this fills the feed and cleans it up rss = morss.Gather(rss, url, options) # this fills the feed and cleans it up
rss = morss.After(rss, options) # applies second round of options rss = morss.After(rss, options) # applies second round of options
output = morss.Format(rss, options) # formats final feed output = morss.Format(rss, options) # formats final feed

View File

@ -1,20 +1,26 @@
import sys
import ssl import ssl
import socket import socket
from gzip import GzipFile from gzip import GzipFile
from io import BytesIO from io import BytesIO, StringIO
import re
import sqlite3
import time
try: try:
from urllib2 import BaseHandler, addinfourl, parse_keqv_list, parse_http_list from urllib2 import BaseHandler, addinfourl, parse_keqv_list, parse_http_list
import mimetools
except ImportError: except ImportError:
from urllib.request import BaseHandler, addinfourl, parse_keqv_list, parse_http_list from urllib.request import BaseHandler, addinfourl, parse_keqv_list, parse_http_list
import email
import re
try: try:
basestring basestring
except NameError: except NameError:
basestring = str basestring = unicode = str
buffer = memoryview
MIMETYPE = { MIMETYPE = {
@ -186,27 +192,195 @@ class HTTPRefreshHandler(BaseHandler):
https_response = http_response https_response = http_response
class EtagHandler(BaseHandler): class NotInCache(IOError):
def __init__(self, cache="", etag=None, lastmodified=None): pass
self.cache = cache
self.etag = etag
self.lastmodified = lastmodified class BaseCacheHandler(BaseHandler):
" Cache based on etags/last-modified. Inherit from this to implement actual storage "
private_cache = False # False to behave like a CDN (or if you just don't care), True like a PC
handler_order = 499
def __init__(self, force_min=None):
self.force_min = force_min # force_min (seconds) to bypass http headers, -1 forever, 0 never, -2 do nothing if not in cache, -3 is like -2 but raises an error
def _load(self, url):
out = list(self.load(url))
if sys.version > '3':
out[2] = email.message_from_string(out[2] or unicode()) # headers
else:
out[2] = mimetools.Message(StringIO(out[2] or unicode()))
out[3] = out[3] or bytes() # data
out[4] = out[4] or 0 # timestamp
return out
def load(self, url):
" Return the basic vars (code, msg, headers, data, timestamp) "
return (None, None, None, None, None)
def _save(self, url, code, msg, headers, data, timestamp):
headers = unicode(headers)
self.save(url, code, msg, headers, data, timestamp)
def save(self, url, code, msg, headers, data, timestamp):
" Save values to disk "
pass
def http_request(self, req): def http_request(self, req):
if self.cache: (code, msg, headers, data, timestamp) = self._load(req.get_full_url())
if self.etag:
req.add_unredirected_header('If-None-Match', self.etag) if 'etag' in headers:
if self.lastmodified: req.add_unredirected_header('If-None-Match', headers['etag'])
req.add_unredirected_header('If-Modified-Since', self.lastmodified)
if 'last-modified' in headers:
req.add_unredirected_header('If-Modified-Since', headers.get('last-modified'))
return req return req
def http_open(self, req):
(code, msg, headers, data, timestamp) = self._load(req.get_full_url())
# some info needed to process everything
cache_control = parse_http_list(headers.get('cache-control', ()))
cache_control += parse_http_list(headers.get('pragma', ()))
cc_list = [x for x in cache_control if '=' not in x]
cc_values = parse_keqv_list([x for x in cache_control if '=' in x])
cache_age = time.time() - timestamp
# list in a simple way what to do when
if self.force_min in (-2, -3):
if code is not None:
# already in cache, perfect, use cache
pass
else:
# ok then...
if self.force_min == -2:
headers['morss'] = 'from_cache'
resp = addinfourl(BytesIO(), headers, req.get_full_url(), 409)
resp.msg = 'Conflict'
return resp
elif self.force_min == -3:
raise NotInCache()
elif code is None:
# cache empty, refresh
return None
elif self.force_min == -1:
# force use cache
pass
elif self.force_min == 0:
# force refresh
return None
elif self.force_min is None and ('no-cache' in cc_list
or 'no-store' in cc_list
or ('private' in cc_list and not self.private)):
# kindly follow web servers indications, refresh
return None
elif 'max-age' in cc_values and int(cc_values['max-age']) > cache_age:
# server says it's still fine (and we trust him, if not, use force_min=0), use cache
pass
elif self.force_min is not None and self.force_min > cache_age:
# still recent enough for us, use cache
pass
else:
# according to the www, we have to refresh when nothing is said
return None
# return the cache as a response
headers['morss'] = 'from_cache' # TODO delete the morss header from incoming pages, to avoid websites messing up with us
resp = addinfourl(BytesIO(data), headers, req.get_full_url(), code)
resp.msg = msg
return resp
def http_response(self, req, resp):
# code for after-fetch, to know whether to save to hard-drive (if stiking to http headers' will)
if resp.code == 304:
return resp
if ('cache-control' in resp.headers or 'pragma' in resp.headers) and self.force_min is None:
cache_control = parse_http_list(resp.headers.get('cache-control', ()))
cache_control += parse_http_list(resp.headers.get('pragma', ()))
cc_list = [x for x in cache_control if '=' not in x]
if 'no-cache' in cc_list or 'no-store' in cc_list or ('private' in cc_list and not self.private):
# kindly follow web servers indications
return resp
if resp.headers.get('morss') == 'from_cache':
# it comes from cache, so no need to save it again
return resp
# save to disk
data = resp.read()
self._save(req.get_full_url(), resp.code, resp.msg, resp.headers, data, time.time())
fp = BytesIO(data)
old_resp = resp
resp = addinfourl(fp, old_resp.headers, old_resp.url, old_resp.code)
resp.msg = old_resp.msg
return resp
def http_error_304(self, req, fp, code, msg, headers): def http_error_304(self, req, fp, code, msg, headers):
if self.etag: (code, msg, headers, data, timestamp) = self._load(req.get_full_url())
headers.addheader('etag', self.etag)
if self.lastmodified: resp = addinfourl(BytesIO(data), headers, req.get_full_url(), code)
headers.addheader('last-modified', self.lastmodified) resp.msg = msg
resp = addinfourl(BytesIO(self.cache), headers, req.get_full_url(), 200)
return resp return resp
https_request = http_request https_request = http_request
https_open = http_open
https_response = http_response
sqlite_default = ':memory'
class SQliteCacheHandler(BaseCacheHandler):
def __init__(self, force_min=-1, filename=None):
BaseCacheHandler.__init__(self, force_min)
self.con = sqlite3.connect(filename or sqlite_default, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
self.con.execute('create table if not exists data (url unicode PRIMARY KEY, code int, msg unicode, headers unicode, data bytes, timestamp int)')
self.con.commit()
def __del__(self):
self.con.close()
def load(self, url):
row = self.con.execute('select * from data where url=?', (url,)).fetchone()
if not row:
return (None, None, None, None, None)
return row[1:]
def save(self, url, code, msg, headers, data, timestamp):
data = buffer(data)
if self.con.execute('select code from data where url=?', (url,)).fetchone():
self.con.execute('update data set code=?, msg=?, headers=?, data=?, timestamp=? where url=?',
(code, msg, headers, data, timestamp, url))
else:
self.con.execute('insert into data values (?,?,?,?,?,?)', (url, code, msg, headers, data, timestamp))
self.con.commit()

View File

@ -94,20 +94,20 @@ def format_string(string, getter, error=False):
return out return out
def pre_worker(url, cache): def pre_worker(url):
if urlparse(url).netloc == 'itunes.apple.com': if urlparse(url).netloc == 'itunes.apple.com':
match = re.search('/id([0-9]+)(\?.*)?$', url) match = re.search('/id([0-9]+)(\?.*)?$', url)
if match: if match:
iid = match.groups()[0] iid = match.groups()[0]
redirect = 'https://itunes.apple.com/lookup?id={id}'.format(id=iid) redirect = 'https://itunes.apple.com/lookup?id={id}'.format(id=iid)
cache.set('redirect', redirect) return redirect
return None
class Builder(object): class Builder(object):
def __init__(self, link, data=None, cache=False): def __init__(self, link, data=None):
self.link = link self.link = link
self.cache = cache
self.data = data self.data = data
if self.data is None: if self.data is None:

View File

@ -140,112 +140,16 @@ def parseOptions(options):
return out return out
class Cache:
""" Light, error-prone caching system. """
def __init__(self, folder=None, key='cache', lifespan=10 * 24 * 3600):
self._key = key
self._dir = folder
self._lifespan = lifespan
self._cache = {}
if self._dir is None:
self._hash = "NO CACHE"
return
maxsize = os.statvfs('./').f_namemax - len(self._dir) - 1 - 4 # ".tmp"
self._hash = quote_plus(self._key)[:maxsize]
self._file = self._dir + '/' + self._hash
self._file_tmp = self._file + '.tmp'
try:
data = open(self._file).read()
if data:
self._cache = json.loads(data)
except IOError:
pass
except ValueError:
log('JSON cache parse fail')
def __del__(self):
self.save()
def __contains__(self, key):
return key in self._cache
def get(self, key):
if key in self._cache:
self._cache[key]['last'] = time.time()
return self._cache[key]['value']
else:
return None
def set(self, key, content):
if sys.version > '3' and isinstance(content, bytes):
content = content.decode('utf-8')
self._cache[key] = {'last': time.time(), 'value': content}
__getitem__ = get
__setitem__ = set
def save(self):
if len(self._cache) == 0 or self._dir is None:
return
if not os.path.exists(self._dir):
os.makedirs(self._dir)
for i in list(self._cache.keys()):
if time.time() - self._cache[i]['last'] > self._lifespan > -1:
del self._cache[i]
out = json.dumps(self._cache, indent=4)
try:
open(self._file_tmp, 'w+').write(out)
os.rename(self._file_tmp, self._file)
except IOError:
log('failed to write cache to tmp file')
except OSError:
log('failed to move cache to file')
def last(self, key):
if key not in self._cache:
return -1
return self._cache[key]['last']
def age(self, key):
if key not in self._cache:
return -1
return time.time() - self.last(key)
def new(self, *arg, **karg):
""" Returns a Cache object in the same directory """
if arg[0] != self._key:
return Cache(self._dir, *arg, **karg)
else:
return self
default_handlers = [crawler.GZIPHandler(), crawler.UAHandler(DEFAULT_UA), default_handlers = [crawler.GZIPHandler(), crawler.UAHandler(DEFAULT_UA),
crawler.AutoRefererHandler(), crawler.HTTPEquivHandler(), crawler.AutoRefererHandler(), crawler.HTTPEquivHandler(),
crawler.HTTPRefreshHandler(), crawler.EncodingFixHandler()] crawler.HTTPRefreshHandler(), crawler.EncodingFixHandler()]
def accept_handler(*kargs): def custom_handler(accept, delay=DELAY):
handlers = default_handlers[:] handlers = default_handlers[:]
handlers.append(crawler.ContentNegociationHandler(*kargs)) handlers.append(crawler.ContentNegociationHandler(accept))
return handlers handlers.append(crawler.SQliteCacheHandler(delay))
def etag_handler(accept, strict, cache, etag, lastmodified): return build_opener(*handlers)
handlers = default_handlers[:]
handlers.append(crawler.ContentNegociationHandler(accept, strict))
handlers.append(crawler.EtagHandler(cache, etag, lastmodified))
return handlers
def Fix(item, feedurl='/'): def Fix(item, feedurl='/'):
@ -315,7 +219,7 @@ def Fix(item, feedurl='/'):
return item return item
def Fill(item, cache, options, feedurl='/', fast=False): def Fill(item, options, feedurl='/', fast=False):
""" Returns True when it has done its best """ """ Returns True when it has done its best """
if not item.link: if not item.link:
@ -364,58 +268,44 @@ def Fill(item, cache, options, feedurl='/', fast=False):
log('no used link') log('no used link')
return True return True
# check cache and previous errors # download
if link in cache: delay = -1
content = cache.get(link)
match = re.search(r'^error-([a-z]{2,10})$', content)
if match:
if cache.age(link) < DELAY and not options.theforce:
log('cached error: %s' % match.groups()[0])
return True
else:
log('ignored old error: %s' % match.groups()[0])
else:
log('cached')
item.push_content(cache.get(link))
return True
# super-fast mode
if fast: if fast:
# super-fast mode
delay = -3
try:
con = custom_handler(('html', 'text/*'), delay).open(link, timeout=TIMEOUT)
data = con.read()
except crawler.NotInCache:
log('skipped') log('skipped')
return False return False
# download
try:
con = build_opener(*accept_handler(('html', 'text/*'), True)).open(link, timeout=TIMEOUT)
data = con.read()
except (IOError, HTTPException) as e: except (IOError, HTTPException) as e:
log('http error: %s' % e.message) log('http error')
cache.set(link, 'error-http')
return True return True
contenttype = con.info().get('Content-Type', '').split(';')[0] contenttype = con.info().get('Content-Type', '').split(';')[0]
if contenttype not in MIMETYPE['html'] and contenttype != 'text/plain': if contenttype not in MIMETYPE['html'] and contenttype != 'text/plain':
log('non-text page') log('non-text page')
cache.set(link, 'error-type')
return True return True
out = breadability.readable.Article(data, url=con.url).readable out = breadability.readable.Article(data, url=con.url).readable
if options.hungry or count_words(out) > max(count_content, count_desc): if options.hungry or count_words(out) > max(count_content, count_desc):
item.push_content(out) item.push_content(out)
cache.set(link, out)
else: else:
log('link not bigger enough') log('link not bigger enough')
cache.set(link, 'error-length')
return True return True
return True return True
def Init(url, cache_path, options): def Fetch(url, options):
# url clean up # basic url clean-up
log(url)
if url is None: if url is None:
raise MorssException('No url provided') raise MorssException('No url provided')
@ -428,91 +318,66 @@ def Init(url, cache_path, options):
if isinstance(url, bytes): if isinstance(url, bytes):
url = url.decode() url = url.decode()
# cache
cache = Cache(cache_path, url)
log(cache._hash)
return (url, cache)
def Fetch(url, cache, options):
# do some useful facebook work # do some useful facebook work
feedify.pre_worker(url, cache) pre = feedify.pre_worker(url)
if pre:
if 'redirect' in cache: url = pre
url = cache.get('redirect')
log('url redirect') log('url redirect')
log(url) log(url)
# fetch feed # fetch feed
if not options.theforce and 'xml' in cache and cache.age('xml') < DELAY and 'style' in cache: delay = DELAY
log('xml cached')
xml = cache.get('xml')
style = cache.get('style')
else:
try:
opener = etag_handler(('xml', 'html'), False, cache.get(url), cache.get('etag'), cache.get('lastmodified'))
con = build_opener(*opener).open(url, timeout=TIMEOUT * 2)
xml = con.read()
except (HTTPError) as e:
raise MorssException('Error downloading feed (HTTP Error %s)' % e.code)
except (crawler.InvalidCertificateException) as e:
raise MorssException('Error downloading feed (Invalid SSL Certificate)')
except (IOError, HTTPException):
raise MorssException('Error downloading feed')
cache.set('xml', xml) if options.theforce:
cache.set('etag', con.info().get('etag')) delay = 0
cache.set('lastmodified', con.info().get('last-modified'))
contenttype = con.info().get('Content-Type', '').split(';')[0] try:
con = custom_handler(('xml', 'html'), delay).open(url, timeout=TIMEOUT * 2)
xml = con.read()
if url.startswith('https://itunes.apple.com/lookup?id='): except (HTTPError) as e:
style = 'itunes' raise MorssException('Error downloading feed (HTTP Error %s)' % e.code)
elif xml.startswith(b'<?xml') or contenttype in MIMETYPE['xml']:
style = 'normal'
elif feedify.supported(url):
style = 'feedify'
elif contenttype in MIMETYPE['html']:
style = 'html'
else:
style = 'none'
log(contenttype)
cache.set('style', style) except (crawler.InvalidCertificateException) as e:
raise MorssException('Error downloading feed (Invalid SSL Certificate)')
# decide what to do except (IOError, HTTPException):
log(style) raise MorssException('Error downloading feed')
if style == 'itunes': contenttype = con.info().get('Content-Type', '').split(';')[0]
link = json.loads(xml)['results'][0]['feedUrl']
if url.startswith('https://itunes.apple.com/lookup?id='):
link = json.loads(xml.decode('utf-8', 'replace'))['results'][0]['feedUrl']
log('itunes redirect: %s' % link) log('itunes redirect: %s' % link)
return Fetch(link, cache.new(link), options) return Fetch(link, options)
elif style == 'normal':
elif xml.startswith(b'<?xml') or contenttype in MIMETYPE['xml']:
rss = feeds.parse(xml) rss = feeds.parse(xml)
elif style == 'feedify':
feed = feedify.Builder(url, xml, cache) elif feedify.supported(url):
feed = feedify.Builder(url, xml)
feed.build() feed.build()
rss = feed.feed rss = feed.feed
elif style == 'html':
elif contenttype in MIMETYPE['html']:
match = lxml.html.fromstring(xml).xpath( match = lxml.html.fromstring(xml).xpath(
"//link[@rel='alternate'][@type='application/rss+xml' or @type='application/atom+xml']/@href") "//link[@rel='alternate'][@type='application/rss+xml' or @type='application/atom+xml']/@href")
if len(match): if len(match):
link = urljoin(url, match[0]) link = urljoin(url, match[0])
log('rss redirect: %s' % link) log('rss redirect: %s' % link)
return Fetch(link, cache.new(link), options) return Fetch(link, options)
else: else:
log('no-link html') log('no-link html')
raise MorssException('Link provided is an HTML page, which doesn\'t link to a feed') raise MorssException('Link provided is an HTML page, which doesn\'t link to a feed')
else: else:
log('random page') log('random page')
log(contenttype)
raise MorssException('Link provided is not a valid feed') raise MorssException('Link provided is not a valid feed')
cache.save()
return rss return rss
def Gather(rss, url, cache, options): def Gather(rss, url, options):
size = len(rss.items) size = len(rss.items)
start_time = time.time() start_time = time.time()
@ -549,12 +414,12 @@ def Gather(rss, url, cache, options):
if time.time() - start_time > max_time >= 0 or i + 1 > max_item >= 0: if time.time() - start_time > max_time >= 0 or i + 1 > max_item >= 0:
if not options.proxy: if not options.proxy:
if Fill(item, cache, options, url, True) is False: if Fill(item, options, url, True) is False:
item.remove() item.remove()
return return
else: else:
if not options.proxy: if not options.proxy:
Fill(item, cache, options, url) Fill(item, options, url)
queue = Queue() queue = Queue()
@ -567,7 +432,6 @@ def Gather(rss, url, cache, options):
queue.put([i, item]) queue.put([i, item])
queue.join() queue.join()
cache.save()
if options.ad: if options.ad:
new = rss.items.append() new = rss.items.append()
@ -663,10 +527,10 @@ def process(url, cache=None, options=None):
options = [] options = []
options = Options(options) options = Options(options)
url, cache = Init(url, cache, options) if cache: crawler.sqlite_default = cache
rss = Fetch(url, cache, options) rss = Fetch(url, options)
rss = Before(rss, options) rss = Before(rss, options)
rss = Gather(rss, url, cache, options) rss = Gather(rss, url, options)
rss = After(rss, options) rss = After(rss, options)
return Format(rss, options) return Format(rss, options)
@ -728,10 +592,10 @@ def cgi_app(environ, start_response):
else: else:
headers['content-type'] = 'text/xml' headers['content-type'] = 'text/xml'
url, cache = Init(url, os.getcwd() + '/cache', options) crawler.sqlite_default = os.getcwd() + '/morss-cache.db'
# get the work done # get the work done
rss = Fetch(url, cache, options) rss = Fetch(url, options)
if headers['content-type'] == 'text/xml': if headers['content-type'] == 'text/xml':
headers['content-type'] = rss.mimetype headers['content-type'] = rss.mimetype
@ -739,7 +603,7 @@ def cgi_app(environ, start_response):
start_response(headers['status'], list(headers.items())) start_response(headers['status'], list(headers.items()))
rss = Before(rss, options) rss = Before(rss, options)
rss = Gather(rss, url, cache, options) rss = Gather(rss, url, options)
rss = After(rss, options) rss = After(rss, options)
out = Format(rss, options) out = Format(rss, options)
@ -795,10 +659,11 @@ def cli_app():
global DEBUG global DEBUG
DEBUG = options.debug DEBUG = options.debug
url, cache = Init(url, os.path.expanduser('~/.cache/morss'), options) crawler.sqlite_default = os.path.expanduser('~/.cache/morss-cache.db')
rss = Fetch(url, cache, options)
rss = Fetch(url, options)
rss = Before(rss, options) rss = Before(rss, options)
rss = Gather(rss, url, cache, options) rss = Gather(rss, url, options)
rss = After(rss, options) rss = After(rss, options)
out = Format(rss, options) out = Format(rss, options)