導航:首頁 > 編程語言 > python多線程pool

python多線程pool

發布時間:2022-05-10 09:35:11

1. python中的多進程與多線程/分布式該如何使用

Python提供了非常好用的多進程包multiprocessing,你只需要定義一個函數,Python會替你完成其他所有事情。
藉助這個包,可以輕松完成從單進程到並發執行的轉換。
1、新建單一進程
如果我們新建少量進程,可以如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
p = multiprocessing.Process(target=func, args=("hello", ))
p.start()
p.join()
print "Sub-process done."12345678910111213
2、使用進程池
是的,你沒有看錯,不是線程池。它可以讓你跑滿多核CPU,而且使用方法非常簡單。
注意要用apply_async,如果落下async,就變成阻塞版本了。
processes=4是最多並發進程數量。
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."12345678910111213141516
3、使用Pool,並需要關注結果
更多的時候,我們不僅需要多進程執行,還需要關注每個進程的執行結果,如下:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
return "done " + msg
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
result = []
for i in xrange(10):
msg = "hello %d" %(i)
result.append(pool.apply_async(func, (msg, )))
pool.close()
pool.join()
for res in result:
print res.get()
print "Sub-process(es) done."
2014.12.25更新
根據網友評論中的反饋,在Windows下運行有可能崩潰(開啟了一大堆新窗口、進程),可以通過如下調用來解決:
multiprocessing.freeze_support()1
附錄(自己的腳本):
#!/usr/bin/python
import threading
import subprocess
import datetime
import multiprocessing
def dd_test(round, th):
test_file_arg = 'of=/zbkc/test_mds_crash/1m_%s_%s_{}' %(round, th)
command = "seq 100 | xargs -i dd if=/dev/zero %s bs=1M count=1" %test_file_arg
print command
subprocess.call(command,shell=True,stdout=open('/dev/null','w'),stderr=subprocess.STDOUT)
def mds_stat(round):
p = subprocess.Popen("zbkc mds stat", shell = True, stdout = subprocess.PIPE)
out = p.stdout.readlines()
if out[0].find('active') != -1:
command = "echo '0205pm %s round mds status OK, %s' >> /round_record" %(round, datetime.datetime.now())
command_2 = "time (ls /zbkc/test_mds_crash/) 2>>/round_record"
command_3 = "ls /zbkc/test_mds_crash | wc -l >> /round_record"
subprocess.call(command,shell=True)
subprocess.call(command_2,shell=True)
subprocess.call(command_3,shell=True)
return 1
else:
command = "echo '0205 %s round mds status abnormal, %s, %s' >> /round_record" %(round, out[0], datetime.datetime.now())
subprocess.call(command,shell=True)
return 0
#threads = []
for round in range(1, 1600):
pool = multiprocessing.Pool(processes = 10) #使用進程池
for th in range(10):
# th_name = "thread-" + str(th)
# threads.append(th_name) #添加線程到線程列表
# threading.Thread(target = dd_test, args = (round, th), name = th_name).start() #創建多線程任務
pool.apply_async(dd_test, (round, th))
pool.close()
pool.join()
#等待線程完成
# for t in threads:
# t.join()
if mds_stat(round) == 0:
subprocess.call("zbkc -s",shell=True)
break

2. python 多線程 怎麼改成非同步

python使用multiprocessing模塊實現帶回調函數的非同步調用方法。分享給大家供大家參考。具體分析如下:
multipressing模塊是python 2.6版本加入的,通過這個模塊可以輕松實現非同步調用
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=1)
# Start a worker processes.
result = pool.apply_async(f, [10], callback)
# Evaluate "f(10)" asynchronously calling callback when finished.
希望本文所述對大家的Python程序設計有所幫助。

3. python 多線程和多進程的區別 mutiprocessing theading

在socketserver服務端代碼中有這么一句:

server = socketserver.ThreadingTCPServer((ip,port), MyServer)

ThreadingTCPServer這個類是一個支持多線程和TCP協議的socketserver,它的繼承關系是這樣的:

class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass

右邊的TCPServer實際上是主要的功能父類,而左邊的ThreadingMixIn則是實現了多線程的類,ThreadingTCPServer自己本身則沒有任何代碼。

