Статьи

Многопроцессорность. Часть 2. Определения классов

Этот пост является второй частью серии статей Стивена Лотта о многопроцессорности.  Прочитайте Часть 1 .

Модуль
многопроцессорной обработки включает в себя общий
класс
Process , который можно использовать для переноса простой функции.

Функция должна быть разработана для работы с очередями, конвейерами или другими методами синхронизации.

Однако есть преимущество в определении класса, который изящно обрабатывает функции генератора. Если у нас есть многопроцессорная обработка с поддержкой генератора, мы можем (1) написать наши алгоритмы как генераторы и затем (2) тривиально связать процессы с очередями для улучшения масштабируемости.

Мы смотрим на создание «конвейеров» обработки с использованием очередей. Таким образом, мы можем легко обрабатывать обработку с несколькими производителями и несколькими потребителями (разветвления, разветвления), что повышает параллелизм.

У нас есть три варианта использования: производитель, потребитель и потребитель-производитель.

Режиссер

Производитель получает данные откуда-то и заполняет ими очередь. Это источник, который подает данные в конвейер.

class ProducerProcess( Process ):
    """Produces items into a Queue.
    
    The "target" must be a generator function which yields
    pickable items.
    """
    def __init__( self, group=None, target=None, name=None, args=None, kwargs=None, output_queue=None, consumers=0 ):
        super( ProducerProcess, self ).__init__( name=name )
        self.target= target
        self.args= args if args is not None else []
        self.kwargs= kwargs if kwargs is not None else {}
        self.output_queue= output_queue
        self.consumers= consumers
    def run( self ):
        target= self.target
        for item in target(*self.args, **self.kwargs):
            self.output_queue.put( item )
        for x in range(self.consumers):
            self.output_queue.put( None )
        self.output_queue.close()

Этот класс обернет «целевую» функцию, которая
должна быть генератором. Каждое полученное значение помещается в «output_queue». Когда исходные данные заканчиваются, в очередь помещается достаточное количество сторожевых токенов, чтобы удовлетворить всех потребителей.

потребитель

Потребитель получает данные из очереди и выполняет некоторую окончательную обработку. Возможно, он загружает базу данных или пишет файл. Это приемник, который потребляет данные в конвейере.

class ConsumerProcess( Process ):
    """Consumes items from a Queue.
    
    The "target" must be a function which expects an iterable as it's
    only argument.  Therefore, the args value is not used here.
    """
    def __init__( self, group=None, target=None, name=None, kwargs=None, input_queue=None, producers=0 ):
        super( ConsumerProcess, self ).__init__( name=name )
        self.target= target
        self.kwargs= kwargs if kwargs is not None else {}
        self.input_queue= input_queue
        self.producers= producers
    def items( self ):
        while self.producers != 0:
            for item in iter( self.input_queue.get, None ):
                yield item
            self.producers -= 1
    def run( self ):
        target= self.target
        target( self.items(), **self.kwargs )

Этот класс обернет «целевую» функцию, которая должна быть готова к работе с любой итерацией. Каждое значение из очереди будет передано целевой функции для обработки. Когда от производителей было получено достаточное количество жетонов, это прекращает обработку.

Потребитель-Producer

Середина конвейера обработки — процессы потребителя-производителя, которые потребляют из одной очереди, а продукты — в другую очередь.

class ConsumerProducerProcess( Process ):
    """Consumes items from a Queue and produces items onto a Queue.
    
    The "target" must be a generator function which yields
    pickable items and which expects an iterable as it's
    only argument.  Therefore, the args value is not used here.
    """
    def __init__( self, group=None, target=None, name=None, kwargs=None, input_queue=None, producers=0, output_queue=None, consumers=0 ):
        super( ConsumerProducerProcess, self ).__init__( name=name )
        self.target= target
        self.kwargs= kwargs if kwargs is not None else {}
        self.input_queue= input_queue
        self.producers= producers
        self.output_queue= output_queue
        self.consumers= consumers
    def items( self ):
        while self.producers != 0:
            for item in iter( self.input_queue.get, None ):
                yield item
            self.producers -= 1
    def run( self ):
        target= self.target
        for item in target(self.items(), **self.kwargs):
            self.output_queue.put( item )
        for x in range(self.consumers):
            self.output_queue.put( None )
        self.output_queue.close()

Этот класс обернет «целевую» функцию, которая должна быть функцией-генератором, которая потребляет итерацию.

Каждое значение из очереди предоставляется целевому генератору. Каждое значение, полученное генератором, отправляется в очередь вывода. Входная сторона подсчитывает дозорных, чтобы знать, когда остановиться. Выходная сторона производит достаточно часовых, чтобы предупредить последующие процессы.

Целевые функции

Функция производителя должна быть функцией генератора этой формы

def prod( *args ):
    for item in some_function(*args):
       yield item

Функция потребителя выглядит так:

def cons( source ):
    for item in source:
       final_disposition(item)

Наконец, функция потребитель-производитель выглядит следующим образом.

def cons_prod( source ):
    for item in source:
       next_value= transform(item)
       yield next_value

Эти функции могут быть протестированы и отлажены следующим образом.

for final in consumer( cons_prod( producer( *args ) ) ):
    print( final )

Таким образом, мы уверены, что наш алгоритм правильный, прежде чем пытаться масштабировать его с помощью многопроцессорной обработки.

Источник: http://slott-softwarearchitect.blogspot.com/2012/02/multiprocessing-goodness-part-2-class.html