導航:首頁 > 編程語言 > python添加非同步函數

python添加非同步函數

發布時間:2022-04-18 23:57:33

python 非同步是什麼意思

非同步是計算機多線程的非同步處理。與同步處理相對,非同步處理不用阻塞當前線程來等待處理完成,而是允許後續操作,直至其它線程將處理完成,並回調通知此線程。

㈡ python里怎麼實現非同步調用

本文實例講述了python使用multiprocessing模塊實現帶回調函數的非同步調用方法。分享給大家供大家參考。具體分析如下:
multipressing模塊是python 2.6版本加入的,通過這個模塊可以輕松實現非同步調用
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=1)
# Start a worker processes.
result = pool.apply_async(f, [10], callback)
# Evaluate "f(10)" asynchronously calling callback when finished.
希望本文所述對大家的Python程序設計有所幫助。

㈢ 如何看待 Python 3.5支持Async/Await非同步編程

根據Python增強提案(PEP) 第0492號, Python 3.5將通過async和await語法增加對協程的支持。該提案目的是使協程成為Python語言的原生特性,並「建立一種普遍、易用的非同步編程思維模型。」

這個新提議中聲明一個協程的語法如下:

async def read_data(db):
pass

async是明確將函數聲明為協程的關鍵字,即便沒有使用await表達式。這樣的函數執行時會返回一個協程對象。

在協程函數內部,可在某個表達式之前使用await關鍵字來暫停協程的執行,以等待某進程完成:

async def read_data(db):
data = await db.fetch('SELECT ...')
...

由於增強版生成器的存在,Python中其實早已有了協程的形式,例如當yield或yield from聲明在Python生成器內部出現,該生成器就會被當作協程。

以下示例展示基於生成器的協程的用法:

>>> def createGenerator():
... mylist = range(3)
... for i in mylist:
... yield i*i
...
>>> mygenerator = createGenerator()
>>> for i in mygenerator:
... print(i)
0
1
4

以上代碼中,每當生成器在for循環中被調用,該生成器中的for循環就會返回一個新的值。

關於await用法的更多示例請參見上文提到的PEP #0492.

這個關於協程的新提案想明確地把生成器與協程區分開,這么做有如下好處:

㈣ python2.7怎麼實現非同步

