Compare commits

..

6 Commits

Author SHA1 Message Date
pictuga cb21871c35 crawler: clean up caching code
continuous-integration/drone/push Build is passing Details
2021-11-08 22:02:23 +01:00
pictuga c71cf5d5ce caching: fix diskcache implementation 2021-11-08 21:57:43 +01:00
pictuga 44a6b2591d crawler: cleaner http header object import 2021-11-07 19:44:36 +01:00
pictuga a890536601 morss: comment code a bit 2021-11-07 18:26:07 +01:00
pictuga 8de309f2d4 caching: add diskcache backend 2021-11-07 18:15:20 +01:00
pictuga cbf7b3f77b caching: simplify sqlite code 2021-11-07 18:14:18 +01:00
5 changed files with 164 additions and 100 deletions

View File

@ -40,7 +40,7 @@ Some features of morss:
- Follow 301/meta redirects - Follow 301/meta redirects
- Recover xml feeds with corrupt encoding - Recover xml feeds with corrupt encoding
- Supports gzip-compressed http content - Supports gzip-compressed http content
- HTTP caching with 3 different backends (in-memory/sqlite/mysql) - HTTP caching with different backends (in-memory/sqlite/mysql/redis/diskcache)
- Works as server/cli tool - Works as server/cli tool
- Deobfuscate various tracking links - Deobfuscate various tracking links
@ -60,8 +60,8 @@ Full installation (including optional dependencies)
pip install git+https://git.pictuga.com/pictuga/morss.git#[full] pip install git+https://git.pictuga.com/pictuga/morss.git#[full]
``` ```
The full install includes mysql and redis (possible cache backends). Otherwise, The full install includes mysql, redis and diskcache (possible cache backends).
only in-memory and sqlite3 caches are available. Otherwise, only in-memory and sqlite3 caches are available.
The dependency `lxml` is fairly long to install (especially on Raspberry Pi, as The dependency `lxml` is fairly long to install (especially on Raspberry Pi, as
C code needs to be compiled). If possible on your distribution, try installing C code needs to be compiled). If possible on your distribution, try installing
@ -390,12 +390,14 @@ will be cleared every time the program is run). Path can be defined with
environment variables: `MYSQL_USER`, `MYSQL_PWD`, `MYSQL_DB`, `MYSQL_HOST` environment variables: `MYSQL_USER`, `MYSQL_PWD`, `MYSQL_DB`, `MYSQL_HOST`
- `CACHE=redis`: Redis cache. Connection can be defined with the following - `CACHE=redis`: Redis cache. Connection can be defined with the following
environment variables: `REDIS_HOST`, `REDIS_PORT`, `REDIS_DB`, `REDIS_PWD` environment variables: `REDIS_HOST`, `REDIS_PORT`, `REDIS_DB`, `REDIS_PWD`
- `CACHE=diskcache`: disk-based cache. Target directory canbe defined with
`DISKCAHE_DIR`.
To limit the size of the cache: To limit the size of the cache:
- `CACHE_SIZE` sets the target number of items in the cache (further items will - `CACHE_SIZE` sets the target number of items in the cache (further items will
be deleted but the cache might be temporarily bigger than that). Defaults to 1k be deleted but the cache might be temporarily bigger than that). Defaults to 1k
entries. entries. NB. When using `diskcache`, this is the cache max size in Bytes.
- `CACHE_LIFESPAN` (seconds) sets how often the cache must be trimmed (i.e. cut - `CACHE_LIFESPAN` (seconds) sets how often the cache must be trimmed (i.e. cut
down to the number of items set in `CACHE_SIZE`). Defaults to 1min. down to the number of items set in `CACHE_SIZE`). Defaults to 1min.

View File