MixIn在Python的類命名中很常見,稱作「混入」,戲稱「亂入」,通常為了某種重要功能被子類繼承。

我們看看一下ThreadingMixIn的源代碼:

class ThreadingMixIn:

daemon_threads = False

def process_request_thread(self, request, client_address):
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)

def process_request(self, request, client_address):

t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
t.start()

在ThreadingMixIn類中,其實就定義了一個屬性,兩個方法。其中的process_request()方法實際調用的正是Python內置的多線程模塊threading。這個模塊是Python中所有多線程的基礎,socketserver本質上也是利用了這個模塊。

socketserver通過threading模塊,實現了多線程任務處理能力,可以同時為多個客戶提供服務。

那麼,什麼是線程,什麼是進程?

進程是程序(軟體,應用)的一個執行實例,每個運行中的程序,可以同時創建多個進程,但至少要有一個。每個進程都提供執行程序所需的所有資源,都有一個虛擬的地址空間、可執行的代碼、操作系統的介面、安全的上下文(記錄啟動該進程的用戶和許可權等等)、唯一的進程ID、環境變數、優先順序類、最小和最大的工作空間(內存空間)。進程可以包含線程,並且每個進程必須有至少一個線程。每個進程啟動時都會最先產生一個線程,即主線程,然後主線程會再創建其他的子線程。

線程,有時被稱為輕量級進程(Lightweight Process,LWP),是程序執行流的最小單元。一個標準的線程由線程ID,當前指令指針(PC),寄存器集合和堆棧組成。另外,線程是進程中的一個實體,是被系統獨立調度和分派的基本單位,線程自己不獨立擁有系統資源,但它可與同屬一個進程的其它線程共享該進程所擁有的全部資源。每一個應用程序都至少有一個進程和一個線程。在單個程序中同時運行多個線程完成不同的被劃分成一塊一塊的工作,稱為多線程。

舉個例子,某公司要生產一種產品,於是在生產基地建設了很多廠房,每個廠房內又有多條流水生產線。所有廠房配合將整個產品生產出來,單個廠房內的流水線負責生產所屬廠房的產品部件,每個廠房都擁有自己的材料庫,廠房內的生產線共享這些材料。公司要實現生產必須擁有至少一個廠房一條生產線。換成計算機的概念,那麼這家公司就是應用程序,廠房就是應用程序的進程,生產線就是某個進程的一個線程。

線程的特點:

線程是一個execution context(執行上下文),即一個cpu執行時所需要的一串指令。假設你正在讀一本書,沒有讀完,你想休息一下,但是你想在回來時繼續先前的進度。有一個方法就是記下頁數、行數與字數這三個數值,這些數值就是execution context。如果你的室友在你休息的時候,使用相同的方法讀這本書。你和她只需要這三個數字記下來就可以在交替的時間共同閱讀這本書了。

線程的工作方式與此類似。CPU會給你一個在同一時間能夠做多個運算的幻覺,實際上它在每個運算上只花了極少的時間,本質上CPU同一時刻只能幹一件事,所謂的多線程和並發處理只是假象。CPU能這樣做是因為它有每個任務的execution context,就像你能夠和你朋友共享同一本書一樣。

進程與線程區別:

4. Python多線程是什麼意思