改進之前
之前,我的查詢步驟很簡單,就是:
前端提交查詢請求 --> 建立資料庫連接 --> 新建游標 --> 執行命令 --> 接受結果 --> 關閉游標、連接
這幾大步驟的順序執行。
這裡面當然問題很大:
建立資料庫連接實際上就是新建一個套接字。這是進程間通信的幾種方法里,開銷最大的了。
在「執行命令」和「接受結果」兩個步驟中,線程在阻塞在資料庫內部的運行過程中,資料庫連接和游標都處於閑置狀態。
這樣一來,每一次查詢都要順序的新建資料庫連接,都要阻塞在資料庫返回結果的過程中。當前端提交大量查詢請求時,查詢效率肯定是很低的。
第一次改進
之前的模塊里,問題最大的就是第一步——建立資料庫連接套接字了。如果能夠一次性建立連接,之後查詢能夠反復服用這個連接就好了。
所以,首先應該把資料庫查詢模塊作為一個單獨的守護進程去執行,而前端app作為主進程響應用戶的點擊操作。那麼兩條進程怎麼傳遞消息呢?翻了幾天Python文檔,終於構思出來:用隊列queue作為生產者(web前端)向消費者(資料庫後端)傳遞任務的渠道。生產者,會與SQL命令一起,同時傳遞一個管道pipe的連接對象,作為任務完成後,回傳結果的渠道。確保,任務的接收方與發送方保持一致。
作為第二個問題的解決方法,可以使用線程池來並發獲取任務隊列中的task,然後執行命令並回傳結果。
第二次改進
第一次改進的效果還是很明顯的,不用任何測試手段。直接點擊頁面鏈接,可以很直觀地感覺到反應速度有很明顯的加快。
但是對於第二個問題,使用線程池還是有些欠妥當。因為,CPython解釋器存在GIL問題,所有線程實際上都在一個解釋器進程里調度。線程稍微開多一點,解釋器進程就會頻繁的切換線程,而線程切換的開銷也不小。線程多一點,甚至會出現「抖動」問題(也就是剛剛喚醒一個線程,就進入掛起狀態,剛剛換到棧幀或內存的上下文,又被換回內存或者磁碟),效率大大降低。也就是說,線程池的並發量很有限。
試過了多進程、多線程,只能在單個線程里做文章了。
Python中的asyncio庫
Python里有大量的協程庫可以實現單線程內的並發操作,比如Twisted、Gevent等等。Python官方在3.5版本里提供了asyncio庫同樣可以實現協程並發。asyncio庫大大降低了Python中協程的實現難度,就像定義普通函數那樣就可以了,只是要在def前面多加一個async關鍵詞。async def函數中,需要阻塞在其他async def函數的位置前面可以加上await關鍵詞。
import asyncio
async def wait():
await asyncio.sleep(2)
async def execute(task):
process_task(task)
await wait()
continue_job()
async def函數的執行稍微麻煩點。需要首先獲取一個loop對象,然後由這個對象代為執行async def函數。
loop = asyncio.get_event_loop()
loop.run_until_complete(execute(task))
loop.close()
loop在執行execute(task)函數時,如果遇到await關鍵字,就會暫時掛起當前協程,轉而去執行其他阻塞在await關鍵詞的協程,從而實現協程並發。
不過需要注意的是,run_until_complete()函數本身是一個阻塞函數。也就是說,當前線程會等候一個run_until_complete()函數執行完畢之後,才會繼續執行下一部函數。所以下面這段代碼並不能並發執行。
for task in task_list:
loop.run_until_complete(task)
對與這個問題,asyncio庫也有相應的解決方案:gather函數。
loop = asyncio.get_event_loop()
tasks = [asyncio.ensure_future(execute(task))
for task in task_list]
loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
當然了,async def函數的執行並不只有這兩種解決方案,還有call_soon與run_forever的配合執行等等,更多內容還請參考官方文檔。
Python下的I/O多路復用
協程,實際上,也存在上下文切換,只不過開銷很輕微。而I/O多路復用則完全不存在這個問題。
目前,linux上比較火的I/O多路復用API要算epoll了。Tornado,就是通過調用C語言封裝的epoll庫,成功解決了C10K問題(當然還有Pypy的功勞)。
在Linux里查文檔,可以看到epoll只有三類函數,調用起來比較方便易懂。
創建epoll對象,並返回其對應的文件描述符(file descriptor)。
int epoll_create(int size);
int epoll_create1(int flags);
控制監聽事件。第一個參數epfd就對應於前面命令創建的epoll對象的文件描述符;第二個參數表示該命令要執行的動作:監聽事件的新增、修改或者刪除;第三個參數,是要監聽的文件對應的描述符;第四個,代表要監聽的事件。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
等候。這是一個阻塞函數,調用者會等候內核通知所注冊的事件被觸發。
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events,
int maxevents, int timeout,
const sigset_t *sigmask);
在Python的select庫里:
select.epoll()對應於第一類創建函數;
epoll.register(),epoll.unregister(),epoll.modify()均是對控制函數epoll_ctl的封裝;
epoll.poll()則是對等候函數epoll_wait的封裝。
Python里epoll相關API的最大問題應該是在epoll.poll()。相比於其所封裝的epoll_wait,用戶無法手動指定要等候的事件,也就是後者的第二個參數struct epoll_event *events。沒法實現精確控制。因此只能使用替代方案:select.select()函數。
根據Python官方文檔,select.select(rlist, wlist, xlist[, timeout])是對Unix系統中select函數的直接調用,與C語言API的傳參很接近。前三個參數都是列表,其中的元素都是要注冊到內核的文件描述符。如果想用自定義類,就要確保實現了fileno()方法。
其分別對應於:
rlist: 等候直到可讀
wlist: 等候直到可寫
xlist: 等候直到異常。這個異常的定義,要查看系統文檔。
select.select(),類似於epoll.poll(),先注冊文件和事件,然後保持等候內核通知,是阻塞函數。
實際應用
Psycopg2庫支持對非同步和協程,但和一般情況下的用法略有區別。普通資料庫連接支持不同線程中的不同游標並發查詢;而非同步連接則不支持不同游標的同時查詢。所以非同步連接的不同游標之間必須使用I/O復用方法來協調調度。
所以,我的大致實現思路是這樣的:首先並發執行大量協程,從任務隊列中提取任務,再向連接池請求連接,創建游標,然後執行命令,並返回結果。在獲取游標和接受查詢結果之前,均要阻塞等候內核通知連接可用。
其中,連接池返回連接時,會根據引用連接的協程數量,返回負載最輕的連接。這也是自己定義AsyncConnectionPool類的目的。
我的代碼位於:bottle-blog/dbservice.py
存在問題
當然了,這個流程目前還一些問題。
首先就是每次輪詢拿到任務之後,都會走這么一個流程。
獲取連接 --> 新建游標 --> 執行任務 --> 關閉游標 --> 取消連接引用
本來,最好的情況應該是:在輪詢之前,就建好游標;在輪詢時,直接等候內核通知,執行相應任務。這樣可以減少輪詢時的任務量。但是如果協程提前對應好連接,那就不能保證在獲取任務時,保持各連接負載均衡了。
所以這一塊,還有工作要做。
還有就是epoll沒能用上,有些遺憾。
以後打算寫點C語言的內容,或者用Python/C API,或者用Ctypes包裝共享庫,來實現epoll的調用。
最後,請允許我吐槽一下Python的epoll相關文檔:簡直太弱了!!!必須看源碼才能弄清楚功能。

