Compare commits

..

6 Commits

Author SHA1 Message Date
pictuga cb21871c35 crawler: clean up caching code
continuous-integration/drone/push Build is passing Details
2021-11-08 22:02:23 +01:00
pictuga c71cf5d5ce caching: fix diskcache implementation 2021-11-08 21:57:43 +01:00
pictuga 44a6b2591d crawler: cleaner http header object import 2021-11-07 19:44:36 +01:00
pictuga a890536601 morss: comment code a bit 2021-11-07 18:26:07 +01:00
pictuga 8de309f2d4 caching: add diskcache backend 2021-11-07 18:15:20 +01:00
pictuga cbf7b3f77b caching: simplify sqlite code 2021-11-07 18:14:18 +01:00
5 changed files with 164 additions and 100 deletions

View File

@ -40,7 +40,7 @@ Some features of morss:
- Follow 301/meta redirects
- Recover xml feeds with corrupt encoding
- Supports gzip-compressed http content
- HTTP caching with 3 different backends (in-memory/sqlite/mysql)
- HTTP caching with different backends (in-memory/sqlite/mysql/redis/diskcache)
- Works as server/cli tool
- Deobfuscate various tracking links
@ -60,8 +60,8 @@ Full installation (including optional dependencies)
pip install git+https://git.pictuga.com/pictuga/morss.git#[full]
```
The full install includes mysql and redis (possible cache backends). Otherwise,
only in-memory and sqlite3 caches are available.
The full install includes mysql, redis and diskcache (possible cache backends).
Otherwise, only in-memory and sqlite3 caches are available.
The dependency `lxml` is fairly long to install (especially on Raspberry Pi, as
C code needs to be compiled). If possible on your distribution, try installing
@ -390,12 +390,14 @@ will be cleared every time the program is run). Path can be defined with
environment variables: `MYSQL_USER`, `MYSQL_PWD`, `MYSQL_DB`, `MYSQL_HOST`
- `CACHE=redis`: Redis cache. Connection can be defined with the following
environment variables: `REDIS_HOST`, `REDIS_PORT`, `REDIS_DB`, `REDIS_PWD`
- `CACHE=diskcache`: disk-based cache. Target directory canbe defined with
`DISKCAHE_DIR`.
To limit the size of the cache:
- `CACHE_SIZE` sets the target number of items in the cache (further items will
be deleted but the cache might be temporarily bigger than that). Defaults to 1k
entries.
entries. NB. When using `diskcache`, this is the cache max size in Bytes.
- `CACHE_LIFESPAN` (seconds) sets how often the cache must be trimmed (i.e. cut
down to the number of items set in `CACHE_SIZE`). Defaults to 1min.

View File

@ -58,8 +58,8 @@ except ImportError:
class SQLiteCache(BaseCache):
def __init__(self, filename=':memory:'):
self.con = sqlite3.connect(filename, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
def __init__(self, path=':memory:'):
self.con = sqlite3.connect(path, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
with self.con:
self.con.execute('CREATE TABLE IF NOT EXISTS data (ky UNICODE PRIMARY KEY, data BLOB, timestamp INT)')
@ -158,6 +158,29 @@ class RedisCacheHandler(BaseCache):
self.r.set(key, data)
try:
import diskcache # isort:skip
except ImportError:
pass
class DiskCacheHandler(BaseCache):
def __init__(self, directory=None, **kwargs):
self.cache = diskcache.Cache(directory=directory, eviction_policy='least-frequently-used', **kwargs)
def __del__(self):
self.cache.close()
def trim(self):
self.cache.cull()
def __getitem__(self, key):
return self.cache[key]
def __setitem__(self, key, data):
self.cache.set(key, data)
if 'CACHE' in os.environ:
if os.environ['CACHE'] == 'mysql':
default_cache = MySQLCacheHandler(
@ -168,13 +191,9 @@ if 'CACHE' in os.environ:
)
elif os.environ['CACHE'] == 'sqlite':
if 'SQLITE_PATH' in os.environ:
path = os.getenv('SQLITE_PATH')
else:
path = ':memory:'
default_cache = SQLiteCache(path)
default_cache = SQLiteCache(
os.getenv('SQLITE_PATH', ':memory:')
)
elif os.environ['CACHE'] == 'redis':
default_cache = RedisCacheHandler(
@ -184,5 +203,11 @@ if 'CACHE' in os.environ:
password = os.getenv('REDIS_PWD', None)
)
elif os.environ['CACHE'] == 'diskcache':
default_cache = DiskCacheHandler(
directory = os.getenv('DISKCAHE_DIR', '/tmp/morss-diskcache'),
size_limit = CACHE_SIZE # in Bytes
)
else:
default_cache = CappedDict()

View File

@ -19,7 +19,6 @@ import os
import pickle
import random
import re
import sys
import time
import zlib
from cgi import parse_header
@ -34,14 +33,14 @@ try:
# python 2
from urllib import quote
import mimetools
from mimetools import Message as message_from_string
from urllib2 import (BaseHandler, HTTPCookieProcessor, HTTPRedirectHandler,
Request, addinfourl, build_opener, parse_http_list,
parse_keqv_list)
from urlparse import urlparse, urlunparse
except ImportError:
# python 3
import email
from email import message_from_string
from urllib.parse import quote, urlparse, urlunparse
from urllib.request import (BaseHandler, HTTPCookieProcessor,
HTTPRedirectHandler, Request, addinfourl,
@ -109,7 +108,7 @@ def adv_get(url, post=None, timeout=None, *args, **kwargs):
}
def custom_opener(follow=None, delay=None):
def custom_opener(follow=None, policy=None, force_min=None, force_max=None):
handlers = []
# as per urllib2 source code, these Handelers are added first
@ -143,7 +142,7 @@ def custom_opener(follow=None, delay=None):
if follow:
handlers.append(AlternateHandler(MIMETYPE[follow]))
handlers.append(CacheHandler(force_min=delay))
handlers.append(CacheHandler(policy=policy, force_min=force_min, force_max=force_max))
return build_opener(*handlers)
@ -427,31 +426,50 @@ class HTTPRefreshHandler(BaseHandler):
https_response = http_response
def error_response(code, msg, url=''):
# return an error as a response
resp = addinfourl(BytesIO(), message_from_string('\n\n'), url, code)
resp.msg = msg
return resp
class CacheHandler(BaseHandler):
" Cache based on etags/last-modified "
private_cache = False # Websites can indicate whether the page should be
# cached by CDNs (e.g. shouldn't be the case for
# private/confidential/user-specific pages.
# With this setting, decide whether (False) you want
# the cache to behave like a CDN (i.e. don't cache
# private pages), or (True) to behave like a end-cache
# private pages. If unsure, False is the safest bet.
privacy = 'private' # Websites can indicate whether the page should be cached
# by CDNs (e.g. shouldn't be the case for
# private/confidential/user-specific pages. With this
# setting, decide whether you want the cache to behave
# like a CDN (i.e. don't cache private pages, 'public'),
# or to behave like a end-user private pages
# ('private'). If unsure, 'public' is the safest bet,
# but many websites abuse this feature...
# NB. This overrides all the other min/max/policy settings.
handler_order = 499
def __init__(self, cache=None, force_min=None):
def __init__(self, cache=None, force_min=None, force_max=None, policy=None):
self.cache = cache or default_cache
self.force_min = force_min
# Servers indicate how long they think their content is "valid".
# With this parameter (force_min, expressed in seconds), we can
# override the validity period (i.e. bypassing http headers)
# Special values:
# -1: valid forever, i.e. use the cache no matter what (and fetch
# the page online if not present in cache)
# 0: valid zero second, i.e. force refresh
# -2: same as -1, i.e. use the cache no matter what, but do NOT
# fetch the page online if not present in cache, throw an
# error instead
self.force_max = force_max
self.policy = policy # can be cached/refresh/offline/None (default)
# Servers indicate how long they think their content is "valid". With
# this parameter (force_min/max, expressed in seconds), we can override
# the validity period (i.e. bypassing http headers)
# Special choices, via "policy":
# cached: use the cache no matter what (and fetch the page online if
# not present in cache)
# refresh: valid zero second, i.e. force refresh
# offline: same as cached, i.e. use the cache no matter what, but do
# NOT fetch the page online if not present in cache, throw an
# error instead
# None: just follow protocols
# sanity checks
assert self.force_max is None or self.force_max >= 0
assert self.force_min is None or self.force_min >= 0
assert self.force_max is None or self.force_min is None or self.force_max >= self.force_min
def load(self, url):
try:
@ -461,10 +479,7 @@ class CacheHandler(BaseHandler):
data = None
else:
if sys.version_info[0] >= 3:
data['headers'] = email.message_from_string(data['headers'] or unicode()) # headers
else:
data['headers'] = mimetools.Message(StringIO(data['headers'] or unicode()))
data['headers'] = message_from_string(data['headers'] or unicode()) # headers
return data
@ -472,18 +487,17 @@ class CacheHandler(BaseHandler):
data['headers'] = unicode(data['headers'])
self.cache[key] = pickle.dumps(data, 0)
def is_cached(self, key):
return self.load(key) is not None
def cached_response(self, req):
# this does NOT check whether it's already cached, use with care
def cached_response(self, req, fallback=None):
data = self.load(req.get_full_url())
# return the cache as a response
resp = addinfourl(BytesIO(data['data']), data['headers'], req.get_full_url(), data['code'])
resp.msg = data['msg']
if data is not None:
# return the cache as a response
resp = addinfourl(BytesIO(data['data']), data['headers'], req.get_full_url(), data['code'])
resp.msg = data['msg']
return resp
return resp
else:
return fallback
def save_response(self, req, resp):
data = resp.read()
@ -491,7 +505,7 @@ class CacheHandler(BaseHandler):
self.save(req.get_full_url(), {
'code': resp.code,
'msg': resp.msg,
'headers': resp.headers,
'headers': str(resp.headers),
'data': data,
'timestamp': time.time()
})
@ -520,60 +534,74 @@ class CacheHandler(BaseHandler):
# If 'None' is returned, try your chance with the next-available handler
# If a 'resp' is returned, stop there, and proceed with 'http_response'
# Here, we try to see whether we want to use data from cache (i.e.
# return 'resp'), or whether we want to refresh the content (return
# 'None')
data = self.load(req.get_full_url())
if data is None:
# cache empty, refresh
if data is not None:
# some info needed to process everything
cache_control = parse_http_list(data['headers'].get('cache-control', ()))
cache_control += parse_http_list(data['headers'].get('pragma', ()))
cc_list = [x for x in cache_control if '=' not in x]
cc_values = parse_keqv_list([x for x in cache_control if '=' in x])
cache_age = time.time() - data['timestamp']
# list in a simple way what to do in special cases
if data is not None and 'private' in cc_list and self.privacy == 'public':
# private data but public cache, do not use cache
# privacy concern, so handled first and foremost
# (and doesn't need to be addressed anymore afterwards)
return None
# some info needed to process everything
cache_control = parse_http_list(data['headers'].get('cache-control', ()))
cache_control += parse_http_list(data['headers'].get('pragma', ()))
elif self.policy == 'offline':
# use cache, or return an error
return self.cached_response(
req,
error_response(409, 'Conflict', req.get_full_url())
)
cc_list = [x for x in cache_control if '=' not in x]
cc_values = parse_keqv_list([x for x in cache_control if '=' in x])
elif self.policy == 'cached':
# use cache, or fetch online
return self.cached_response(req, None)
cache_age = time.time() - data['timestamp']
# list in a simple way what to do when
if self.force_min == -2:
if data['code'] is not None:
# already in cache, perfect, use cache
return self.cached_response(req)
else:
# raise an error, via urllib handlers
resp = addinfourl(BytesIO(), data['headers'], req.get_full_url(), 409)
resp.msg = 'Conflict'
return resp
elif self.force_min == -1:
# force use cache
return self.cached_response(req)
elif self.force_min == 0:
elif self.policy == 'refresh':
# force refresh
return None
elif data is None:
# we have already settled all the cases that don't need the cache.
# all the following ones need the cached item
return None
elif self.force_max is not None and cache_age > self.force_max:
# older than we want, refresh
return None
elif self.force_min is not None and cache_age < self.force_min:
# recent enough, use cache
return self.cached_response(req)
elif data['code'] == 301 and cache_age < 7*24*3600:
# "301 Moved Permanently" has to be cached...as long as we want
# (awesome HTTP specs), let's say a week (why not?). Use force_min=0
# if you want to bypass this (needed for a proper refresh)
return self.cached_response(req)
elif (self.force_min is None or self.force_min > 0) and ('no-cache' in cc_list or 'no-store' in cc_list or ('private' in cc_list and not self.private_cache)):
# kindly follow web servers indications, refresh
# if the same settings are used all along, this section shouldn't be
# of any use, since the page woudln't be cached in the first place
# the check is only performed "just in case"
elif self.force_min is None and ('no-cache' in cc_list or 'no-store' in cc_list):
# kindly follow web servers indications, refresh if the same
# settings are used all along, this section shouldn't be of any use,
# since the page woudln't be cached in the first place the check is
# only performed "just in case"
# NB. NOT respected if force_min is set
return None
elif 'max-age' in cc_values and int(cc_values['max-age']) > cache_age:
# server says it's still fine (and we trust him, if not, use force_min=0), use cache
return self.cached_response(req)
elif self.force_min is not None and self.force_min > cache_age:
# still recent enough for us, use cache
# server says it's still fine (and we trust him, if not, use overrides), use cache
return self.cached_response(req)
else:
@ -584,19 +612,19 @@ class CacheHandler(BaseHandler):
# code for after-fetch, to know whether to save to hard-drive (if stiking to http headers' will)
# NB. It might re-save requests pulled from cache, which will re-set the time() to the latest, i.e. lenghten its useful life
if resp.code == 304 and self.is_cached(resp.url):
if resp.code == 304 and resp.url in self.cache:
# we are hopefully the first after the HTTP handler, so no need
# to re-run all the *_response
# here: cached page, returning from cache
return self.cached_response(req)
elif ('cache-control' in resp.headers or 'pragma' in resp.headers) and self.force_min is None:
elif self.force_min is None and ('cache-control' in resp.headers or 'pragma' in resp.headers):
cache_control = parse_http_list(resp.headers.get('cache-control', ()))
cache_control += parse_http_list(resp.headers.get('pragma', ()))
cc_list = [x for x in cache_control if '=' not in x]
if 'no-cache' in cc_list or 'no-store' in cc_list or ('private' in cc_list and not self.private_cache):
if 'no-cache' in cc_list or 'no-store' in cc_list or ('private' in cc_list and self.privacy == 'public'):
# kindly follow web servers indications (do not save & return)
return resp
@ -618,6 +646,8 @@ if 'IGNORE_SSL' in os.environ:
if __name__ == '__main__':
import sys
req = adv_get(sys.argv[1] if len(sys.argv) > 1 else 'https://morss.it')
if sys.flags.interactive:

View File

@ -194,21 +194,20 @@ def ItemFill(item, options, feedurl='/', fast=False):
log(item.link)
# download
delay = -1
if fast or options.fast:
if fast or options.cache:
# force cache, don't fetch
delay = -2
policy = 'offline'
elif options.force:
# force refresh
delay = 0
policy = 'refresh'
else:
delay = 24*60*60 # 24h
policy = None
try:
req = crawler.adv_get(url=item.link, delay=delay, timeout=TIMEOUT)
req = crawler.adv_get(url=item.link, policy=policy, force_min=24*60*60, timeout=TIMEOUT)
except (IOError, HTTPException) as e:
log('http error')
@ -266,11 +265,17 @@ def FeedFetch(url, options):
# fetch feed
delay = DELAY
if options.force:
delay = 0
if options.cache:
policy = 'offline'
elif options.force:
policy = 'refresh'
else:
policy = None
try:
req = crawler.adv_get(url=url, post=options.post, follow=('rss' if not options.items else None), delay=delay, timeout=TIMEOUT * 2)
req = crawler.adv_get(url=url, post=options.post, follow=('rss' if not options.items else None), policy=policy, force_min=5*60, force_max=60*60, timeout=TIMEOUT * 2)
except (IOError, HTTPException):
raise MorssException('Error downloading feed')
@ -324,7 +329,7 @@ def FeedGather(rss, url, options):
max_time = 0
if options.newest:
# :newest take the newest items
# :newest take the newest items (instead of appearing order)
now = datetime.now(tz.tzutc())
sorted_items = sorted(rss.items, key=lambda x:x.updated or x.time or now, reverse=True)
@ -333,6 +338,7 @@ def FeedGather(rss, url, options):
sorted_items = list(rss.items)
for i, item in enumerate(sorted_items):
# hard cap
if time.time() - start_time > lim_time >= 0 or i + 1 > lim_item >= 0:
log('dropped')
item.remove()
@ -345,6 +351,7 @@ def FeedGather(rss, url, options):
item = ItemFix(item, options, url)
# soft cap
if time.time() - start_time > max_time >= 0 or i + 1 > max_item >= 0:
if not options.proxy:
if ItemFill(item, options, url, True) is False:

View File

@ -14,7 +14,7 @@ setup(
license = 'AGPL v3',
packages = [package_name],
install_requires = ['lxml', 'bs4', 'python-dateutil', 'chardet'],
extras_require = {'full': ['pymysql', 'redis']},
extras_require = {'full': ['pymysql', 'redis', 'diskcache']},
package_data = {package_name: ['feedify.ini']},
data_files = [
('share/' + package_name, ['README.md', 'LICENSE']),