Пул процесса может быть создан и использован так же, как мы создали и использовали пул потоков. Пул процессов может быть определен как группа предварительно созданных и незанятых процессов, которые готовы к работе. Создание пула процессов предпочтительнее, чем создание новых процессов для каждой задачи, когда нам нужно выполнить большое количество задач.
Модуль Python — Concurrent.futures
Стандартная библиотека Python имеет модуль, называемый concurrent.futures . Этот модуль был добавлен в Python 3.2 для предоставления разработчикам высокоуровневого интерфейса для запуска асинхронных задач. Это уровень абстракции в верхней части потокового и многопроцессорного модулей Python, обеспечивающий интерфейс для выполнения задач с использованием пула потоков или процессов.
В наших последующих разделах мы рассмотрим различные подклассы модуля concurrent.futures.
Executor Class
Executor — это абстрактный класс Python-модуля concurrent.futures . Его нельзя использовать напрямую, и нам нужно использовать один из следующих конкретных подклассов:
- ThreadPoolExecutor
- ProcessPoolExecutor
ProcessPoolExecutor — конкретный подкласс
Это один из конкретных подклассов класса Executor. Он использует мульти-обработку, и мы получаем пул процессов для отправки задач. Этот пул назначает задачи доступным процессам и планирует их запуск.
Как создать ProcessPoolExecutor?
С помощью модуля concurrent.futures и его конкретного подкласса Executor мы можем легко создать пул процессов. Для этого нам нужно создать ProcessPoolExecutor с количеством процессов, которые мы хотим в пуле. По умолчанию это число 5. Затем следует отправить задачу в пул процессов.
пример
Теперь мы рассмотрим тот же пример, который мы использовали при создании пула потоков, с той лишь разницей, что теперь мы будем использовать ProcessPoolExecutor вместо ThreadPoolExecutor .
from concurrent.futures import ProcessPoolExecutor from time import sleep def task(message): sleep(2) return message def main(): executor = ProcessPoolExecutor(5) future = executor.submit(task, ("Completed")) print(future.done()) sleep(2) print(future.done()) print(future.result()) if __name__ == '__main__': main()
Выход
False False Completed
В приведенном выше примере Process PoolExecutor был создан с 5 потоками. Затем задача, которая будет ожидать сообщения в течение 2 секунд, передается исполнителю пула процессов. Как видно из выходных данных, задача не завершается до 2 секунд, поэтому первый вызов done () вернет False. Через 2 секунды задача будет выполнена, и мы получим результат будущего, вызвав для него метод result () .
Создание ProcessPoolExecutor — Менеджер контекста
Другой способ создания экземпляра ProcessPoolExecutor — с помощью диспетчера контекста. Он работает аналогично методу, использованному в приведенном выше примере. Основным преимуществом использования контекстного менеджера является то, что он выглядит синтаксически хорошо. Создание экземпляра может быть сделано с помощью следующего кода —
with ProcessPoolExecutor(max_workers = 5) as executor
пример
Для лучшего понимания мы берем тот же пример, который использовался при создании пула потоков. В этом примере нам нужно начать с импорта модуля concurrent.futures . Затем создается функция с именем load_url (), которая загружает запрошенный URL. Затем создается ProcessPoolExecutor с 5 числами потоков в пуле. Process PoolExecutor был использован как менеджер контекста. Мы можем получить результат будущего, вызвав для него метод result () .
import concurrent.futures from concurrent.futures import ProcessPoolExecutor import urllib.request URLS = ['http://www.foxnews.com/', 'http://www.cnn.com/', 'http://europe.wsj.com/', 'http://www.bbc.co.uk/', 'http://some-made-up-domain.com/'] def load_url(url, timeout): with urllib.request.urlopen(url, timeout = timeout) as conn: return conn.read() def main(): with concurrent.futures.ProcessPoolExecutor(max_workers=5) as executor: future_to_url = {executor.submit(load_url, url, 60): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: data = future.result() except Exception as exc: print('%r generated an exception: %s' % (url, exc)) else: print('%r page is %d bytes' % (url, len(data))) if __name__ == '__main__': main()
Выход
Приведенный выше скрипт Python сгенерирует следующий вывод:
'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed> 'http://www.foxnews.com/' page is 229476 bytes 'http://www.cnn.com/' page is 165323 bytes 'http://www.bbc.co.uk/' page is 284981 bytes 'http://europe.wsj.com/' page is 967575 bytes
Использование функции Executor.map ()
Функция Python map () широко используется для выполнения ряда задач. Одна из таких задач — применить определенную функцию к каждому элементу итерируемых элементов. Точно так же мы можем отобразить все элементы итератора в функцию и отправить их как независимые задания в ProcessPoolExecutor . Рассмотрим следующий пример скрипта Python, чтобы понять это.
пример
Мы рассмотрим тот же пример, который мы использовали при создании пула потоков с помощью функции Executor.map () . В приведенном ниже примере функция map используется для применения функции square () к каждому значению в массиве значений.
from concurrent.futures import ProcessPoolExecutor from concurrent.futures import as_completed values = [2,3,4,5] def square(n): return n * n def main(): with ProcessPoolExecutor(max_workers = 3) as executor: results = executor.map(square, values) for result in results: print(result) if __name__ == '__main__': main()
Выход
Приведенный выше скрипт Python сгенерирует следующий вывод
4 9 16 25
Когда использовать ProcessPoolExecutor и ThreadPoolExecutor?
Теперь, когда мы изучили оба класса Executor — ThreadPoolExecutor и ProcessPoolExecutor, нам нужно знать, когда использовать какого исполнителя. Нам нужно выбрать ProcessPoolExecutor в случае связанных с CPU рабочих нагрузок и ThreadPoolExecutor в случае связанных с I / O рабочих нагрузок.
Если мы используем ProcessPoolExecutor , то нам не нужно беспокоиться о GIL, поскольку он использует многопроцессорность. Более того, время выполнения будет меньше по сравнению с ThreadPoolExecution . Рассмотрим следующий пример скрипта Python, чтобы понять это.
пример
import time import concurrent.futures value = [8000000, 7000000] def counting(n): start = time.time() while n > 0: n -= 1 return time.time() - start def main(): start = time.time() with concurrent.futures.ProcessPoolExecutor() as executor: for number, time_taken in zip(value, executor.map(counting, value)): print('Start: {} Time taken: {}'.format(number, time_taken)) print('Total time taken: {}'.format(time.time() - start)) if __name__ == '__main__': main()
Выход
Start: 8000000 Time taken: 1.5509998798370361 Start: 7000000 Time taken: 1.3259999752044678 Total time taken: 2.0840001106262207 Example- Python script with ThreadPoolExecutor: import time import concurrent.futures value = [8000000, 7000000] def counting(n): start = time.time() while n > 0: n -= 1 return time.time() - start def main(): start = time.time() with concurrent.futures.ThreadPoolExecutor() as executor: for number, time_taken in zip(value, executor.map(counting, value)): print('Start: {} Time taken: {}'.format(number, time_taken)) print('Total time taken: {}'.format(time.time() - start)) if __name__ == '__main__': main()
Выход
Start: 8000000 Time taken: 3.8420000076293945 Start: 7000000 Time taken: 3.6010000705718994 Total time taken: 3.8480000495910645
Из выводов обеих программ, приведенных выше, мы видим разницу во времени выполнения при использовании ProcessPoolExecutor и ThreadPoolExecutor .