Статьи

Распараллеливание связанных с процессором задач с многопроцессорностью в Python

В предыдущем посте о потоках Python я кратко упомянул, что потоки не подходят для задач, связанных с ЦП, и вместо этого следует использовать многопроцессорность. Здесь я хочу продемонстрировать это с помощью показателей производительности, а также показать, что создание нескольких процессов в Python так же просто, как создание нескольких потоков.

Во-первых, давайте выберем простое вычисление, которое будет использоваться для сравнительного анализа. Я не хочу, чтобы это было полностью искусственным, поэтому я буду использовать тупую версию факторизации — разбив число на основные факторы. Вот очень наивная и неоптимизированная функция, которая принимает число и возвращает список факторов:

def factorize_naive(n):
    """ A naive factorization method. Take integer 'n', return list of
        factors.
    """
    if n < 2:
        return []
    factors = []
    p = 2

    while True:
        if n == 1:
            return factors

        r = n % p
        if r == 0:
            factors.append(p)
            n = n / p
        elif p * p >= n:
            factors.append(n)
            return factors
        elif p > 2:
            # Advance in steps of 2 over odd numbers
            p += 2
        else:
            # If p == 2, get to 3
            p += 1
    assert False, "unreachable"

Теперь, в качестве основы для бенчмаркинга, я буду использовать следующий последовательный (однопотоковый) факторизатор, который берет список чисел для факторизации и возвращает dict, отображающий число в свой список факторов:

def serial_factorizer(nums):
    return {n: factorize_naive(n) for n in nums}

Версия с резьбой следует. Также требуется список чисел для разложения, а также количество создаваемых потоков. Затем он разделяет список на части и назначает каждый кусок отдельному потоку:

def threaded_factorizer(nums, nthreads):
    def worker(nums, outdict):
        """ The worker function, invoked in a thread. 'nums' is a
            list of numbers to factor. The results are placed in
            outdict.
        """
        for n in nums:
            outdict[n] = factorize_naive(n)

    # Each thread will get 'chunksize' nums and its own output dict
    chunksize = int(math.ceil(len(nums) / float(nthreads)))
    threads = []
    outs = [{} for i in range(nthreads)]

    for i in range(nthreads):
        # Create each thread, passing it its chunk of numbers to factor
        # and output dict.
        t = threading.Thread(
                target=worker,
                args=(nums[chunksize * i:chunksize * (i + 1)],
                      outs[i]))
        threads.append(t)
        t.start()

    # Wait for all threads to finish
    for t in threads:
        t.join()

    # Merge all partial output dicts into a single dict and return it
    return {k: v for out_d in outs for k, v in out_d.iteritems()}

Обратите внимание, что интерфейс между основным и рабочим потоками очень прост. У каждого рабочего потока есть определенный объем работы, после которого он просто возвращается. Таким образом, единственное, что делает основной поток, это запускает потоки nthreads с подходящими аргументами и затем ожидает их завершения.

Я проверил эталонный анализатор последовательных и потоковых потоков с 2, 4 и 8 потоками. Эталоном был факторизация постоянного набора больших чисел, чтобы минимизировать различия из-за случайных случайностей. Все тесты проводились на моем ноутбуке Ubuntu 10.04 с процессором Intel Core i7-2820MQ (4 физических ядра, гиперпоточные).

Вот результаты:

http://eli.thegreenplace.net/wp-content/uploads/2012/01/serial_vs_threaded.png

Горизонтальная ось — время в секундах, поэтому более короткие столбцы означают более быстрое выполнение. Да, разделение вычислений на несколько потоков на самом деле медленнее, чем последовательная реализация, и чем больше потоков используется, тем медленнее оно становится.

Это может быть немного удивительно, если вы не знакомы с тем, как реализованы потоки Python и GIL (Global Interpreter Lock). Чтобы понять, почему это происходит, вы вряд ли сможете лучше, чем прочитать статьи и презентации Дейва Бизли на эту тему. Его работа настолько обширна и доступна, что я не вижу смысла повторять что-либо из этого (кроме выводов) здесь.

Давайте теперь сделаем то же самое, только с процессами вместо потоков. Отличный многопроцессорный модуль Python делает процессы такими же простыми для запуска и управления, как и потоки. На самом деле, он предоставляет очень похожие API для модуля потоков. Вот многопроцессорный факторизатор:

def mp_factorizer(nums, nprocs):
    def worker(nums, out_q):
        """ The worker function, invoked in a process. 'nums' is a
            list of numbers to factor. The results are placed in
            a dictionary that's pushed to a queue.
        """
        outdict = {}
        for n in nums:
            outdict[n] = factorize_naive(n)
        out_q.put(outdict)

    # Each process will get 'chunksize' nums and a queue to put his out
    # dict into
    out_q = Queue()
    chunksize = int(math.ceil(len(nums) / float(nprocs)))
    procs = []

    for i in range(nprocs):
        p = multiprocessing.Process(
                target=worker,
                args=(nums[chunksize * i:chunksize * (i + 1)],
                      out_q))
        procs.append(p)
        p.start()

    # Collect all results into a single result dict. We know how many dicts
    # with results to expect.
    resultdict = {}
    for i in range(nprocs):
        resultdict.update(out_q.get())

    # Wait for all worker processes to finish
    for p in procs:
        p.join()

    return resultdict

Единственная реальная разница здесь по сравнению с потоковым решением заключается в том, что выходные данные передаются от рабочего к основному потоку / процессу. С помощью многопроцессорной обработки мы не можем просто передать диктант подпроцессу и ожидать, что его модификации будут видны в другом процессе. Есть несколько подходов для решения этой проблемы. Одним из них является использование синхронизированного словаря из multiprocessing.managers.SyncManager. Тот, который я выбрал, — просто создать очередь для каждого процесса. Рабочий процесс помещает свои выходные данные в свою очередь, и эта очередь видна из основного процесса. Я решил создать одну очередь для каждого рабочего процесса, а не одну очередь, общую для всех процессов, чтобы минимизировать возможные конфликты.

Я запустил тот же тест, добавив время выполнения из mp_factorizer на график:

http://eli.thegreenplace.net/wp-content/uploads/2012/01/serial_vs_threaded_vs_mp.png

Как видите, есть хорошие ускорения. Самая быстрая многопроцессорная версия (разделенная на 8 процессов) работает в 3,1 раза быстрее, чем серийная версия. Хотя мой ЦП имеет только 4 физических ядра (и пара аппаратных «потоков» в каждом ядре совместно использует много ресурсов выполнения), 8-процессовая версия работает быстрее, что, вероятно, связано с тем, что ОС не выделяет ЦП оптимально между «тяжелыми» задачами. Другая причина того, что ускорение немного отличается от 4х, заключается в том, что работа не распределяется равномерно между подпроцессами. Некоторые цифры значительно быстрее, чем другие, и в настоящее время не уделяется внимания распределению нагрузки между рабочими.

Это интересные темы для изучения, но выходящие за рамки этого поста. Для наших нужд лучший совет — провести тесты и выбрать лучшую стратегию распараллеливания в соответствии с результатами.

Цель этого поста была двоякой. Во-первых, чтобы легко продемонстрировать, как потоки Python вредны для ускорения вычислений с привязкой к процессору (на самом деле они очень хороши для замедления их!), В то время как многопроцессорная обработка использует многоядерный процессор параллельно, как и ожидалось. , Во-вторых, чтобы показать, что многопроцессорность делает написание параллельного кода так же просто, как использование потоков. При синхронизации объектов между процессами нужно выполнить немного больше работы, чем между потоками, но в остальном это очень похожий код. И если вы спросите меня, что синхронизация объектов сложнее, это хорошо , потому что чем меньше объектов, тем лучше. Это основная причина, почему многопроцессное программирование часто считается более безопасным и менее подверженным ошибкам, чем многопоточное программирование.

 

Источник: http://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/