Статьи

Демонстрация верблюда для простого сервиса Worklfow от Amazon

В предыдущем посте я объяснил, почему сервис AWS SWF хорош, и анонсировал новый компонент Camel SWF. Теперь документация по компонентам готова, и вот упрощенная полностью рабочая демонстрация. Он состоит из трех независимых маршрутов Camel:
Производитель рабочего процесса позволяет нам взаимодействовать с рабочим процессом. Он может начать выполнение нового рабочего процесса, запросить его состояние, отправить сигналы работающему рабочему процессу или завершить и отменить его. В нашей демонстрации WorkflowProducer запускает маршрут, который планирует 10 выполнений рабочего процесса, где каждое выполнение получает в качестве аргумента число.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.ofbizian.swf.demo;
 
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.main.Main;
 
public class WorkflowProducer {
 
    public static String COMMON_OPTIONS =
        "accessKey=XXX"
        + "&secretKey=XXX"
        + "&domainName=demo"
        + "&workflowList=demo-wlist"
        + "&activityList=demo-alist"
        + "&version=1.0"
        + "&clientConfiguration.endpoint=swf.eu-west-12.amazonaws.com";
 
    public static void main(String[] args) throws Exception {
        Main main = new Main();
        main.enableHangupSupport();
        WorkflowProducerRoute route = new WorkflowProducerRoute();
        main.addRouteBuilder(route);
        main.run();
    }
 
    static class WorkflowProducerRoute extends RouteBuilder {
 
        @Override
        public void configure() throws Exception {
 
            from("timer://workflowProducer?repeatCount=10")
                    .setBody(property("CamelTimerCounter"))
                    .to("aws-swf://workflow?" + COMMON_OPTIONS + "&eventName=calculator")
                    .log("SENT WORKFLOW TASK ${body}");
        }
    }
}

После того, как выполнение рабочего процесса запланировано, нам нужен процесс, который решит, каковы следующие шаги для него. В Camel это делается с помощью Workflow Consumer. Потребитель рабочего процесса представляет логику рабочего процесса. Когда он запускается, он начинает опросить задачи решения рабочего процесса и обрабатывает их. В дополнение к обработке задач принятия решений потребительский маршрут рабочего процесса также будет получать сигналы (отправленные от производителя рабочего процесса) или запросы состояния. Основная цель потребителя рабочего процесса — это планирование задач действий для выполнения с использованием производителей действий. На самом деле задачи деятельности могут быть запланированы только из потока, запущенного потребителем рабочего процесса.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.ofbizian.swf.demo;
 
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws.swf.SWFConstants;
import org.apache.camel.main.Main;
 
public class WorkflowConsumer {
 
    public static void main(String[] args) throws Exception {
        Main main = new Main();
        main.enableHangupSupport();
        WorkflowConsumerRoute route = new WorkflowConsumerRoute();
        main.addRouteBuilder(route);
        main.run();
    }
 
    static class WorkflowConsumerRoute extends RouteBuilder {
 
        @Override
        public void configure() throws Exception {
 
            from("aws-swf://workflow?" + WorkflowProducer.COMMON_OPTIONS + "&eventName=calculator")
 
                    .choice()
                        .when(header(SWFConstants.ACTION).isEqualTo(SWFConstants.SIGNAL_RECEIVED_ACTION))
                            .log("Signal received ${body}")
 
                        .when(header(SWFConstants.ACTION).isEqualTo(SWFConstants.GET_STATE_ACTION))
                            .log("State asked ${body}")
 
                        .when(header(SWFConstants.ACTION).isEqualTo(SWFConstants.EXECUTE_ACTION))
 
                            .setBody(simple("${body[0]}"))
                            .log("EXECUTION TASK RECEIVED ${body}")
 
                            .filter(simple("${body} > 5"))
                                .to("aws-swf://activity?" + WorkflowProducer.COMMON_OPTIONS + "&eventName=incrementor");
        }
    }
 
}

Логика нашего демо-решения проста: если входящий аргумент больше 5, мы планируем выполнение задачи. В противном случае рабочий процесс завершится, так как другие задачи не будут выполнены. Обратите внимание, что он также имеет ветви для обработки сигналов и событий запроса состояния.

Окончательный вариант нашего распределенного (так как состоит из трех независимых приложений) приложения рабочего процесса — это ActivityConsumer, который фактически выполняет некоторые вычисления. Он имеет простейшую возможную реализацию: увеличивает заданный аргумент и возвращает его.

01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.ofbizian.swf.demo;
 
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.main.Main;
 
public class ActivityConsumer {
 
    public static void main(String[] args) throws Exception {
        Main main = new Main();
        main.enableHangupSupport();
        ActivityConsumerRoute route = new ActivityConsumerRoute();
        main.addRouteBuilder(route);
        main.run();
    }
 
    static class ActivityConsumerRoute extends RouteBuilder {
 
        @Override
        public void configure() throws Exception {
from("aws-swf://activity?" + WorkflowProducer.COMMON_OPTIONS + "&eventName=incrementor")
                    .setBody(simple("${body[0]}"))
                    .log("RECEIVED ACTIVITY TASK ${body}")
                    .setBody(simple("${body}++"));
        }
    }
}

Все, что вам нужно сделать для запуска этой демонстрации, это создать соответствующий домен рабочего процесса и добавить ваш ключ / секрет к маршруту.