Преимущество многопроцессорности заключается в том, что несколько процессов работают над проблемой. Если мы разбиваем большую проблему на небольшие параллельные шаги, мы часто можем получить результаты за меньшее время, более эффективно используя ЦП. В частности, мы хотим использовать не пользовательское время, когда наш процесс может ожидать что-то в сети или ожидать завершения физического ввода-вывода.
Существуют ограничения на ускорение, предлагаемое многопроцессорностью. Как только загрузка достигает 100% ядер, мы не можем двигаться быстрее. Однако существует множество процессов, которые выполняют много операций ввода-вывода или много доступа к сети; мы можем использовать многопроцессорный модуль Python
для более эффективного использования нашего процессора.
Самый простой подход к многопроцессорной обработке — это использование философии конвейера оболочки. Разбейте обработку на небольшие шаги, каждый из которых читает из исходного потока и записывает в выходной поток. Давняя традиция здесь — читать из `
sys.stdin` и писать
в` sys.stdout` . Тем не менее, многопроцессорный модуль дает нам инструменты для достижения этой цели с относительно небольшими усилиями.
Однако вместо того, чтобы использовать простой канал, нам нужно использовать
многопроцессорность . На языке оболочки мы можем иметь `
func1 | func2 | func3 `.
Для многопроцессорных целей у нас было бы нечто более сложное на вид.
q1 = Queue() q2 = Queue() p1 = Process( target=func1, kwargs=dict(output=q1)) p2 = Process( target=func2, kwargs=dict(input=q1, output=q2)) p3 = Process( target=func3, kwargs=dict(input=q2)) p1.start() p2.start() p3.start()
В то время как многословный, он намекает на более обобщенный подход, чтобы три процесса передавали данные.
прекращение
Вопрос один из прекращения. Большинство многопроцессорных пакетов (например, многопроцессорные и сельдерейные) предполагают, что ваш конвейер обработки имеет довольно длительный срок службы. Из-за этого предполагается, что вы можете определить, что он бездействует, и завершить его по одному процессу за раз.
Это не плохое предположение и, вероятно, охватывает большое количество вариантов использования.
Это, однако, не покрывает простой подобный оболочке `
func1 | func2 | func3 `очень хорошо подходит для использования.
Почему бы и нет?
Потому что мы не можем легко сказать, когда очередь закрыта навсегда и все. Производящий процесс может закрыть очередь, но это не та часть информации, которая отображается в конце очереди потребителя.
Очереди разработаны, чтобы быть длительными и иметь многократные продукты. Нет простого способа узнать, что очередь больше не нужна. Каждый производитель должен будет попытаться закрыть Очередь,
и Очередь должна знать предполагаемое количество производителей. Если процессы являются динамическими, то число производителей может не иметь фиксированного заранее известного предела.
Поэтому подход состоит в том, чтобы поместить объект-дозор в очередь. Таким образом, потребитель знает, что производство закончено. Это может освободить ресурсы и выйти вежливо.
Fan-Out и Fan-In
Проблема со стражем в очереди нескольких производителей состоит в том, что будет несколько стражей, по одному от каждого производителя. И, конечно же, с многопользовательской очередью, для каждого потребителя должен быть один дозорный.
Если производители придерживаются правила «дозорный на потребителя» и потребители знают, что следует ожидать «дозорного на каждого производителя», то мы можем легко создать динамические многопроцессорные сети, которые запускаются и выключаются быстро и чисто.
Случай использования
Вот пример использования. Мы хотим сделать
whois анализ по IP-адресам в журнале.
Если у нас есть простой цикл для анализа журнала и выполнения
запроса whois на каждом IP-адресе хоста, обработка будет медленной. Он практически не использует ЦП, поскольку тратит почти все свое время на ожидание ввода из журнала, ожидание whois или ожидание записи буферов в выходной файл.
Если мы создадим простой трехэтапный конвейер (
parse | whois | report ), тогда мы получим некоторое улучшение по прошествии времени, но — действительно —
шаг whois убивает пропускную способность.
Нам нужен способ одновременной работы с дюжиной
запросов whois . Это приводит нас к многопроцессорности, разветвлению и разветвлению.
Вот что мы хотим.
def analyze_ip( logs ): user_queue = Queue() report_queue= Queue() user_from_log= ProducerProcess( name='book_users', target=book_users, args=(logs,), output_queue=user_queue, consumers=12 ) user_from_log.start() workers= [] for worker in range(12): get_details= ConsumerProducerProcess( name='user_whois', target=user_whois, kwargs=dict(LIVE=False), input_queue=user_queue, output_queue=report_queue, producers=1, consumers=1 ) get_details.start() workers.append(get_details) report= ConsumerProcess( name='final_report', target=final_report, input_queue=report_queue, producers=12 ) report.start() user_from_log.join() for w in workers: w.join() report.join()
Это будет выполнять несколько одновременных
запросов к whois , связывая много-много ресурсов и (мы надеемся) насыщать процессор реальной работой.
Это показывает разветвление от одного
ProducerProcess до десятка
экземпляров ConsumerProducerProcess . Он показывает фан-вход из
ConsumerProducerProcess в один
ConsumerProcess, который записывает окончательный отчет.
Это тривиально увеличивается (или выполняется) путем изменения количества процессов в середине.
Важно то, что задействованы реальные функции (
book_users ,
user_whois и
final_report) являются относительно тривиальными функциями генератора, которые используют исходные данные (файлы журнала или записи очереди) и выдают результаты (записи очереди или файл отчета.)
Также важным является тот факт, что он закрывается чисто. Когда вход достигает конца файла, дозорные значения помещаются в очереди, чтобы просочиться и привести к упорядоченному завершению процесса.
Источник: http://slott-softwarearchitect.blogspot.com/2012/02/multiprocessing-goodness-part-1-use.html