簡單地說就是作為可能是僅有的支持多線程的解釋型語言(perl的多線程是殘疾,PHP沒有多線程),Python的多線程是有compromise的,在任意時間只有一個Python解釋器在解釋Python bytecode。
UPDATE:如評論指出,Ruby也是有thread支持的,而且至少Ruby MRI是有GIL的。
如果你的代碼是CPU密集型,多個線程的代碼很有可能是線性執行的。所以這種情況下多線程是雞肋,效率可能還不如單線程因為有context switch
但是:如果你的代碼是IO密集型,多線程可以明顯提高效率。例如製作爬蟲(我就不明白為什麼Python總和爬蟲聯系在一起…不過也只想起來這個例子…),絕大多數時間爬蟲是在等待socket返回數據。這個時候C代碼里是有release GIL的,最終結果是某個線程等待IO的時候其他線程可以繼續執行。
反過來講:你就不應該用Python寫CPU密集型的代碼…效率擺在那裡…
如果確實需要在CPU密集型的代碼里用concurrent,就去用multiprocessing庫。這個庫是基於multi process實現了類multi thread的API介面,並且用pickle部分地實現了變數共享。
再加一條,如果你不知道你的代碼到底算CPU密集型還是IO密集型,教你個方法:
multiprocessing這個mole有一個mmy的sub mole,它是基於multithread實現了multiprocessing的API。
假設你使用的是multiprocessing的Pool,是使用多進程實現了concurrency
from multiprocessing import Pool
如果把這個代碼改成下面這樣,就變成多線程實現concurrency
from multiprocessing.mmy import Pool
兩種方式都跑一下,哪個速度快用哪個就行了。
UPDATE:
剛剛才發現concurrent.futures這個東西,包含ThreadPoolExecutor和ProcessPoolExecutor,可能比multiprocessing更簡單

5. 定義了一個函數和一個列表,如何讓python多線程執行呢,python高手在嗎 告訴我失敗原因,如何修改。

提示錯誤時因為你通過map的話list取的話是一個P1對象,是一個列表,傳到P ointsTuple2的話是一整個列表,但是你定義的函數需要5個參數,才會報那樣的錯,改的方法有兩個,下面這個改參數為tuple,在把參數賦值,或者加個中介,希望有幫助
修改成一個參數:
# -*- coding: utf-8 -*-
def PointsTuple2(tuple1):
R,L,ALPHA,M,ANGLE=tuple1
lst=[]
pitch=2.0*np.pi*R/np.tan(np.radians(ALPHA))
pitch1=2.0*np.pi*11.5/np.tan(np.radians(7.16))
newM=int(np.ceil(M*L/pitch))
DtaPitch=0
if pitch > pitch1 :
DtaPitch= pitch-pitch1
else:
DtaPitch=0
for i in range(newM):
ii=float(i)
x=R*np.cos(2.0*np.pi*ii/M+np.radians(ANGLE))
y=R*np.sin(2.0*np.pi*ii/M+np.radians(ANGLE))
z=ii*(pitch-DtaPitch)/M
pointCoordinates=(x,y,z)
lst.append(pointCoordinates)
Points=tuple(lst)
print 'PointsNum:',len(Points),'DtaPitch:',DtaPitch,'newM:',newM,'R=%f,pitch=%f'%(R,pitch)
return Points
#
import numpy as np
from multiprocessing.mmy import Pool as ThreadPool
pool = ThreadPool(4)
#
global newM,L1,L2,L3,ALPHA1,M1
L1,L2,L3,ALPHA1,M1,newM=912.0,938.0,991.0,7.16,102.0,0.0
dAngle=12.629916088
ddAngle=(36.0-dAngle)/8.0 #意欲將陰線分為8個網格
tuple1=(11.5,L1,ALPHA1,M1,11.685041956)
Points1=PointsTuple2(tuple1)
tuple2=(11.5,L1,ALPHA1,M1,11.685041956)
Points2=PointsTuple2(tuple2)
#對比多線程
P1=(11.5,912.0,7.16,102.0,11.685041956)
P2=(11.5,L1,ALPHA1,M1,23.954074918)
P3=(11.85,L2,ALPHA1,M1,11.685041956)
P4=(11.85,L2,ALPHA1,M1,23.954074918)
list1=[P1,P2,P3,P4]
results9=pool.map(PointsTuple2,list1)
pool.close()
pool.join()
另外這個運行可能有點問題(google的樣本,可以借鑒一下他的寫法):
from multiprocessing import Pool
import numpy as np
pool = Pool(processes=2) # I like to calculate only 2 FFTs parallel
# in every time step, therefor 2 processes
def Splitter(args):
'''I have to pass 2 arguments'''
return makeSomething(*args)
def makeSomething(a,b):
'''mmy function instead of the one with the FFT'''
return a*b
def RungeK():
# ...
# a lot of code which create the vectors A and B and calculates
# one Kunge-Kutta step for them
# ...
pool = Pool(processes=2)
n = 20 # Just something for the example
A = np.arange(50000)
B = np.ones_like(A)
for i in xrange(n): # loop over the time steps
A *= np.mean(B)*B - A
B *= np.sqrt(A)
results = pool.map(Splitter, [(A, 3), (B, 2)])
A = results[0]
B = results[1]
pool.close()
print np.mean(A) # Some output
print np.max(B)
if __name__== '__main__':
RungeK()

