Compare commits

...

3 Commits

Author SHA1 Message Date
pictuga 000a5cda7a Move cache code to its own file
continuous-integration/drone/push Build is failing Details
2021-09-11 13:20:34 +02:00
pictuga f2efd56e8f crawler: cache pickle'd array 2021-09-11 13:10:42 +02:00
pictuga 52e73331b8 Make mysql optdep 2021-09-11 12:12:51 +02:00
3 changed files with 208 additions and 172 deletions

163
morss/cache.py 100644
View File

@ -0,0 +1,163 @@
# This file is part of morss
#
# Copyright (C) 2013-2020 pictuga <contact@pictuga.com>
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <https://www.gnu.org/licenses/>.
import os
import pickle
import time
import threading
from collections import OrderedDict
CACHE_SIZE = int(os.getenv('CACHE_SIZE', 1000)) # max number of items in cache (default: 1k items)
CACHE_LIFESPAN = int(os.getenv('CACHE_LIFESPAN', 60)) # how often to auto-clear the cache (default: 1min)
class BaseCache:
""" Subclasses must behave like a dict """
def trim(self):
pass
def autotrim(self, delay=CACHE_LIFESPAN):
# trim the cache every so often
self.trim()
t = threading.Timer(delay, self.autotrim)
t.daemon = True
t.start()
def __contains__(self, url):
try:
self[url]
except KeyError:
return False
else:
return True
try:
import sqlite3 # isort:skip
except ImportError:
pass
class SQLiteCache(BaseCache):
def __init__(self, filename=':memory:'):
self.con = sqlite3.connect(filename, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
with self.con:
self.con.execute('CREATE TABLE IF NOT EXISTS data (ky UNICODE PRIMARY KEY, data BLOB, timestamp INT)')
self.con.execute('pragma journal_mode=WAL')
self.trim()
def __del__(self):
self.con.close()
def trim(self):
with self.con:
self.con.execute('DELETE FROM data WHERE timestamp <= ( SELECT timestamp FROM ( SELECT timestamp FROM data ORDER BY timestamp DESC LIMIT 1 OFFSET ? ) foo )', (CACHE_SIZE,))
def __getitem__(self, key):
row = self.con.execute('SELECT * FROM data WHERE ky=?', (key,)).fetchone()
if not row:
raise KeyError
return row[1]
def __setitem__(self, key, data):
with self.con:
self.con.execute('INSERT INTO data VALUES (?,?,?) ON CONFLICT(ky) DO UPDATE SET data=?, timestamp=?', (key, data, time.time(), data, time.time()))
try:
import pymysql.cursors # isort:skip
except ImportError:
pass
class MySQLCacheHandler(BaseCache):
def __init__(self, user, password, database, host='localhost'):
self.user = user
self.password = password
self.database = database
self.host = host
with self.cursor() as cursor:
cursor.execute('CREATE TABLE IF NOT EXISTS data (ky VARCHAR(255) NOT NULL PRIMARY KEY, data MEDIUMBLOB, timestamp INT)')
self.trim()
def cursor(self):
return pymysql.connect(host=self.host, user=self.user, password=self.password, database=self.database, charset='utf8', autocommit=True).cursor()
def trim(self):
with self.cursor() as cursor:
cursor.execute('DELETE FROM data WHERE timestamp <= ( SELECT timestamp FROM ( SELECT timestamp FROM data ORDER BY timestamp DESC LIMIT 1 OFFSET %s ) foo )', (CACHE_SIZE,))
def __getitem__(self, key):
cursor = self.cursor()
cursor.execute('SELECT * FROM data WHERE ky=%s', (key,))
row = cursor.fetchone()
if not row:
raise KeyError
return row[1]
def __setitem__(self, key, data):
with self.cursor() as cursor:
cursor.execute('INSERT INTO data VALUES (%s,%s,%s) ON DUPLICATE KEY UPDATE data=%s, timestamp=%s',
(key, data, time.time(), data, time.time()))
class CappedDict(OrderedDict, BaseCache):
def trim(self):
if CACHE_SIZE >= 0:
for i in range( max( len(self) - CACHE_SIZE , 0 )):
self.popitem(False)
def __setitem__(self, key, data):
# https://docs.python.org/2/library/collections.html#ordereddict-examples-and-recipes
if key in self:
del self[key]
OrderedDict.__setitem__(self, key, data)
if 'CACHE' in os.environ:
if os.environ['CACHE'] == 'mysql':
default_cache = MySQLCacheHandler(
user = os.getenv('MYSQL_USER'),
password = os.getenv('MYSQL_PWD'),
database = os.getenv('MYSQL_DB'),
host = os.getenv('MYSQL_HOST', 'localhost')
)
elif os.environ['CACHE'] == 'sqlite':
if 'SQLITE_PATH' in os.environ:
path = os.getenv('SQLITE_PATH')
else:
path = ':memory:'
default_cache = SQLiteCache(path)
else:
default_cache = CappedDict()

View File

@ -16,10 +16,10 @@
# with this program. If not, see <https://www.gnu.org/licenses/>. # with this program. If not, see <https://www.gnu.org/licenses/>.
import os import os
import pickle
import random import random
import re import re
import sys import sys
import threading
import time import time
import zlib import zlib
from cgi import parse_header from cgi import parse_header
@ -28,6 +28,8 @@ from io import BytesIO, StringIO
import chardet import chardet
from .cache import default_cache
try: try:
# python 2 # python 2
from urllib import quote from urllib import quote
@ -53,10 +55,6 @@ except NameError:
basestring = unicode = str basestring = unicode = str
CACHE_SIZE = int(os.getenv('CACHE_SIZE', 1000)) # max number of items in cache (default: 1k items)
CACHE_LIFESPAN = int(os.getenv('CACHE_LIFESPAN', 60)) # how often to auto-clear the cache (default: 1min)
MIMETYPE = { MIMETYPE = {
'xml': ['text/xml', 'application/xml', 'application/rss+xml', 'application/rdf+xml', 'application/atom+xml', 'application/xhtml+xml'], 'xml': ['text/xml', 'application/xml', 'application/rss+xml', 'application/rdf+xml', 'application/atom+xml', 'application/xhtml+xml'],
'rss': ['application/rss+xml', 'application/rdf+xml', 'application/atom+xml'], 'rss': ['application/rss+xml', 'application/rdf+xml', 'application/atom+xml'],
@ -457,37 +455,46 @@ class CacheHandler(BaseHandler):
def load(self, url): def load(self, url):
try: try:
out = list(self.cache[url]) data = pickle.loads(self.cache[url])
except KeyError: except KeyError:
out = [None, None, unicode(), bytes(), 0] data = None
if sys.version_info[0] >= 3:
out[2] = email.message_from_string(out[2] or unicode()) # headers
else: else:
out[2] = mimetools.Message(StringIO(out[2] or unicode())) if sys.version_info[0] >= 3:
data['headers'] = email.message_from_string(data['headers'] or unicode()) # headers
else:
data['headers'] = mimetools.Message(StringIO(data['headers'] or unicode()))
return out return data
def save(self, url, code, msg, headers, data, timestamp): def save(self, key, data):
self.cache[url] = (code, msg, unicode(headers), data, timestamp) data['headers'] = unicode(data['headers'])
self.cache[key] = pickle.dumps(data, 0)
def is_cached(self, url): def is_cached(self, key):
return self.load(url)[0] is not None return self.load(key) is not None
def cached_response(self, req): def cached_response(self, req):
# this does NOT check whether it's already cached, use with care # this does NOT check whether it's already cached, use with care
(code, msg, headers, data, timestamp) = self.load(req.get_full_url()) data = self.load(req.get_full_url())
# return the cache as a response # return the cache as a response
resp = addinfourl(BytesIO(data), headers, req.get_full_url(), code) resp = addinfourl(BytesIO(data['data']), data['headers'], req.get_full_url(), data['code'])
resp.msg = msg resp.msg = data['msg']
return resp return resp
def save_response(self, req, resp): def save_response(self, req, resp):
data = resp.read() data = resp.read()
self.save(req.get_full_url(), resp.code, resp.msg, resp.headers, data, time.time()) self.save(req.get_full_url(), {
'code': resp.code,
'msg': resp.msg,
'headers': resp.headers,
'data': data,
'timestamp': time.time()
})
fp = BytesIO(data) fp = BytesIO(data)
old_resp = resp old_resp = resp
@ -497,13 +504,14 @@ class CacheHandler(BaseHandler):
return resp return resp
def http_request(self, req): def http_request(self, req):
(code, msg, headers, data, timestamp) = self.load(req.get_full_url()) data = self.load(req.get_full_url())
if 'etag' in headers: if data is not None:
req.add_unredirected_header('If-None-Match', headers['etag']) if 'etag' in data['headers']:
req.add_unredirected_header('If-None-Match', data['headers']['etag'])
if 'last-modified' in headers: if 'last-modified' in data['headers']:
req.add_unredirected_header('If-Modified-Since', headers.get('last-modified')) req.add_unredirected_header('If-Modified-Since', data['headers']['last-modified'])
return req return req
@ -512,33 +520,33 @@ class CacheHandler(BaseHandler):
# If 'None' is returned, try your chance with the next-available handler # If 'None' is returned, try your chance with the next-available handler
# If a 'resp' is returned, stop there, and proceed with 'http_response' # If a 'resp' is returned, stop there, and proceed with 'http_response'
(code, msg, headers, data, timestamp) = self.load(req.get_full_url()) data = self.load(req.get_full_url())
if data is None:
# cache empty, refresh
return None
# some info needed to process everything # some info needed to process everything
cache_control = parse_http_list(headers.get('cache-control', ())) cache_control = parse_http_list(data['headers'].get('cache-control', ()))
cache_control += parse_http_list(headers.get('pragma', ())) cache_control += parse_http_list(data['headers'].get('pragma', ()))
cc_list = [x for x in cache_control if '=' not in x] cc_list = [x for x in cache_control if '=' not in x]
cc_values = parse_keqv_list([x for x in cache_control if '=' in x]) cc_values = parse_keqv_list([x for x in cache_control if '=' in x])
cache_age = time.time() - timestamp cache_age = time.time() - data['timestamp']
# list in a simple way what to do when # list in a simple way what to do when
if self.force_min == -2: if self.force_min == -2:
if code is not None: if data['code'] is not None:
# already in cache, perfect, use cache # already in cache, perfect, use cache
return self.cached_response(req) return self.cached_response(req)
else: else:
# raise an error, via urllib handlers # raise an error, via urllib handlers
resp = addinfourl(BytesIO(), headers, req.get_full_url(), 409) resp = addinfourl(BytesIO(), data['headers'], req.get_full_url(), 409)
resp.msg = 'Conflict' resp.msg = 'Conflict'
return resp return resp
elif code is None:
# cache empty, refresh
return None
elif self.force_min == -1: elif self.force_min == -1:
# force use cache # force use cache
return self.cached_response(req) return self.cached_response(req)
@ -547,7 +555,7 @@ class CacheHandler(BaseHandler):
# force refresh # force refresh
return None return None
elif code == 301 and cache_age < 7*24*3600: elif data['code'] == 301 and cache_age < 7*24*3600:
# "301 Moved Permanently" has to be cached...as long as we want # "301 Moved Permanently" has to be cached...as long as we want
# (awesome HTTP specs), let's say a week (why not?). Use force_min=0 # (awesome HTTP specs), let's say a week (why not?). Use force_min=0
# if you want to bypass this (needed for a proper refresh) # if you want to bypass this (needed for a proper refresh)
@ -604,142 +612,6 @@ class CacheHandler(BaseHandler):
https_response = http_response https_response = http_response
class BaseCache:
""" Subclasses must behave like a dict """
def trim(self):
pass
def autotrim(self, delay=CACHE_LIFESPAN):
# trim the cache every so often
self.trim()
t = threading.Timer(delay, self.autotrim)
t.daemon = True
t.start()
def __contains__(self, url):
try:
self[url]
except KeyError:
return False
else:
return True
import sqlite3 # isort:skip
class SQLiteCache(BaseCache):
def __init__(self, filename=':memory:'):
self.con = sqlite3.connect(filename, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
with self.con:
self.con.execute('CREATE TABLE IF NOT EXISTS data (url UNICODE PRIMARY KEY, code INT, msg UNICODE, headers UNICODE, data BLOB, timestamp INT)')
self.con.execute('pragma journal_mode=WAL')
self.trim()
def __del__(self):
self.con.close()
def trim(self):
with self.con:
self.con.execute('DELETE FROM data WHERE timestamp <= ( SELECT timestamp FROM ( SELECT timestamp FROM data ORDER BY timestamp DESC LIMIT 1 OFFSET ? ) foo )', (CACHE_SIZE,))
def __getitem__(self, url):
row = self.con.execute('SELECT * FROM data WHERE url=?', (url,)).fetchone()
if not row:
raise KeyError
return row[1:]
def __setitem__(self, url, value): # value = (code, msg, headers, data, timestamp)
value = list(value)
value[3] = sqlite3.Binary(value[3]) # data
value = tuple(value)
with self.con:
self.con.execute('INSERT INTO data VALUES (?,?,?,?,?,?) ON CONFLICT(url) DO UPDATE SET code=?, msg=?, headers=?, data=?, timestamp=?', (url,) + value + value)
import pymysql.cursors # isort:skip
class MySQLCacheHandler(BaseCache):
def __init__(self, user, password, database, host='localhost'):
self.user = user
self.password = password
self.database = database
self.host = host
with self.cursor() as cursor:
cursor.execute('CREATE TABLE IF NOT EXISTS data (url VARCHAR(255) NOT NULL PRIMARY KEY, code INT, msg TEXT, headers TEXT, data BLOB, timestamp INT)')
self.trim()
def cursor(self):
return pymysql.connect(host=self.host, user=self.user, password=self.password, database=self.database, charset='utf8', autocommit=True).cursor()
def trim(self):
with self.cursor() as cursor:
cursor.execute('DELETE FROM data WHERE timestamp <= ( SELECT timestamp FROM ( SELECT timestamp FROM data ORDER BY timestamp DESC LIMIT 1 OFFSET %s ) foo )', (CACHE_SIZE,))
def __getitem__(self, url):
cursor = self.cursor()
cursor.execute('SELECT * FROM data WHERE url=%s', (url,))
row = cursor.fetchone()
if not row:
raise KeyError
return row[1:]
def __setitem__(self, url, value): # (code, msg, headers, data, timestamp)
with self.cursor() as cursor:
cursor.execute('INSERT INTO data VALUES (%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE code=%s, msg=%s, headers=%s, data=%s, timestamp=%s',
(url,) + value + value)
class CappedDict(OrderedDict, BaseCache):
def trim(self):
if CACHE_SIZE >= 0:
for i in range( max( len(self) - CACHE_SIZE , 0 )):
self.popitem(False)
def __setitem__(self, key, value):
# https://docs.python.org/2/library/collections.html#ordereddict-examples-and-recipes
if key in self:
del self[key]
OrderedDict.__setitem__(self, key, value)
if 'CACHE' in os.environ:
if os.environ['CACHE'] == 'mysql':
default_cache = MySQLCacheHandler(
user = os.getenv('MYSQL_USER'),
password = os.getenv('MYSQL_PWD'),
database = os.getenv('MYSQL_DB'),
host = os.getenv('MYSQL_HOST', 'localhost')
)
elif os.environ['CACHE'] == 'sqlite':
if 'SQLITE_PATH' in os.environ:
path = os.getenv('SQLITE_PATH')
else:
path = ':memory:'
default_cache = SQLiteCache(path)
else:
default_cache = CappedDict()
if 'IGNORE_SSL' in os.environ: if 'IGNORE_SSL' in os.environ:
import ssl import ssl
ssl._create_default_https_context = ssl._create_unverified_context ssl._create_default_https_context = ssl._create_unverified_context

View File

@ -13,7 +13,8 @@ setup(
download_url = 'https://git.pictuga.com/pictuga/morss', download_url = 'https://git.pictuga.com/pictuga/morss',
license = 'AGPL v3', license = 'AGPL v3',
packages = [package_name], packages = [package_name],
install_requires = ['lxml', 'bs4', 'python-dateutil', 'chardet', 'pymysql'], install_requires = ['lxml', 'bs4', 'python-dateutil', 'chardet'],
extras_require = {'full': ['pymysql']},
package_data = {package_name: ['feedify.ini']}, package_data = {package_name: ['feedify.ini']},
data_files = [ data_files = [
('share/' + package_name, ['README.md', 'LICENSE']), ('share/' + package_name, ['README.md', 'LICENSE']),