Статьи

RateLimiter — открытие Google Guava

Класс RateLimiter был недавно добавлен в библиотеки Guava (начиная с 13.0) и уже входит в число моих любимых инструментов. Посмотрите, что говорит JavaDoc:
Давайте начнем с простого примера. Скажем, у нас есть длительный процесс, который должен транслировать свое продвижение на поставляемый слушатель:

01
02
03
04
05
06
07
08
09
10
11
12
def longRunning(listener: Listener) {
    var processed = 0
    for(item <- items) {
        //..do work...
        processed += 1
        listener.progressChanged(100.0 * processed / items.size)
    }
}
 
trait Listener {
    def progressChanged(percentProgress: Double)
}

Пожалуйста, простите мне императивный стиль этого кода Scala, но это не главное. Проблема, которую я хочу выделить, становится очевидной, когда мы запускаем наше приложение с каким-то конкретным слушателем:

1
2
3
4
5
6
7
class ConsoleListener extends Listener {
    def progressChanged(percentProgress: Double) {
        println('Progress: ' + percentProgress)
    }
}
 
longRunning(new ConsoleListener)

Представьте, что longRunning() обрабатывает миллионы items но каждая итерация занимает доли секунды. Количество сообщений журнала просто безумно, не говоря уже о том, что вывод на консоль, вероятно, занимает гораздо больше времени, чем сама обработка. Вы, вероятно, сталкивались с такой проблемой несколько раз и имеете простой обходной путь:

1
2
3
if(processed % 100 == 0) {
    listener.progressChanged(100.0 * processed / items.size)
}

Там я это исправил! Мы печатаем прогресс только каждую сотую итерацию. Однако у этого подхода есть несколько недостатков:

  • код загрязнен несвязанной логикой
  • нет никакой гарантии, что каждая сотая итерация будет достаточно медленной…
  • … или, может быть, все еще слишком медленно?

Чего мы действительно хотим достичь, так это ограничить частоту обновления прогресса (скажем, два раза в секунду). Хорошо, углубляясь в кроличью нору:

01
02
03
04
05
06
07
08
09
10
11
12
def longRunning(listener: Listener) {
    var processed = 0
    var lastUpdateTimestamp = 0L
    for(item <- items) {
        //..do work...
        processed += 1
        if(System.currentTimeMillis() - lastUpdateTimestamp > 500) {
            listener.progressChanged(100.0 * processed / items.size)
            lastUpdateTimestamp = System.currentTimeMillis()
        }
    }
}

У вас также есть чувство, что мы идем в неверном направлении? Дамы и господа, я даю вам RateLimiter :

1
2
3
4
5
6
7
8
9
var processed = 0
val limiter = RateLimiter.create(2)
for (item <- items) {
    //..do work...
    processed += 1
    if (limiter.tryAcquire()) {
        listener.progressChanged(100.0 * processed / items.size)
    }
}

Становиться лучше? Если API не понятен: мы сначала создаем RateLimiter с 2 разрешениями в секунду. Это означает, что мы можем получить до двух разрешений в течение одной секунды, и если мы попытаемся делать это чаще, tryAcquire() вернет false (или поток заблокирует, если вместо этого используется tryAcquire() 1 ). Таким образом, приведенный выше код гарантирует, что слушатель не будет вызываться более двух раз в секунду.
В качестве бонуса, если вы хотите полностью избавиться от несвязанного кода регулирования из бизнес-логики, шаблон декоратора на помощь. Сначала давайте создадим слушателя, который оборачивает другого (конкретного) слушателя и делегирует ему только с заданной скоростью:

01
02
03
04
05
06
07
08
09
10
class RateLimitedListener(target: Listener) extends Listener {
 
    val limiter = RateLimiter.create(2)
 
    def progressChanged(percentProgress: Double) {
        if (limiter.tryAcquire()) {
            target.progressChanged(percentProgress)
        }
    }
}

Что лучше всего в шаблоне декоратора , так это то, что и код, использующий слушателя, и конкретная реализация не знают о декораторе. Также клиентский код стал намного проще (по сути мы вернулись к оригиналу):

01
02
03
04
05
06
07
08
09
10
def longRunning(listener: Listener) {
    var processed = 0
    for (item <- items) {
        //..do work...
        processed += 1
        listener.progressChanged(100.0 * processed / items.size)
    }
}
 
longRunning(new RateLimitedListener(new ConsoleListener))

Но мы только поцарапали поверхность, где можно использовать RateLimiter ! Допустим, мы хотим избежать вышеупомянутой атаки типа «отказ в обслуживании» или замедлить автоматизацию клиентов нашего API. Это очень просто с RateLimiter и фильтром сервлета:

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
@WebFilter(urlPatterns=Array('/*'))
class RateLimiterFilter extends Filter {
 
    val limiter = RateLimiter.create(100)
 
    def init(filterConfig: FilterConfig) {}
 
    def doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain) {
        if(limiter.tryAcquire()) {
            chain.doFilter(request, response)
        } else {
            response.asInstanceOf[HttpServletResponse].sendError(SC_TOO_MANY_REQUESTS)
        }
    }
 
    def destroy() {}
}