6. python 怎麼實現多線程的

線程也就是輕量級的進程,多線程允許一次執行多個線程,Python是多線程語言,它有一個多線程包,GIL也就是全局解釋器鎖,以確保一次執行單個線程,一個線程保存GIL並在將其傳遞給下一個線程之前執行一些操作,也就產生了並行執行的錯覺。

7. python 多進程和多線程配合

由於python的多線程中存在PIL鎖,因此python的多線程不能利用多核,那麼,由於現在的計算機是多核的,就不能充分利用計算機的多核資源。但是python中的多進程是可以跑在不同的cpu上的。因此,嘗試了多進程+多線程的方式,來做一個任務。比如:從中科大的鏡像源中下載多個rpm包。
#!/usr/bin/pythonimport reimport commandsimport timeimport multiprocessingimport threadingdef download_image(url):
print '*****the %s rpm begin to download *******' % url
commands.getoutput('wget %s' % url)def get_rpm_url_list(url):
commands.getoutput('wget %s' % url)
rpm_info_str = open('index.html').read()

regu_mate = '(?<=<a href=")(.*?)(?=">)'
rpm_list = re.findall(regu_mate, rpm_info_str)

rpm_url_list = [url + rpm_name for rpm_name in rpm_list] print 'the count of rpm list is: ', len(rpm_url_list) return rpm_url_
def multi_thread(rpm_url_list):
threads = [] # url = 'https://mirrors.ustc.e.cn/centos/7/os/x86_64/Packages/'
# rpm_url_list = get_rpm_url_list(url)
for index in range(len(rpm_url_list)): print 'rpm_url is:', rpm_url_list[index]
one_thread = threading.Thread(target=download_image, args=(rpm_url_list[index],))
threads.append(one_thread)

thread_num = 5 # set threading pool, you have put 4 threads in it
while 1:
count = min(thread_num, len(threads)) print '**********count*********', count ###25,25,...6707%25

res = [] for index in range(count):
x = threads.pop()
res.append(x) for thread_index in res:
thread_index.start() for j in res:
j.join() if not threads:
def multi_process(rpm_url_list):
# process num at the same time is 4
process = []
rpm_url_group_0 = []
rpm_url_group_1 = []
rpm_url_group_2 = []
rpm_url_group_3 = [] for index in range(len(rpm_url_list)): if index % 4 == 0:
rpm_url_group_0.append(rpm_url_list[index]) elif index % 4 == 1:
rpm_url_group_1.append(rpm_url_list[index]) elif index % 4 == 2:
rpm_url_group_2.append(rpm_url_list[index]) elif index % 4 == 3:
rpm_url_group_3.append(rpm_url_list[index])
rpm_url_groups = [rpm_url_group_0, rpm_url_group_1, rpm_url_group_2, rpm_url_group_3] for each_rpm_group in rpm_url_groups:
each_process = multiprocessing.Process(target = multi_thread, args = (each_rpm_group,))
process.append(each_process) for one_process in process:
one_process.start() for one_process in process:
one_process.join()# for each_url in rpm_url_list:# print '*****the %s rpm begin to download *******' %each_url## commands.getoutput('wget %s' %each_url)
def main():
url = 'https://mirrors.ustc.e.cn/centos/7/os/x86_64/Packages/'
url_paas = 'http://mirrors.ustc.e.cn/centos/7.3.1611/paas/x86_64/openshift-origin/'
url_paas2 ='http://mirrors.ustc.e.cn/fedora/development/26/Server/x86_64/os/Packages/u/'

