В предыдущем посте о потоках Python я кратко упомянул, что потоки не подходят для задач, связанных с ЦП, и вместо этого следует использовать многопроцессорность. Здесь я хочу продемонстрировать это с помощью показателей производительности, а также показать, что создание нескольких процессов в Python так же просто, как создание нескольких потоков.
Во-первых, давайте выберем простое вычисление, которое будет использоваться для сравнительного анализа. Я не хочу, чтобы это было полностью искусственным, поэтому я буду использовать тупую версию факторизации — разбив число на основные факторы. Вот очень наивная и неоптимизированная функция, которая принимает число и возвращает список факторов:
def factorize_naive(n): """ A naive factorization method. Take integer 'n', return list of factors. """ if n < 2: return [] factors = [] p = 2 while True: if n == 1: return factors r = n % p if r == 0: factors.append(p) n = n / p elif p * p >= n: factors.append(n) return factors elif p > 2: # Advance in steps of 2 over odd numbers p += 2 else: # If p == 2, get to 3 p += 1 assert False, "unreachable"
Теперь, в качестве основы для бенчмаркинга, я буду использовать следующий последовательный (однопотоковый) факторизатор, который берет список чисел для факторизации и возвращает dict, отображающий число в свой список факторов:
def serial_factorizer(nums): return {n: factorize_naive(n) for n in nums}
Версия с резьбой следует. Также требуется список чисел для разложения, а также количество создаваемых потоков. Затем он разделяет список на части и назначает каждый кусок отдельному потоку:
def threaded_factorizer(nums, nthreads): def worker(nums, outdict): """ The worker function, invoked in a thread. 'nums' is a list of numbers to factor. The results are placed in outdict. """ for n in nums: outdict[n] = factorize_naive(n) # Each thread will get 'chunksize' nums and its own output dict chunksize = int(math.ceil(len(nums) / float(nthreads))) threads = [] outs = [{} for i in range(nthreads)] for i in range(nthreads): # Create each thread, passing it its chunk of numbers to factor # and output dict. t = threading.Thread( target=worker, args=(nums[chunksize * i:chunksize * (i + 1)], outs[i])) threads.append(t) t.start() # Wait for all threads to finish for t in threads: t.join() # Merge all partial output dicts into a single dict and return it return {k: v for out_d in outs for k, v in out_d.iteritems()}
Обратите внимание, что интерфейс между основным и рабочим потоками очень прост. У каждого рабочего потока есть определенный объем работы, после которого он просто возвращается. Таким образом, единственное, что делает основной поток, это запускает потоки nthreads с подходящими аргументами и затем ожидает их завершения.
Я проверил эталонный анализатор последовательных и потоковых потоков с 2, 4 и 8 потоками. Эталоном был факторизация постоянного набора больших чисел, чтобы минимизировать различия из-за случайных случайностей. Все тесты проводились на моем ноутбуке Ubuntu 10.04 с процессором Intel Core i7-2820MQ (4 физических ядра, гиперпоточные).
Вот результаты:
Горизонтальная ось — время в секундах, поэтому более короткие столбцы означают более быстрое выполнение. Да, разделение вычислений на несколько потоков на самом деле медленнее, чем последовательная реализация, и чем больше потоков используется, тем медленнее оно становится.
Это может быть немного удивительно, если вы не знакомы с тем, как реализованы потоки Python и GIL (Global Interpreter Lock). Чтобы понять, почему это происходит, вы вряд ли сможете лучше, чем прочитать статьи и презентации Дейва Бизли на эту тему. Его работа настолько обширна и доступна, что я не вижу смысла повторять что-либо из этого (кроме выводов) здесь.
Давайте теперь сделаем то же самое, только с процессами вместо потоков. Отличный многопроцессорный модуль Python делает процессы такими же простыми для запуска и управления, как и потоки. На самом деле, он предоставляет очень похожие API для модуля потоков. Вот многопроцессорный факторизатор:
def mp_factorizer(nums, nprocs): def worker(nums, out_q): """ The worker function, invoked in a process. 'nums' is a list of numbers to factor. The results are placed in a dictionary that's pushed to a queue. """ outdict = {} for n in nums: outdict[n] = factorize_naive(n) out_q.put(outdict) # Each process will get 'chunksize' nums and a queue to put his out # dict into out_q = Queue() chunksize = int(math.ceil(len(nums) / float(nprocs))) procs = [] for i in range(nprocs): p = multiprocessing.Process( target=worker, args=(nums[chunksize * i:chunksize * (i + 1)], out_q)) procs.append(p) p.start() # Collect all results into a single result dict. We know how many dicts # with results to expect. resultdict = {} for i in range(nprocs): resultdict.update(out_q.get()) # Wait for all worker processes to finish for p in procs: p.join() return resultdict
Единственная реальная разница здесь по сравнению с потоковым решением заключается в том, что выходные данные передаются от рабочего к основному потоку / процессу. С помощью многопроцессорной обработки мы не можем просто передать диктант подпроцессу и ожидать, что его модификации будут видны в другом процессе. Есть несколько подходов для решения этой проблемы. Одним из них является использование синхронизированного словаря из multiprocessing.managers.SyncManager. Тот, который я выбрал, — просто создать очередь для каждого процесса. Рабочий процесс помещает свои выходные данные в свою очередь, и эта очередь видна из основного процесса. Я решил создать одну очередь для каждого рабочего процесса, а не одну очередь, общую для всех процессов, чтобы минимизировать возможные конфликты.
Я запустил тот же тест, добавив время выполнения из mp_factorizer на график:
Как видите, есть хорошие ускорения. Самая быстрая многопроцессорная версия (разделенная на 8 процессов) работает в 3,1 раза быстрее, чем серийная версия. Хотя мой ЦП имеет только 4 физических ядра (и пара аппаратных «потоков» в каждом ядре совместно использует много ресурсов выполнения), 8-процессовая версия работает быстрее, что, вероятно, связано с тем, что ОС не выделяет ЦП оптимально между «тяжелыми» задачами. Другая причина того, что ускорение немного отличается от 4х, заключается в том, что работа не распределяется равномерно между подпроцессами. Некоторые цифры значительно быстрее, чем другие, и в настоящее время не уделяется внимания распределению нагрузки между рабочими.
Это интересные темы для изучения, но выходящие за рамки этого поста. Для наших нужд лучший совет — провести тесты и выбрать лучшую стратегию распараллеливания в соответствии с результатами.
Цель этого поста была двоякой. Во-первых, чтобы легко продемонстрировать, как потоки Python вредны для ускорения вычислений с привязкой к процессору (на самом деле они очень хороши для замедления их!), В то время как многопроцессорная обработка использует многоядерный процессор параллельно, как и ожидалось. , Во-вторых, чтобы показать, что многопроцессорность делает написание параллельного кода так же просто, как использование потоков. При синхронизации объектов между процессами нужно выполнить немного больше работы, чем между потоками, но в остальном это очень похожий код. И если вы спросите меня, что синхронизация объектов сложнее, это хорошо , потому что чем меньше объектов, тем лучше. Это основная причина, почему многопроцессное программирование часто считается более безопасным и менее подверженным ошибкам, чем многопоточное программирование.
Источник: http://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/