Статьи

Повышение производительности агрегатора кормов

В этой публикации я покажу вам, как использовать функции Task Parallel Library (TPL) и функции PLINQ для повышения производительности простого агрегатора RSS-каналов. Я буду использовать только самые базовые классы .NET, с которых почти каждый разработчик начинает изучать параллельное программирование. Конечно, мы также измерим, как каждая оптимизация влияет на производительность агрегатора кормов.

Кормовой агрегатор

Наш агрегатор кормов работает следующим образом:

  1. Загрузить список блогов
  2. Скачать RSS-ленту
  3. Разобрать фид XML
  4. Добавить новые сообщения в базу данных

Наш агрегатор каналов запускается планировщиком задач через каждые 15 минут в качестве примера.

Мы начнем наше путешествие с серийной реализации агрегатора кормов. Второй шаг — использовать параллелизм задач и распараллеливать загрузку и анализ каналов. И наш последний шаг — использовать параллелизм данных для распараллеливания операций базы данных.

Мы будем использовать класс Секундомер, чтобы измерить, сколько времени требуется агрегатору для загрузки и вставки всех сообщений из всех зарегистрированных блогов. После каждого запуска мы очищаем таблицу сообщений в базе данных.

Последовательная агрегация

Прежде чем приступить к параллельной работе, давайте взглянем на последовательную реализацию агрегатора каналов. Все задачи выполняются одна за другой.

internal class FeedClient

{

    private readonly INewsService _newsService;

    private const int FeedItemContentMaxLength = 255;

 

    public FeedClient()

    {

         ObjectFactory.Initialize(container =>

         {

             container.PullConfigurationFromAppConfig = true;

         });

 

        _newsService = ObjectFactory.GetInstance<INewsService>();

    }

 

    public void Execute()

    {

        var blogs = _newsService.ListPublishedBlogs();

 

        for (var index = 0; index <blogs.Count; index++)

        {

             ImportFeed(blogs[index]);

        }

    }

 

    private void ImportFeed(BlogDto blog)

    {

        if(blog == null)

            return;

        if (string.IsNullOrEmpty(blog.RssUrl))

            return;

 

        var uri = new Uri(blog.RssUrl);

        SyndicationContentFormat feedFormat;

 

        feedFormat = SyndicationDiscoveryUtility.SyndicationContentFormatGet(uri);

 

        if (feedFormat == SyndicationContentFormat.Rss)

            ImportRssFeed(blog);

        if (feedFormat == SyndicationContentFormat.Atom)

            ImportAtomFeed(blog);            

    }

 

    private void ImportRssFeed(BlogDto blog)

    {

        var uri = new Uri(blog.RssUrl);

        var feed = RssFeed.Create(uri);

 

        foreach (var item in feed.Channel.Items)

        {

            SaveRssFeedItem(item, blog.Id, blog.CreatedById);

        }

    }

 

    private void ImportAtomFeed(BlogDto blog)

    {

        var uri = new Uri(blog.RssUrl);

        var feed = AtomFeed.Create(uri);

 

        foreach (var item in feed.Entries)

        {

            SaveAtomFeedEntry(item, blog.Id, blog.CreatedById);

        }

    }

}

Последовательная реализация агрегатора каналов загружает и вставляет все сообщения за 25,46 секунды.

Задача параллелизма

Параллелизм задач означает, что отдельные задачи выполняются параллельно. Вы можете узнать больше о параллелизме задач на страницах MSDN Страница Параллелизм задач (Task Parallel Library) и страница Википедии Параллелизм задач . Хотя поиск частей кода, которые могут безопасно выполняться параллельно без проблем синхронизации, — задача не из легких, на этот раз нам повезло. Импорт и анализ каналов — идеальный кандидат для параллельных задач.

Мы можем безопасно распараллелить импорт фидов, поскольку задачи импорта не разделяют ресурсы и, следовательно, им также не требуется синхронизация. После получения списка блогов мы перебираем коллекцию и запускаем новое задание TPL для каждого объединения каналов.

internal class FeedClient

{

    private readonly INewsService _newsService;

    private const int FeedItemContentMaxLength = 255;

 

    public FeedClient()

    {

         ObjectFactory.Initialize(container =>

         {

             container.PullConfigurationFromAppConfig = true;

         });

 

        _newsService = ObjectFactory.GetInstance<INewsService>();

    }

 

    public void Execute()

    {

        var blogs = _newsService.ListPublishedBlogs();       

        var tasks = new Task[blogs.Count];

 

        for (var index = 0; index <blogs.Count; index++)

        {

            tasks[index] = new Task(ImportFeed, blogs[index]);

            tasks[index].Start();

        }

 

        Task.WaitAll(tasks);

    }

 

    private void ImportFeed(object blogObject)

    {

        if(blogObject == null)

            return;

        var blog = (BlogDto)blogObject;

        if (string.IsNullOrEmpty(blog.RssUrl))

            return;

 

        var uri = new Uri(blog.RssUrl);

        SyndicationContentFormat feedFormat;

 

        feedFormat = SyndicationDiscoveryUtility.SyndicationContentFormatGet(uri);

 

        if (feedFormat == SyndicationContentFormat.Rss)

            ImportRssFeed(blog);

        if (feedFormat == SyndicationContentFormat.Atom)

            ImportAtomFeed(blog);           

    }

 

    private void ImportRssFeed(BlogDto blog)

    {

         var uri = new Uri(blog.RssUrl);

         var feed = RssFeed.Create(uri);

 

        foreach (var item in feed.Channel.Items)

         {

             SaveRssFeedItem(item, blog.Id, blog.CreatedById);

         }

    }