㈤ python 非同步請求的時候怎麼添加代理

有幾種方法。一種是設置環境變數http_proxy,它會自動訪問這個。 另外一種是你使用urllib2的時候,在參數里加上代理。還有一個是urllib上指定。

比如
import urllib
urllib.urlopen(某網站,proxyes={'http:':"某代理IP地址:代理的埠"})

使用QT時,它的瀏覽器設置代理要在瀏覽器初始化參數里指定。

㈥ python 多線程 怎麼改成非同步

python使用multiprocessing模塊實現帶回調函數的非同步調用方法。分享給大家供大家參考。具體分析如下:
multipressing模塊是python 2.6版本加入的,通過這個模塊可以輕松實現非同步調用
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=1)
# Start a worker processes.
result = pool.apply_async(f, [10], callback)
# Evaluate "f(10)" asynchronously calling callback when finished.
希望本文所述對大家的Python程序設計有所幫助。

㈦ python非同步有哪些方式

yield相當於return,他將相應的值返回給調用next()或者send()的調用者,從而交出了CPU使用權,而當調用者再次調用next()或者send()的時候,又會返回到yield中斷的地方,如果send有參數,還會將參數返回給yield賦值的變數,如果沒有就和next()一樣賦值為None。但是這里會遇到一個問題,就是嵌套使用generator時外層的generator需要寫大量代碼,看如下示例:
注意以下代碼均在Python3.6上運行調試

#!/usr/bin/env python# encoding:utf-8def inner_generator():
i = 0
while True:
i = yield i if i > 10: raise StopIterationdef outer_generator():
print("do something before yield")
from_inner = 0
from_outer = 1
g = inner_generator()
g.send(None) while 1: try:
from_inner = g.send(from_outer)
from_outer = yield from_inner except StopIteration: breakdef main():
g = outer_generator()
g.send(None)
i = 0
while 1: try:
i = g.send(i + 1)
print(i) except StopIteration: breakif __name__ == '__main__':
main()041

為了簡化,在Python3.3中引入了yield from

yield from

使用yield from有兩個好處,

1、可以將main中send的參數一直返回給最里層的generator,
2、同時我們也不需要再使用while循環和send (), next()來進行迭代。

我們可以將上邊的代碼修改如下:

def inner_generator():
i = 0
while True:
i = yield i if i > 10: raise StopIterationdef outer_generator():
print("do something before coroutine start") yield from inner_generator()def main():
g = outer_generator()
g.send(None)
i = 0
while 1: try:
i = g.send(i + 1)
print(i) except StopIteration: breakif __name__ == '__main__':
main()

