간단한 파이썬 루프를 어떻게 병렬화합니까?
이것은 아마도 사소한 질문이지만 파이썬에서 다음 루프를 어떻게 병렬화합니까?
# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
파이썬에서 단일 스레드를 시작하는 방법을 알고 있지만 결과를 "수집"하는 방법을 모르겠습니다.
이 경우 가장 쉬운 방법은 여러 프로세스도 좋습니다. 현재 Linux를 사용하고 있지만 코드는 Windows 및 Mac에서 잘 실행되어야합니다.
이 코드를 병렬화하는 가장 쉬운 방법은 무엇입니까?
CPython에서 다중 스레드를 사용하면 GIL (Global Interpreter Lock)으로 인해 순수 Python 코드의 성능이 향상되지 않습니다. multiprocessing
대신 모듈을 사용하는 것이 좋습니다 .
pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
대화식 인터프리터에서는 작동하지 않습니다.
GIL 주위의 일반적인 FUD를 피하려면 :이 예제에서 스레드를 사용하면 아무런 이점이 없습니다. 당신이 원하는 그들이 문제의 전체 무리를 피하기 때문에, 여기가 아닌 스레드 프로세스를 사용합니다.
간단한 for 루프를 병렬화하기 위해 joblib 은 다중 처리의 원시 사용에 많은 가치를 제공합니다. 짧은 구문뿐만 아니라 매우 빠른 경우 (오버 헤드를 제거하기 위해) 자식 프로세스의 트레이스 백을 캡처 할 때 투명한 반복 묶음과 같은 것들이 오류보고를 더 잘 수행합니다.
면책 조항 : 나는 joblib의 최초 저자입니다.
이 코드를 병렬화하는 가장 쉬운 방법은 무엇입니까?
정말 같은 나는 concurrent.futures
이것에 대한, Python3에서 사용할 수있는 버전 3.2 이후 - 2.6 및 2.7에 백 포트를 통해 PyPi .
스레드 또는 프로세스를 사용하고 정확히 동일한 인터페이스를 사용할 수 있습니다.
멀티 프로세싱
이것을 futuretest.py 파일에 넣으십시오.
import concurrent.futures
import time, random # add some random sleep time
offset = 2 # you don't supply these so
def calc_stuff(parameter=None): # these are examples.
sleep_time = random.choice([0, 1, 2, 3, 4, 5])
time.sleep(sleep_time)
return parameter / 2, sleep_time, parameter * parameter
def procedure(j): # just factoring out the
parameter = j * offset # procedure
# call the calculation
return calc_stuff(parameter=parameter)
def main():
output1 = list()
output2 = list()
output3 = list()
start = time.time() # let's see how long this takes
# we can swap out ProcessPoolExecutor for ThreadPoolExecutor
with concurrent.futures.ProcessPoolExecutor() as executor:
for out1, out2, out3 in executor.map(procedure, range(0, 10)):
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
finish = time.time()
# these kinds of format strings are only available on Python 3.6:
# time to upgrade!
print(f'original inputs: {repr(output1)}')
print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
print(f'returned in order given: {repr(output3)}')
if __name__ == '__main__':
main()
출력은 다음과 같습니다.
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
멀티 스레딩
이제 변경 ProcessPoolExecutor
에 ThreadPoolExecutor
, 다시 모듈을 실행합니다
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
이제 멀티 스레딩과 멀티 프로세싱을 모두 완료했습니다!
성능 및 두 가지를 함께 사용하는 것에주의하십시오.
샘플링이 너무 작아 결과를 비교할 수 없습니다.
그러나 Windows는 포크 기능을 지원하지 않으므로 각 새 프로세스를 시작하는 데 시간이 걸리기 때문에 멀티 스레딩이 일반적으로, 특히 Windows에서 멀티 프로세싱보다 빠를 것이라고 생각합니다. Linux 또는 Mac에서는 아마도 더 가까이있을 것입니다.
여러 프로세스 내에 여러 스레드를 중첩 할 수 있지만 여러 스레드를 사용하여 여러 프로세스를 분리하지 않는 것이 좋습니다.
from joblib import Parallel, delayed
import multiprocessing
inputs = range(10)
def processInput(i):
return i * i
num_cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)
위의 내용은 내 컴퓨터에서 아름답게 작동합니다 (우분투, 패키지 joblib는 사전 설치되었지만를 통해 설치할 수 있습니다 pip install joblib
).
https://blog.dominodatalab.com/simple-parallelization/ 에서 가져온
Ray 를 사용하면 많은 장점이 있습니다 .
- 여러 코드 (동일한 코드로) 외에도 여러 머신을 병렬 처리 할 수 있습니다.
- 공유 메모리 및 제로 카피 직렬화를 통한 수치 데이터의 효율적인 처리
- 분산 스케줄링으로 높은 작업 처리량.
- 결함 허용.
귀하의 경우 Ray를 시작하고 원격 기능을 정의 할 수 있습니다
import ray
ray.init()
@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
# Do something.
return 1, 2, 3
그런 다음 병렬로 호출하십시오.
output1, output2, output3 = [], [], []
# Launch the tasks.
for j in range(10):
id1, id2, id3 = calc_stuff.remote(parameter=j)
output1.append(id1)
output2.append(id2)
output3.append(id3)
# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)
클러스터에서 동일한 예제를 실행하려면 ray.init () 호출 만 변경하면됩니다. 관련 문서는 여기 에서 찾을 수 있습니다 .
Ray 개발을 돕고 있습니다.
왜 하나의 전역 목록을 보호하기 위해 스레드와 하나의 뮤텍스를 사용하지 않습니까?
import os
import re
import time
import sys
import thread
from threading import Thread
class thread_it(Thread):
def __init__ (self,param):
Thread.__init__(self)
self.param = param
def run(self):
mutex.acquire()
output.append(calc_stuff(self.param))
mutex.release()
threads = []
output = []
mutex = thread.allocate_lock()
for j in range(0, 10):
current = thread_it(j * offset)
threads.append(current)
current.start()
for t in threads:
t.join()
#here you have output list filled with data
명심하십시오, 당신은 당신의 가장 느린 스레드만큼 빠를 것입니다
나는 joblib
나에게 매우 유용하다는 것을 알았다 . 다음 예를 참조하십시오.
from joblib import Parallel, delayed
def yourfunction(k):
s=3.14*k*k
print "Area of a circle with a radius ", k, " is:", s
element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))
n_jobs = -1 : 사용 가능한 모든 코어 사용
병렬 처리의 매우 간단한 예는
from multiprocessing import Process
output1 = list()
output2 = list()
output3 = list()
def yourfunction():
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter=parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
if __name__ == '__main__':
p = Process(target=pa.yourfunction, args=('bob',))
p.start()
p.join()
이것은 파이썬에서 멀티 프로세싱 및 병렬 / 분산 컴퓨팅을 구현할 때 유용 할 수 있습니다.
techila 패키지 사용에 관한 YouTube 자습서
Techila는 분산 컴퓨팅 미들웨어로 techila 패키지를 사용하여 Python과 직접 통합됩니다. 패키지의 복숭아 함수는 루프 구조를 병렬화하는 데 유용 할 수 있습니다. (다음 코드 스 니펫은 Techila 커뮤니티 포럼에서 제공 )
techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
jobs = jobcount # Number of Jobs in the Project
)
비동기 함수가 있다고 가정 해 봅시다.
async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
# Do some async procesing
That needs to be run on a large array. Some attributes are being passed to the program and some are used from property of dictionary element in the array.
async def process_students(self, student_name: str, loop):
market = sys.argv[2]
subjects = [...] #Some large array
batchsize = 5
for i in range(0, len(subjects), batchsize):
batch = subjects[i:i+batchsize]
await asyncio.gather(*(self.work_async(student_name,
sub['Code'],
loop)
for sub in batch))
Have a look at this;
http://docs.python.org/library/queue.html
This might not be the right way to do it, but I'd do something like;
Actual code;
from multiprocessing import Process, JoinableQueue as Queue
class CustomWorker(Process):
def __init__(self,workQueue, out1,out2,out3):
Process.__init__(self)
self.input=workQueue
self.out1=out1
self.out2=out2
self.out3=out3
def run(self):
while True:
try:
value = self.input.get()
#value modifier
temp1,temp2,temp3 = self.calc_stuff(value)
self.out1.put(temp1)
self.out2.put(temp2)
self.out3.put(temp3)
self.input.task_done()
except Queue.Empty:
return
#Catch things better here
def calc_stuff(self,param):
out1 = param * 2
out2 = param * 4
out3 = param * 8
return out1,out2,out3
def Main():
inputQueue = Queue()
for i in range(10):
inputQueue.put(i)
out1 = Queue()
out2 = Queue()
out3 = Queue()
processes = []
for x in range(2):
p = CustomWorker(inputQueue,out1,out2,out3)
p.daemon = True
p.start()
processes.append(p)
inputQueue.join()
while(not out1.empty()):
print out1.get()
print out2.get()
print out3.get()
if __name__ == '__main__':
Main()
Hope that helps.
thanks @iuryxavier
from multiprocessing import Pool
from multiprocessing import cpu_count
def add_1(x):
return x + 1
if __name__ == "__main__":
pool = Pool(cpu_count())
results = pool.map(add_1, range(10**12))
pool.close() # 'TERM'
pool.join() # 'KILL'
참고URL : https://stackoverflow.com/questions/9786102/how-do-i-parallelize-a-simple-python-loop
'Programming' 카테고리의 다른 글
AngularJS에서 지시문에서 지시문 추가 (0) | 2020.05.10 |
---|---|
helper와 helper_method는 무엇을합니까? (0) | 2020.05.10 |
Java 스트림을 1 및 1 요소로 필터링 (0) | 2020.05.09 |
Mac에 R 설치-경고 메시지 :“C”를 사용하여 LC_CTYPE 설정에 실패했습니다 (0) | 2020.05.09 |
이중 중괄호와 AngularJS-Twig 충돌 (0) | 2020.05.09 |