⑴ 如何用python写一个协程
作者:LittleCoder
链接:https://www.hu.com/question/54483694/answer/139785021
来源:知乎
着作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
yield`和`yield from`的区别
`yield`题主肯定不陌生,而`yield from`是PEP 380中新增的一个特性。
PEP 380的名字是嵌套子迭代器的语法糖(我喜欢这么翻译,原文是:Syntax for Delegating to a Subgenerator)。
既然是语法糖,那么肯定本来是有别的写法的,这里给出本来的写法:
def subgen():
for i in range(3):
yield 'subgen: %s' % i
return 'subgen returned'def gen():
r = yield from subgen()
print('r = "%s"' % r)
yield rdef gen_without_yield_from():
sg = subgen()
try:
while 1:
yield sg.send(None)
except StopIteration as e:
yield e.valueprint('* [gen] get all values')for v in gen_without_yield_from():
print('get value: %s' % v)print('* [gen_without_yield_from] get all values')for v in gen_without_yield_from():
print('get value: %s' % v)
不难看出,`yield`子迭代器是把子迭代器直接传递出去,`yield from`子迭代器是把子迭代器的值一个一个传出去。
虽然实际把子迭代器当做一个对象直接传递出去也没有问题,也有使用场景(生成迭代器的迭代器)。
但在协程中相较于这个令人愉快的语法糖而言,直接传递就显得没有必要且碍事了。
毕竟我希望使用一个子迭代器是把子迭代器中的代码都运行一遍而不是直接就把这个子迭代器传出来让我自己操作。
所以如果你把子迭代器直接传了出去,asyncio就判断你在做一件奇怪的事情并报了错。
那么,回到问题,给出的程序要怎么通过`yield`调用呢?
# 源程序@asyncio.coroutinedef hello():
print("Hello world!")
yield from asyncio.sleep(1)
print("Hello again!")# 使用[email protected] hello():
print("Hello world!")
for v in asyncio.sleep(1):
yield v
print("Hello again!")
协程和迭代器的区别
举个比喻,迭代器和协程就像火药和枪械,利用火药的特性辅助各种其他东西才造出了枪械。
迭代器就最简单的本质而言就是一个可以暂停的程序。
那么就有这样一个合理的联想,我是不是可以节省下所有不必要的例如等待网站响应的等待时间。
就是我把我的请求发过去以后就把这个程序暂停下来,开启别的程序,等到响应来了再叫我回到这个程序。
那么等待网站响应的时间也就完全没有浪费了,比原来傻傻的等着网站响应真是优秀了许多。
这就是协程。
所以,为什么看上去都是`generator`,迭代器不会天生成为协程呢?
因为没有一个知道什么时候应该叫你回到这个程序的人。
这个人就是`event_loop`(消息循环)。
回到问题,协程是否可以脱离`event_loop`(消息循环)调用。
讲道理是不可以的,但合理联想一下是不是一直不停的告诉程序又到你了就行了。
像这样:
@asyncio.coroutinedef gen():
for i in range(3):
yield ifor i in gen():
print(i)print('end')
的确有些协程这样是可以运行的(这些协程为什么要写成协程?)。
但终究你是在不应该告诉程序到你的时候告诉了他这件事情。
所以显然获取数据的话当时数据根本没有传到,`sleep`的话就根本没有了`sleep`的效果。
只是看上去能够运行,实际完全没有用。
asyncio还为此特地加了一个断言,如果你这样调用`asyncio.sleep`,asyncio会发现你在伪装消息循环骗他。
协程的原理
这是另一个看上去能够运行,实际上完全没有用的事情。
这虽然不是你想问的问题,但你已经碰到了也迟早会意识到,所以一并讲了。
这个问题应该是这样的:为什么我写出来的协程完全没有协程的效果?
import time, [email protected] sleep(symbol, i):
time.sleep(i)
print('[%s] finished')loop = asyncio.get_event_loop()tasks = [sleep('A', 2), sleep('B', 2)]loop.run_until_complete(asyncio.wait(tasks))loop.close()
看到这里你起码可以简单的讲出来,因为显然我们在傻傻的等。
我们没有在开始等待的时候把程序暂停下来,然后在等待结束后继续运行程序,我们一心一意的在等。
我们真的`time.sleep`了两秒,而不是去做了两秒其他的事情。
你有各种选择,可以花式等待。我这里给你两个最基本的例子:
* get请求
* 同步变为协程(线程池)
get请求
为了让你更好的了解asyncio,我从最底层的socket开始写一个协程的get请求给你。
为了模拟延时很大的网站,我在本地开了一个延时服务器,这是服务器程序。
import tornado.ioloopimport tornado.webfrom tornado.gen import coroutine, sleepclass MainHandler(tornado.web.RequestHandler):
@coroutine
def get(self, waitTime=3):
yield sleep(int(waitTime))
self.write('you have waited for %ss' % waitTime)if __name__ == "__main__":
application = tornado.web.Application([
('/([0-9])', MainHandler),
], debug=True)
application.listen(5000)
try:
tornado.ioloop.IOLoop.current().start()
except:
tornado.ioloop.IOLoop.current().stop()
记得打开了这个服务器再运行下面的程序。
import socket, asyncio, timedata = 'GET /%s HTTP/1.1\r\n\r\n'loop = asyncio.get_event_loop()@asyncio.coroutinedef get(i):
future = asyncio.futures.Future(loop=loop)
s = socket.socket()
s.connect(('127.0.0.1', 5000))
s.sendall((data % i).encode('utf8'))
s.setblocking(False)
def callback(future):
future.set_result(s.recv(999).split(b'\r\n\r\n')[-1])
loop.add_reader(s.fileno(), callback, future)
r = yield from future
print('Return value: %s' % r)tasks = [get(3), get(3)]loop.run_until_complete(asyncio.wait(tasks))loop.close()
同步变为协程(线程池)
这里拿sleep模拟耗时的程序,原理就是开了5个新的线程处理耗时程序。
当然实际的`asyncio.sleep`只需要告诉消息循环一定时间后叫醒我就好了。
import asyncio, sleep, [email protected] sleep(i):
executor = concurrent.futures.ThreadPoolExecutor(5)
future = asyncio.futures.wrap_future(executor.submit(time.sleep, i), loop=loop)
yield from future
print('Slept for %s seconds' % i)tasks = [sleep(3), sleep(3)]loop.run_until_complete(asyncio.wait(tasks))loop.close()
⑵ python 中的协程是怎么实现多任务的
协程也称为微线程,是在一个线程中,通过不断的切换任务函数实现了多任务的效果。
协程在python实现的原理主要是通过yield这个关键字实现
但是真正在开发时,可以不需要自己实现,可以通过很多成熟的第三方模块来实现协程,比如greenlet,gevent等模块。多线程的课程我记得是在黑马程序员里面找的,一套,还有资料。
⑶ python中多进程+协程的使用以及为什么要用它
前面讲了为什么python里推荐用多进程而不是多线程,但是多进程也有其自己的限制:相比线程更加笨重、切换耗时更长,并且在python的多进程下,进程数量不推荐超过CPU核心数(一个进程只有一个GIL,所以一个进程只能跑满一个CPU),因为一个进程占用一个CPU时能充分利用机器的性能,但是进程多了就会出现频繁的进程切换,反而得不偿失。
不过特殊情况(特指IO密集型任务)下,多线程是比多进程好用的。
举个例子:给你200W条url,需要你把每个url对应的页面抓取保存起来,这种时候,单单使用多进程,效果肯定是很差的。为什么呢?
例如每次请求的等待时间是2秒,那么如下(忽略cpu计算时间):
1、单进程+单线程:需要2秒*200W=400W秒==1111.11个小时==46.3天,这个速度明显是不能接受的
2、单进程+多线程:例如我们在这个进程中开了10个多线程,比1中能够提升10倍速度,也就是大约4.63天能够完成200W条抓取,请注意,这里的实际执行是:线程1遇见了阻塞,CPU切换到线程2去执行,遇见阻塞又切换到线程3等等,10个线程都阻塞后,这个进程就阻塞了,而直到某个线程阻塞完成后,这个进程才能继续执行,所以速度上提升大约能到10倍(这里忽略了线程切换带来的开销,实际上的提升应该是不能达到10倍的),但是需要考虑的是线程的切换也是有开销的,所以不能无限的启动多线程(开200W个线程肯定是不靠谱的)
3、多进程+多线程:这里就厉害了,一般来说也有很多人用这个方法,多进程下,每个进程都能占一个cpu,而多线程从一定程度上绕过了阻塞的等待,所以比单进程下的多线程又更好使了,例如我们开10个进程,每个进程里开20W个线程,执行的速度理论上是比单进程开200W个线程快10倍以上的(为什么是10倍以上而不是10倍,主要是cpu切换200W个线程的消耗肯定比切换20W个进程大得多,考虑到这部分开销,所以是10倍以上)。
还有更好的方法吗?答案是肯定的,它就是:
4、协程,使用它之前我们先讲讲what/why/how(它是什么/为什么用它/怎么使用它)
what:
协程是一种用户级的轻量级线程。协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
在并发编程中,协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。
why:
目前主流语言基本上都选择了多线程作为并发设施,与线程相关的概念是抢占式多任务(Preemptive multitasking),而与协程相关的是协作式多任务。
不管是进程还是线程,每次阻塞、切换都需要陷入系统调用(system call),先让CPU跑操作系统的调度程序,然后再由调度程序决定该跑哪一个进程(线程)。
而且由于抢占式调度执行顺序无法确定的特点,使用线程时需要非常小心地处理同步问题,而协程完全不存在这个问题(事件驱动和异步程序也有同样的优点)。
因为协程是用户自己来编写调度逻辑的,对CPU来说,协程其实是单线程,所以CPU不用去考虑怎么调度、切换上下文,这就省去了CPU的切换开销,所以协程在一定程度上又好于多线程。
how:
python里面怎么使用协程?答案是使用gevent,使用方法:看这里
使用协程,可以不受线程开销的限制,我尝试过一次把20W条url放在单进程的协程里执行,完全没问题。
所以最推荐的方法,是多进程+协程(可以看作是每个进程里都是单线程,而这个单线程是协程化的)
多进程+协程下,避开了CPU切换的开销,又能把多个CPU充分利用起来,这种方式对于数据量较大的爬虫还有文件读写之类的效率提升是巨大的。
小例子:
[python]view plain
#-*-coding=utf-8-*-
importrequests
importgevent
fromgeventimportmonkey;monkey.patch_all()
importsys
reload(sys)
sys.setdefaultencoding('utf8')
deffetch(url):
try:
s=requests.Session()
r=s.get(url,timeout=1)#在这里抓取页面
exceptException,e:
printe
return''
defprocess_start(url_list):
tasks=[]
forurlinurl_list:
tasks.append(gevent.spawn(fetch,url))
gevent.joinall(tasks)#使用协程来执行
deftask_start(filepath,flag=100000):#每10W条url启动一个进程
withopen(filepath,'r')asreader:#从给定的文件中读取url
url=reader.readline().strip()
url_list=[]#这个list用于存放协程任务
i=0#计数器,记录添加了多少个url到协程队列
whileurl!='':
i+=1
url_list.append(url)#每次读取出url,将url添加到队列
ifi==flag:#一定数量的url就启动一个进程并执行
p=Process(target=process_start,args=(url_list,))
p.start()
url_list=[]#重置url队列
i=0#重置计数器
url=reader.readline().strip()
ifurl_listnot[]:#若退出循环后任务队列里还有url剩余
p=Process(target=process_start,args=(url_list,))#把剩余的url全都放到最后这个进程来执行
p.start()
if__name__=='__main__':
task_start('./testData.txt')#读取指定文件
细心的同学会发现:上面的例子中隐藏了一个问题:进程的数量会随着url数量的增加而不断增加,我们在这里不使用进程池multiprocessing.Pool来控制进程数量的原因是multiprocessing.Pool和gevent有冲突不能同时使用,但是有兴趣的同学可以研究一下gevent.pool这个协程池。
⑷ python协程gevent怎么用
在学习gevent之前,你肯定要知道你学的这个东西是什么。
官方描述gevent
gevent is a coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev event loop.
翻译:gevent是一个基于协程的Python网络库。我们先理解这句,也是这次学习的重点——协程。
wiki描述协程
与子例程一样,协程也是一种程序组件。相对子例程而言,协程更为一般和灵活,但在实践中使用没有子例程那样广泛。子例程的起始处是惟一的入口点,一旦退出即完成了子例程的执行,子例程的一个实例只会返回一次;协程可以通过yield来调用其它协程。通过yield方式转移执行权的协程之间不是调用者与被调用者的关系,而是彼此对称、平等的。协程允许多个入口点,可以在指定位置挂起和恢复执行。
没看懂?没关系,我也没看懂,不过算是有点线索:子例程。
子例程
过程有两种,一种叫子例程(Subroutine),通常叫Sub;另一种叫函数(Function)。底层实现机制是一样的,区别在于,Sub只执行操作,没有返回值;Function不但执行操作,并且有返回值。用过VB的应该会比较清楚这点。(原谅我用了网络)说到底子例程就是过程,我们一般叫它函数。
说到函数,我就想吐槽了,不明白为什么要叫函数。很多时候我们写一个函数是为了封装、模块化某个功能,它是一个功能、或者说是一个过程。因为它包含的是类似于流程图那样的具体逻辑,先怎样做,然后怎样做;如果遇到A情况则怎样,如果遇到B情况又怎样。个人觉得还是叫过程比较好,叫做函数就让人很纠结了,难道因为回归到底层还是计算问题,出于数学的角度把它称为函数?这个略坑啊!为了符合大家的口味,我还是称之为函数好了(其实我也习惯叫函数了%>_
讲到函数,我们就往底层深入一点,看看下面的代码:
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def a():
print "a start"
b()
print "a end"
def b():
print "b start"
c()
print "b end"
def c():
print "c start"
print "c end"
if __name__ == "__main__":
a()
a start
b start
c start
c end
b end
a end
对于这样的结果大家肯定不会意外的。每当函数被调用,就会在栈中开辟一个栈空间,调用结束后再回收该空间。
假设一个这样的场景:有个讲台,每个人都可以上去发表言论,但是每次讲台只能站一个人。现在a在上面演讲,当他说到“大家好!”的时候,b有个紧急通知要告诉大家,所以a就先下来让b讲完通知,然后a再上讲台继续演讲。如果用函数的思想模拟这个问题,堆栈示意图是这样的:
那什么东西有这样的能力呢?我们很快就可以想到进程、线程,但是你真的想使用进程、线程如此重量级的东西在这么简单的程序上吗?野蛮的抢占式机制和笨重的上下文切换!
还有一种程序组件,那就是协程。它能保留上一次调用时的状态,每次重新进入该过程的时候,就相当于回到上一次离开时所处逻辑流的位置。协程的起始处是第一个入口点,在协程里,返回点之后是接下来的入口点。协程的生命期完全由他们的使用的需要决定。每个协程在用yield命令向另一个协程交出控制时都尽可能做了更多的工作,放弃控制使得另一个协程从这个协程停止的地方开始,接下来的每次协程被调用时,都是从协程返回(或yield)的位置接着执行。
从上面这些你就可以知道其实协程是模拟了多线程(或多进程)的操作,多线程在切换的时候都会有一个上下文切换,在退出的时候将现场保存起来,等到下一次进入的时候从保存的现场开始,继续执行。
看下协程是怎样实现的:
Python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import random
from time import sleep
from greenlet import greenlet
from Queue import Queue
queue = Queue(1)
@greenlet
def procer():
chars = ['a', 'b', 'c', 'd', 'e']
global queue
while True:
char = random.choice(chars)
queue.put(char)
print "Proced: ", char
sleep(1)
consumer.switch()
@greenlet
def consumer():
global queue
while True:
char = queue.get()
print "Consumed: ", char
sleep(1)
procer.switch()
if __name__ == "__main__":
procer.run()
consumer.run()
应用场景
我们一直都在大谈协程是什么样一个东西,却从没有提起协程用来干嘛,这个其实大家分析一下就能够知道。从上面的生产者——消费者问题应该能看出,它分别有两个任务,假设交给两个人去执行,但每次只能允许一个人行动。当缓冲区满的时候,生产者是出于等待状态的,这个时候可以将执行任务的权利转交给消费者,当缓冲区空得时候,消费者是出于等待状态的,这个时候可以将执行任务的权利转交给生产者,是不是很容易联想到多任务切换?然后想到线程?最后想到高并发?
但同学们又会问,既然有了线程为什么还要协程呢?因为线程是系统级别的,在做切换的时候消耗是特别大的,具体为什么这么大等我研究好了再告诉你;同时线程的切换是由CPU决定的,可能你刚好执行到一个地方的时候就要被迫终止,这个时候你需要用各种措施来保证你的数据不出错,所以线程对于数据安全的操作是比较复杂的。而协程是用户级别的切换,且切换是由自己控制,不受外力终止。
总结
协程其实模拟了人类活动的一种过程。例如:你准备先写文档,然后修复bug。这时候接到电话说这个bug很严重,必须立即修复(可以看作CPU通知)。于是你暂停写文档,开始去填坑,终于你把坑填完了,你回来写文档,这个时候你肯定是接着之前写的文档继续,难道你要把之前写的给删了,重新写?这就是协程。那如果是子例程呢?那你就必须重新写了,因为退出之后,栈帧就会被弹出销毁,再次调用就是开辟新的栈空间了。
总结:协程就是用户态下的线程,是人们在有了进程、线程之后仍觉得效率不够,而追求的又一种高并发解决方案。为什么说是用户态,是因为操作系统并不知道它的存在,它是由程序员自己控制、互相协作的让出控制权而不是像进程、线程那样由操作系统调度决定是否让出控制权。
⑸ python中的协程是怎么实现多任务的
协程也称为微线程,是在一个线程中,通过不断的切换任务函数实现了多任务的效果。
协程在python实现的原理主要是通过yield这个关键字实现
但是真正在开发时,可以不需要自己实现,可以通过很多成熟的第三方模块来实现协程,比如greenlet,gevent等模块。黑马程序员可学习Python哦,有免费的学习视频,学习路线图,学习工具!
⑹ python里怎么实现多个协程一起执行,只要完
需要使用新的函数as_completed()来实现,可以把多个并发的协程一起给它,但它把返回的结果变成一个生成器,每次返回一个协程的结果,与函数wait()一样,执行协程是乱序的,不会等所有协程执行完成才返回。例子:
importasyncio
asyncdefphase(i):
print('inphase{}'.format(i))
awaitasyncio.sleep(0.5-(0.1*i))
print('donewithphase{}'.format(i))
return'phase{}result'.format(i)
asyncdefmain(num_phases):
print('startingmain')
phases=[
phase(i)
foriinrange(num_phases)
]
print('waitingforphasestocomplete')
results=[]
fornext_to_completeinasyncio.as_completed(phases):
answer=awaitnext_to_complete
print('receivedanswer{!r}'.format(answer))
results.append(answer)
print('results:{!r}'.format(results))
returnresults
event_loop=asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(3))
finally:
event_loop.close()
结果输出如下:starting main
waiting for phases to complete
in phase 2
in phase 1
in phase 0
done with phase 2
received answer 'phase 2 result'
done with phase 1
received answer 'phase 1 result'
done with phase 0
received answer 'phase 0 result'
results: ['phase 2 result', 'phase 1 result', 'phase 0 result']
⑺ python里怎么实现多个协程一起执行,只要完成
importasyncio
asyncdefphase(i):
print('inphase{}'.format(i))
awaitasyncio.sleep(0.5-(0.1*i))
print('donewithphase{}'.format(i))
return'phase{}result'.format(i)
asyncdefmain(num_phases):
print('startingmain')
phases=[
phase(i)
foriinrange(num_phases)
]
print('waitingforphasestocomplete')
results=[]
fornext_to_completeinasyncio.as_completed(phases):
answer=awaitnext_to_complete
print('receivedanswer{!r}'.format(answer))
results.append(answer)
print('results:{!r}'.format(results))
returnresults
event_loop=asyncio.get_event_loop()
try:
event_loop.run_until_complete(main(3))
finally:
event_loop.close()