執行結果如下:

do something before coroutine start123456789101234567891011

這里inner_generator()中執行的代碼片段我們實際就可以認為是協程,所以總的來說邏輯圖如下:

我們都知道Python由於GIL(Global Interpreter Lock)原因,其線程效率並不高,並且在*nix系統中,創建線程的開銷並不比進程小,因此在並發操作時,多線程的效率還是受到了很大制約的。所以後來人們發現通過yield來中斷代碼片段的執行,同時交出了cpu的使用權,於是協程的概念產生了。在Python3.4正式引入了協程的概念,代碼示例如下:

import asyncio# Borrowed from http://curio.readthedocs.org/en/latest/[email protected] countdown(number, n):
while n > 0:
print('T-minus', n, '({})'.format(number)) yield from asyncio.sleep(1)
n -= 1loop = asyncio.get_event_loop()
tasks = [
asyncio.ensure_future(countdown("A", 2)),
asyncio.ensure_future(countdown("B", 3))]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()12345678910111213141516

示例顯示了在Python3.4引入兩個重要概念協程和事件循環,
通過修飾符@asyncio.coroutine定義了一個協程,而通過event loop來執行tasks中所有的協程任務。之後在Python3.5引入了新的async & await語法,從而有了原生協程的概念。

async & await

在Python3.5中,引入了aync&await 語法結構,通過」aync def」可以定義一個協程代碼片段,作用類似於Python3.4中的@asyncio.coroutine修飾符,而await則相當於」yield from」。

先來看一段代碼,這個是我剛開始使用async&await語法時,寫的一段小程序。

#!/usr/bin/env python# encoding:utf-8import asyncioimport requestsimport time


async def wait_download(url):
response = await requets.get(url)
print("get {} response complete.".format(url))


async def main():
start = time.time()
await asyncio.wait([
wait_download("http://www.163.com"),
wait_download("http://www.mi.com"),
wait_download("http://www.google.com")])
end = time.time()
print("Complete in {} seconds".format(end - start))


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

這里會收到這樣的報錯:

Task exception was never retrieved
future: <Task finished coro=<wait_download() done, defined at asynctest.py:9> exception=TypeError("object Response can't be used in 'await' expression",)>
Traceback (most recent call last):
File "asynctest.py", line 10, in wait_download
data = await requests.get(url)
TypeError: object Response can't be used in 'await' expression123456

這是由於requests.get()函數返回的Response對象不能用於await表達式,可是如果不能用於await,還怎麼樣來實現非同步呢?
原來Python的await表達式是類似於」yield from」的東西,但是await會去做參數檢查,它要求await表達式中的對象必須是awaitable的,那啥是awaitable呢? awaitable對象必須滿足如下條件中其中之一:

1、A native coroutine object returned from a native coroutine function .

原生協程對象

2、A generator-based coroutine object returned from a function decorated with types.coroutine() .

types.coroutine()修飾的基於生成器的協程對象,注意不是Python3.4中asyncio.coroutine

3、An object with an await method returning an iterator.

實現了await method,並在其中返回了iterator的對象

根據這些條件定義,我們可以修改代碼如下:

#!/usr/bin/env python# encoding:utf-8import asyncioimport requestsimport time


async def download(url): # 通過async def定義的函數是原生的協程對象
response = requests.get(url)
print(response.text)


async def wait_download(url):
await download(url) # 這里download(url)就是一個原生的協程對象
print("get {} data complete.".format(url))


async def main():
start = time.time()
await asyncio.wait([
wait_download("http://www.163.com"),
wait_download("http://www.mi.com"),
wait_download("http://www.google.com")])
end = time.time()
print("Complete in {} seconds".format(end - start))


loop = asyncio.get_event_loop()
loop.run_until_complete(main())27282930

好了現在一個真正的實現了非同步編程的小程序終於誕生了。
而目前更牛逼的非同步是使用uvloop或者pyuv,這兩個最新的Python庫都是libuv實現的,可以提供更加高效的event loop。

uvloop和pyuv

pyuv實現了Python2.x和3.x,但是該項目在github上已經許久沒有更新了,不知道是否還有人在維護。
uvloop只實現了3.x, 但是該項目在github上始終活躍。

它們的使用也非常簡單,以uvloop為例,只需要添加以下代碼就可以了

import asyncioimport uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())123

㈧ 如何利用python asyncio編寫非同步爬蟲

不兼容 最明顯的是print變成了函數 最重要的變化一是python2里的str變為了python3里的byte,而str由unicode str取代,因此一些網路編程,hash加密的函數需要將參數encode處理。 第二點是大量的python2庫沒有被移植到python3,以request為例

㈨ Ppython await是什麼

await的解釋:

await用來聲明程序掛起。

比如非同步程序執行到某一步時需要等待的時間很長,就將此掛起,去執行其他的非同步程序。

await 後面只能跟非同步程序或有__await__屬性的對象,因為非同步程序與一般程序不同。

程序解釋:

假設有兩個非同步函數async a,async b,a中的某一步有await,

當程序碰到關鍵字await b()後,非同步程序掛起後去執行另一個非同步b程序,就是從函數內部跳出去執行其他函數,

當掛起條件消失後,不管b是否執行完,要馬上從b程序中跳出來,回到原程序執行原來的操作。

如果await後面跟的b函數不是非同步函數,那麼操作就只能等b執行完再返回,無法在b執行的過程中返回。

如果要在b執行完才返回,也就不需要用await關鍵字了,直接調用b函數就行。

所以這就需要await後面跟的是非同步函數了。

在一個非同步函數中,可以不止一次掛起,也就是可以用多個await。

更多Python知識,請關註:Python自學網!!

㈩ python 開發經驗怎麼xie

當我開始學習Python的時候,有些事我希望我一早就知道。我花費了很多時間才學會這些東西。我想要把這些重點都編纂到一篇文章當中。這篇文章的目標讀者,是剛剛開始學習Python語言的有經驗的程序員,想要跳過前幾個月研究Python使用的那些他們已經在用的類似工具。包管理和標准工具這兩節對於初學者來說同樣很有幫助。
我的經驗主要基於Python 2.7,但是大多數的工具對任何版本都有效。
如果你從來沒有使用過Python,我強烈建議你閱讀Python introction,因為你需要知道基本的語法和類型。
包管理
Python世界最棒的地方之一,就是大量的第三方程序包。同樣,管理這些包也非常容易。按照慣例,會在 requirements.txt 文件中列出項目所需要的包。每個包佔一行,通常還包含版本號。這里有一個例子,本博客使用Pelican:

1
2
3

pelican==3.3
Markdown
pelican-extended-sitemap==1.0.0

Python 程序包有一個缺陷是,它們默認會進行全局安裝。我們將要使用一個工具,使我們每個項目都有一個獨立的環境,這個工具叫virtualenv。我們同樣要安裝一個更高級的包管理工具,叫做pip,他可以和virtualenv配合工作。
首先,我們需要安裝pip。大多數python安裝程序已經內置了easy_install(python默認的包管理工具),所以我們就使用easy_install pip來安裝pip。這應該是你最後一次使用easy_install 了。如果你並沒有安裝easy_install ,在linux系統中,貌似從python-setuptools 包中可以獲得。
如果你使用的Python版本高於等於3.3, 那麼Virtualenv 已經是標准庫的一部分了,所以沒有必要再去安裝它了。
下一步,你希望安裝virtualenv和virtualenvwrapper。Virtualenv使你能夠為每個項目創造一個獨立的環境。尤其是當你的不同項目使用不同版本的包時,這一點特別有用。Virtualenv wrapper 提供了一些不錯的腳本,可以讓一些事情變得容易。

1

sudo pip install virtualenvwrapper

當virtualenvwrapper安裝後,它會把virtualenv列為依賴包,所以會自動安裝。
打開一個新的shell,輸入mkvirtualenv test 。如果你打開另外一個shell,則你就不在這個virtualenv中了,你可以通過workon test 來啟動。如果你的工作完成了,可以使用deactivate 來停用。

IPython
IPython是標准Python互動式的編程環境的一個替代品,支持自動補全,文檔快速訪問,以及標准互動式編程環境本應該具備的很多其他功能。
當你處在一個虛擬環境中的時候,可以很簡單的使用pip install ipython 來進行安裝,在命令行中使用ipython 來啟動

另一個不錯的功能是」筆記本」,這個功能需要額外的組件。安裝完成後,你可以使用ipython notebook,而且會有一個不錯的網頁UI,你可以創建筆記本。這在科學計算領域很流行。

測試
我推薦使用nose或是py.test。我大部分情況下用nose。它們基本上是類似的。我將講解nose的一些細節。
這里有一個人為創建的可笑的使用nose進行測試的例子。在一個以test_開頭的文件中的所有以test_開頭的函數,都會被調用:

1
2

def test_equality():
assert True == False

不出所料,當運行nose的時候,我們的測試沒有通過。

1
2
3
4
5
6
7
8
9
10
11
12
13

(test)jhaddad@jons-mac-pro ~VIRTUAL_ENV/src$ nosetests
F
======================================================================
FAIL: test_nose_example.test_equality
----------------------------------------------------------------------
Traceback (most recent call last):
File "/Users/jhaddad/.virtualenvs/test/lib/python2.7/site-packages/nose/case.py", line 197, in runTest
self.test(*self.arg)
File "/Users/jhaddad/.virtualenvs/test/src/test_nose_example.py", line 3, in test_equality
assert True == False
AssertionError
----------------------------------------------------------------------

nose.tools中同樣也有一些便捷的方法可以調用

1
2
3

from nose.tools import assert_true
def test_equality():
assert_true(False)

如果你想使用更加類似JUnit的方法,也是可以的:

1
2
3
4
5
6
7
8
9
10

from nose.tools import assert_true
from unittest import TestCase
class ExampleTest(TestCase):
def setUp(self): # setUp & tearDown are both available
self.blah = False
def test_blah(self):
self.assertTrue(self.blah)

開始測試:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

(test)jhaddad@jons-mac-pro ~VIRTUAL_ENV/src$ nosetests
F
======================================================================
FAIL: test_blah (test_nose_example.ExampleTest)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/Users/jhaddad/.virtualenvs/test/src/test_nose_example.py", line 11, in test_blah
self.assertTrue(self.blah)
AssertionError: False is not true
----------------------------------------------------------------------
Ran 1 test in 0.003s
FAILED (failures=1)

卓越的Mock庫包含在Python 3 中,但是如果你在使用Python 2,可以使用pypi來獲取。這個測試將進行一個遠程調用,但是這次調用將耗時10s。這個例子顯然是人為捏造的。我們使用mock來返回樣本數據而不是真正的進行調用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14

import mock
from mock import patch
from time import sleep
class Sweetness(object):
def slow_remote_call(self):
sleep(10)
return "some_data" # lets pretend we get this back from our remote api call
def test_long_call():
s = Sweetness()
result = s.slow_remote_call()
assert result == "some_data"

當然,我們的測試需要很長的時間。

1
2
3
4
5

(test)jhaddad@jons-mac-pro ~VIRTUAL_ENV/src$ nosetests test_mock.py
Ran 1 test in 10.001s
OK

太慢了!因此我們會問自己,我們在測試什麼?我們需要測試遠程調用是否有用,還是我們要測試當我們獲得數據後要做什麼?大多數情況下是後者。讓我們擺脫這個愚蠢的遠程調用吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

import mock
from mock import patch
from time import sleep
class Sweetness(object):
def slow_remote_call(self):
sleep(10)
return "some_data" # lets pretend we get this back from our remote api call
def test_long_call():
s = Sweetness()
with patch.object(s, "slow_remote_call", return_value="some_data"):
result = s.slow_remote_call()
assert result == "some_data"

好吧,讓我們再試一次:

1
2
3
4
5
6

(test)jhaddad@jons-mac-pro ~VIRTUAL_ENV/src$ nosetests test_mock.py
.
----------------------------------------------------------------------
Ran 1 test in 0.001s
OK

