一、scrapy源码流程
- 流程要点:
1、执行CrawlerProcess构造方法
2、CrawlerProcess对象(含有配置文件)的spiders
2.1、为每个爬虫创建一个Crawler
2.2、执行d=Crawler.crawl(...)
d.addBoth(_done)
2.3、CrawlerProcess对象._active={d,}
3、dd=defer.DeferredList(self._active)
dd.addBoth(self._stop_reactor) #self._stop_reactor ==>reactor.stop()
reactor.run()
实现原理同twisted
- Twisted示例
from twisted.internet import reactor from twisted.web.client import Agent,readBody from twisted.internet import defer from twisted.web._newclient import ResponseFailed import warnings warnings.filterwarnings("ignore",category=DeprecationWarning)@defer.inlineCallbacks def task(url):agent = Agent(reactor)v = agent.request(b'GET', url.encode("utf-8"))v.addBoth(download)yield vdef download(response):print(response.code)r=readBody(response)r.addCallbacks(handle_response,handle_error)def stop(*args,**kwargs):reactor.stop() def handle_response(body):#处理响应print(body) def handle_error(failure):#错误处理if isinstance(failure.value, ResponseFailed):print("HTTP请求失败:", failure.value.response)else:print("发生未知错误:", failure.value)if __name__=='__main__':url_list=['http://www.sogou.com','http://www.baidu.com','http://www.cnblogs.com',]_active=set()for url in url_list:d=task(url)_active.add(d)dd=defer.DeferredList(_active)dd.addBoth(stop)reactor.run()
*原案例用的getPage,在twisted的版本16.7.0后弃用,示例用的Agent
二、Twisted使用
1、调用模块的功能原理如下:
from twisted.internet import reactor #事件循环(终止条件,所有的socket都已经移除)
from twisted.web.client import Agent #socket对象(如果下载完成,自动从事件循环中移除)
from twisted.internet import defer #defer.Deferred 特殊的socket对象(不会发请求,手动移除)
示例1:
实现基本流程,此时事件循环没有终止
from twisted.internet import reactor #事件循环(终止条件,所有的socket都已经移除) from twisted.web.client import Agent,readBody #socket对象(如果下载完成,自动从事件循环中移除) from twisted.internet import defer #defer.Deferred 特殊的socket对象(不会发请求,手动移除)#1、利用Agent创建socket #2、将socket添加到事件循环中 #3、开始事件循环(内部发送请求,并接收响应;当所有的socket请求完成后,终止事件循环) def hand_response(body):print(body) def response(response):print(response.code)# r=readBody(response)# r.addCallback(hand_response) @defer.inlineCallbacks def task():url='http://www.baidu.com'agent = Agent(reactor)d=agent.request(b'GET',url.encode('utf-8'))print(type(d))d.addCallbacks(response)yield d task() reactor.run()
2、让事件循环终止
示例2:
def hand_response(body):print(body) def response(response):print(response.code)# r=readBody(response)# r.addCallback(hand_response) @defer.inlineCallbacks def task():url='http://www.baidu.com'agent = Agent(reactor)d=agent.request(b'GET',url.encode('utf-8'))d.addCallbacks(response)yield d def done(*args,**kwargs):reactor.stop()d=task() dd=defer.DeferredList([d,]) dd.addBoth(done) reactor.run()
3、特色的socket对象defer,使用defer.Deferred()进行handle住,再使用计数器终止
示例3:
from twisted.internet import reactor from twisted.web.client import Agent from twisted.internet import defer _close=None count=0def response(response):print(response.code)global countcount+=1if count==3:_close.callback(None)@defer.inlineCallbacks def task():'''#相当于每个爬虫的 start_request:return:'''url="http://www.baidu.com"agent = Agent(reactor)d=agent.request(b'GET',url.encode('utf-8'))d.addCallback(response)url = "http://www.cnblogs.com"agent = Agent(reactor)d2 = agent.request(b'GET',url.encode('utf-8'))d2.addCallback(response)url = "http://www.bing.com"agent = Agent(reactor)d3 = agent.request(b'GET',url.encode('utf-8'))d3.addCallback(response)global _close_close=defer.Deferred()yield _closedef done(*args,**kwargs):reactor.stop()#每一个爬虫 spider1=task() spider2=task() dd=defer.DeferredList([spider1,spider2]) dd.addBoth(done) reactor.run()
三、自定义框架
疑问,defer.DeferredList()作用:
defer.DeferredList()是Twisted中的一个函数,用于创建一个Deferred对象列表,并在所有Deferred对象都触发时返回一个Deferred对象。它的作用是允许你同时处理多个Deferred对象,并在它们全部完成时获得通知
具体来说,defer.DeferredList()接受一个Deferred对象的列表作为参数,并返回一个新的Deferred对象。当传入的所有Deferred对象都出发时,返回的Deferred对象会触发,其回调函数会接收一个列表作为参数,列表
中包含了原始Deferred对象触发时传递的结果或Failure对象。
例子:
from twisted.internet import defer, reactordef task1():d = defer.Deferred()reactor.callLater(2, d.callback, "Task 1 done,")return ddef task2():d = defer.Deferred()reactor.callLater(3, d.callback, "Task 2 done")return ddef task3():d = defer.Deferred()reactor.callLater(1, d.callback, "Task 3 done")return ddef callback(results):print("All tasks completed")for result in results:print(result)def errback(failure):print("An error occurred:", failure)if __name__ == "__main__":deferred_list = defer.DeferredList([task1(), task2(), task3()])deferred_list.addCallback(callback)deferred_list.addErrback(errback)reactor.run()
结果:
1、自定义的框架
from twisted.internet import reactor from twisted.web.client import Agent,readBody from twisted.internet import defer class Request(object):def __init__(self,url,callback):self.url=urlself.callback=callbackclass ChoutiSpider(object):name="chouti"def start_requests(self):start_url=['http://www.baidu.com','http://www.bing.com']for url in start_url:yield Request(url,self.parse)def parse(self,response):print('response',response)# yield Request('http://www.cnblogs.com')import queue Q=queue.Queue()class Engine(object):def __init__(self):self._close=Noneself.spider=None@defer.inlineCallbacksdef crawl(self,spider):#pass 进行下载操作 self.spider=spiderstart_requests=iter(spider.start_requests())while True:try:request=next(start_requests)Q.put(request)except StopIteration as e:breakself._close=defer.Deferred()yield self._closespider=ChoutiSpider()_active=set() engine=Engine()d=engine.crawl(spider) _active.add(d)dd=defer.DeferredList(_active) dd.addBoth(lambda a:reactor.stop())reactor.run()
*还未实现从队列取数据发请求
2、框架实现发送请求
from twisted.internet import reactor from twisted.web.client import Agent,readBody from twisted.internet import defer class Request(object):def __init__(self,url,callback):self.url=urlself.callback=callbackdef hand_response(body):print(body)class HttpResponse(object):def __init__(self,content,request):try:self.content=contentself.request=requestself.url=request.urlself.d_content=readBody(content)#self.text=str(content,encoding='utf-8')except Exception as e:print(e)class ChoutiSpider(object):name="chouti"def start_requests(self):start_url=['http://www.baidu.com','http://www.bing.com']for url in start_url:yield Request(url,self.parse)def parse(self,response):response.d_content.addCallback(hand_response) #在回调函数中实现打印responseyield Request('http://www.cnblogs.com',callback=self.parse)#1、crawlling移除#2、获取paese yield值#3、再次去队列中获取import queue Q=queue.Queue()class Engine(object):def __init__(self):self._close=Noneself.max=5self.crawlling=[]def get_response_callback(self,content,request):self.crawlling.remove(request)print('response&req:',content,request)rep=HttpResponse(content,request)result=request.callback(rep) #parseimport typesif isinstance(result,types.GeneratorType):#判断是否是生成器for req in result:Q.put(req)def _next_request(self):'''去取request对象,并发送请求最大并发数限制:return:'''if Q.qsize==0 and len(self.crawlling)==0:self._close.callback(None)returnif len(self.crawlling)>=self.max:returnwhile len(self.crawlling)<self.max:try:req=Q.get(block=False) #false表示队列中无数据时不等候 self.crawlling.append(req)agent=Agent(reactor)d=agent.request(b'GET',req.url.encode('utf-8'))#页面下载完成,get_response_callback,调用用户spider中定义的parse方法,并将新请求添加到调度器 d.addCallback(self.get_response_callback,req)#未达到最大并发数,可以再去调度器中获取requestd.addCallback(lambda _:reactor.callLater(0,self._next_request))except Exception as e:return@defer.inlineCallbacksdef crawl(self,spider):#将初始request对象添加到调度器start_requests=iter(spider.start_requests())while True:try:request=next(start_requests)Q.put(request)except StopIteration as e:break#去调度器中取request,并发请求#self._next_request() reactor.callLater(0, self._next_request)self._close=defer.Deferred()yield self._closespider=ChoutiSpider()_active=set() engine=Engine()d=engine.crawl(spider) _active.add(d)dd=defer.DeferredList(_active) dd.addBoth(lambda a:reactor.stop())reactor.run()
3、tiny爬虫框架
封装到类,模仿scrapy的流程
结构:
from engine import Requestclass ChoutiSpider(object):name='chouti'def start_requests(self):start_url=['http://www.baidu.com','http://www.bing.com']for url in start_url:yield Request(url,self.parse)def parse(self,response):print('chouti',response)#response.d_content.addCallback(response.hand_response) #在回调函数中实现打印response#yield Request('http://www.cnblogs.com',callback=self.parse)
from engine import Requestclass CnblogsSpider(object):name='cnblogs'def start_requests(self):start_url=['http://www.cnblogs.com']for url in start_url:yield Request(url,self.parse)def parse(self,response):print('cnblogs',response)#yield Request('http://www.cnblogs.com',callback=self.parse)
from twisted.internet import reactor from twisted.web.client import Agent,readBody from twisted.internet import defer import queueclass Request(object):'''用于封装用户请求相关信息'''def __init__(self,url,callback):self.url=urlself.callback=callbackclass HttpResponse(object):def __init__(self,content,request):try:self.content=contentself.request=requestself.url=request.urlself.d_content=readBody(content)except Exception as e:print(e)def hand_response(self,body):print(body)class CrawlerProcess(object):'''开启时间循环'''def __init__(self):self._active=set()def crawl(self,spider_cls_path):crawler=Crawler()d=crawler.crawl(spider_cls_path)self._active.add(d)def start(self):dd=defer.DeferredList(self._active)dd.addBoth(lambda _:reactor.stop())reactor.run()class Crawler(object):'''用于封装调度器和引擎的'''def _create_engine(self):return ExecutionEngine()def _create_spider(self,spider_cls_path):module_path,cls_name=spider_cls_path.rsplit('.',maxsplit=1)import importlibm=importlib.import_module(module_path)cls=getattr(m,cls_name)return cls()@defer.inlineCallbacksdef crawl(self,spider_cls_path):engine=self._create_engine()spider=self._create_spider(spider_cls_path)start_requests=iter(spider.start_requests())yield engine.open_spider(start_requests)yield engine.start()class Scheduler(object):'''调度器'''def __init__(self):self.q=queue.Queue()def enqueue_request(self,req):self.q.put(req)def open(self):passdef next_request(self):try:req=self.q.get(block=False)except Exception as e:req=Nonereturn reqdef size(self):return self.q.qsize()class ExecutionEngine(object):'''引擎'''def __init__(self):self._close=Noneself.scheduler=Noneself.max=5self.crawlling=[]def get_response_callback(self,content,request):self.crawlling.remove(request)response=HttpResponse(content,request)result=request.callback(response)import typesif isinstance(result,types.GeneratorType):#判断是否是生成器for req in result:self.scheduler.enqueue_request(req)def _next_request(self):if self.scheduler.size==0 and len(self.crawlling)==0:self._close.callback(None)returnwhile len(self.crawlling)<self.max:req=self.scheduler.next_request()if not req:returnself.crawlling.append(req)agent=Agent(reactor)d=agent.request(b'GET',req.url.encode('utf-8'))d.addCallback(self.get_response_callback,req)d.addCallback(lambda _:reactor.callLater(0,self._next_request))@defer.inlineCallbacksdef open_spider(self,start_requests):self.scheduler=Scheduler()yield self.scheduler.open()while True:try:req=next(start_requests)except StopIteration as e:breakself.scheduler.enqueue_request(req)reactor.callLater(0,self._next_request)@defer.inlineCallbacksdef start(self):self._close=defer.Deferred()yield self._closeclass Commond(object):def run(self):crawl_process=CrawlerProcess()spider_clss_path_list=['spider.chouti.ChoutiSpider','spider.cnblogs.CnblogsSpider']for spider_clss_path in spider_clss_path_list:crawl_process.crawl(spider_clss_path)crawl_process.start()if __name__=="__main__":cmd=Commond()cmd.run()
运行结果: