crawler: cache pickle'd array

pictuga 2021-09-11 13:10:42 +02:00
parent 52e73331b8
commit f2efd56e8f
1 changed files with 56 additions and 49 deletions

View File

@ -16,6 +16,7 @@
# with this program. If not, see <https://www.gnu.org/licenses/>. # with this program. If not, see <https://www.gnu.org/licenses/>.
import os import os
import pickle
import random import random
import re import re
import sys import sys
@ -457,37 +458,46 @@ class CacheHandler(BaseHandler):
def load(self, url): def load(self, url):
try: try:
out = list(self.cache[url]) data = pickle.loads(self.cache[url])
except KeyError: except KeyError:
out = [None, None, unicode(), bytes(), 0] data = None
if sys.version_info[0] >= 3:
out[2] = email.message_from_string(out[2] or unicode()) # headers
else: else:
out[2] = mimetools.Message(StringIO(out[2] or unicode())) if sys.version_info[0] >= 3:
data['headers'] = email.message_from_string(data['headers'] or unicode()) # headers
else:
data['headers'] = mimetools.Message(StringIO(data['headers'] or unicode()))
return out return data
def save(self, url, code, msg, headers, data, timestamp): def save(self, key, data):
self.cache[url] = (code, msg, unicode(headers), data, timestamp) data['headers'] = unicode(data['headers'])
self.cache[key] = pickle.dumps(data, 0)
def is_cached(self, url): def is_cached(self, key):
return self.load(url)[0] is not None return self.load(key) is not None
def cached_response(self, req): def cached_response(self, req):
# this does NOT check whether it's already cached, use with care # this does NOT check whether it's already cached, use with care
(code, msg, headers, data, timestamp) = self.load(req.get_full_url()) data = self.load(req.get_full_url())
# return the cache as a response # return the cache as a response
resp = addinfourl(BytesIO(data), headers, req.get_full_url(), code) resp = addinfourl(BytesIO(data['data']), data['headers'], req.get_full_url(), data['code'])
resp.msg = msg resp.msg = data['msg']
return resp return resp
def save_response(self, req, resp): def save_response(self, req, resp):
data = resp.read() data = resp.read()
self.save(req.get_full_url(), resp.code, resp.msg, resp.headers, data, time.time()) self.save(req.get_full_url(), {
'code': resp.code,
'msg': resp.msg,
'headers': resp.headers,
'data': data,
'timestamp': time.time()
})
fp = BytesIO(data) fp = BytesIO(data)
old_resp = resp old_resp = resp
@ -497,13 +507,14 @@ class CacheHandler(BaseHandler):
return resp return resp
def http_request(self, req): def http_request(self, req):
(code, msg, headers, data, timestamp) = self.load(req.get_full_url()) data = self.load(req.get_full_url())
if 'etag' in headers: if data is not None:
req.add_unredirected_header('If-None-Match', headers['etag']) if 'etag' in data['headers']:
req.add_unredirected_header('If-None-Match', data['headers']['etag'])
if 'last-modified' in headers: if 'last-modified' in data['headers']:
req.add_unredirected_header('If-Modified-Since', headers.get('last-modified')) req.add_unredirected_header('If-Modified-Since', data['headers']['last-modified'])
return req return req
@ -512,33 +523,33 @@ class CacheHandler(BaseHandler):
# If 'None' is returned, try your chance with the next-available handler # If 'None' is returned, try your chance with the next-available handler
# If a 'resp' is returned, stop there, and proceed with 'http_response' # If a 'resp' is returned, stop there, and proceed with 'http_response'
(code, msg, headers, data, timestamp) = self.load(req.get_full_url()) data = self.load(req.get_full_url())
if data is None:
# cache empty, refresh
return None
# some info needed to process everything # some info needed to process everything
cache_control = parse_http_list(headers.get('cache-control', ())) cache_control = parse_http_list(data['headers'].get('cache-control', ()))
cache_control += parse_http_list(headers.get('pragma', ())) cache_control += parse_http_list(data['headers'].get('pragma', ()))
cc_list = [x for x in cache_control if '=' not in x] cc_list = [x for x in cache_control if '=' not in x]
cc_values = parse_keqv_list([x for x in cache_control if '=' in x]) cc_values = parse_keqv_list([x for x in cache_control if '=' in x])
cache_age = time.time() - timestamp cache_age = time.time() - data['timestamp']
# list in a simple way what to do when # list in a simple way what to do when
if self.force_min == -2: if self.force_min == -2:
if code is not None: if data['code'] is not None:
# already in cache, perfect, use cache # already in cache, perfect, use cache
return self.cached_response(req) return self.cached_response(req)
else: else:
# raise an error, via urllib handlers # raise an error, via urllib handlers
resp = addinfourl(BytesIO(), headers, req.get_full_url(), 409) resp = addinfourl(BytesIO(), data['headers'], req.get_full_url(), 409)
resp.msg = 'Conflict' resp.msg = 'Conflict'
return resp return resp
elif code is None:
# cache empty, refresh
return None
elif self.force_min == -1: elif self.force_min == -1:
# force use cache # force use cache
return self.cached_response(req) return self.cached_response(req)
@ -547,7 +558,7 @@ class CacheHandler(BaseHandler):
# force refresh # force refresh
return None return None
elif code == 301 and cache_age < 7*24*3600: elif data['code'] == 301 and cache_age < 7*24*3600:
# "301 Moved Permanently" has to be cached...as long as we want # "301 Moved Permanently" has to be cached...as long as we want
# (awesome HTTP specs), let's say a week (why not?). Use force_min=0 # (awesome HTTP specs), let's say a week (why not?). Use force_min=0
# if you want to bypass this (needed for a proper refresh) # if you want to bypass this (needed for a proper refresh)
@ -641,7 +652,7 @@ class SQLiteCache(BaseCache):
self.con = sqlite3.connect(filename, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False) self.con = sqlite3.connect(filename, detect_types=sqlite3.PARSE_DECLTYPES, check_same_thread=False)
with self.con: with self.con:
self.con.execute('CREATE TABLE IF NOT EXISTS data (url UNICODE PRIMARY KEY, code INT, msg UNICODE, headers UNICODE, data BLOB, timestamp INT)') self.con.execute('CREATE TABLE IF NOT EXISTS data (ky UNICODE PRIMARY KEY, data BLOB, timestamp INT)')
self.con.execute('pragma journal_mode=WAL') self.con.execute('pragma journal_mode=WAL')
self.trim() self.trim()
@ -653,21 +664,17 @@ class SQLiteCache(BaseCache):
with self.con: with self.con:
self.con.execute('DELETE FROM data WHERE timestamp <= ( SELECT timestamp FROM ( SELECT timestamp FROM data ORDER BY timestamp DESC LIMIT 1 OFFSET ? ) foo )', (CACHE_SIZE,)) self.con.execute('DELETE FROM data WHERE timestamp <= ( SELECT timestamp FROM ( SELECT timestamp FROM data ORDER BY timestamp DESC LIMIT 1 OFFSET ? ) foo )', (CACHE_SIZE,))
def __getitem__(self, url): def __getitem__(self, key):
row = self.con.execute('SELECT * FROM data WHERE url=?', (url,)).fetchone() row = self.con.execute('SELECT * FROM data WHERE ky=?', (key,)).fetchone()
if not row: if not row:
raise KeyError raise KeyError
return row[1:] return row[1]
def __setitem__(self, url, value): # value = (code, msg, headers, data, timestamp)
value = list(value)
value[3] = sqlite3.Binary(value[3]) # data
value = tuple(value)
def __setitem__(self, key, data):
with self.con: with self.con:
self.con.execute('INSERT INTO data VALUES (?,?,?,?,?,?) ON CONFLICT(url) DO UPDATE SET code=?, msg=?, headers=?, data=?, timestamp=?', (url,) + value + value) self.con.execute('INSERT INTO data VALUES (?,?,?) ON CONFLICT(ky) DO UPDATE SET data=?, timestamp=?', (key, data, time.time(), data, time.time()))
try: try:
@ -684,7 +691,7 @@ class MySQLCacheHandler(BaseCache):
self.host = host self.host = host
with self.cursor() as cursor: with self.cursor() as cursor:
cursor.execute('CREATE TABLE IF NOT EXISTS data (url VARCHAR(255) NOT NULL PRIMARY KEY, code INT, msg TEXT, headers TEXT, data BLOB, timestamp INT)') cursor.execute('CREATE TABLE IF NOT EXISTS data (ky VARCHAR(255) NOT NULL PRIMARY KEY, data MEDIUMBLOB, timestamp INT)')
self.trim() self.trim()
@ -695,20 +702,20 @@ class MySQLCacheHandler(BaseCache):
with self.cursor() as cursor: with self.cursor() as cursor:
cursor.execute('DELETE FROM data WHERE timestamp <= ( SELECT timestamp FROM ( SELECT timestamp FROM data ORDER BY timestamp DESC LIMIT 1 OFFSET %s ) foo )', (CACHE_SIZE,)) cursor.execute('DELETE FROM data WHERE timestamp <= ( SELECT timestamp FROM ( SELECT timestamp FROM data ORDER BY timestamp DESC LIMIT 1 OFFSET %s ) foo )', (CACHE_SIZE,))
def __getitem__(self, url): def __getitem__(self, key):
cursor = self.cursor() cursor = self.cursor()
cursor.execute('SELECT * FROM data WHERE url=%s', (url,)) cursor.execute('SELECT * FROM data WHERE ky=%s', (key,))
row = cursor.fetchone() row = cursor.fetchone()
if not row: if not row:
raise KeyError raise KeyError
return row[1:] return row[1]
def __setitem__(self, url, value): # (code, msg, headers, data, timestamp) def __setitem__(self, key, data):
with self.cursor() as cursor: with self.cursor() as cursor:
cursor.execute('INSERT INTO data VALUES (%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE code=%s, msg=%s, headers=%s, data=%s, timestamp=%s', cursor.execute('INSERT INTO data VALUES (%s,%s,%s) ON DUPLICATE KEY UPDATE data=%s, timestamp=%s',
(url,) + value + value) (key, data, time.time(), data, time.time()))
class CappedDict(OrderedDict, BaseCache): class CappedDict(OrderedDict, BaseCache):
@ -717,11 +724,11 @@ class CappedDict(OrderedDict, BaseCache):
for i in range( max( len(self) - CACHE_SIZE , 0 )): for i in range( max( len(self) - CACHE_SIZE , 0 )):
self.popitem(False) self.popitem(False)
def __setitem__(self, key, value): def __setitem__(self, key, data):
# https://docs.python.org/2/library/collections.html#ordereddict-examples-and-recipes # https://docs.python.org/2/library/collections.html#ordereddict-examples-and-recipes
if key in self: if key in self:
del self[key] del self[key]
OrderedDict.__setitem__(self, key, value) OrderedDict.__setitem__(self, key, data)
if 'CACHE' in os.environ: if 'CACHE' in os.environ: