1. python 多線程與多進程問題
監控一個信號就起一個線程與進程處理。這樣的邏輯是不太合適的。所有的資源都是有限的,如果這樣浪費很快會資源管理失控。
常規的做法是起一個線程池,或者是進程池。 使用線程還是進程取決於你處理的信號的類型。如果計算量大,則需要進程池,如果只是設備等待,比如網路數據收發,則線程也勉強夠用。
信號過來後處理方法有兩種,一種是實時處理,這個沒有好辦法,可以用「微線程」的辦法做,盡量減少處理周期。另外一種是允許少量的延遲。那麼通常的做法是用隊列。將信號放到線程或者是進程池的消息隊列里。然後再由後者分配。
還有一種高效的處理方法,根據信號的值做hash,然後自動分發到不同的CPU或者是伺服器。這個就算是大規模並發處理機制。
通常情況下,比如一個WEB伺服器,它需要獲取一個請求,然後處理響應,可以使用線程模型,或者是進程模型。也是使用典型的池的方法。一個Pool的大於,取決於你的計算 機的計算 能力,內存大小,以及你的並發訪問數量。
所要要啟用多少個呢?假設你的一個信號的處理周期是1秒,你同時有100個信號進來,那麼就需要100個線程或者是進程。
線程數過多,表面上處理能力在增加,不過延遲也在增加,失敗率也會增加。
2. python 線程阻塞了怎麼處理
我用thread和threading.thread測試了,都不存在你所說的問題。
time.sleep的c源碼(python2.6.8/Moles/timemole.c: floatsleep函數)我也看了,每一個分支都有Py_BEGIN_ALLOW_THREADS(即允許python解釋器運行在其他線程上)。
綜上,要麼是你的代碼寫錯了,要麼是你沒有看明白現象——只是你以為沒有產生新線程。
3. python多線程幾種方法實現
Python進階(二十六)-多線程實現同步的四種方式
臨界資源即那些一次只能被一個線程訪問的資源,典型例子就是列印機,它一次只能被一個程序用來執行列印功能,因為不能多個線程同時操作,而訪問這部分資源的代碼通常稱之為臨界區。
鎖機制
threading的Lock類,用該類的acquire函數進行加鎖,用realease函數進行解鎖
import threadingimport timeclass Num:
def __init__(self):
self.num = 0
self.lock = threading.Lock() def add(self):
self.lock.acquire()#加鎖,鎖住相應的資源
self.num += 1
num = self.num
self.lock.release()#解鎖,離開該資源
return num
n = Num()class jdThread(threading.Thread):
def __init__(self,item):
threading.Thread.__init__(self)
self.item = item def run(self):
time.sleep(2)
value = n.add()#將num加1,並輸出原來的數據和+1之後的數據
print(self.item,value)for item in range(5):
t = jdThread(item)
t.start()
t.join()#使線程一個一個執行
當一個線程調用鎖的acquire()方法獲得鎖時,鎖就進入「locked」狀態。每次只有一個線程可以獲得鎖。如果此時另一個線程試圖獲得這個鎖,該線程就會變為「blocked」狀態,稱為「同步阻塞」(參見多線程的基本概念)。
直到擁有鎖的線程調用鎖的release()方法釋放鎖之後,鎖進入「unlocked」狀態。線程調度程序從處於同步阻塞狀態的線程中選擇一個來獲得鎖,並使得該線程進入運行(running)狀態。
信號量
信號量也提供acquire方法和release方法,每當調用acquire方法的時候,如果內部計數器大於0,則將其減1,如果內部計數器等於0,則會阻塞該線程,知道有線程調用了release方法將內部計數器更新到大於1位置。
import threadingimport timeclass Num:
def __init__(self):
self.num = 0
self.sem = threading.Semaphore(value = 3) #允許最多三個線程同時訪問資源
def add(self):
self.sem.acquire()#內部計數器減1
self.num += 1
num = self.num
self.sem.release()#內部計數器加1
return num
n = Num()class jdThread(threading.Thread):
def __init__(self,item):
threading.Thread.__init__(self)
self.item = item def run(self):
time.sleep(2)
value = n.add()
print(self.item,value)for item in range(100):
4. python多線程的幾種方法
Python進階(二十六)-多線程實現同步的四種方式
臨界資源即那些一次只能被一個線程訪問的資源,典型例子就是列印機,它一次只能被一個程序用來執行列印功能,因為不能多個線程同時操作,而訪問這部分資源的代碼通常稱之為臨界區。
鎖機制
threading的Lock類,用該類的acquire函數進行加鎖,用realease函數進行解鎖
import threadingimport timeclass Num:
def __init__(self):
self.num = 0
self.lock = threading.Lock() def add(self):
self.lock.acquire()#加鎖,鎖住相應的資源
self.num += 1
num = self.num
self.lock.release()#解鎖,離開該資源
return num
n = Num()class jdThread(threading.Thread):
def __init__(self,item):
threading.Thread.__init__(self)
self.item = item def run(self):
time.sleep(2)
value = n.add()#將num加1,並輸出原來的數據和+1之後的數據
print(self.item,value)for item in range(5):
t = jdThread(item)
t.start()
t.join()#使線程一個一個執行
當一個線程調用鎖的acquire()方法獲得鎖時,鎖就進入「locked」狀態。每次只有一個線程可以獲得鎖。如果此時另一個線程試圖獲得這個鎖,該線程就會變為「blocked」狀態,稱為「同步阻塞」(參見多線程的基本概念)。
直到擁有鎖的線程調用鎖的release()方法釋放鎖之後,鎖進入「unlocked」狀態。線程調度程序從處於同步阻塞狀態的線程中選擇一個來獲得鎖,並使得該線程進入運行(running)狀態。
信號量
信號量也提供acquire方法和release方法,每當調用acquire方法的時候,如果內部計數器大於0,則將其減1,如果內部計數器等於0,則會阻塞該線程,知道有線程調用了release方法將內部計數器更新到大於1位置。
import threadingimport timeclass Num:
def __init__(self):
self.num = 0
self.sem = threading.Semaphore(value = 3) #允許最多三個線程同時訪問資源
def add(self):
self.sem.acquire()#內部計數器減1
self.num += 1
num = self.num
self.sem.release()#內部計數器加1
return num
n = Num()class jdThread(threading.Thread):
def __init__(self,item):
threading.Thread.__init__(self)
self.item = item def run(self):
time.sleep(2)
value = n.add()
print(self.item,value)for item in range(100):
5. Python多線程的一些問題
python提供了兩個模塊來實現多線程thread 和threading ,thread 有一些缺點,在threading 得到了彌補,為了不浪費你和時間,所以我們直接學習threading 就可以了。
繼續對上面的例子進行改造,引入threadring來同時播放音樂和視頻:
#coding=utf-8import threadingfrom time import ctime,sleepdef music(func): for i in range(2): print "I was listening to %s. %s" %(func,ctime())
sleep(1)def move(func): for i in range(2): print "I was at the %s! %s" %(func,ctime())
sleep(5)
threads = []
t1 = threading.Thread(target=music,args=(u'愛情買賣',))
threads.append(t1)
t2 = threading.Thread(target=move,args=(u'阿凡達',))
threads.append(t2)if __name__ == '__main__': for t in threads:
t.setDaemon(True)
t.start() print "all over %s" %ctime()
import threading
首先導入threading 模塊,這是使用多線程的前提。
threads = []
t1 = threading.Thread(target=music,args=(u'愛情買賣',))
threads.append(t1)
創建了threads數組,創建線程t1,使用threading.Thread()方法,在這個方法中調用music方法target=music,args方法對music進行傳參。 把創建好的線程t1裝到threads數組中。
接著以同樣的方式創建線程t2,並把t2也裝到threads數組。
for t in threads:
t.setDaemon(True)
t.start()
最後通過for循環遍歷數組。(數組被裝載了t1和t2兩個線程)
setDaemon()
setDaemon(True)將線程聲明為守護線程,必須在start() 方法調用之前設置,如果不設置為守護線程程序會被無限掛起。子線程啟動後,父線程也繼續執行下去,當父線程執行完最後一條語句print "all over %s" %ctime()後,沒有等待子線程,直接就退出了,同時子線程也一同結束。
start()
開始線程活動。
運行結果:
>>> ========================= RESTART ================================
>>> I was listening to 愛情買賣. Thu Apr 17 12:51:45 2014 I was at the 阿凡達! Thu Apr 17 12:51:45 2014 all over Thu Apr 17 12:51:45 2014
從執行結果來看,子線程(muisc 、move )和主線程(print "all over %s" %ctime())都是同一時間啟動,但由於主線程執行完結束,所以導致子線程也終止。
繼續調整程序:
...if __name__ == '__main__': for t in threads:
t.setDaemon(True)
t.start()
t.join() print "all over %s" %ctime()
我們只對上面的程序加了個join()方法,用於等待線程終止。join()的作用是,在子線程完成運行之前,這個子線程的父線程將一直被阻塞。
注意: join()方法的位置是在for循環外的,也就是說必須等待for循環里的兩個進程都結束後,才去執行主進程。
運行結果:
>>> ========================= RESTART ================================
>>> I was listening to 愛情買賣. Thu Apr 17 13:04:11 2014 I was at the 阿凡達! Thu Apr 17 13:04:11 2014I was listening to 愛情買賣. Thu Apr 17 13:04:12 2014I was at the 阿凡達! Thu Apr 17 13:04:16 2014all over Thu Apr 17 13:04:21 2014
從執行結果可看到,music 和move 是同時啟動的。
開始時間4分11秒,直到調用主進程為4分22秒,總耗時為10秒。從單線程時減少了2秒,我們可以把music的sleep()的時間調整為4秒。
...def music(func): for i in range(2): print "I was listening to %s. %s" %(func,ctime())
sleep(4)
...
子線程啟動11分27秒,主線程運行11分37秒。
雖然music每首歌曲從1秒延長到了4 ,但通多程線的方式運行腳本,總的時間沒變化。
6. python線程如何捕獲中斷信號
import threadtext = Nonedef get_input(): global text text = raw_input()def main(): while True: print "running" if text != None: breakthread.start_new_thread(get_input,())main()
global全局變數吧?看看上面這段代碼是我常用的套路,main()是一個無限循環一直輸出running,如果你按下回車讓get_input()線程里的text變成不是None了,那麼main()就斷了,希望對你有幫助
7. python實現了多線程,如果使用了命令kill把正在運行的進程kill掉的話,可能某些線程會出錯,怎麼解決
主線程捕獲kill的信號以後去終止其他線程, 等其他線程完成以後, 再在主線程中退出.不過更現代的實現並發的方法是使用非同步, 而不是多線程.python實現了多線程,如果使用了命令kill把正在運行的進程kill掉的話,可能某些線程會出錯,怎麼解決?
8. python 怎麼讓程序接受ctrl + c終止信號
花了一天時間用python為服務寫了個壓力測試。很簡單,多線程向伺服器發請求。但寫完之後發現如果中途想停下來,按Ctrl+C達不到效果,自然想到要用信號處理函數捕捉信號,使線程都停下來,問題解決的方法請往下看:
復制代碼代碼如下:
#!/bin/env python
# -*- coding: utf-8 -*-
#filename: peartest.py
import threading, signal
is_exit = False
def doStress(i, cc):
global is_exit
idx = i
while not is_exit:
if (idx < 10000000):
print "thread[%d]: idx=%d"%(i, idx)
idx = idx + cc
else:
break
print "thread[%d] complete."%i
def handler(signum, frame):
global is_exit
is_exit = True
print "receive a signal %d, is_exit = %d"%(signum, is_exit)
if __name__ == "__main__":
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
cc = 5
for i in range(cc):
t = threading.Thread(target=doStress, args=(i,cc))
t.start()
上面是一個模擬程序,並不真正向服務發送請求,而代之以在一千萬以內,每個線程每隔並發數個(cc個)列印一個整數。很明顯,當所有線程都完成自己的任務後,進程會正常退出。但如果我們中途想退出(試想一個壓力測試程序,在中途已經發現了問題,需要停止測試),該腫么辦?你當然可以用ps查找到進程號,然後kill -9殺掉,但這樣太繁瑣了,捕捉Ctrl+C是最自然的想法。上面示常式序中已經捕捉了這個信號,並修改全局變數is_exit,線程中會檢測這個變數,及時退出。
但事實上這個程序並不work,當你按下Ctrl+C時,程序照常運行,並無任何響應。網上搜了一些資料,明白是python的子線程如果不是daemon的話,主線程是不能響應任何中斷的。但設為daemon後主線程會隨之退出,接著整個進程很快就退出了,所以還需要在主線程中檢測各個子線程的狀態,直到所有子線程退出後自己才退出,因此上例29行之後的代碼可以修改為:
復制代碼代碼如下:
threads=[]
for i in range(cc):
t = threading.Thread(target=doStress, args=(i, cc))
t.setDaemon(True)
threads.append(t)
t.start()
for i in range(cc):
threads[i].join()
重新試一下,問題依然沒有解決,進程還是沒有響應Ctrl+C,這是因為join()函數同樣會waiting在一個鎖上,使主線程無法捕獲信號。因此繼續修改,調用線程的isAlive()函數判斷線程是否完成:
復制代碼代碼如下:
while 1:
alive = False
for i in range(cc):
alive = alive or threads[i].isAlive()
if not alive:
break
這樣修改後,程序完全按照預想運行了:可以順利的列印每個線程應該列印的所有數字,也可以中途用Ctrl+C終結整個進程。完整的代碼如下:
復制代碼代碼如下:
#!/bin/env python
# -*- coding: utf-8 -*-
#filename: peartest.py
import threading, signal
is_exit = False
def doStress(i, cc):
global is_exit
idx = i
while not is_exit:
if (idx < 10000000):
print "thread[%d]: idx=%d"%(i, idx)
idx = idx + cc
else:
break
if is_exit:
print "receive a signal to exit, thread[%d] stop."%i
else:
print "thread[%d] complete."%i
def handler(signum, frame):
global is_exit
is_exit = True
print "receive a signal %d, is_exit = %d"%(signum, is_exit)
if __name__ == "__main__":
signal.signal(signal.SIGINT, handler)
signal.signal(signal.SIGTERM, handler)
cc = 5
threads = []
for i in range(cc):
t = threading.Thread(target=doStress, args=(i,cc))
t.setDaemon(True)
threads.append(t)
t.start()
while 1:
alive = False
for i in range(cc):
alive = alive or threads[i].isAlive()
if not alive:
break
其實,如果用python寫一個服務,也需要這樣,因為負責服務的那個線程是永遠在那裡接收請求的,不會退出,而如果你想用Ctrl+C殺死整個服務,跟上面的壓力測試程序是一個道理。總結一下,python多線程中要響應Ctrl+C的信號以殺死整個進程,需要:
1.把所有子線程設為Daemon;
2.使用isAlive()函數判斷所有子線程是否完成,而不是在主線程中用join()函數等待完成;
3.寫一個響應Ctrl+C信號的函數,修改全局變數,使得各子線程能夠檢測到,並正常退出。
9. 深入解析Python中的線程同步方法
深入解析Python中的線程同步方法
同步訪問共享資源
在使用線程的時候,一個很重要的問題是要避免多個線程對同一變數或其它資源的訪問沖突。一旦你稍不留神,重疊訪問、在多個線程中修改(共享資源)等這些操作會導致各種各樣的問題;更嚴重的是,這些問題一般只會在比較極端(比如高並發、生產伺服器、甚至在性能更好的硬體設備上)的情況下才會出現。
比如有這樣一個情況:需要追蹤對一事件處理的次數
counter = 0
def process_item(item):
global counter
... do something with item ...
counter += 1
如果你在多個線程中同時調用這個函數,你會發現counter的值不是那麼准確。在大多數情況下它是對的,但有時它會比實際的少幾個。
出現這種情況的原因是,計數增加操作實際上分三步執行:
解釋器獲取counter的當前值計算新值將計算的新值回寫counter變數
考慮一下這種情況:在當前線程獲取到counter值後,另一個線程搶佔到了CPU,然後同樣也獲取到了counter值,並進一步將counter值重新計算並完成回寫;之後時間片重新輪到當前線程(這里僅作標識區分,並非實際當前),此時當前線程獲取到counter值還是原來的,完成後續兩步操作後counter的值實際只加上1。
另一種常見情況是訪問不完整或不一致狀態。這類情況主要發生在一個線程正在初始化或更新數據時,另一個進程卻嘗試讀取正在更改的數據。
原子操作
實現對共享變數或其它資源的同步訪問最簡單的方法是依靠解釋器的原子操作。原子操作是在一步完成執行的操作,在這一步中其它線程無法獲得該共享資源。
通常情況下,這種同步方法只對那些只由單個核心數據類型組成的共享資源有效,譬如,字元串變數、數字、列表或者字典等。下面是幾個線程安全的操作:
讀或者替換一個實例屬性讀或者替換一個全局變數從列表中獲取一項元素原位修改一個列表(例如:使用append增加一個列表項)從字典中獲取一項元素原位修改一個字典(例如:增加一個字典項、調用clear方法)
注意,上面提到過,對一個變數或者屬性進行讀操作,然後修改它,最終將其回寫不是線程安全的。因為另外一個線程會在這個線程讀完卻沒有修改或回寫完成之前更改這個共享變數/屬性。
鎖
鎖是Python的threading模塊提供的最基本的同步機制。在任一時刻,一個鎖對象可能被一個線程獲取,或者不被任何線程獲取。如果一個線程嘗試去獲取一個已經被另一個線程獲取到的鎖對象,那麼這個想要獲取鎖對象的線程只能暫時終止執行直到鎖對象被另一個線程釋放掉。
鎖通常被用來實現對共享資源的同步訪問。為每一個共享資源創建一個Lock對象,當你需要訪問該資源時,調用acquire方法來獲取鎖對象(如果其它線程已經獲得了該鎖,則當前線程需等待其被釋放),待資源訪問完後,再調用release方法釋放鎖:
lock = Lock()
lock.acquire() #: will block if lock is already held
... access shared resource
lock.release()
注意,即使在訪問共享資源的過程中出錯了也應該釋放鎖,可以用try-finally來達到這一目的:
lock.acquire()
try:
... access shared resource
finally:
lock.release() #: release lock, no matter what
在Python 2.5及以後的版本中,你可以使用with語句。在使用鎖的時候,with語句會在進入語句塊之前自動的獲取到該鎖對象,然後在語句塊執行完成後自動釋放掉鎖:
from __future__ import with_statement #: 2.5 only
with lock:
... access shared resource
acquire方法帶一個可選的等待標識,它可用於設定當有其它線程佔有鎖時是否阻塞。如果你將其值設為False,那麼acquire方法將不再阻塞,只是如果該鎖被佔有時它會返回False:
if not lock.acquire(False):
... 鎖資源失敗
else:
try:
... access shared resource
finally:
lock.release()
你可以使用locked方法來檢查一個鎖對象是否已被獲取,注意不能用該方法來判斷調用acquire方法時是否會阻塞,因為在locked方法調用完成到下一條語句(比如acquire)執行之間該鎖有可能被其它線程佔有。
if not lock.locked():
#: 其它線程可能在下一條語句執行之前佔有了該鎖
lock.acquire() #: 可能會阻塞
簡單鎖的缺點
標準的鎖對象並不關心當前是哪個線程佔有了該鎖;如果該鎖已經被佔有了,那麼任何其它嘗試獲取該鎖的線程都會被阻塞,即使是佔有鎖的這個線程。考慮一下下面這個例子:
lock = threading.Lock()
def get_first_part():
lock.acquire()
try:
... 從共享對象中獲取第一部分數據
finally:
lock.release()
return data
def get_second_part():
lock.acquire()
try:
... 從共享對象中獲取第二部分數據
finally:
lock.release()
return data
示例中,我們有一個共享資源,有兩個分別取這個共享資源第一部分和第二部分的函數。兩個訪問函數都使用了鎖來確保在獲取數據時沒有其它線程修改對應的共享數據。
現在,如果我們想添加第三個函數來獲取兩個部分的數據,我們將會陷入泥潭。一個簡單的方法是依次調用這兩個函數,然後返回結合的結果:
def get_both_parts():
first = get_first_part()
seconde = get_second_part()
return first, second
這里的問題是,如有某個線程在兩個函數調用之間修改了共享資源,那麼我們最終會得到不一致的數據。最明顯的解決方法是在這個函數中也使用lock:
def get_both_parts():
lock.acquire()
try:
first = get_first_part()
seconde = get_second_part()
finally:
lock.release()
return first, second
然而,這是不可行的。裡面的兩個訪問函數將會阻塞,因為外層語句已經佔有了該鎖。為了解決這個問題,你可以通過使用標記在訪問函數中讓外層語句釋放鎖,但這樣容易失去控制並導致出錯。幸運的是,threading模塊包含了一個更加實用的鎖實現:re-entrant鎖。
Re-Entrant Locks (RLock)
RLock類是簡單鎖的另一個版本,它的特點在於,同一個鎖對象只有在被其它的線程佔有時嘗試獲取才會發生阻塞;而簡單鎖在同一個線程中同時只能被佔有一次。如果當前線程已經佔有了某個RLock鎖對象,那麼當前線程仍能再次獲取到該RLock鎖對象。
lock = threading.Lock()
lock.acquire()
lock.acquire() #: 這里將會阻塞
lock = threading.RLock()
lock.acquire()
lock.acquire() #: 這里不會發生阻塞
RLock的主要作用是解決嵌套訪問共享資源的問題,就像前面描述的示例。要想解決前面示例中的問題,我們只需要將Lock換為RLock對象,這樣嵌套調用也會OK.
lock = threading.RLock()
def get_first_part():
... see above
def get_second_part():
... see above
def get_both_parts():
... see above
這樣既可以單獨訪問兩部分數據也可以一次訪問兩部分數據而不會被鎖阻塞或者獲得不一致的數據。
注意RLock會追蹤遞歸層級,因此記得在acquire後進行release操作。
Semaphores
信號量是一個更高級的鎖機制。信號量內部有一個計數器而不像鎖對象內部有鎖標識,而且只有當佔用信號量的線程數超過信號量時線程才阻塞。這允許了多個線程可以同時訪問相同的代碼區。
semaphore = threading.BoundedSemaphore()
semaphore.acquire() #: counter減小
... 訪問共享資源
semaphore.release() #: counter增大
當信號量被獲取的時候,計數器減小;當信號量被釋放的時候,計數器增大。當獲取信號量的時候,如果計數器值為0,則該進程將阻塞。當某一信號量被釋放,counter值增加為1時,被阻塞的線程(如果有的話)中會有一個得以繼續運行。
信號量通常被用來限制對容量有限的資源的訪問,比如一個網路連接或者資料庫伺服器。在這類場景中,只需要將計數器初始化為最大值,信號量的實現將為你完成剩下的事情。
max_connections = 10
semaphore = threading.BoundedSemaphore(max_connections)
如果你不傳任何初始化參數,計數器的值會被初始化為1.
Python的threading模塊提供了兩種信號量實現。Semaphore類提供了一個無限大小的信號量,你可以調用release任意次來增大計數器的值。為了避免錯誤出現,最好使用BoundedSemaphore類,這樣當你調用release的次數大於acquire次數時程序會出錯提醒。
線程同步
鎖可以用在線程間的同步上。threading模塊包含了一些用於線程間同步的類。
Events
一個事件是一個簡單的同步對象,事件表示為一個內部標識(internal flag),線程等待這個標識被其它線程設定,或者自己設定、清除這個標識。
event = threading.Event()
#: 一個客戶端線程等待flag被設定
event.wait()
#: 服務端線程設置或者清除flag
event.set()
event.clear()
一旦標識被設定,wait方法就不做任何處理(不會阻塞),當標識被清除時,wait將被阻塞直至其被重新設定。任意數量的線程可能會等待同一個事件。
Conditions
條件是事件對象的高級版本。條件表現為程序中的某種狀態改變,線程可以等待給定條件或者條件發生的信號。
下面是一個簡單的生產者/消費者實例。首先你需要創建一個條件對象:
#: 表示一個資源的附屬項
condition = threading.Condition()
生產者線程在通知消費者線程有新生成資源之前需要獲得條件:
#: 生產者線程
... 生產資源項
condition.acquire()
... 將資源項添加到資源中
condition.notify() #: 發出有可用資源的信號
condition.release()
消費者必須獲取條件(以及相關聯的鎖),然後嘗試從資源中獲取資源項:
#: 消費者線程
condition.acquire()
while True:
...從資源中獲取資源項
if item:
break
condition.wait() #: 休眠,直至有新的資源
condition.release()
... 處理資源
wait方法釋放了鎖,然後將當前線程阻塞,直到有其它線程調用了同一條件對象的notify或者notifyAll方法,然後又重新拿到鎖。如果同時有多個線程在等待,那麼notify方法只會喚醒其中的一個線程,而notifyAll則會喚醒全部線程。
為了避免在wait方法處阻塞,你可以傳入一個超時參數,一個以秒為單位的浮點數。如果設置了超時參數,wait將會在指定時間返回,即使notify沒被調用。一旦使用了超時,你必須檢查資源來確定發生了什麼。
注意,條件對象關聯著一個鎖,你必須在訪問條件之前獲取這個鎖;同樣的,你必須在完成對條件的訪問時釋放這個鎖。在生產代碼中,你應該使用try-finally或者with.
可以通過將鎖對象作為條件構造函數的參數來讓條件關聯一個已經存在的鎖,這可以實現多個條件公用一個資源:
lock = threading.RLock()
condition_1 = threading.Condition(lock)
condition_2 = threading.Condition(lock)
互斥鎖同步
我們先來看一個例子:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time, threading
# 假定這是你的銀行存款:
balance = 0
muxlock = threading.Lock()
def change_it(n):
# 先存後取,結果應該為0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
# 循環次數一旦多起來,最後的數字就變成非0
for i in range(100000):
change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t3 = threading.Thread(target=run_thread, args=(9,))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print balance
結果 :
[/data/web/test_python]$ python multhread_threading.py
0
[/data/web/test_python]$ python multhread_threading.py
61
[/data/web/test_python]$ python multhread_threading.py
0
[/data/web/test_python]$ python multhread_threading.py
24
上面的例子引出了多線程編程的最常見問題:數據共享。當多個線程都修改某一個共享數據的時候,需要進行同步控制。
線程同步能夠保證多個線程安全訪問競爭資源,最簡單的同步機制是引入互斥鎖。互斥鎖為資源引入一個狀態:鎖定/非鎖定。某個線程要更改共享數據時,先將其鎖定,此時資源的狀態為「鎖定」,其他線程不能更改;直到該線程釋放資源,將資源的狀態變成「非鎖定」,其他的線程才能再次鎖定該資源。互斥鎖保證了每次只有一個線程進行寫入操作,從而保證了多線程情況下數據的正確性。
threading模塊中定義了Lock類,可以方便的處理鎖定:
#創建鎖mutex = threading.Lock()
#鎖定mutex.acquire([timeout])
#釋放mutex.release()
其中,鎖定方法acquire可以有一個超時時間的可選參數timeout。如果設定了timeout,則在超時後通過返回值可以判斷是否得到了鎖,從而可以進行一些其他的處理。
使用互斥鎖實現上面的例子的代碼如下:
balance = 0
muxlock = threading.Lock()
def change_it(n):
# 獲取鎖,確保只有一個線程操作這個數
muxlock.acquire()
global balance
balance = balance + n
balance = balance - n
# 釋放鎖,給其他被阻塞的線程繼續操作
muxlock.release()
def run_thread(n):
for i in range(10000):
change_it(n)
加鎖後的結果,就能確保數據正確:
[/data/web/test_python]$ python multhread_threading.py
0
[/data/web/test_python]$ python multhread_threading.py
0
[/data/web/test_python]$ python multhread_threading.py
0
[/data/web/test_python]$ python multhread_threading.py
0
10. python線程間通信的問題,回答有加分!300
pyqt的線程之間的通信是通過信號to槽來實現的,首先你在線程類裡面聲明一個全局槽比如:
classimThread(QtCore.QThread):
imslot=QtCore.pyqtSignal()
這里是要重點注意,上面的是沒有任何參數的一個信號,如果你需要參數的話,你可以在裡面添加參數類型,例如:
imslot1=QtCore.pyqtSignal(str)#這是一個帶字元串參數的信號
imslot2=QtCore.pyqtSignal(int)#這是一個帶整型參數的信號
imslot3=QtCore.pyqtSignal(bool)#這是一個帶布爾參數的信號
當然了,如果你需要多個參數的話,同樣地往裡面加就是了,qt也沒有要求參數必須是同類型的,所以可以這樣:
imslot1=QtCore.pyqtSignal(str,int)#這是一個帶整型和字元串的參數信號
imslot2=QtCore.pyqtSignal(int,str,str)#這是一個帶整型和兩個字元串的參數信號
imslot3=QtCore.pyqtSignal(bool,str)#這是一個帶布爾和字元串的參數信號
在線程的run方法裡面來定義執行信號:
self.imslot.emit()
這里也是需要重點注意的是,上面這個介面是沒有參數的,如果你是要參數的話,是需要這樣寫:
self.imslot1[str].emit('hello')
self.imslot2[int].emit(1)
self.imslot3[bool].emit(False)
多參數的是這樣
self.imslot1[str,int].emit('hello',1)
self.imslot2[int,str,str].emit(1,"hello","world")
self.imslot3[bool,str].emit(False,'hello')
以上就是在線程類裡面完成信號定義了,接下來就是邏輯層成定義一個函數槽來連接線程類裡面的信號,這個也很簡單,比如我在主線程類裡面定義一個方法:
defimSlot():
print'ok'
以上這個是槽函數,接下來是實現信號槽的連接
imThread.imslot.connect('imSlot')
這個就是信號槽的連接方式,當然了,這個是沒有參數的一個信號槽,那麼帶參數的怎麼寫呢?也很簡單!首先定義一個槽函數:
defimSlot(para):
printpara
這個是帶參數的槽函數,下面是:
imThread.imslot[str].connect('imSlot')
以上就是線程之間的方法了,子線程在執行的通行經過執行信號的話,子線程可以安全地執行而不會出現GUI主線程卡死的情況了。