    private void ImportAtomFeed(BlogDto blog)

    {

        var uri = new Uri(blog.RssUrl);

        var feed = AtomFeed.Create(uri);

 

        foreach (var item in feed.Entries)

        {

            SaveAtomFeedEntry(item, blog.Id, blog.CreatedById);

        }

    }

}

Вы должны заметить первые признаки силы TPL. Мы внесли только небольшие изменения в наш код, чтобы распараллелить агрегирование фидов блогов. На моей машине эта модификация дает некоторое повышение производительности — время составляет 17,57 секунды.

Параллелизм данных

Есть еще один способ распараллеливания действий. В предыдущем разделе был представлен параллелизм на основе задач или операций, в этом разделе представлен параллелизм на основе данных. На странице MSDN Параллелизм данных (Task Parallel Library) параллелизм данных относится к сценарию, в котором одна и та же операция выполняется одновременно с элементами в исходной коллекции или массиве.

В нашем коде у нас есть независимые коллекции, которые мы можем обрабатывать параллельно — импортированные записи фидов. Поскольку проверка наличия записи в фиде и вставка ее, если она отсутствует в базе данных, не влияет на другие записи, импортированная коллекция записей фида является идеальным кандидатом для распараллеливания.

internal class FeedClient

{

    private readonly INewsService _newsService;

    private const int FeedItemContentMaxLength = 255;

 

    public FeedClient()

    {

         ObjectFactory.Initialize(container =>

         {

             container.PullConfigurationFromAppConfig = true;

         });

 

        _newsService = ObjectFactory.GetInstance<INewsService>();

    }

 

    public void Execute()

    {

        var blogs = _newsService.ListPublishedBlogs();       

        var tasks = new Task[blogs.Count];

 

        for (var index = 0; index <blogs.Count; index++)

        {

            tasks[index] = new Task(ImportFeed, blogs[index]);

            tasks[index].Start();

        }

 

        Task.WaitAll(tasks);

    }

 

    private void ImportFeed(object blogObject)

    {

        if(blogObject == null)

            return;

        var blog = (BlogDto)blogObject;

        if (string.IsNullOrEmpty(blog.RssUrl))

            return;

 

        var uri = new Uri(blog.RssUrl);

        SyndicationContentFormat feedFormat;

 

        feedFormat = SyndicationDiscoveryUtility.SyndicationContentFormatGet(uri);

 

        if (feedFormat == SyndicationContentFormat.Rss)

            ImportRssFeed(blog);

        if (feedFormat == SyndicationContentFormat.Atom)

            ImportAtomFeed(blog);           

    }

 

    private void ImportRssFeed(BlogDto blog)

    {

        var uri = new Uri(blog.RssUrl);

        var feed = RssFeed.Create(uri);

 

        feed.Channel.Items.AsParallel().ForAll(a =>

        {

            SaveRssFeedItem(a, blog.Id, blog.CreatedById);

        });

     }

 

     private void ImportAtomFeed(BlogDto blog)

     {

        var uri = new Uri(blog.RssUrl);

        var feed = AtomFeed.Create(uri);

 

        feed.Entries.AsParallel().ForAll(a =>

        {

             SaveAtomFeedEntry(a, blog.Id, blog.CreatedById);

        });

     }

}

Мы снова сделали небольшие изменения, и в результате мы распараллелили проверку и сохранение элементов фида. Это изменение было ориентировано на данные, поскольку мы применили одну и ту же операцию ко всем элементам в коллекции. На моей машине я снова получил лучшую производительность. Время сейчас 11,22 секунды.

Полученные результаты

Давайте представим результаты наших измерений (числа даны в секундах).

Результаты агрегации фидов

Как мы видим, с параллелизмом задачи агрегация каналов занимает примерно на 25% меньше времени, чем в исходном случае. При добавлении параллелизма данных к параллелизму задач наша агрегация занимает примерно в 2,3 раза меньше времени, чем в исходном случае.

Подробнее о TPL и PLINQ

Добавление параллелизма в ваше приложение может быть очень сложной задачей. Вы должны тщательно найти части своего кода, где вы можете безопасно перейти к параллельной обработке, и даже в этом случае вы должны измерить эффекты параллельной обработки, чтобы выяснить, работает ли параллельный код лучше. Если вы не будете осторожны, проблемы, с которыми вы столкнетесь позже, будут хуже, чем те, которые вы видели раньше (представьте, что ошибка возникает в среднем только один раз за 10000 запусков кода).

Параллельное программирование — это то, что трудно игнорировать. Эффективные программы способны использовать несколько ядер процессоров. Используя TPL, вы также можете установить степень параллелизма, чтобы ваше приложение не использовало все вычислительные ядра и оставляло одно или несколько из них свободными для хост-системы и других процессов. И в TPL есть еще много вещей, которые облегчают вам запуск и параллельное программирование.

В следующей основной версии все языки .NET будут иметь встроенную поддержку параллельного программирования. Будут также новые языковые конструкции, поддерживающие параллельное программирование. В настоящее время вы можете скачать Visual Studio Async, чтобы получить представление о том, что будет дальше.

Вывод

Параллельное программирование очень сложно, но хорошие инструменты, предлагаемые Visual Studio и .NET Framework, облегчают нам задачу. В этой публикации мы начали с агрегатора каналов, который импортирует элементы каналов в последовательном режиме. За два шага мы распараллелили импорт фида и вставку записей, увеличив производительность в 2,3 раза. Хотя это число относится к моей тестовой среде, оно ясно показывает, что параллельное программирование может значительно повысить производительность вашего приложения.