Статьи

Функциональное программирование Боже — Python на помощь


Вот ситуация.


Продавец прислал нам три отдельных файла, которые необходимо объединить.
Более 70 000 записей каждая. Это файлы CSV, поэтому
положение столбцов
не имеет большого значения. Столбца
Имя (в строке 1) является то , что имеет значение.

Я посмотрел на три решения.
Два из которых просто в порядке. Третьим было функциональное программирование, которое было очень круто.
Вариант 1 — ОС Сортировка / Слияние

Чтобы привести файлы в согласованный порядок, нам нужно отсортировать.
Однако сортировка Linux смещена в сторону столбцов, которые известны позиционно.

Итак, нам нужно использовать
шаблон
оформления Decorate-Sort-Undecorate . Итак, у нас есть сценарий оболочки что-то вроде следующего.

decorate.py a.csv | sort >a_sorted.csv
decorate.py b.csv | sort >b_sorted.csv
decorate.py c.csv | sort >c_sorted.csv
sort -m a_sorted.csv b_sorted.csv c_sorted.csv | undecorate.py >result.csv

Это хорошо работает, потому что decorate.py и undecorate.py такие простые программы. Вот decorate.py.

from __future__ import print_function
import csv
import sys
with open(sys.argv[1],"rb") as source:
rdr= csv.DictReader( source )
for row in rdr:
  print( row['the key'], row )


Undecorate похож.
Он использует метод
str.partition () для удаления оформления.

Обратите внимание, что начальные шаги «украшения» могут выполняться одновременно, что приводит к некоторому сокращению времени.
Это хорошо масштабируется. Это не использует много памяти; управление параллелизмом ОС означает, что оно использует каждое доступное ядро.

Я не тестировал это, кстати.
Вариант 2 — Большой D-память


Поскольку файлы не очень большие, они помещаются в памяти.
Это тоже довольно просто.

import csv
from collections import defaultdict
# build the result set
result = defaultdict( dict )
for f in ( 'a.csv', 'b.csv', 'c.csv' ):
with open( f, 'rb' ) as source:
  rdr = csv.DictReader( source )
  for row in rdr:
      result[row['key']].update( row )
# find the column titles
keys = set()
for row in result:
  keys |= set( result[row].keys() )
# write the result set
with open( 'output.csv', 'wb' ) as target:
  wtr= csv.DictWriter( target, sorted(keys) )
  wtr.writerow( dict(zip(keys,keys)) )
  for row in result:
      wtr.writerow( result[row] )


Это не так уж плохо.
Однако для безумно больших файлов он не будет хорошо масштабироваться.

Истекшее время для реальных файлов (которые были заархивированы, добавляя обработку, не относящуюся к этой публикации) было 218 секунд на моем маленьком ноутбуке.
Вариант 3 — Функциональное программирование


Подход к функциональному программированию — это немного больше кода, чем вариант 1. Но это круто и очень расширяемо.
Он предлагает больше гибкости без ограничения памяти большого словаря.

Начнем с конца.

Мы делаем слияние из 3 файлов.
Алгоритм слияния двух файлов действительно прост. Однако алгоритм
слияния
n- файлов не так прост. Мы можем легко создать
слияние
n- файлов как композицию из
n- парных слияний.

Вот как это должно выглядеть.
with open('temp.csv','wb') as output:
  wtr= csv.DictWriter( output, sorted(fieldNames) )
  wtr.writerow( dict( zip( fieldNames, fieldNames )))
  for row in merge( merge( s1, s2 ), s3 ):
      wtr.writerow( row )

Мы выполняем
слияние (merge (s1, s2), s3), чтобы составить слияние из 3 файлов из 2 слияний из 2 файлов. И да, это
может быть так просто.

Композитная сортировка


Чтобы быть «компонуемыми», мы должны написать функции итератора, которые читают и записывают данные одного типа.
В нашем случае, так как мы используем
DictReader , наши различные функции должны работать с итерацией над диктовками, что приводит к диктатам.

Для объединения входные данные должны быть отсортированы.
Вот наш составной вид.
def key_sort( source, key='key' ):
  def get_key( x ):
     return int(x[key])
  for row in sorted(source, key=get_key ):
     yield row

Да, нам нужно предварительно обработать ключи, они не являются простым текстом; они числа.

Composable 2-File Merge

Композиционное слияние имеет похожую схему.
Это цикл по входам, и он дает выходы того же типа.

def merge( f1, f2, key='key' ):
"""Merge two sequences of row dictionaries on a key column."""
r1, r2 = None, None
try:
r1= f1.next()
r2= f2.next()
while True:
    if r1[key] == r2[key]:
        r1.update(r2)
        yield r1
        r1, r2 = None, None
        r1= f1.next()
        r2= f2.next()
    elif r1[key] < r2[key]:
        yield r1
        r1= None
        r1= f1.next()
    elif r1[key] > r2[key]:
        yield r2
        r2= None
        r2= f2.next()
    else:
        raise Exception # Yes, this is impossible
except StopIteration:
    pass
if r1 is not None:
    yield r1
    for r1 in f1:
       yield r1
elif r2 is not None:
    yield r2
    for r2 in f2:
        yield r2
else:
    pass # Exhausted with an exact match.


Это работает за 214 секунд.
Не большое улучшение во времени. Тем не менее, улучшение гибкости является выдающимся. И элегантная простота восхитительна. Удивительно, что управление многопоточным состоянием полностью осуществляется через абстракцию функции генератора / итератора.

Кроме того, это демонстрирует, что большая часть времени уходит на чтение сжатых CSV-файлов и запись окончательного выходного CSV-файла.
Фактический алгоритм слияния не доминирует над сложностью.