Thread Gather()
Twice faster. Ctrl-C no longer works in terminal. Don't know how to fix it. Using threading.active_count() or sth didn't work as expected. New settings (THREADS) to change the number of threads to use.
This commit is contained in:
		
							
								
								
									
										33
									
								
								morss.py
									
									
									
									
									
								
							
							
						
						
									
										33
									
								
								morss.py
									
									
									
									
									
								
							@@ -4,6 +4,9 @@ import os
 | 
				
			|||||||
import os.path
 | 
					import os.path
 | 
				
			||||||
import time
 | 
					import time
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					import Queue
 | 
				
			||||||
 | 
					import threading
 | 
				
			||||||
 | 
					
 | 
				
			||||||
from fnmatch import fnmatch
 | 
					from fnmatch import fnmatch
 | 
				
			||||||
from base64 import b64encode, b64decode
 | 
					from base64 import b64encode, b64decode
 | 
				
			||||||
import re
 | 
					import re
 | 
				
			||||||
@@ -33,6 +36,7 @@ MAX_ITEM = 50	# cache-only beyond
 | 
				
			|||||||
MAX_TIME = 7	# cache-only after (in sec)
 | 
					MAX_TIME = 7	# cache-only after (in sec)
 | 
				
			||||||
DELAY = 10*60	# xml cache & ETag cache (in sec)
 | 
					DELAY = 10*60	# xml cache & ETag cache (in sec)
 | 
				
			||||||
TIMEOUT = 2	# http timeout (in sec)
 | 
					TIMEOUT = 2	# http timeout (in sec)
 | 
				
			||||||
 | 
					THREADS = 10	# number of threads (1 for single-threaded)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
DEBUG = False
 | 
					DEBUG = False
 | 
				
			||||||
HOLD = False
 | 
					HOLD = False
 | 
				
			||||||
@@ -497,10 +501,16 @@ def Gather(url, cachePath, options):
 | 
				
			|||||||
		return False
 | 
							return False
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	size = len(rss.items)
 | 
						size = len(rss.items)
 | 
				
			||||||
 | 
						startTime = time.time()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	# set
 | 
						# set
 | 
				
			||||||
	startTime = time.time()
 | 
						def runner(queue):
 | 
				
			||||||
	for i, item in enumerate(rss.items):
 | 
							while True:
 | 
				
			||||||
 | 
								worker(*queue.get())
 | 
				
			||||||
 | 
								queue.task_done()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						def worker(i, item):
 | 
				
			||||||
		if options.progress:
 | 
							if options.progress:
 | 
				
			||||||
			if MAX_ITEM == -1:
 | 
								if MAX_ITEM == -1:
 | 
				
			||||||
				print '%s/%s' % (i+1, size)
 | 
									print '%s/%s' % (i+1, size)
 | 
				
			||||||
@@ -511,7 +521,7 @@ def Gather(url, cachePath, options):
 | 
				
			|||||||
		if time.time() - startTime > LIM_TIME >= 0 or i+1 > LIM_ITEM >= 0:
 | 
							if time.time() - startTime > LIM_TIME >= 0 or i+1 > LIM_ITEM >= 0:
 | 
				
			||||||
			log('dropped')
 | 
								log('dropped')
 | 
				
			||||||
			item.remove()
 | 
								item.remove()
 | 
				
			||||||
			continue
 | 
								return
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		item = Fix(item, url)
 | 
							item = Fix(item, url)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -519,7 +529,7 @@ def Gather(url, cachePath, options):
 | 
				
			|||||||
			if not options.proxy:
 | 
								if not options.proxy:
 | 
				
			||||||
				if Fill(item, cache, url, True) is False:
 | 
									if Fill(item, cache, url, True) is False:
 | 
				
			||||||
					item.remove()
 | 
										item.remove()
 | 
				
			||||||
					continue
 | 
										return
 | 
				
			||||||
		else:
 | 
							else:
 | 
				
			||||||
			if not options.proxy:
 | 
								if not options.proxy:
 | 
				
			||||||
				Fill(item, cache, url)
 | 
									Fill(item, cache, url)
 | 
				
			||||||
@@ -531,6 +541,19 @@ def Gather(url, cachePath, options):
 | 
				
			|||||||
			if not options.keep:
 | 
								if not options.keep:
 | 
				
			||||||
				del item.desc
 | 
									del item.desc
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						queue = Queue.Queue()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for i in range(THREADS):
 | 
				
			||||||
 | 
							t = threading.Thread(target=runner, args=(queue,))
 | 
				
			||||||
 | 
							t.daemon = True
 | 
				
			||||||
 | 
							t.start()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						for i, item in enumerate(rss.items):
 | 
				
			||||||
 | 
							queue.put([i, item])
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						queue.join()
 | 
				
			||||||
 | 
						cache.save()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
	log(len(rss.items))
 | 
						log(len(rss.items))
 | 
				
			||||||
	log(time.time() - startTime)
 | 
						log(time.time() - startTime)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -584,6 +607,8 @@ if __name__ == '__main__':
 | 
				
			|||||||
		if 'o'+user_id not in token:
 | 
							if 'o'+user_id not in token:
 | 
				
			||||||
			facebook.set('o'+user_id, ltoken)
 | 
								facebook.set('o'+user_id, ltoken)
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
							facebook.save()
 | 
				
			||||||
 | 
					
 | 
				
			||||||
		if 'REQUEST_URI' in os.environ:
 | 
							if 'REQUEST_URI' in os.environ:
 | 
				
			||||||
			print 'Status: 200'
 | 
								print 'Status: 200'
 | 
				
			||||||
			print 'Content-Type: text/plain'
 | 
								print 'Content-Type: text/plain'
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user