Статьи

Исследовательский анализ данных в функциональном стиле Python

Вот несколько приемов работы с выдержками из файла журнала. Мы смотрим на некоторые выдержки Enterprise Splunk. Мы можем возиться со Splunk, пытаясь изучить данные. Или мы можем получить простое извлечение и возиться с данными в Python.

Проведение различных экспериментов в Python кажется более эффективным, чем попытка провести такую ​​исследовательскую игру в Splunk. Прежде всего потому, что нет никаких границ в том, что мы можем сделать с данными. Мы можем создать очень сложные статистические модели в одном месте.

Теоретически, мы можем сделать много исследований в Splunk. Имеет множество отчетных и аналитических функций.

Но…

Используя Splunk, мы знаем, что ищем. Во многих случаях мы не знаем, что мы ищем: мы исследуем. У нас могут быть некоторые признаки того, что несколько транзакций API RESTful медленны, но не намного Как мы продолжаем?

Шаг первый — получить необработанные данные в формате CSV. Что теперь?

Чтение необработанных данных

Начнем с обертывания объекта CSV.DictReader некоторыми дополнительными функциями.

Объектно-ориентированные пуристы будут возражать против этой стратегии. «Почему бы просто не расширить DictReader?» они спрашивают. У меня нет отличного ответа. Я склоняюсь к функциональному программированию и получающейся в результате ортогональности компонентов. При чисто ОО-подходе мы должны использовать более сложные миксины для достижения этой цели.

Наша общая структура для обработки журналов такова.

with open("somefile.csv") as source:
    rdr = csv.DictReader(source)
  

Это позволяет нам читать выдержку в формате CSV в формате CSV. Мы можем перебирать строки в читателе. Вот трюк № 1. Это не действительно очень сложно, но мне это нравится.

with open("somefile.csv") as source:
    rdr = csv.DictReader(source)
    for row in rdr:
        print( "{host} {ResponseTime} {source} {Service}".format_map(row) )
  

Мы можем — в ограниченной степени — представлять необработанные данные в удобном формате. Если мы хотим украсить вывод, мы можем изменить формат строки. Может быть «{host: 30s} {ReponseTime: 8s} {source: s}» или что-то в этом роде.

фильтрация

Распространенной ситуацией является то, что мы извлекли слишком много, и нам нужно только увидеть подмножество. Мы можем изменить фильтр Splunk, но мы ненавидим чрезмерную загрузку, прежде чем мы закончим наше исследование. Гораздо проще фильтровать в Python. Как только мы узнали, что нам нужно, мы можем завершить в Splunk.

with open("somefile.csv") as source:
    rdr = csv.DictReader(source)
    rdr_perf_log = (row for row in rdr if row['source'] == 'perf_log')
    for row in rdr_perf_log:
        print( "{host} {ResponseTime} {Service}".format_map(row) )
  

Мы внедрили выражение-генератор, которое будет фильтровать исходные строки, что позволит нам работать со значимым подмножеством.

проекция

В некоторых случаях у нас будут дополнительные столбцы исходных данных, которые мы на самом деле не хотим использовать. Мы удалим эти данные, сделав прогноз каждой строки.

В принципе, Splunk никогда не создает пустой столбец. Однако журналы RESTful API могут привести к наборам данных с огромным количеством уникальных заголовков столбцов, основанных на суррогатных ключах, которые являются частью URI запроса. В этих столбцах будет одна строка данных из одного запроса, который использовал этот суррогатный ключ. Для каждого второго ряда в этом столбце нет ничего полезного. Жизнь намного проще, если мы уберем пустые столбцы из каждой строки.

Мы также можем сделать это с помощью выражения генератора, но оно становится немного длиннее. Функция генератора несколько проще для чтения.

def project(reader):
    for row in reader:
        yield {k:v for k,v in row.items() if v}
  