start_time = time.time()
rpm_list = get_rpm_url_list(url_paas) print multi_process(rpm_list) # print multi_thread(rpm_list)
#print multi_process()
# print multi_thread(rpm_list)
# for index in range(len(rpm_list)):
# print 'rpm_url is:', rpm_list[index]
end_time = time.time() print 'the download time is:', end_time - start_timeprint main()123456789101112131415161718

代碼的功能主要是這樣的:
main()方法中調用get_rpm_url_list(base_url)方法,獲取要下載的每個rpm包的具體的url地址。其中base_url即中科大基礎的鏡像源的地址,比如:http://mirrors.ustc.e.cn/centos/7.3.1611/paas/x86_64/openshift-origin/,這個地址下有幾十個rpm包,get_rpm_url_list方法將每個rpm包的url地址拼出來並返回。
multi_process(rpm_url_list)啟動多進程方法,在該方法中,會調用多線程方法。該方法啟動4個多進程,將上面方法得到的rpm包的url地址進行分組,分成4組,然後每一個組中的rpm包再最後由不同的線程去執行。從而達到了多進程+多線程的配合使用。
代碼還有需要改進的地方,比如多進程啟動的進程個數和rpm包的url地址分組是硬編碼,這個還需要改進,畢竟,不同的機器,適合同時啟動的進程個數是不同的。

8. 如何多線程(多進程)加速while循環(語言-python)

import numpy as np
import os
import sys
import multiprocessing as mp
import time

def MCS(input_data, med):
#t1 = time.perf_counter()
left = 0
lp = 0

while True:
lp = lp + 1
data_pool = input_data + left
output_data = med * 0.05 * data_pool / (10000 + med)
output_data = np.where(output_data > data_pool, data_pool, output_data)
left = data_pool - output_data
cri = (input_data - output_data) / input_data * 100
#print(lp, data_pool, output_data, cri)
if cri <= 1:
break
t2 = time.perf_counter()
#print(f'Finished in {t2 - t1} seconds')

if __name__ == "__main__":
pool = mp.Pool(processes=5)
tasks = []
for i in np.linspace(0.4, 0.6, num = 10):
tasks.append([100, i])
t1 = time.perf_counter()
pool.starmap(MCS, tasks)
#pool.apply_async(MCS, args=(100, 0.4))
t2 = time.perf_counter()
#pool.join()
#pool.close()
for i in np.linspace(0.4, 0.6, num = 10):
MCS(100, i)
t3 = time.perf_counter()

print(f'Finished in {t2 - t1} seconds')
print(f'Finished in {t3 - t2} seconds')

原因可能是只運行了一個例子,
如圖測試了10個例子,測試結果如下
Finished in 15.062450630997773 seconds
Finished in 73.1936681799998 seconds
並行確實有一定的加速。

9. python 多線程池的用法

你的意思是不是:只禁用腳本但保持物體有效? 那可以用GetComponent<YourClass>().enabled =false; //YourClass是你要禁用的腳本類 以上是C#的語法,JS的用: GetComponent("YourClass").enable=false; //"YourClass"是你要禁用的腳本名

閱讀全文

與python多線程pool相關的資料

熱點內容
摩托車在哪裡app看考題 瀏覽:356
蘋果5app在哪裡設置 瀏覽:737
如何查看伺服器的磁碟使用 瀏覽:165
python蒙特卡洛模型投點圖 瀏覽:330
安卓手機屬於什麼介面 瀏覽:742
微信群推廣網站源碼 瀏覽:764
九江離鷹潭源碼 瀏覽:719
python可以當作函數的返回值 瀏覽:422
地鐵逃生體驗服怎麼進入安卓 瀏覽:833
齊魯工惠app的中獎記錄在哪裡 瀏覽:759
linuxkill命令詳解 瀏覽:103
dhcp伺服器動態分配地址 瀏覽:265
門禁卡加密了能破解嗎 瀏覽:215
在哪裡下載百度網盤app 瀏覽:917
伺服器要升級什麼意思 瀏覽:831
銀行還房貸解壓方法 瀏覽:702
伺服器主機辦公如何提速 瀏覽:920
cad列印為pdf 瀏覽:418
賣手錶的app哪裡可以賣 瀏覽:55
放管伺服器怎麼辦理 瀏覽:631