@ -58,8 +58,8 @@ except ImportError:
class SQLiteCache(BaseCache): class SQLiteCache(BaseCache):
def __init__(self, filename=':memory:'): def __init__(self, path=':memory:'):
self.con = sqlite3.connect(filename, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False) self.con = sqlite3.connect(path, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
with self.con: with self.con:
self.con.execute('CREATE TABLE IF NOT EXISTS data (ky UNICODE PRIMARY KEY, data BLOB, timestamp INT)') self.con.execute('CREATE TABLE IF NOT EXISTS data (ky UNICODE PRIMARY KEY, data BLOB, timestamp INT)')
@ -158,6 +158,29 @@ class RedisCacheHandler(BaseCache):
self.r.set(key, data) self.r.set(key, data)
try:
import diskcache # isort:skip
except ImportError:
pass
class DiskCacheHandler(BaseCache):
def __init__(self, directory=None, **kwargs):
self.cache = diskcache.Cache(directory=directory, eviction_policy='least-frequently-used', **kwargs)
def __del__(self):
self.cache.close()
def trim(self):
self.cache.cull()
def __getitem__(self, key):
return self.cache[key]
def __setitem__(self, key, data):
self.cache.set(key, data)
if 'CACHE' in os.environ: if 'CACHE' in os.environ:
if os.environ['CACHE'] == 'mysql': if os.environ['CACHE'] == 'mysql':
default_cache = MySQLCacheHandler( default_cache = MySQLCacheHandler(
@ -168,13 +191,9 @@ if 'CACHE' in os.environ:
) )
elif os.environ['CACHE'] == 'sqlite': elif os.environ['CACHE'] == 'sqlite':
if 'SQLITE_PATH' in os.environ: default_cache = SQLiteCache(
path = os.getenv('SQLITE_PATH') os.getenv('SQLITE_PATH', ':memory:')
)
else:
path = ':memory:'
default_cache = SQLiteCache(path)
elif os.environ['CACHE'] == 'redis': elif os.environ['CACHE'] == 'redis':
default_cache = RedisCacheHandler( default_cache = RedisCacheHandler(
@ -184,5 +203,11 @@ if 'CACHE' in os.environ:
password = os.getenv('REDIS_PWD', None) password = os.getenv('REDIS_PWD', None)
) )
elif os.environ['CACHE'] == 'diskcache':
default_cache = DiskCacheHandler(
directory = os.getenv('DISKCAHE_DIR', '/tmp/morss-diskcache'),
size_limit = CACHE_SIZE # in Bytes
)
else: else:
default_cache = CappedDict() default_cache = CappedDict()

View File

@ -19,7 +19,6 @@ import os
import pickle import pickle
import random import random
import re import re
import sys
import time import time
import zlib import zlib
from cgi import parse_header from cgi import parse_header
@ -34,14 +33,14 @@ try:
# python 2 # python 2
from urllib import quote from urllib import quote
import mimetools from mimetools import Message as message_from_string
from urllib2 import (BaseHandler, HTTPCookieProcessor, HTTPRedirectHandler, from urllib2 import (BaseHandler, HTTPCookieProcessor, HTTPRedirectHandler,
Request, addinfourl, build_opener, parse_http_list, Request, addinfourl, build_opener, parse_http_list,
parse_keqv_list) parse_keqv_list)
from urlparse import urlparse, urlunparse from urlparse import urlparse, urlunparse
except ImportError: except ImportError:
# python 3 # python 3
import email from email import message_from_string
from urllib.parse import quote, urlparse, urlunparse from urllib.parse import quote, urlparse, urlunparse
from urllib.request import (BaseHandler, HTTPCookieProcessor, from urllib.request import (BaseHandler, HTTPCookieProcessor,
HTTPRedirectHandler, Request, addinfourl, HTTPRedirectHandler, Request, addinfourl,
@ -109,7 +108,7 @@ def adv_get(url, post=None, timeout=None, *args, **kwargs):
} }
def custom_opener(follow=None, delay=None): def custom_opener(follow=None, policy=None, force_min=None, force_max=None):
handlers = [] handlers = []
# as per urllib2 source code, these Handelers are added first # as per urllib2 source code, these Handelers are added first
@ -143,7 +142,7 @@ def custom_opener(follow=None, delay=None):
if follow: if follow:
handlers.append(AlternateHandler(MIMETYPE[follow])) handlers.append(AlternateHandler(MIMETYPE[follow]))
handlers.append(CacheHandler(force_min=delay)) handlers.append(CacheHandler(policy=policy, force_min=force_min, force_max=force_max))
return build_opener(*handlers) return build_opener(*handlers)
@ -427,31 +426,50 @@ class HTTPRefreshHandler(BaseHandler):
https_response = http_response https_response = http_response
def error_response(code, msg, url=''):
# return an error as a response
resp = addinfourl(BytesIO(), message_from_string('\n\n'), url, code)
resp.msg = msg
return resp
class CacheHandler(BaseHandler): class CacheHandler(BaseHandler):
" Cache based on etags/last-modified " " Cache based on etags/last-modified "
private_cache = False # Websites can indicate whether the page should be privacy = 'private' # Websites can indicate whether the page should be cached
# cached by CDNs (e.g. shouldn't be the case for # by CDNs (e.g. shouldn't be the case for
# private/confidential/user-specific pages. # private/confidential/user-specific pages. With this
# With this setting, decide whether (False) you want # setting, decide whether you want the cache to behave
# the cache to behave like a CDN (i.e. don't cache # like a CDN (i.e. don't cache private pages, 'public'),
# private pages), or (True) to behave like a end-cache # or to behave like a end-user private pages
# private pages. If unsure, False is the safest bet. # ('private'). If unsure, 'public' is the safest bet,
# but many websites abuse this feature...
# NB. This overrides all the other min/max/policy settings.
handler_order = 499 handler_order = 499
def __init__(self, cache=None, force_min=None): def __init__(self, cache=None, force_min=None, force_max=None, policy=None):
self.cache = cache or default_cache self.cache = cache or default_cache
self.force_min = force_min self.force_min = force_min
# Servers indicate how long they think their content is "valid". self.force_max = force_max
# With this parameter (force_min, expressed in seconds), we can self.policy = policy # can be cached/refresh/offline/None (default)
# override the validity period (i.e. bypassing http headers)
# Special values: # Servers indicate how long they think their content is "valid". With
# -1: valid forever, i.e. use the cache no matter what (and fetch # this parameter (force_min/max, expressed in seconds), we can override
# the page online if not present in cache) # the validity period (i.e. bypassing http headers)
# 0: valid zero second, i.e. force refresh # Special choices, via "policy":
# -2: same as -1, i.e. use the cache no matter what, but do NOT # cached: use the cache no matter what (and fetch the page online if
# fetch the page online if not present in cache, throw an # not present in cache)
# error instead # refresh: valid zero second, i.e. force refresh
# offline: same as cached, i.e. use the cache no matter what, but do
# NOT fetch the page online if not present in cache, throw an
# error instead
# None: just follow protocols
# sanity checks
assert self.force_max is None or self.force_max >= 0
assert self.force_min is None or self.force_min >= 0
assert self.force_max is None or self.force_min is None or self.force_max >= self.force_min
def load(self, url): def load(self, url):
try: try:
@ -461,10 +479,7 @@ class CacheHandler(BaseHandler):
data = None data = None
else: else:
if sys.version_info[0] >= 3: data['headers'] = message_from_string(data['headers'] or unicode()) # headers
data['headers'] = email.message_from_string(data['headers'] or unicode()) # headers
else:
data['headers'] = mimetools.Message(StringIO(data['headers'] or unicode()))
return data return data
@ -472,18 +487,17 @@ class CacheHandler(BaseHandler):
data['headers'] = unicode(data['headers']) data['headers'] = unicode(data['headers'])
self.cache[key] = pickle.dumps(data, 0) self.cache[key] = pickle.dumps(data, 0)
def is_cached(self, key): def cached_response(self, req, fallback=None):
return self.load(key) is not None
def cached_response(self, req):
# this does NOT check whether it's already cached, use with care
data = self.load(req.get_full_url()) data = self.load(req.get_full_url())
# return the cache as a response if data is not None:
resp = addinfourl(BytesIO(data['data']), data['headers'], req.get_full_url(), data['code']) # return the cache as a response
resp.msg = data['msg'] resp = addinfourl(BytesIO(data['data']), data['headers'], req.get_full_url(), data['code'])
resp.msg = data['msg']
return resp
return resp else:
return fallback
def save_response(self, req, resp): def save_response(self, req, resp):
data = resp.read() data = resp.read()
@ -491,7 +505,7 @@ class CacheHandler(BaseHandler):
self.save(req.get_full_url(), { self.save(req.get_full_url(), {
'code': resp.code, 'code': resp.code,
'msg': resp.msg, 'msg': resp.msg,
'headers': resp.headers, 'headers': str(resp.headers),
'data': data, 'data': data,
'timestamp': time.time() 'timestamp': time.time()
}) })
@ -520,60 +534,74 @@ class CacheHandler(BaseHandler):
# If 'None' is returned, try your chance with the next-available handler # If 'None' is returned, try your chance with the next-available handler
# If a 'resp' is returned, stop there, and proceed with 'http_response' # If a 'resp' is returned, stop there, and proceed with 'http_response'
# Here, we try to see whether we want to use data from cache (i.e.
# return 'resp'), or whether we want to refresh the content (return
# 'None')
data = self.load(req.get_full_url()) data = self.load(req.get_full_url())
if data is None: if data is not None:
# cache empty, refresh # some info needed to process everything
cache_control = parse_http_list(data['headers'].get('cache-control', ()))
cache_control += parse_http_list(data['headers'].get('pragma', ()))
cc_list = [x for x in cache_control if '=' not in x]
cc_values = parse_keqv_list([x for x in cache_control if '=' in x])
cache_age = time.time() - data['timestamp']
# list in a simple way what to do in special cases
if data is not None and 'private' in cc_list and self.privacy == 'public':
# private data but public cache, do not use cache
# privacy concern, so handled first and foremost
# (and doesn't need to be addressed anymore afterwards)
return None return None
# some info needed to process everything elif self.policy == 'offline':
cache_control = parse_http_list(data['headers'].get('cache-control', ())) # use cache, or return an error
cache_control += parse_http_list(data['headers'].get('pragma', ())) return self.cached_response(
req,
error_response(409, 'Conflict', req.get_full_url())
)
cc_list = [x for x in cache_control if '=' not in x] elif self.policy == 'cached':
cc_values = parse_keqv_list([x for x in cache_control if '=' in x]) # use cache, or fetch online
return self.cached_response(req, None)
cache_age = time.time() - data['timestamp'] elif self.policy == 'refresh':
# list in a simple way what to do when
if self.force_min == -2:
if data['code'] is not None:
# already in cache, perfect, use cache
return self.cached_response(req)
else:
# raise an error, via urllib handlers
resp = addinfourl(BytesIO(), data['headers'], req.get_full_url(), 409)
resp.msg = 'Conflict'
return resp
elif self.force_min == -1:
# force use cache
return self.cached_response(req)
elif self.force_min == 0:
# force refresh # force refresh
return None return None
elif data is None:
# we have already settled all the cases that don't need the cache.
# all the following ones need the cached item
return None
elif self.force_max is not None and cache_age > self.force_max:
# older than we want, refresh
return None
elif self.force_min is not None and cache_age < self.force_min:
# recent enough, use cache
return self.cached_response(req)
elif data['code'] == 301 and cache_age < 7*24*3600: elif data['code'] == 301 and cache_age < 7*24*3600:
# "301 Moved Permanently" has to be cached...as long as we want # "301 Moved Permanently" has to be cached...as long as we want
# (awesome HTTP specs), let's say a week (why not?). Use force_min=0 # (awesome HTTP specs), let's say a week (why not?). Use force_min=0
# if you want to bypass this (needed for a proper refresh) # if you want to bypass this (needed for a proper refresh)
return self.cached_response(req) return self.cached_response(req)
elif (self.force_min is None or self.force_min > 0) and ('no-cache' in cc_list or 'no-store' in cc_list or ('private' in cc_list and not self.private_cache)): elif self.force_min is None and ('no-cache' in cc_list or 'no-store' in cc_list):
# kindly follow web servers indications, refresh # kindly follow web servers indications, refresh if the same
# if the same settings are used all along, this section shouldn't be # settings are used all along, this section shouldn't be of any use,
# of any use, since the page woudln't be cached in the first place # since the page woudln't be cached in the first place the check is
# the check is only performed "just in case" # only performed "just in case"
# NB. NOT respected if force_min is set
return None return None
elif 'max-age' in cc_values and int(cc_values['max-age']) > cache_age: elif 'max-age' in cc_values and int(cc_values['max-age']) > cache_age:
# server says it's still fine (and we trust him, if not, use force_min=0), use cache # server says it's still fine (and we trust him, if not, use overrides), use cache
return self.cached_response(req)
elif self.force_min is not None and self.force_min > cache_age:
# still recent enough for us, use cache
return self.cached_response(req) return self.cached_response(req)
else: else:
@ -584,19 +612,19 @@ class CacheHandler(BaseHandler):
# code for after-fetch, to know whether to save to hard-drive (if stiking to http headers' will) # code for after-fetch, to know whether to save to hard-drive (if stiking to http headers' will)
# NB. It might re-save requests pulled from cache, which will re-set the time() to the latest, i.e. lenghten its useful life # NB. It might re-save requests pulled from cache, which will re-set the time() to the latest, i.e. lenghten its useful life
if resp.code == 304 and self.is_cached(resp.url): if resp.code == 304 and resp.url in self.cache:
# we are hopefully the first after the HTTP handler, so no need # we are hopefully the first after the HTTP handler, so no need
# to re-run all the *_response # to re-run all the *_response
# here: cached page, returning from cache # here: cached page, returning from cache
return self.cached_response(req) return self.cached_response(req)
elif ('cache-control' in resp.headers or 'pragma' in resp.headers) and self.force_min is None: elif self.force_min is None and ('cache-control' in resp.headers or 'pragma' in resp.headers):
cache_control = parse_http_list(resp.headers.get('cache-control', ())) cache_control = parse_http_list(resp.headers.get('cache-control', ()))
cache_control += parse_http_list(resp.headers.get('pragma', ())) cache_control += parse_http_list(resp.headers.get('pragma', ()))
cc_list = [x for x in cache_control if '=' not in x] cc_list = [x for x in cache_control if '=' not in x]
if 'no-cache' in cc_list or 'no-store' in cc_list or ('private' in cc_list and not self.private_cache): if 'no-cache' in cc_list or 'no-store' in cc_list or ('private' in cc_list and self.privacy == 'public'):
# kindly follow web servers indications (do not save & return) # kindly follow web servers indications (do not save & return)
return resp return resp
@ -618,6 +646,8 @@ if 'IGNORE_SSL' in os.environ:
if __name__ == '__main__': if __name__ == '__main__':
import sys
req = adv_get(sys.argv[1] if len(sys.argv) > 1 else 'https://morss.it') req = adv_get(sys.argv[1] if len(sys.argv) > 1 else 'https://morss.it')
if sys.flags.interactive: if sys.flags.interactive:

View File

@ -194,21 +194,20 @@ def ItemFill(item, options, feedurl='/', fast=False):
log(item.link) log(item.link)
# download # download
delay = -1
if fast or options.fast: if fast or options.cache:
# force cache, don't fetch # force cache, don't fetch
delay = -2 policy = 'offline'
elif options.force: elif options.force:
# force refresh # force refresh
delay = 0 policy = 'refresh'
else: else:
delay = 24*60*60 # 24h policy = None
try: try:
req = crawler.adv_get(url=item.link, delay=delay, timeout=TIMEOUT) req = crawler.adv_get(url=item.link, policy=policy, force_min=24*60*60, timeout=TIMEOUT)
except (IOError, HTTPException) as e: except (IOError, HTTPException) as e:
log('http error') log('http error')
@ -266,11 +265,17 @@ def FeedFetch(url, options):
# fetch feed # fetch feed
delay = DELAY delay = DELAY
if options.force: if options.cache:
delay = 0 policy = 'offline'
elif options.force:
policy = 'refresh'
else:
policy = None
try: try:
req = crawler.adv_get(url=url, post=options.post, follow=('rss' if not options.items else None), delay=delay, timeout=TIMEOUT * 2) req = crawler.adv_get(url=url, post=options.post, follow=('rss' if not options.items else None), policy=policy, force_min=5*60, force_max=60*60, timeout=TIMEOUT * 2)
except (IOError, HTTPException): except (IOError, HTTPException):
raise MorssException('Error downloading feed') raise MorssException('Error downloading feed')
@ -324,7 +329,7 @@ def FeedGather(rss, url, options):
max_time = 0 max_time = 0
if options.newest: if options.newest:
# :newest take the newest items # :newest take the newest items (instead of appearing order)
now = datetime.now(tz.tzutc()) now = datetime.now(tz.tzutc())
sorted_items = sorted(rss.items, key=lambda x:x.updated or x.time or now, reverse=True) sorted_items = sorted(rss.items, key=lambda x:x.updated or x.time or now, reverse=True)
@ -333,6 +338,7 @@ def FeedGather(rss, url, options):
sorted_items = list(rss.items) sorted_items = list(rss.items)
for i, item in enumerate(sorted_items): for i, item in enumerate(sorted_items):
# hard cap
if time.time() - start_time > lim_time >= 0 or i + 1 > lim_item >= 0: if time.time() - start_time > lim_time >= 0 or i + 1 > lim_item >= 0:
log('dropped') log('dropped')
item.remove() item.remove()
@ -345,6 +351,7 @@ def FeedGather(rss, url, options):
item = ItemFix(item, options, url) item = ItemFix(item, options, url)
# soft cap
if time.time() - start_time > max_time >= 0 or i + 1 > max_item >= 0: if time.time() - start_time > max_time >= 0 or i + 1 > max_item >= 0:
if not options.proxy: if not options.proxy:
if ItemFill(item, options, url, True) is False: if ItemFill(item, options, url, True) is False:

View File

@ -14,7 +14,7 @@ setup(
license = 'AGPL v3', license = 'AGPL v3',
packages = [package_name], packages = [package_name],
install_requires = ['lxml', 'bs4', 'python-dateutil', 'chardet'], install_requires = ['lxml', 'bs4', 'python-dateutil', 'chardet'],
extras_require = {'full': ['pymysql', 'redis']}, extras_require = {'full': ['pymysql', 'redis', 'diskcache']},
package_data = {package_name: ['feedify.ini']}, package_data = {package_name: ['feedify.ini']},
data_files = [ data_files = [
('share/' + package_name, ['README.md', 'LICENSE']), ('share/' + package_name, ['README.md', 'LICENSE']),