Статьи

Google AppEngine: API очередей задач

Очереди задач

com.google.appengine.api.taskqueue

С помощью очередей задач пользователь может инициировать запрос, чтобы приложения выполняли работу вне этого запроса; они являются мощным инструментом для фоновой работы.
Кроме того, вы можете организовать работу в небольшие, отдельные блоки (задачи). Затем приложение вставляет эти задачи в одну или несколько очередей на основе конфигурации очереди и обрабатывает их в порядке FIFO. Вот диаграмма, которую я взял из презентации Google IO, которая иллюстрирует вставку задачи высокого уровня в очередь:

Конфигурация очереди

1. Push-очереди (по умолчанию):

Push-очередь будет обрабатывать задачи на основе скорости обработки, заданной в определении очереди (см. Ниже). App Engine автоматически управляет временем жизни этих очередей (созданием, удалением и т. Д.) И регулирует мощность обработки в соответствии с вашей конфигурацией и объемом обработки. Они могут использоваться только в App Engine (внутри вашего приложения).

2. Потяните очереди:

Разрешить потребителю задач арендовать задачи в определенное время в течение определенного периода времени. Они доступны как внутри, так и снаружи через API REST Task Queue. В этом случае, однако, GAE не управляет жизненным циклом и скоростью обработки очередей автоматически, это зависит от разработчика. Бэкэнд также имеет доступ к этим очередям.

Задания
 

Они представляют собой единицу работы, выполняемой приложением. Эти запросы являются идемпотентными, то есть они уникальны в очереди и, согласно документации Google, не могут быть вызваны более одного раза одновременно (если не произойдет какое-то странное внутреннее состояние ошибки).
Экземпляры класса TaskOptions , задачи состоят из URL и полезной нагрузки, которая может быть простой строкой, двоичным объектом (byte []) или экземпляром DeferredTask. DeferredTask – это в основном Runnable. Это позволяет вам связывать задачи вместе. Наша команда должна была сделать это, чтобы смоделировать задачи длительного выполнения, когда максимальный лимит исполнения GAE составлял 30 секунд. В настоящее время задача должна завершить выполнение и отправить значение ответа HTTP между 200–299 в течение 10 минут после исходного запроса. Этот крайний срок отделен от пользовательских запросов, которые имеют 60-секундный крайний срок.
Кроме того, t просит использовать маркеры для контроля скорости выполнения задач. При каждом вызове задачи используется токен. Эта модель аренды (получение токена) обычно относится к брокерским системам или системам передачи сообщений и позволяет пользователям контролировать скорость выполнения этих задач (см. Ниже о настройке очередей).
Наконец, очень важной особенностью API очереди задач является то, что он имеет автоматические повторные попытки задач. Вы можете настроить это с параметром RetriesOptions при создании объекта TaskOptions .

Задача внутри транзакции

Задачи могут быть поставлены в очередь как часть транзакции хранилища данных. Вставка (не выполнение) будет гарантирована, если транзакция была успешно завершена. Единственное предостережение в том, что транзакционные задачи не могут иметь пользовательских имен, и в одной транзакции может быть не более 5 вставок в очереди задач.

конфигурация
 

Очереди настраиваются через queue.xml. Если опущено, используется очередь по умолчанию с конфигурацией по умолчанию. Поскольку очереди извлечения предназначены для более сложных задач, они должны быть специально настроены (нет очереди извлечения по умолчанию).
Конфигурация очереди приложения применяется ко всем версиям приложения. Вы можете переопределить это поведение для push-очередей, используя целевой параметр в queue.xml. Это используется, если вам нужны разные версии вашего приложения (разные сайты) с разной конфигурацией обработки очереди.

Вот некоторые вещи, которые вам разрешено настраивать (более обширная документация):

bucket-size : скорость обработки очереди, когда в очереди много задач и скорость высока (только push). (Предупреждение: сервер разработки игнорирует это значение)

max-concurrent-запросы : максимальное количество задач, которые могут быть выполнены в любой момент времени в указанной очереди (только push).

режим : будь то толчок или тянуть.

имя : имя очереди

скорость : как часто задачи обрабатываются в этой очереди (s = секунды, m = минуты, h = часы, d = дни). Если 0, очередь считается приостановленной. (Предупреждение: сервер разработки игнорирует это значение)

target : нацелить задачу на конкретный сервер или версию приложения.

1
2
3
4
5
6
7
8
9
<queue-entries>
<!--Set the number of max concurrent requests to 10-->  
  <queue>    
     <name>optimize-queue</name>                
     <rate>20/s</rate>  
     <bucket-size>40</bucket-size>      
     <max-concurrent-requests>10</max-concurrent-requests>    
  </queue>
</queue-entries>

Образец кода
 

Это очень простой пример. Как я уже говорил, очереди задач в основном являются обработчиками URL. В этом сервлете GET будет обрабатывать поставленную задачу. Задача отправит POST к этому же сервлету и выполнит метод doPost (), выполняющий задачу. В данном случае это просто простой счетчик. Обратите внимание, что счетчик является изменчивым свойством. Если вы получите доступ к этому сервлету как запрос GET, он поставит в очередь другую задачу. Таким образом, вы увидите, что счетчик увеличивается с помощью обеих задач.
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class TaskQInfo extends HttpServlet {
                      
   private static volatile int TASK_COUNTER = 0;
                      
 
   // Executed by user menu click
   public void doGet(HttpServletRequest req, HttpServletResponse resp)
        throws IOException {
                       
        // Build a task using the TaskOptions Builder pattern from ** above
        Queue queue = QueueFactory.getDefaultQueue();
        queue.add(withUrl("/taskq_demo").method(TaskOptions.Method.POST));
                      
        resp.getWriter().println("Task have been added to default queue...");
                      
        resp.getWriter().println("Refresh this page to add another count task");
   }
                      
   // Executed by TaskQueue
   @Override
   protected void doPost(HttpServletRequest req, HttpServletResponse resp)
       throws ServletException, IOException {
                      
       // This is the body of the task
       for(int i = 0; i < 1000; i++) {
             log.info("Processing: " + req.getHeader("X-AppEngine-TaskName") + "-" +          
                     TASK_COUNTER++);
                      
             try {
                // Sleep for a second (if the rate is set to 1/s this will allow at
                // most 1 more task to be processed)
                      
                Thread.sleep(1000);
             } catch (InterruptedException e) { // ignore}
       }
   }
}

Очереди задач позволяют вам достичь некоторого уровня параллелизма в вашем приложении, вызывая фоновые процессы по требованию. Для очень длительных задач вы можете взглянуть на бэкэнды App Engine , которые в основном являются специальными экземплярами App Engine без ограничения времени запроса.

Ссылка: Google AppEngine: API очередей задач от нашего партнера JCG Луиса Атенсио из блога Reflective Thought .