Статьи

Темы Python: связь и остановка

Очень распространенное сомнение у разработчиков, плохо знакомых с Python, — как правильно использовать его потоки. В частности, большое количество вопросов о StackOverflow показывают, что люди чаще всего борются с двумя аспектами:

  1. Как остановить / убить поток
  2. Как безопасно передавать данные в поток и обратно

У меня уже есть блог пост прикосновений по этим вопросам прямо здесь , но я чувствую , что это слишком конкретные задачи для сокетов, и более простой и общий пост было бы уместно. Я предполагаю, что читатель имеет базовые знания о потоках Python, т.е., по крайней мере, изучил документацию .

Итак, без лишних слов, вот пример реализации «рабочего» потока. Можно задавать задачи, где каждая задача является именем каталога, и она выполняет полезную работу. Эта работа рекурсивно перечисляет все файлы, содержащиеся в данном каталоге и его подкаталогах.

import os, time
import threading, Queue

class WorkerThread(threading.Thread):
    """ A worker thread that takes directory names from a queue, finds all
        files in them recursively and reports the result.

        Input is done by placing directory names (as strings) into the
        Queue passed in dir_q.

        Output is done by placing tuples into the Queue passed in result_q.
        Each tuple is (thread name, dirname, [list of files]).

        Ask the thread to stop by calling its join() method.
    """
    def __init__(self, dir_q, result_q):
        super(WorkerThread, self).__init__()
        self.dir_q = dir_q
        self.result_q = result_q
        self.stoprequest = threading.Event()

    def run(self):
        # As long as we weren't asked to stop, try to take new tasks from the
        # queue. The tasks are taken with a blocking 'get', so no CPU
        # cycles are wasted while waiting.
        # Also, 'get' is given a timeout, so stoprequest is always checked,
        # even if there's nothing in the queue.
        while not self.stoprequest.isSet():
            try:
                dirname = self.dir_q.get(True, 0.05)
                filenames = list(self._files_in_dir(dirname))
                self.result_q.put((self.name, dirname, filenames))
            except Queue.Empty:
                continue

    def join(self, timeout=None):
        self.stoprequest.set()
        super(WorkerThread, self).join(timeout)

    def _files_in_dir(self, dirname):
        """ Given a directory name, yields the names of all files (not dirs)
            contained in this directory and its sub-directories.
        """
        for path, dirs, files in os.walk(dirname):
            for file in files:
                yield os.path.join(path, file)

Для опытных программистов должно быть достаточно просмотреть код, его документацию и комментарии. Для людей с меньшим опытом, я уточню. Давайте рассмотрим две проблемы, упомянутые в начале статьи.

Во-первых, убивая нить. Это достигнуто, вежливо прося, чтобы нить умерла. Присоединиться метод Thread переопределен, и перед вызовом фактического присоединиться родительского класса, он «устанавливает» self.stoprequest атрибут, который является threading.Event . Основной цикл в потоке в перспективе метод проверяет этот флаг, и выходит , когда он установлен. Вы можете думать о многопоточности. Событие как синхронизированный логический флаг. Имейте в виду, что метод join вызывается в контексте основного потока, а тело метода run выполняется в контексте рабочего потока.

Во-вторых, передача данных в и из потока. Лучше всего это сделать с помощью объектов Queue из модуля Queue (да, это Queue.Queue — немного неловко, я согласен. В Python 3 это было исправлено, и модуль назван в нижнем регистре — queue ). Когда создается рабочий поток, ему дается ссылка на одну очередь для ввода и одну очередь для вывода. Объекты очереди могут безопасно разделяться между потоками (фактически, любым количеством потоков) и предоставлять синхронизированный интерфейс очереди FIFO.

Вероятно, самая важная часть кода, которую нужно понять, это следующие строки:

while not self.stoprequest.isSet():
    try:
        dirname = self.dir_q.get(True, 0.05)
        ... # do work
    except Queue.Empty:
        continue

Это простой способ ожидания в двух условиях одновременно. Поток работает только в том случае, если его не просят остановить, и в очереди есть работа. Поэтому метод Queue.get используется следующим образом:

  • Его аргумент блокировки имеет значение true, что означает, что вызов будет блокировать рабочий поток до тех пор, пока в очереди не появится элемент, или…
  • Таймаут аргумент также устанавливается, что означает , что GET будет блокировать для в большинстве таймаут секунд. Если в течение этого времени в очереди не появилось ни одного элемента, метод выдает исключение Queue.Empty .

Итак, у нас есть способ как ждать в очереди, не тратя циклов ЦП ( get использует специальные сервисы ОС, расположенные глубоко под ним, чтобы реализовать ожидание без вращения), так и время от времени проверять событие stoprequest . Однако есть только одна ошибка. Если выполнение работы занимает много времени, следующая проверка стоп-запроса может быть отложена. Если это важно для вашего приложения, подумайте о том, чтобы выполнять работу короткими порциями, проверяя стоп-запрос после каждого порции.

Вот простой код, который использует этот рабочий поток. Он создает пул потоков с 4-мя потоками, передает их работу и ожидает получения всех результатов:

def main(args):
    # Create a single input and a single output queue for all threads.
    dir_q = Queue.Queue()
    result_q = Queue.Queue()

    # Create the "thread pool"
    pool = [WorkerThread(dir_q=dir_q, result_q=result_q) for i in range(4)]

    # Start all threads
    for thread in pool:
        thread.start()

    # Give the workers some work to do
    work_count = 0
    for dir in args:
        if os.path.exists(dir):
            work_count += 1
            dir_q.put(dir)

    print 'Assigned %s dirs to workers' % work_count

    # Now get all the results
    while work_count > 0:
        # Blocking 'get' from a Queue.
        result = result_q.get()
        print 'From thread %s: %s files found in dir %s' % (
            result[0], len(result[2]), result[1])
        work_count -= 1

    # Ask threads to die and wait for them to do it
    for thread in pool:
        thread.join()

if __name__ == '__main__':
    import sys
    main(sys.argv[1:])

Все рабочие потоки в пуле имеют одинаковую очередь ввода и очередь вывода. Там нет абсолютно никаких проблем с этим. Напротив, как вы можете видеть, это позволяет очень простой реализации пула потоков быть достаточно функциональной.

Пожалуйста, рассматривайте этот образец как шаблон, а не как универсальное решение для всех возможных сценариев многопоточности. Параллелизм — сложная тема, и при проектировании даже самых простых многопоточных программ требуется много компромиссов.

Наконец, обратите внимание, что такие рабочие потоки в Python полезны только в том случае, если работа, которую они выполняют, не связана с процессором. Поток, показанный здесь, является хорошим примером — перечисление каталогов и файлов — это, в основном, задача, связанная с вводом / выводом (ну, в меньшей степени, если у вас действительно быстрый SSD). Другими хорошими кандидатами являются сокетный ввод-вывод, взаимодействие с пользователем и все, что связано с сетью (т. Е. Выборка данных из служб HTTP или RPC).

Связанные с процессором задачи не подходят для потоков Python из-за глобальной блокировки интерпретатора (GIL). Параллельные вычисления в Python должны выполняться в нескольких процессах , а не в потоках. В следующей статье я расскажу, как использовать многопроцессорный модуль для аналогичного управления рабочими процессами.

Похожие сообщения:

  1. Как (не) установить время ожидания для вычисления в Python
  2. Python — распараллеливание связанных с процессором задач с многопроцессорностью
  3. Пример кода — поток клиента сокета в Python
  4. Создание потоков в Win32 C / C ++ программировании
  5. потоки, последовательные порты, ядро, emacs и другие овощи