Вот ситуация.
Продавец прислал нам три отдельных файла, которые необходимо объединить. Более 70 000 записей каждая. Это файлы CSV, поэтому положение столбцов
не имеет большого значения. Столбца
Имя (в строке 1) является то , что имеет значение.
Я посмотрел на три решения. Два из которых просто в порядке. Третьим было функциональное программирование, которое было очень круто.
Вариант 1 — ОС Сортировка / Слияние
Чтобы привести файлы в согласованный порядок, нам нужно отсортировать. Однако сортировка Linux смещена в сторону столбцов, которые известны позиционно.
Итак, нам нужно использовать
шаблон оформления Decorate-Sort-Undecorate . Итак, у нас есть сценарий оболочки что-то вроде следующего.
decorate.py a.csv | sort >a_sorted.csv decorate.py b.csv | sort >b_sorted.csv decorate.py c.csv | sort >c_sorted.csv sort -m a_sorted.csv b_sorted.csv c_sorted.csv | undecorate.py >result.csv
Это хорошо работает, потому что decorate.py и undecorate.py такие простые программы. Вот decorate.py.
from __future__ import print_function import csv import sys with open(sys.argv[1],"rb") as source: rdr= csv.DictReader( source ) for row in rdr: print( row['the key'], row )
Undecorate похож. Он использует метод
str.partition () для удаления оформления.
Обратите внимание, что начальные шаги «украшения» могут выполняться одновременно, что приводит к некоторому сокращению времени. Это хорошо масштабируется. Это не использует много памяти; управление параллелизмом ОС означает, что оно использует каждое доступное ядро.
Я не тестировал это, кстати.
Вариант 2 — Большой D-память
Поскольку файлы не очень большие, они помещаются в памяти. Это тоже довольно просто.
import csv from collections import defaultdict # build the result set result = defaultdict( dict ) for f in ( 'a.csv', 'b.csv', 'c.csv' ): with open( f, 'rb' ) as source: rdr = csv.DictReader( source ) for row in rdr: result[row['key']].update( row ) # find the column titles keys = set() for row in result: keys |= set( result[row].keys() ) # write the result set with open( 'output.csv', 'wb' ) as target: wtr= csv.DictWriter( target, sorted(keys) ) wtr.writerow( dict(zip(keys,keys)) ) for row in result: wtr.writerow( result[row] )
Это не так уж плохо. Однако для безумно больших файлов он не будет хорошо масштабироваться.
Истекшее время для реальных файлов (которые были заархивированы, добавляя обработку, не относящуюся к этой публикации) было 218 секунд на моем маленьком ноутбуке.
Вариант 3 — Функциональное программирование
Подход к функциональному программированию — это немного больше кода, чем вариант 1. Но это круто и очень расширяемо. Он предлагает больше гибкости без ограничения памяти большого словаря.
Начнем с конца.
Мы делаем слияние из 3 файлов. Алгоритм слияния двух файлов действительно прост. Однако алгоритм
слияния n- файлов не так прост. Мы можем легко создать
слияние n- файлов как композицию из
n- парных слияний.
Вот как это должно выглядеть.
with open('temp.csv','wb') as output: wtr= csv.DictWriter( output, sorted(fieldNames) ) wtr.writerow( dict( zip( fieldNames, fieldNames ))) for row in merge( merge( s1, s2 ), s3 ): wtr.writerow( row )
Мы выполняем
слияние (merge (s1, s2), s3), чтобы составить слияние из 3 файлов из 2 слияний из 2 файлов. И да, это
может быть так просто.
Композитная сортировка
Чтобы быть «компонуемыми», мы должны написать функции итератора, которые читают и записывают данные одного типа. В нашем случае, так как мы используем
DictReader , наши различные функции должны работать с итерацией над диктовками, что приводит к диктатам.
Для объединения входные данные должны быть отсортированы. Вот наш составной вид.
def key_sort( source, key='key' ): def get_key( x ): return int(x[key]) for row in sorted(source, key=get_key ): yield row
Да, нам нужно предварительно обработать ключи, они не являются простым текстом; они числа.
Composable 2-File Merge
Композиционное слияние имеет похожую схему. Это цикл по входам, и он дает выходы того же типа.
def merge( f1, f2, key='key' ): """Merge two sequences of row dictionaries on a key column.""" r1, r2 = None, None try: r1= f1.next() r2= f2.next() while True: if r1[key] == r2[key]: r1.update(r2) yield r1 r1, r2 = None, None r1= f1.next() r2= f2.next() elif r1[key] < r2[key]: yield r1 r1= None r1= f1.next() elif r1[key] > r2[key]: yield r2 r2= None r2= f2.next() else: raise Exception # Yes, this is impossible except StopIteration: pass if r1 is not None: yield r1 for r1 in f1: yield r1 elif r2 is not None: yield r2 for r2 in f2: yield r2 else: pass # Exhausted with an exact match.
Это работает за 214 секунд. Не большое улучшение во времени. Тем не менее, улучшение гибкости является выдающимся. И элегантная простота восхитительна. Удивительно, что управление многопоточным состоянием полностью осуществляется через абстракцию функции генератора / итератора.
Кроме того, это демонстрирует, что большая часть времени уходит на чтение сжатых CSV-файлов и запись окончательного выходного CSV-файла. Фактический алгоритм слияния не доминирует над сложностью.