Еще один информативный образец. На этот раз мы ограничиваем наш API для обработки не более 100 запросов в секунду (конечно, RateLimiter является поточно- RateLimiter ). Все HTTP-запросы, которые проходят через наш фильтр, подлежат ограничению скорости. Если мы не можем обработать входящий запрос, мы отправляем HTTP 429 — код ошибки Too Many Requests (пока не доступен в спецификации сервлета). В качестве альтернативы вы можете заблокировать клиента на некоторое время, а не отказываться от него. Это также довольно просто:

1
2
3
4
def doFilter(request: ServletRequest, response: ServletResponse, chain: FilterChain) {
    limiter.acquire()
    chain.doFilter(request, response)
}

limiter.acquire() будет блокировать до тех пор, пока необходимо поддерживать желаемый лимит в 100 запросов в секунду. Еще одной альтернативой является использование tryAcquire() с таймаутом (блокировка до заданного количества времени). Подход блокировки лучше, если вы хотите избежать отправки ошибок клиенту. Однако при высокой нагрузке легко представить, что почти все потоки HTTP заблокированы в ожидании RateLimiter , что в конечном итоге приводит к RateLimiter , что контейнер сервлета отклоняет соединения. Таким образом, отбрасывание клиентов можно избежать лишь частично.
Этот фильтр является хорошей отправной точкой для создания более сложных решений. Карта ограничителей скорости по IP или имени пользователя являются хорошими примерами.

То, что мы еще не рассмотрели, — это получение более одного разрешения одновременно. Оказывается, что RateLimiter также можно использовать, например, для ограничения пропускной способности сети или объема отправляемых / принимаемых данных. Представьте, что вы создали поисковый сервлет и хотите навязать, что в секунду возвращается не более 1000 результатов. В каждом запросе пользователь решает, сколько результатов он хочет получить за ответ: это может быть 500 запросов, каждый из которых содержит 2 результата, или 1 огромный запрос, запрашивающий 1000 результатов одновременно. Но не более 1000 результатов в среднем за секунду. Пользователи могут свободно использовать свою квоту по своему усмотрению:

01
02
03
04
05
06
07
08
09
10
11
@WebFilter(urlPatterns = Array ('/search'))
class SearchServlet extends HttpServlet {
 
    val limiter = RateLimiter.create(1000)
 
    override def doGet(req: HttpServletRequest, resp: HttpServletResponse) {
        val resultsCount = req.getParameter('results').toInt
        limiter.acquire(resultsCount)
        //process and return results...
    }
}

По умолчанию мы получаем acquire() одно разрешение на вызов. Неблокирующий сервлет вызвал бы limiter.tryAcquire(resultsCount) и проверил бы результаты, вы уже знаете это. Если вы заинтересованы в ограничении скорости сетевого трафика, не забудьте проверить мое десятикратное увеличение пропускной способности сервера с помощью асинхронной обработки Servlet 3.0 . RateLimiter из-за природы блокировки не очень подходит для написания масштабируемых серверов загрузки / выгрузки с регулированием.

Последний пример, которым я хотел бы поделиться с вами, — это регулирование клиентского кода, чтобы избежать перегрузки сервера, с которым мы говорим. Представьте себе процесс пакетного импорта / экспорта, который тысячи раз вызывает несколько серверов для обмена данными. Если мы не ограничиваем клиент и не ограничиваем скорость на стороне сервера, сервер может быть перегружен и аварийно завершить работу. RateLimiter еще раз очень полезен:

1
2
3
4
5
6
7
8
val limiter = RateLimiter.create(20)
 
def longRunning() {
    for (item <- items) {
        limiter.acquire()
        server.sync(item)
    }
}

Этот образец очень похож на первый. Разница в том, что на этот раз мы блокируем, а не отбрасываем пропущенные разрешения. Благодаря блокировке внешний вызов server.sync(item) не будет перегружать сторонний сервер, вызывая его не более 20 раз в секунду. Конечно, если у вас есть несколько потоков, взаимодействующих с сервером, все они могут использовать один и тот же RateLimiter .

Упаковать:

  • RateLimiter позволяет выполнять определенные действия не чаще, чем с заданной частотой
  • Это небольшой и легкий класс (без участия потоков!). Вы можете создать тысячи ограничителей скорости (для каждого клиента?) Или использовать один из нескольких потоков.
  • Мы не рассмотрели функцию прогрева — если RateLimiter долгое время не использовался, он будет постепенно увеличивать допустимую частоту в течение настроенного времени до настроенного максимального значения вместо того, чтобы разрешать максимальную частоту с самого начала.

У меня есть ощущение, что мы скоро вернемся к этому классу. Я надеюсь, что вы найдете это полезным в вашем следующем проекте!

1 — Я использую Guava 14.0-SNAPSHOT. Если стабильная tryAcquire(1, 0, TimeUnit.MICROSECONDS) 14.0 не доступна к тому времени, когда вы читаете это, вы должны использовать более подробный tryAcquire(1, 0, TimeUnit.MICROSECONDS) вместо tryAcquire() и tryAcquire(1, 0, TimeUnit.MICROSECONDS) вместо tryAcquire() acquire() .

Приятного кодирования и не забудьте поделиться!

Ссылка: RateLimiter — знакомство с Google Guava от нашего партнера по JCG Томаша Нуркевича в блоге Java и соседей .