Мы создали новый словарь строк из подмножества элементов в исходном считывателе. Мы можем использовать это, чтобы обернуть вывод нашего фильтра.

with open("somefile.csv") as source:
    rdr = csv.DictReader(source)
    rdr_perf_log = (row for row in rdr if row['source'] == 'perf_log')
    for row in project(rdr_perf_log):
        print( "{host} {ResponseTime} {Service}".format_map(row) )
  

Это уменьшит неиспользуемые столбцы, которые видны внутри оператора for.

Изменение обозначения

Запись строки [‘source’] станет неуклюжей. Работать с типами гораздо приятнее. Простое пространство имен, чем словарь. Это позволяет нам использовать row.source.

Вот крутой трюк, чтобы создать что-то более полезное.

rdr_ns = (types.SimpleNamespace(**row) for row in reader)
    

Мы можем сложить это в нашу последовательность шагов, как это.

with open("somefile.csv") as source:
    rdr = csv.DictReader(source)
    rdr_perf_log = (row for row in rdr if row['source'] == 'perf_log')
    rdr_proj = project(rdr_perf_log)
    rdr_ns = (types.SimpleNamespace(**row) for row in rdr_proj)
    for row in rdr_ns:
        print( "{host} {ResponseTime} {Service}".format_map(vars(row)) )
    

Обратите внимание на небольшое изменение нашего метода format_map (). Мы добавили функцию vars () для извлечения словаря из атрибутов SimpleNamespace.

Мы могли бы написать это как функцию, чтобы сохранить синтаксическую симметрию с другими функциями.

def ns_reader(reader):
    return (types.SimpleNamespace(**row) for row in reader)
    

Действительно, мы могли бы написать это как лямбда-конструкцию, которая используется как функция.

ns_reader = lambda reader: (types.SimpleNamespace(**row) for row in reader)
    

Хотя функции ns_reader () и лямбда-ns_reader () используются одинаково, написать строку документа и модульный тест doctest для лямбды немного сложнее. По этой причине, лямбда, вероятно, следует избегать.

Мы можем использовать карту (лямбда-строка: types.SimpleNamespace (** row), читатель). Некоторые люди предпочитают это перед выражением генератора.

Мы могли бы использовать правильный оператор for с внутренним оператором yield, но, кажется, нет ничего полезного в том, чтобы делать большие операторы из мелочей.

У нас много вариантов, потому что Python предлагает так много функциональных возможностей программирования. Мы не часто рассматриваем Python как функциональный язык. Тем не менее, у нас есть множество способов справиться с простым отображением.

Сопоставления: преобразования и производные данные

У нас часто будет список преобразований данных, которые довольно очевидны. Кроме того, у нас будет растущий список элементов производных данных. Производные элементы будут динамическими и основаны на различных гипотезах, которые мы проверяем. Каждый раз, когда у нас есть эксперимент или вопрос, мы можем изменить полученные данные.

Каждый из этих этапов: фильтрация, проекция, преобразование и деривация являются этапами в части «карта» конвейера сокращения карты. Мы могли бы создать ряд небольших функций и применить их с помощью map (). Поскольку мы обновляем объект с состоянием, мы не можем использовать функцию общего map (). Если бы мы хотели добиться более чистого функционального стиля программирования, мы бы использовали неизменяемый namedtuple вместо изменяемого SimpleNamespace.

def convert(reader):
    for row in reader:
        row._time = datetime.datetime.strptime(row.Time, "%Y-%m-%dT%H:%M:%S.%F%Z")
        row.response_time = float(row.ResponseTime)
        yield row
    

По мере изучения, мы будем корректировать тело этой функции преобразования. Возможно, мы начнем с некоторого минимального набора преобразований и дериваций. Мы расширим это с некоторыми «это правильно?» вид вещей. Мы достанем немного, когда обнаружим, что они не работают.

Наша общая обработка выглядит так:

with open("somefile.csv") as source:
    rdr = csv.DictReader(source)
    rdr_perf_log = (row for row in rdr if row['source'] == 'perf_log')
    rdr_proj = project(rdr_perf_log)
    rdr_ns = (types.SimpleNamespace(**row) for row in rdr_proj)
    rdr_converted = convert(rdr_ns)
    for row in rdr_converted:
        row.start_time = row._time - datetime.timedelta(seconds=row.response_time)
        row.service = some_mapping(row.Service)
        print( "{host:30s} {start_time:%H:%M:%S} {response_time:6.3f} {service}".format_map(vars(row)) )
    

Обратите внимание, что изменения в теле нашего оператора for. Наша функция convert () выдает значения, в которых мы уверены. Мы добавили несколько дополнительных переменных в цикл for, в которых мы не уверены на 100%. Посмотрим, будут ли они полезными (или даже правильными) перед обновлением функции convert ().

Сокращения

Когда дело доходит до сокращений, мы можем использовать немного другой стиль обработки. Нам нужно реорганизовать наш предыдущий пример и превратить его в функцию генератора.

def converted_log(some_file):
    with open(some_file) as source:
        rdr = csv.DictReader(source)
        rdr_perf_log = (row for row in rdr if row['source'] == 'perf_log')
        rdr_proj = project(rdr_perf_log)
        rdr_ns = (types.SimpleNamespace(**row) for row in rdr_proj)
        rdr_converted = convert(rdr_ns)
        for row in rdr_converted:
            row.start_time = row._time - datetime.timedelta(seconds=row.response_time)
            row.service = some_mapping(row.Service)
            yield row

Мы заменили print () на выход.

Вот другая часть этого рефакторинга.

for row in converted_log("somefile.csv"):
    print( "{host:30s} {start_time:%H:%M:%S} {response_time:6.3f} {service}".format_map(vars(row)) )
    

В идеале все наши программы выглядят так. Мы используем функцию генератора для получения данных. Окончательное отображение данных хранится совершенно отдельно. Это позволяет нам более свободно осуществлять рефакторинг и изменять обработку.

Теперь мы можем делать такие вещи, как собирать строки в объекты Counter () или, возможно, вычислять некоторую статистику. Мы можем использовать defaultdict (список) для группировки строк по сервису.

by_service= defaultdict(list)
for row in converted_log("somefile.csv"):
    by_service[row.service] = row.response_time
for svc in sorted(by_service):
    m = statistics.mean( by_service[svc] )
    print( "{svc:15s} {m:.2f}".format_map(vars()) )
    

Мы решили создать конкретный список объектов здесь. Мы можем использовать itertools для группировки времени ответа по сервису. Это похоже на правильное функциональное программирование, но реализация указывает на некоторые ограничения в Pythonic форме функционального программирования. Либо мы должны сортировать данные (создавая объект списка), либо мы должны создавать списки по мере группировки данных. Чтобы сделать несколько разных статистических данных, часто проще сгруппировать данные, создав конкретные списки.

Вместо того, чтобы просто печатать объект строки, мы сейчас делаем две вещи.

  1. Создайте несколько локальных переменных, таких как svc и m. Мы можем легко добавить дисперсию или другие меры.
  2. Используйте функцию vars () без аргументов, которая создает словарь из локальных переменных.

Такое использование vars () без аргументов, которое ведет себя как locals (), является удобным трюком. Это позволяет нам просто создавать любые локальные переменные, которые мы хотим, и включать их в форматированный вывод. Мы можем взломать столько разных видов статистических показателей, сколько мы считаем актуальными.

Теперь, когда наш основной цикл обработки предназначен для строки в convert_log («somefile.csv»), мы можем исследовать множество альтернатив обработки в крошечном, легко модифицируемом скрипте. Мы можем изучить ряд гипотез, чтобы определить, почему некоторые транзакции API RESTful выполняются медленно, а другие — быстро.