Compare commits
2 Commits
da81edc651
...
b888e068c0
Author | SHA1 | Date |
---|---|---|
pictuga | b888e068c0 | |
pictuga | d6b90448f3 |
|
@ -10,6 +10,7 @@ steps:
|
|||
- pip3 install --no-cache-dir .[full] .[dev]
|
||||
- isort --check-only --diff .
|
||||
- pylint morss --rcfile=.pylintrc --disable=C,R,W --fail-under=8
|
||||
- pytest --cov=morss tests
|
||||
|
||||
---
|
||||
kind: pipeline
|
||||
|
|
|
@ -32,18 +32,18 @@ from .caching import default_cache
|
|||
|
||||
try:
|
||||
# python 2
|
||||
from urllib import quote
|
||||
from urllib import quote, unquote
|
||||
|
||||
from httplib import HTTPMessage
|
||||
from urllib2 import (BaseHandler, HTTPCookieProcessor, HTTPRedirectHandler,
|
||||
Request, addinfourl, build_opener, parse_http_list,
|
||||
parse_keqv_list)
|
||||
from urlparse import urlparse, urlunparse
|
||||
from urlparse import urlsplit
|
||||
except ImportError:
|
||||
# python 3
|
||||
from email import message_from_string
|
||||
from http.client import HTTPMessage
|
||||
from urllib.parse import quote, urlparse, urlunparse
|
||||
from urllib.parse import quote, unquote, urlsplit
|
||||
from urllib.request import (BaseHandler, HTTPCookieProcessor,
|
||||
HTTPRedirectHandler, Request, addinfourl,
|
||||
build_opener, parse_http_list, parse_keqv_list)
|
||||
|
@ -151,22 +151,10 @@ def custom_opener(follow=None, policy=None, force_min=None, force_max=None):
|
|||
return build_opener(*handlers)
|
||||
|
||||
|
||||
def is_ascii(string):
|
||||
# there's a native function in py3, but home-made fix for backward compatibility
|
||||
try:
|
||||
string.encode('ascii')
|
||||
|
||||
except UnicodeError:
|
||||
return False
|
||||
|
||||
else:
|
||||
return True
|
||||
|
||||
|
||||
def sanitize_url(url):
|
||||
# make sure the url is unicode, i.e. not bytes
|
||||
if isinstance(url, bytes):
|
||||
url = url.decode()
|
||||
url = url.decode('utf-8')
|
||||
|
||||
# make sure there's a protocol (http://)
|
||||
if url.split(':', 1)[0] not in PROTOCOL:
|
||||
|
@ -175,22 +163,20 @@ def sanitize_url(url):
|
|||
# turns out some websites have really badly fomatted urls (fix http:/badurl)
|
||||
url = re.sub('^(https?):/([^/])', r'\1://\2', url)
|
||||
|
||||
# escape spaces
|
||||
url = url.replace(' ', '%20')
|
||||
# escape non-ascii unicode characters (also encode spaces as %20)
|
||||
parts = urlsplit(url)
|
||||
|
||||
# escape non-ascii unicode characters
|
||||
# https://stackoverflow.com/a/4391299
|
||||
parts = list(urlparse(url))
|
||||
parts = parts._replace(
|
||||
netloc=parts.netloc.replace(
|
||||
parts.hostname,
|
||||
parts.hostname.encode('idna').decode('ascii')
|
||||
),
|
||||
path=quote(unquote(parts.path).encode('utf-8')),
|
||||
query=quote(unquote(parts.query).encode('utf-8')),
|
||||
fragment=quote(unquote(parts.fragment).encode('utf-8')),
|
||||
)
|
||||
|
||||
for i in range(len(parts)):
|
||||
if not is_ascii(parts[i]):
|
||||
if i == 1:
|
||||
parts[i] = parts[i].encode('idna').decode('ascii')
|
||||
|
||||
else:
|
||||
parts[i] = quote(parts[i].encode('utf-8'))
|
||||
|
||||
return urlunparse(parts)
|
||||
return parts.geturl()
|
||||
|
||||
|
||||
class RespDataHandler(BaseHandler):
|
||||
|
|
2
setup.py
2
setup.py
|
@ -23,7 +23,7 @@ setup(
|
|||
install_requires = ['lxml', 'bs4', 'python-dateutil', 'chardet'],
|
||||
extras_require = {
|
||||
'full': ['pymysql', 'redis', 'diskcache', 'gunicorn', 'setproctitle'],
|
||||
'dev': ['pylint']
|
||||
'dev': ['pylint', 'pytest'],
|
||||
},
|
||||
python_requires = '>=2.7',
|
||||
package_data = {package_name: ['feedify.ini']},
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
import os
|
||||
import os.path
|
||||
import threading
|
||||
|
||||
import pytest
|
||||
|
||||
try:
|
||||
# python2
|
||||
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
|
||||
from SimpleHTTPServer import SimpleHTTPRequestHandler
|
||||
except:
|
||||
# python3
|
||||
from http.server import (BaseHTTPRequestHandler, HTTPServer,
|
||||
SimpleHTTPRequestHandler)
|
||||
|
||||
class HTTPReplayHandler(SimpleHTTPRequestHandler):
|
||||
" Serves pages saved alongside with headers. See `curl --http1.1 -is http://...` "
|
||||
|
||||
directory = os.path.join(os.path.dirname(__file__), './samples/')
|
||||
|
||||
__init__ = BaseHTTPRequestHandler.__init__
|
||||
|
||||
def do_GET(self):
|
||||
path = self.translate_path(self.path)
|
||||
|
||||
if os.path.isdir(path):
|
||||
f = self.list_directory(path)
|
||||
|
||||
else:
|
||||
f = open(path, 'rb')
|
||||
|
||||
try:
|
||||
self.copyfile(f, self.wfile)
|
||||
|
||||
finally:
|
||||
f.close()
|
||||
|
||||
class MuteHTTPServer(HTTPServer):
|
||||
def handle_error(self, request, client_address):
|
||||
# mute errors
|
||||
pass
|
||||
|
||||
def make_server(port=8888):
|
||||
print('Serving http://localhost:%s/' % port)
|
||||
return MuteHTTPServer(('', port), RequestHandlerClass=HTTPReplayHandler)
|
||||
|
||||
@pytest.fixture
|
||||
def replay_server():
|
||||
httpd = make_server()
|
||||
thread = threading.Thread(target=httpd.serve_forever)
|
||||
thread.start()
|
||||
|
||||
yield
|
||||
|
||||
httpd.shutdown()
|
||||
thread.join()
|
||||
|
||||
if __name__ == '__main__':
|
||||
httpd = make_server()
|
||||
httpd.serve_forever()
|
|
@ -0,0 +1,16 @@
|
|||
HTTP/1.1 200 OK
|
||||
Content-Type: text/xml; charset=utf-8
|
||||
|
||||
<?xml version='1.0' encoding='utf-8'?>
|
||||
<feed xmlns="http://www.w3.org/2005/Atom">
|
||||
<title>!TITLE!</title>
|
||||
<subtitle>!DESC!</subtitle>
|
||||
<entry>
|
||||
<title>!ITEM_TITLE!</title>
|
||||
<summary>!ITEM_DESC!</summary>
|
||||
<content type="html">!ITEM_CONTENT!</content>
|
||||
<link href="!ITEM_LINK!"/>
|
||||
<updated>2022-01-01T00:00:01+01:00</updated>
|
||||
<published>2022-01-01T00:00:02+01:00</published>
|
||||
</entry>
|
||||
</feed>
|
|
@ -0,0 +1,15 @@
|
|||
HTTP/1.1 200 OK
|
||||
content-type: application/xml
|
||||
|
||||
<?xml version='1.0' encoding='utf-8' ?>
|
||||
<feed version='0.3' xmlns='http://purl.org/atom/ns#'>
|
||||
<title>!TITLE!</title>
|
||||
<subtitle>!DESC!</subtitle>
|
||||
<entry>
|
||||
<title>!ITEM_TITLE!</title>
|
||||
<link rel='alternate' type='text/html' href='!ITEM_LINK!' />
|
||||
<summary>!ITEM_DESC!</summary>
|
||||
<content>!ITEM_CONTENT!</content>
|
||||
<issued>2022-01-01T00:00:01+01:00</issued> <!-- FIXME -->
|
||||
</entry>
|
||||
</feed>
|
|
@ -0,0 +1,22 @@
|
|||
HTTP/1.1 200 OK
|
||||
Content-Type: text/html; charset=utf-8
|
||||
|
||||
<html>
|
||||
<head></head>
|
||||
|
||||
<body>
|
||||
<div id="header">
|
||||
<h1>!TITLE!</h1>
|
||||
<p>!DESC!</p>
|
||||
</div>
|
||||
|
||||
<div id="content">
|
||||
<div class="item">
|
||||
<a target="_blank" href="!ITEM_LINK!">!ITEM_TITLE!</a>
|
||||
<div class="desc">!ITEM_DESC!</div>
|
||||
<div class="content">!ITEM_CONTENT!</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
</body>
|
||||
</html>
|
|
@ -0,0 +1,16 @@
|
|||
HTTP/1.1 200 OK
|
||||
Content-Type: application/json; charset=utf-8
|
||||
|
||||
{
|
||||
"title": "!TITLE!",
|
||||
"desc": "!DESC!",
|
||||
"items": [
|
||||
{
|
||||
"title": "!ITEM_TITLE!",
|
||||
"time": "2022-01-01T00:00:01+0100",
|
||||
"url": "!ITEM_LINK!",
|
||||
"desc": "!ITEM_DESC!",
|
||||
"content": "!ITEM_CONTENT!"
|
||||
}
|
||||
]
|
||||
}
|
|
@ -0,0 +1,17 @@
|
|||
HTTP/1.1 200 OK
|
||||
Content-Type: text/xml; charset=utf-8
|
||||
|
||||
<?xml version='1.0' encoding='utf-8'?>
|
||||
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" version="2.0">
|
||||
<channel>
|
||||
<title>!TITLE!</title>
|
||||
<description>!DESC!</description>
|
||||
<item>
|
||||
<title>!ITEM_TITLE!</title>
|
||||
<pubDate>Mon, 01 Jan 2022 00:00:01 +0100</pubDate>
|
||||
<link>!ITEM_LINK!</link>
|
||||
<description>!ITEM_DESC!</description>
|
||||
<content:encoded>!ITEM_CONTENT!</content:encoded>
|
||||
</item>
|
||||
</channel>
|
||||
</rss>
|
|
@ -0,0 +1,106 @@
|
|||
import pytest
|
||||
from morss.crawler import adv_get
|
||||
from morss.feeds import *
|
||||
|
||||
def get_feed(url):
|
||||
url = 'http://localhost:8888/%s' % url
|
||||
out = adv_get(url)
|
||||
feed = parse(out['data'], url=url, encoding=out['encoding'])
|
||||
return feed
|
||||
|
||||
def check_feed(feed):
|
||||
# NB. time and updated not covered
|
||||
assert feed.title == '!TITLE!'
|
||||
assert feed.desc == '!DESC!'
|
||||
assert feed[0] == feed.items[0]
|
||||
assert feed[0].title == '!ITEM_TITLE!'
|
||||
assert feed[0].link == '!ITEM_LINK!'
|
||||
assert '!ITEM_DESC!' in feed[0].desc # broader test due to possible inclusion of surrounding <div> in xml
|
||||
assert '!ITEM_CONTENT!' in feed[0].content
|
||||
|
||||
def check_output(feed):
|
||||
output = feed.tostring()
|
||||
assert '!TITLE!' in output
|
||||
assert '!DESC!' in output
|
||||
assert '!ITEM_TITLE!' in output
|
||||
assert '!ITEM_LINK!' in output
|
||||
assert '!ITEM_DESC!' in output
|
||||
assert '!ITEM_CONTENT!' in output
|
||||
|
||||
def check_change(feed):
|
||||
feed.title = '!TITLE2!'
|
||||
feed.desc = '!DESC2!'
|
||||
feed[0].title = '!ITEM_TITLE2!'
|
||||
feed[0].link = '!ITEM_LINK2!'
|
||||
feed[0].desc = '!ITEM_DESC2!'
|
||||
feed[0].content = '!ITEM_CONTENT2!'
|
||||
|
||||
assert feed.title == '!TITLE2!'
|
||||
assert feed.desc == '!DESC2!'
|
||||
assert feed[0].title == '!ITEM_TITLE2!'
|
||||
assert feed[0].link == '!ITEM_LINK2!'
|
||||
assert '!ITEM_DESC2!' in feed[0].desc
|
||||
assert '!ITEM_CONTENT2!' in feed[0].content
|
||||
|
||||
def check_add(feed):
|
||||
feed.append({
|
||||
'title': '!ITEM_TITLE3!',
|
||||
'link': '!ITEM_LINK3!',
|
||||
'desc': '!ITEM_DESC3!',
|
||||
'content': '!ITEM_CONTENT3!',
|
||||
})
|
||||
|
||||
assert feed[1].title == '!ITEM_TITLE3!'
|
||||
assert feed[1].link == '!ITEM_LINK3!'
|
||||
assert '!ITEM_DESC3!' in feed[1].desc
|
||||
assert '!ITEM_CONTENT3!' in feed[1].content
|
||||
|
||||
each_format = pytest.mark.parametrize('url', [
|
||||
'feed-rss-channel-utf-8.txt', 'feed-atom-utf-8.txt',
|
||||
'feed-atom03-utf-8.txt', 'feed-json-utf-8.txt', 'feed-html-utf-8.txt',
|
||||
])
|
||||
|
||||
each_check = pytest.mark.parametrize('check', [
|
||||
check_feed, check_output, check_change, check_add,
|
||||
])
|
||||
|
||||
@each_format
|
||||
@each_check
|
||||
def test_parse(replay_server, url, check):
|
||||
feed = get_feed(url)
|
||||
check(feed)
|
||||
|
||||
@each_format
|
||||
@each_check
|
||||
def test_convert_rss(replay_server, url, check):
|
||||
feed = get_feed(url)
|
||||
feed = feed.convert(FeedXML)
|
||||
check(feed)
|
||||
|
||||
@each_format
|
||||
@each_check
|
||||
def test_convert_json(replay_server, url, check):
|
||||
feed = get_feed(url)
|
||||
feed = feed.convert(FeedJSON)
|
||||
check(feed)
|
||||
|
||||
@each_format
|
||||
@each_check
|
||||
def test_convert_html(replay_server, url, check):
|
||||
feed = get_feed(url)
|
||||
feed = feed.convert(FeedHTML)
|
||||
if len(feed) > 1:
|
||||
# remove the 'blank' default html item
|
||||
del feed[0]
|
||||
check(feed)
|
||||
|
||||
@each_format
|
||||
def test_convert_csv(replay_server, url):
|
||||
# only csv output, not csv feed, check therefore differnet
|
||||
feed = get_feed(url)
|
||||
output = feed.tocsv()
|
||||
|
||||
assert '!ITEM_TITLE!' in output
|
||||
assert '!ITEM_LINK!' in output
|
||||
assert '!ITEM_DESC!' in output
|
||||
assert '!ITEM_CONTENT!' in output
|
Loading…
Reference in New Issue