Учебники

Параллелизм в Python — пул потоков

Предположим, нам пришлось создать большое количество потоков для наших многопоточных задач. Это будет в вычислительном отношении самым дорогим, поскольку может быть много проблем с производительностью из-за слишком большого количества потоков. Основной проблемой может быть ограничение пропускной способности. Мы можем решить эту проблему, создав пул потоков. Пул потоков может быть определен как группа предварительно созданных и свободных потоков, которые готовы к выполнению работы. Создание пула потоков предпочтительнее, чем создание новых потоков для каждой задачи, когда нам нужно выполнить большое количество задач. Пул потоков может управлять одновременным выполнением большого количества потоков следующим образом:

  • Если поток в пуле потоков завершает свое выполнение, то этот поток можно использовать повторно.

  • Если поток завершается, будет создан другой поток для его замены.

Если поток в пуле потоков завершает свое выполнение, то этот поток можно использовать повторно.

Если поток завершается, будет создан другой поток для его замены.

Модуль Python — Concurrent.futures

Стандартная библиотека Python включает модуль concurrent.futures . Этот модуль был добавлен в Python 3.2 для предоставления разработчикам высокоуровневого интерфейса для запуска асинхронных задач. Это уровень абстракции в верхней части потокового и многопроцессорного модулей Python, обеспечивающий интерфейс для выполнения задач с использованием пула потоков или процессов.

В наших последующих разделах мы узнаем о различных классах модуля concurrent.futures.

Executor Class

Executor — это абстрактный класс Python-модуля concurrent.futures . Его нельзя использовать напрямую, и нам нужно использовать один из следующих конкретных подклассов:

  • ThreadPoolExecutor
  • ProcessPoolExecutor

ThreadPoolExecutor — конкретный подкласс

Это один из конкретных подклассов класса Executor. Подкласс использует многопоточность, и мы получаем пул потоков для отправки задач. Этот пул назначает задачи доступным потокам и планирует их запуск.

Как создать ThreadPoolExecutor?

С помощью модуля concurrent.futures и его конкретного подкласса Executor мы можем легко создать пул потоков. Для этого нам нужно создать ThreadPoolExecutor с тем количеством потоков, которое мы хотим в пуле. По умолчанию это число 5. Затем мы можем отправить задачу в пул потоков. Когда мы отправляем () задачу, мы возвращаемся в будущее . У объекта Future есть метод done () , который сообщает, разрешено ли будущее. При этом значение было установлено для этого конкретного будущего объекта. Когда задача завершается, исполнитель пула потоков устанавливает значение для будущего объекта.

пример

from concurrent.futures import ThreadPoolExecutor
from time import sleep
def task(message):
   sleep(2)
   return message

def main():
   executor = ThreadPoolExecutor(5)
   future = executor.submit(task, ("Completed"))
   print(future.done())
   sleep(2)
   print(future.done())
   print(future.result())
if __name__ == '__main__':
main()

Выход

False
True
Completed

В приведенном выше примере ThreadPoolExecutor был создан с 5 потоками. Затем задача, которая будет ожидать сообщения в течение 2 секунд, передается исполнителю пула потоков. Как видно из выходных данных, задача не завершается до 2 секунд, поэтому первый вызов done () вернет False. Через 2 секунды задача будет выполнена, и мы получим результат будущего, вызвав для него метод result () .

Создание ThreadPoolExecutor — Менеджер контекста

Другой способ создания ThreadPoolExecutor — с помощью диспетчера контекста. Он работает аналогично методу, использованному в приведенном выше примере. Основным преимуществом использования контекстного менеджера является то, что он выглядит синтаксически хорошо. Создание экземпляра может быть сделано с помощью следующего кода —

with ThreadPoolExecutor(max_workers = 5) as executor

пример

Следующий пример заимствован из документации по Python. В этом примере, прежде всего, необходимо импортировать модуль concurrent.futures . Затем создается функция с именем load_url (), которая загружает запрошенный URL. Затем функция создает ThreadPoolExecutor с 5 потоками в пуле. ThreadPoolExecutor был использован как менеджер контекста. Мы можем получить результат будущего, вызвав для него метод result () .

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
   'http://www.cnn.com/',
   'http://europe.wsj.com/',
   'http://www.bbc.co.uk/',
   'http://some-made-up-domain.com/']

def load_url(url, timeout):
   with urllib.request.urlopen(url, timeout = timeout) as conn:
   return conn.read()

with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:

   future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
   for future in concurrent.futures.as_completed(future_to_url):
   url = future_to_url[future]
   try:
      data = future.result()
   except Exception as exc:
      print('%r generated an exception: %s' % (url, exc))
   else:
      print('%r page is %d bytes' % (url, len(data)))

Выход

Ниже приведен вывод приведенного выше скрипта Python —

'http://some-made-up-domain.com/' generated an exception: <urlopen error [Errno 11004] getaddrinfo failed>
'http://www.foxnews.com/' page is 229313 bytes
'http://www.cnn.com/' page is 168933 bytes
'http://www.bbc.co.uk/' page is 283893 bytes
'http://europe.wsj.com/' page is 938109 bytes

Использование функции Executor.map ()

Функция Python map () широко используется в ряде задач. Одна из таких задач — применить определенную функцию к каждому элементу итерируемых элементов. Точно так же мы можем отобразить все элементы итератора в функцию и передать их как независимые задания в ThreadPoolExecutor . Рассмотрим следующий пример скрипта Python, чтобы понять, как работает функция.

пример

В этом примере ниже функция map используется для применения функции square () к каждому значению в массиве значений.

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
values = [2,3,4,5]
def square(n):
   return n * n
def main():
   with ThreadPoolExecutor(max_workers = 3) as executor:
      results = executor.map(square, values)
for result in results:
      print(result)
if __name__ == '__main__':
   main()

Выход

Приведенный выше скрипт Python генерирует следующий вывод: