В предыдущем посте о потоках Python я кратко упомянул, что потоки не подходят для задач, связанных с ЦП, и вместо этого следует использовать многопроцессорность. Здесь я хочу продемонстрировать это с помощью показателей производительности, а также показать, что создание нескольких процессов в Python так же просто, как создание нескольких потоков.
Во-первых, давайте выберем простое вычисление, которое будет использоваться для сравнительного анализа. Я не хочу, чтобы это было полностью искусственным, поэтому я буду использовать тупую версию факторизации — разбив число на основные факторы. Вот очень наивная и неоптимизированная функция, которая принимает число и возвращает список факторов:
def factorize_naive(n):
""" A naive factorization method. Take integer 'n', return list of
factors.
"""
if n < 2:
return []
factors = []
p = 2
while True:
if n == 1:
return factors
r = n % p
if r == 0:
factors.append(p)
n = n / p
elif p * p >= n:
factors.append(n)
return factors
elif p > 2:
# Advance in steps of 2 over odd numbers
p += 2
else:
# If p == 2, get to 3
p += 1
assert False, "unreachable"
Теперь, в качестве основы для бенчмаркинга, я буду использовать следующий последовательный (однопотоковый) факторизатор, который берет список чисел для факторизации и возвращает dict, отображающий число в свой список факторов:
def serial_factorizer(nums):
return {n: factorize_naive(n) for n in nums}
Версия с резьбой следует. Также требуется список чисел для разложения, а также количество создаваемых потоков. Затем он разделяет список на части и назначает каждый кусок отдельному потоку:
def threaded_factorizer(nums, nthreads):
def worker(nums, outdict):
""" The worker function, invoked in a thread. 'nums' is a
list of numbers to factor. The results are placed in
outdict.
"""
for n in nums:
outdict[n] = factorize_naive(n)
# Each thread will get 'chunksize' nums and its own output dict
chunksize = int(math.ceil(len(nums) / float(nthreads)))
threads = []
outs = [{} for i in range(nthreads)]
for i in range(nthreads):
# Create each thread, passing it its chunk of numbers to factor
# and output dict.
t = threading.Thread(
target=worker,
args=(nums[chunksize * i:chunksize * (i + 1)],
outs[i]))
threads.append(t)
t.start()
# Wait for all threads to finish
for t in threads:
t.join()
# Merge all partial output dicts into a single dict and return it
return {k: v for out_d in outs for k, v in out_d.iteritems()}
Обратите внимание, что интерфейс между основным и рабочим потоками очень прост. У каждого рабочего потока есть определенный объем работы, после которого он просто возвращается. Таким образом, единственное, что делает основной поток, это запускает потоки nthreads с подходящими аргументами и затем ожидает их завершения.
Я проверил эталонный анализатор последовательных и потоковых потоков с 2, 4 и 8 потоками. Эталоном был факторизация постоянного набора больших чисел, чтобы минимизировать различия из-за случайных случайностей. Все тесты проводились на моем ноутбуке Ubuntu 10.04 с процессором Intel Core i7-2820MQ (4 физических ядра, гиперпоточные).
Вот результаты:
Горизонтальная ось — время в секундах, поэтому более короткие столбцы означают более быстрое выполнение. Да, разделение вычислений на несколько потоков на самом деле медленнее, чем последовательная реализация, и чем больше потоков используется, тем медленнее оно становится.
Это может быть немного удивительно, если вы не знакомы с тем, как реализованы потоки Python и GIL (Global Interpreter Lock). Чтобы понять, почему это происходит, вы вряд ли сможете лучше, чем прочитать статьи и презентации Дейва Бизли на эту тему. Его работа настолько обширна и доступна, что я не вижу смысла повторять что-либо из этого (кроме выводов) здесь.
Давайте теперь сделаем то же самое, только с процессами вместо потоков. Отличный многопроцессорный модуль Python делает процессы такими же простыми для запуска и управления, как и потоки. На самом деле, он предоставляет очень похожие API для модуля потоков. Вот многопроцессорный факторизатор:
def mp_factorizer(nums, nprocs):
def worker(nums, out_q):
""" The worker function, invoked in a process. 'nums' is a
list of numbers to factor. The results are placed in
a dictionary that's pushed to a queue.
"""
outdict = {}
for n in nums:
outdict[n] = factorize_naive(n)
out_q.put(outdict)
# Each process will get 'chunksize' nums and a queue to put his out
# dict into
out_q = Queue()
chunksize = int(math.ceil(len(nums) / float(nprocs)))
procs = []
for i in range(nprocs):
p = multiprocessing.Process(
target=worker,
args=(nums[chunksize * i:chunksize * (i + 1)],
out_q))
procs.append(p)
p.start()
# Collect all results into a single result dict. We know how many dicts
# with results to expect.
resultdict = {}
for i in range(nprocs):
resultdict.update(out_q.get())
# Wait for all worker processes to finish
for p in procs:
p.join()
return resultdict
Единственная реальная разница здесь по сравнению с потоковым решением заключается в том, что выходные данные передаются от рабочего к основному потоку / процессу. С помощью многопроцессорной обработки мы не можем просто передать диктант подпроцессу и ожидать, что его модификации будут видны в другом процессе. Есть несколько подходов для решения этой проблемы. Одним из них является использование синхронизированного словаря из multiprocessing.managers.SyncManager. Тот, который я выбрал, — просто создать очередь для каждого процесса. Рабочий процесс помещает свои выходные данные в свою очередь, и эта очередь видна из основного процесса. Я решил создать одну очередь для каждого рабочего процесса, а не одну очередь, общую для всех процессов, чтобы минимизировать возможные конфликты.
Я запустил тот же тест, добавив время выполнения из mp_factorizer на график:
Как видите, есть хорошие ускорения. Самая быстрая многопроцессорная версия (разделенная на 8 процессов) работает в 3,1 раза быстрее, чем серийная версия. Хотя мой ЦП имеет только 4 физических ядра (и пара аппаратных «потоков» в каждом ядре совместно использует много ресурсов выполнения), 8-процессовая версия работает быстрее, что, вероятно, связано с тем, что ОС не выделяет ЦП оптимально между «тяжелыми» задачами. Другая причина того, что ускорение немного отличается от 4х, заключается в том, что работа не распределяется равномерно между подпроцессами. Некоторые цифры значительно быстрее, чем другие, и в настоящее время не уделяется внимания распределению нагрузки между рабочими.
Это интересные темы для изучения, но выходящие за рамки этого поста. Для наших нужд лучший совет — провести тесты и выбрать лучшую стратегию распараллеливания в соответствии с результатами.
Цель этого поста была двоякой. Во-первых, чтобы легко продемонстрировать, как потоки Python вредны для ускорения вычислений с привязкой к процессору (на самом деле они очень хороши для замедления их!), В то время как многопроцессорная обработка использует многоядерный процессор параллельно, как и ожидалось. , Во-вторых, чтобы показать, что многопроцессорность делает написание параллельного кода так же просто, как использование потоков. При синхронизации объектов между процессами нужно выполнить немного больше работы, чем между потоками, но в остальном это очень похожий код. И если вы спросите меня, что синхронизация объектов сложнее, это хорошо , потому что чем меньше объектов, тем лучше. Это основная причина, почему многопроцессное программирование часто считается более безопасным и менее подверженным ошибкам, чем многопоточное программирование.
Источник: http://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/