好多了。記住,這個例子進行了荒唐的簡化。就我個人來講,我僅僅會忽略從遠程系統的調用,而不是我的資料庫調用。
nose-progressive是一個很好的模塊,它可以改善nose的輸出,讓錯誤在發生時就顯示出來,而不是留到最後。如果你的測試需要花費一定的時間,那麼這是件好事。
pip install nose-progressive 並且在你的nosetests中添加--with-progressive
調試
iPDB是一個極好的工具,我已經用它查出了很多匪夷所思的bug。pip install ipdb 安裝該工具,然後在你的代碼中import ipdb; ipdb.set_trace(),然後你會在你的程序運行時,獲得一個很好的互動式提示。它每次執行程序的一行並且檢查變數。

python內置了一個很好的追蹤模塊,幫助我搞清楚發生了什麼。這里有一個沒什麼用的python程序:

1
2
3

a = 1
b = 2
a = b

這里是對這個程序的追蹤結果:

1
2
3
4
5
6
7

(test)jhaddad@jons-mac-pro ~VIRTUAL_ENV/src$ python -m trace --trace tracing.py 1 ?
--- molename: tracing, funcname: <mole>
tracing.py(1): a = 1
tracing.py(2): b = 2
tracing.py(3): a = b
--- molename: trace, funcname: _unsettrace
trace.py(80): sys.settrace(None)

當你想要搞清楚其他程序的內部構造的時候,這個功能非常有用。如果你以前用過strace,它們的工作方式很相像
在一些場合,我使用pycallgraph來追蹤性能問題。它可以創建函數調用時間和次數的圖表。

最後,objgraph對於查找內存泄露非常有用。這里有一篇關於如何使用它查找內存泄露的好文。
Gevent
Gevent 是一個很好的庫,封裝了Greenlets,使得Python具備了非同步調用的功能。是的,非常棒。我最愛的功能是Pool,它抽象了非同步調用部分,給我們提供了可以簡單使用的途徑,一個非同步的map()函數:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

from gevent import monkey
monkey.patch_all()
from time import sleep, time
def fetch_url(url):
print "Fetching %s" % url
sleep(10)
print "Done fetching %s" % url
from gevent.pool import Pool
urls = ["http://test.com", "http://bacon.com", "http://eggs.com"]
p = Pool(10)
start = time()
p.map(fetch_url, urls)
print time() - start

非常重要的是,需要注意這段代碼頂部對gevent monkey進行的補丁,如果沒有它的話,就不能正確的運行。如果我們讓Python連續調用 fetch_url 3次,通常我們期望這個過程花費30秒時間。使用gevent:

1
2
3
4
5
6
7
8

(test)jhaddad@jons-mac-pro ~VIRTUAL_ENV/src$ python g.py
Fetching http://test.com
Fetching http://bacon.com
Fetching http://eggs.com
Done fetching http://test.com
Done fetching http://bacon.com
Done fetching http://eggs.com
10.001791954

如果你有很多資料庫調用或是從遠程URLs獲取,這是非常有用的。我並不是很喜歡回調函數,所以這一抽象對我來說效果很好。

閱讀全文

與python添加非同步函數相關的資料

熱點內容
安卓qq瀏覽器怎麼轉換到ios 瀏覽:292
不同編譯器的庫可以調用嗎 瀏覽:455
灰度信託基金加密 瀏覽:421
宿遷程序員兼職網上接單 瀏覽:924
電視編譯器怎麼設置 瀏覽:276
手機如何解壓漢字密碼的壓縮包 瀏覽:701
為什麼很多程序員愛用vim 瀏覽:828
安卓手機怎麼連接寶華韋健音響 瀏覽:555
12星座製作解壓球 瀏覽:867
java調用oracle數據 瀏覽:827
怎麼在伺服器上上傳小程序源碼 瀏覽:304
空中加油通達信指標公式源碼 瀏覽:38
分卷解壓只解壓了一部分 瀏覽:760
php網站自動登錄 瀏覽:705
合肥凌達壓縮機招聘 瀏覽:965
怎麼找到文件夾的圖標 瀏覽:237
linuxc編程pdf百度雲 瀏覽:192
會計pdf下載 瀏覽:835
c開源cf源碼 瀏覽:951
如何取消掉添加進app資源庫 瀏覽:732