Очень распространенное сомнение у разработчиков, плохо знакомых с Python, — как правильно использовать его потоки. В частности, большое количество вопросов о StackOverflow показывают, что люди чаще всего борются с двумя аспектами:
- Как остановить / убить поток
- Как безопасно передавать данные в поток и обратно
У меня уже есть блог пост прикосновений по этим вопросам прямо здесь , но я чувствую , что это слишком конкретные задачи для сокетов, и более простой и общий пост было бы уместно. Я предполагаю, что читатель имеет базовые знания о потоках Python, т.е., по крайней мере, изучил документацию .
Итак, без лишних слов, вот пример реализации «рабочего» потока. Можно задавать задачи, где каждая задача является именем каталога, и она выполняет полезную работу. Эта работа рекурсивно перечисляет все файлы, содержащиеся в данном каталоге и его подкаталогах.
import os, time import threading, Queue class WorkerThread(threading.Thread): """ A worker thread that takes directory names from a queue, finds all files in them recursively and reports the result. Input is done by placing directory names (as strings) into the Queue passed in dir_q. Output is done by placing tuples into the Queue passed in result_q. Each tuple is (thread name, dirname, [list of files]). Ask the thread to stop by calling its join() method. """ def __init__(self, dir_q, result_q): super(WorkerThread, self).__init__() self.dir_q = dir_q self.result_q = result_q self.stoprequest = threading.Event() def run(self): # As long as we weren't asked to stop, try to take new tasks from the # queue. The tasks are taken with a blocking 'get', so no CPU # cycles are wasted while waiting. # Also, 'get' is given a timeout, so stoprequest is always checked, # even if there's nothing in the queue. while not self.stoprequest.isSet(): try: dirname = self.dir_q.get(True, 0.05) filenames = list(self._files_in_dir(dirname)) self.result_q.put((self.name, dirname, filenames)) except Queue.Empty: continue def join(self, timeout=None): self.stoprequest.set() super(WorkerThread, self).join(timeout) def _files_in_dir(self, dirname): """ Given a directory name, yields the names of all files (not dirs) contained in this directory and its sub-directories. """ for path, dirs, files in os.walk(dirname): for file in files: yield os.path.join(path, file)
Для опытных программистов должно быть достаточно просмотреть код, его документацию и комментарии. Для людей с меньшим опытом, я уточню. Давайте рассмотрим две проблемы, упомянутые в начале статьи.
Во-первых, убивая нить. Это достигнуто, вежливо прося, чтобы нить умерла. Присоединиться метод Thread переопределен, и перед вызовом фактического присоединиться родительского класса, он «устанавливает» self.stoprequest атрибут, который является threading.Event . Основной цикл в потоке в перспективе метод проверяет этот флаг, и выходит , когда он установлен. Вы можете думать о многопоточности. Событие как синхронизированный логический флаг. Имейте в виду, что метод join вызывается в контексте основного потока, а тело метода run выполняется в контексте рабочего потока.
Во-вторых, передача данных в и из потока. Лучше всего это сделать с помощью объектов Queue из модуля Queue (да, это Queue.Queue — немного неловко, я согласен. В Python 3 это было исправлено, и модуль назван в нижнем регистре — queue ). Когда создается рабочий поток, ему дается ссылка на одну очередь для ввода и одну очередь для вывода. Объекты очереди могут безопасно разделяться между потоками (фактически, любым количеством потоков) и предоставлять синхронизированный интерфейс очереди FIFO.
Вероятно, самая важная часть кода, которую нужно понять, это следующие строки:
while not self.stoprequest.isSet(): try: dirname = self.dir_q.get(True, 0.05) ... # do work except Queue.Empty: continue
Это простой способ ожидания в двух условиях одновременно. Поток работает только в том случае, если его не просят остановить, и в очереди есть работа. Поэтому метод Queue.get используется следующим образом:
- Его аргумент блокировки имеет значение true, что означает, что вызов будет блокировать рабочий поток до тех пор, пока в очереди не появится элемент, или…
- Таймаут аргумент также устанавливается, что означает , что GET будет блокировать для в большинстве таймаут секунд. Если в течение этого времени в очереди не появилось ни одного элемента, метод выдает исключение Queue.Empty .
Итак, у нас есть способ как ждать в очереди, не тратя циклов ЦП ( get использует специальные сервисы ОС, расположенные глубоко под ним, чтобы реализовать ожидание без вращения), так и время от времени проверять событие stoprequest . Однако есть только одна ошибка. Если выполнение работы занимает много времени, следующая проверка стоп-запроса может быть отложена. Если это важно для вашего приложения, подумайте о том, чтобы выполнять работу короткими порциями, проверяя стоп-запрос после каждого порции.
Вот простой код, который использует этот рабочий поток. Он создает пул потоков с 4-мя потоками, передает их работу и ожидает получения всех результатов:
def main(args): # Create a single input and a single output queue for all threads. dir_q = Queue.Queue() result_q = Queue.Queue() # Create the "thread pool" pool = [WorkerThread(dir_q=dir_q, result_q=result_q) for i in range(4)] # Start all threads for thread in pool: thread.start() # Give the workers some work to do work_count = 0 for dir in args: if os.path.exists(dir): work_count += 1 dir_q.put(dir) print 'Assigned %s dirs to workers' % work_count # Now get all the results while work_count > 0: # Blocking 'get' from a Queue. result = result_q.get() print 'From thread %s: %s files found in dir %s' % ( result[0], len(result[2]), result[1]) work_count -= 1 # Ask threads to die and wait for them to do it for thread in pool: thread.join() if __name__ == '__main__': import sys main(sys.argv[1:])
Все рабочие потоки в пуле имеют одинаковую очередь ввода и очередь вывода. Там нет абсолютно никаких проблем с этим. Напротив, как вы можете видеть, это позволяет очень простой реализации пула потоков быть достаточно функциональной.
Пожалуйста, рассматривайте этот образец как шаблон, а не как универсальное решение для всех возможных сценариев многопоточности. Параллелизм — сложная тема, и при проектировании даже самых простых многопоточных программ требуется много компромиссов.
Наконец, обратите внимание, что такие рабочие потоки в Python полезны только в том случае, если работа, которую они выполняют, не связана с процессором. Поток, показанный здесь, является хорошим примером — перечисление каталогов и файлов — это, в основном, задача, связанная с вводом / выводом (ну, в меньшей степени, если у вас действительно быстрый SSD). Другими хорошими кандидатами являются сокетный ввод-вывод, взаимодействие с пользователем и все, что связано с сетью (т. Е. Выборка данных из служб HTTP или RPC).
Связанные с процессором задачи не подходят для потоков Python из-за глобальной блокировки интерпретатора (GIL). Параллельные вычисления в Python должны выполняться в нескольких процессах , а не в потоках. В следующей статье я расскажу, как использовать многопроцессорный модуль для аналогичного управления рабочими процессами.
Похожие сообщения:
- Как (не) установить время ожидания для вычисления в Python
- Python — распараллеливание связанных с процессором задач с многопроцессорностью
- Пример кода — поток клиента сокета в Python
- Создание потоков в Win32 C / C ++ программировании
- потоки, последовательные порты, ядро, emacs и другие овощи