обзор
Начиная с Spring Framework 5.0 и Spring Boot 2.0, платформа поддерживает асинхронное программирование, как и AWS SDK, начиная с версии 2.0.
В этой статье я буду изучать использование асинхронного API DynamoDB и Spring Webflux путем создания простого реактивного REST-приложения. Допустим, нам нужно обработать HTTP-запросы для извлечения или сохранения какого-либо события (id: string, body: string). Событие будет сохранено в DynamoDB.
Возможно, будет проще просто посмотреть код на Github и следовать ему там.
Maven Зависимости
Начнем с зависимостей Maven для асинхронного SDK WebFlux и DynamoDB.
XML
xxxxxxxxxx
1
<dependencies>
2
<dependency>
3
<groupId>org.springframework.boot</groupId>
4
<artifactId>spring-boot-starter-webflux</artifactId>
5
<version>2.2.4.RELEASE</version>
6
</dependency>
7
<dependency>
8
<groupId>software.amazon.awssdk</groupId>
9
<artifactId>dynamodb</artifactId>
10
<version>2.10.76</version>
11
</dependency>
12
</dependencies>
DynamoDB Stuff
Конфигурация пружины
Простой конфиг, где мы установили соединение с DynamoDB. Для целей тестирования нам нужно указать, dynamoEndpoint но для реального приложения нам нужно указать регион AWS. DynamoEndpointбудет указывать на локальный экземпляр DynamoDB, который мы запустим во время тестов.
Джава
xxxxxxxxxx
1
2
public class AppConfig {
3
("${aws.accessKey}")
4
String accessKey;
5
("${aws.secretKey}")
7
String secretKey;
8
("${dynamodb.endpoint:}")
10
String dynamoEndpoint;
11
13
AwsBasicCredentials awsBasicCredentials(){
14
return AwsBasicCredentials.create(accessKey, secretKey);
15
}
16
18
DynamoDbAsyncClient dynamoDbAsyncClient(
19
AwsBasicCredentials awsBasicCredentials
20
){
21
DynamoDbAsyncClientBuilder clientBuilder = DynamoDbAsyncClient.builder();
22
clientBuilder
23
.credentialsProvider(StaticCredentialsProvider.create(awsBasicCredentials));
24
if(!dynamoEndpoint.isEmpty()){
25
clientBuilder.endpointOverride(URI.create(dynamoEndpoint));
26
}
27
return clientBuilder.build();
28
}
29
}
Application.yaml с подробностями подключения.
YAML
xxxxxxxxxx
1
aws
2
accessKeyany
3
secretKeyany
4
dynamodb
5
endpointhttp//localhost8000/
Реактивная служба DynamoDB
К сожалению, асинхронная версия AWS SDK пока не поддерживает DynamoDBMapper (вы можете отслеживать готовность картографа здесь ), поэтому создание таблиц, отправка запросов и анализ ответов должны выполняться API «низкого уровня».
Поэтому нам нужно создать DynamoDbService для обработки:
- Создание таблицы, если она не существует
- Сохранение и получение события
Джава
x
1
2
public class DynamoDbService {
3
public static final String TABLE_NAME = "events";
5
public static final String ID_COLUMN = "id";
6
public static final String BODY_COLUMN = "body";
7
final DynamoDbAsyncClient client;
9
11
public DynamoDbService(DynamoDbAsyncClient client) {
12
this.client = client;
13
}
14
//Creating table on startup if not exists
16
17
public void createTableIfNeeded() throws ExecutionException, InterruptedException {
18
ListTablesRequest request = ListTablesRequest
19
.builder()
20
.exclusiveStartTableName(TABLE_NAME)
21
.build();
22
CompletableFuture<ListTablesResponse> listTableResponse = client.listTables(request);
23
CompletableFuture<CreateTableResponse> createTableRequest = listTableResponse
25
.thenCompose(response -> {
26
boolean tableExist = response
27
.tableNames()
28
.contains(TABLE_NAME);
29
30
if (!tableExist) {
31
return createTable();
32
} else {
33
return CompletableFuture.completedFuture(null);
34
}
35
});
36
//Wait in synchronous manner for table creation
38
createTableRequest.get();
39
}
40
public CompletableFuture<PutItemResponse> saveEvent(Event event) {
42
Map<String, AttributeValue> item = new HashMap<>();
43
item.put(ID_COLUMN, AttributeValue.builder().s(event.getUuid()).build());
44
item.put(BODY_COLUMN, AttributeValue.builder().s(event.getBody()).build());
45
PutItemRequest putItemRequest = PutItemRequest.builder()
47
.tableName(TABLE_NAME)
48
.item(item)
49
.build();
50
return client.putItem(putItemRequest);
52
}
53
public CompletableFuture<Event> getEvent(String id) {
55
Map<String, AttributeValue> key = new HashMap<>();
56
key.put(ID_COLUMN, AttributeValue.builder().s(id).build());
57
GetItemRequest getRequest = GetItemRequest.builder()
59
.tableName(TABLE_NAME)
60
.key(key)
61
.attributesToGet(BODY_COLUMN)
62
.build();
63
return client.getItem(getRequest).thenApply(item -> {
65
if (!item.hasItem()) {
66
return null;
67
} else {
68
Map<String, AttributeValue> itemAttr = item.item();
69
String body = itemAttr.get(BODY_COLUMN).s();
70
return new Event(id, body);
71
}
72
});
73
}
74
private CompletableFuture<CreateTableResponse> createTable() {
76
KeySchemaElement keySchemaElement = KeySchemaElement
77
.builder()
78
.attributeName(ID_COLUMN)
79
.keyType(KeyType.HASH)
80
.build();
81
AttributeDefinition attributeDefinition = AttributeDefinition
83
.builder()
84
.attributeName(ID_COLUMN)
85
.attributeType(ScalarAttributeType.S)
86
.build();
87
88
CreateTableRequest request = CreateTableRequest.builder()
89
.tableName(TABLE_NAME)
90
.keySchema(keySchemaElement)
91
.attributeDefinitions(attributeDefinition)
92
.billingMode(BillingMode.PAY_PER_REQUEST)
93
.build();
94
return client.createTable(request);
96
}
97
}
Реактивный REST-контроллер
Простой контроллер с методом GET для извлечения событий по id и методом POST для сохранения событий в DynamoDB. Мы можем сделать это двумя способами - реализовать это с помощью аннотаций или избавиться от аннотаций и сделать это функциональным способом. Это не влияет на производительность, в большинстве случаев это абсолютно зависит от индивидуальных предпочтений, что использовать.
Аннотированные контроллеры
Джава
xxxxxxxxxx
1
2
("/event")
3
public class AnnotatedController {
4
final DynamoDbService dynamoDbService;
6
public AnnotatedController(DynamoDbService dynamoDbService) {
8
this.dynamoDbService = dynamoDbService;
9
}
10
(value = "/{eventId}", produces = MediaType.APPLICATION_JSON_VALUE)
12
public Mono<Event> getEvent( String eventId) {
13
CompletableFuture<Event> eventFuture = dynamoDbService.getEvent(eventId);
14
return Mono.fromCompletionStage(eventFuture);
15
}
16
(consumes = MediaType.APPLICATION_JSON_VALUE)
18
public void saveEvent( Event event) {
19
dynamoDbService.saveEvent(event);
20
}
21
}
Функциональные конечные точки
Это облегченная модель функционального программирования, в которой функции используются для маршрутизации и обработки запросов. Здесь я создал EventHandlerтолько для простоты чтения кода.
Джава
xxxxxxxxxx
1
2
public class HttpRouter {
3
5
public RouterFunction<ServerResponse> eventRouter(DynamoDbService dynamoDbService) {
6
EventHandler eventHandler = new EventHandler(dynamoDbService);
7
return RouterFunctions
8
.route(GET("/eventfn/{id}")
9
.and(accept(APPLICATION_JSON)), eventHandler::getEvent)
10
.andRoute(POST("/eventfn")
11
.and(accept(APPLICATION_JSON))
12
.and(contentType(APPLICATION_JSON)), eventHandler::saveEvent);
13
}
14
static class EventHandler {
16
private final DynamoDbService dynamoDbService;
17
public EventHandler(DynamoDbService dynamoDbService) {
19
this.dynamoDbService = dynamoDbService;
20
}
21
Mono<ServerResponse> getEvent(ServerRequest request) {
23
String eventId = request.pathVariable("id");
24
CompletableFuture<Event> eventGetFuture = dynamoDbService.getEvent(eventId);
25
Mono<Event> eventMono = Mono.fromFuture(eventGetFuture);
26
return ServerResponse.ok().body(eventMono, Event.class);
27
}
28
Mono<ServerResponse> saveEvent(ServerRequest request) {
30
Mono<Event> eventMono = request.bodyToMono(Event.class);
31
eventMono.map(dynamoDbService::saveEvent);
32
return ServerResponse.ok().build();
33
}
34
}
35
}
Spring DynamoDB Интеграционный тест
Maven Зависимости
Для запуска интеграционного теста с DynamoDB нам нужен DynamoDBLocal, который на самом деле не DynamoDB, а SQLite с реализованными интерфейсами DynamoDB поверх него.
XML
xxxxxxxxxx
1
<dependency>
2
<groupId>com.amazonaws</groupId>
3
<artifactId>DynamoDBLocal</artifactId>
4
<version>1.12.0</version>
5
<scope>test</scope>
6
</dependency>
7
<build>
9
<plugins>
10
<plugin>
11
<groupId>org.apache.maven.plugins</groupId>
12
<artifactId>maven-dependency-plugin</artifactId>
13
<version>2.10</version>
14
<executions>
15
<execution>
16
<id>copy</id>
17
<phase>test-compile</phase>
18
<goals>
19
<goal>copy-dependencies</goal>
20
</goals>
21
<configuration>
22
<includeScope>test</includeScope>
23
<includeTypes>so,dll,dylib</includeTypes>
24
<!--Keep an eye on output directory - it will be used for starting dynamodb-->
25
<outputDirectory>${project.basedir}/target/native-libs</outputDirectory>
26
</configuration>
27
</execution>
28
</executions>
29
</plugin>
30
</plugins>
31
</build>
32
<repositories>
34
<repository>
35
<id>dynamodb-local-oregon</id>
36
<name>DynamoDB Local Release Repository</name>
37
<url>https://s3-us-west-2.amazonaws.com/dynamodb-local/release</url>
38
</repository>
39
</repositories>
Сервер DynamoDB
Теперь нам нужно запустить DynamoDB перед тестовым запуском. Я предпочитаю делать это как JUnit Class Rule, но мы также можем делать это как бин.
Джава
xxxxxxxxxx
1
public class LocalDynamoDbRule extends ExternalResource {
2
protected DynamoDBProxyServer server;
4
public LocalDynamoDbRule() {
6
//here we set the path from "outputDirectory" of maven-dependency-plugin
7
System.setProperty("sqlite4java.library.path", "target/native-libs");
8
}
9
11
protected void before() throws Exception {
12
this.server = ServerRunner
13
.createServerFromCommandLineArgs(new String[]{"-inMemory", "-port", "8000"});
14
server.start();
15
}
16
18
protected void after() {
19
this.stopUnchecked(server);
20
}
21
protected void stopUnchecked(DynamoDBProxyServer dynamoDbServer) {
23
try {
24
dynamoDbServer.stop();
25
} catch (Exception e) {
26
throw new RuntimeException(e);
27
}
28
}
29
}
Бегущий тест
Теперь мы можем создать интеграционный тест и протестировать событие get по идентификатору и сохранить событие.
Джава
xxxxxxxxxx
1
(SpringRunner.class)
2
(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
3
public class IntegrationTest {
4
6
public static LocalDynamoDbRule dynamoDbRule = new LocalDynamoDbRule();
7
9
private WebTestClient webTestClient;
10
12
public void getEvent() {
13
// Create a GET request to test an endpoint
14
webTestClient
15
.get().uri("/event/1")
16
.accept(MediaType.APPLICATION_JSON)
17
.exchange()
18
// and use the dedicated DSL to test assertions against the response
19
.expectStatus().isOk()
20
.expectBody(String.class).isEqualTo(null);
21
}
22
24
public void saveEvent() throws InterruptedException {
25
Event event = new Event("10", "event");
26
webTestClient
27
.post().uri("/event/")
28
.body(BodyInserters.fromValue(event))
29
.exchange()
30
.expectStatus().isOk();
31
Thread.sleep(1500);
32
webTestClient
33
.get().uri("/event/10")
34
.accept(MediaType.APPLICATION_JSON)
35
.exchange()
36
.expectStatus().isOk()
37
.expectBody(Event.class).isEqualTo(event);
38
}
39
}
докер
Здесь мы собираемся подготовить наше приложение для запуска в Docker, поэтому оно будет готово для развертывания в AWS.
СОВЕТ: Начиная с Java 10, вы можете указать, сколько памяти будет использовать JVM в зависимости от памяти контейнера. -XX:MaxRAMPercentage=75.0означает, что JVM не будет использовать более 75% памяти контейнера.
Dockerfile
Dockerfile
xxxxxxxxxx
1
# Use our standard java12 baseimage
2
FROM openjdk:12-alpine
3
# Copy the artifact into the container
5
COPY target/dynamodb-spring-*-exec.jar /srv/service.jar
6
# Run the artifact and expose the default port
8
WORKDIR /srv
9
ENTRYPOINT [ "java", \
11
"-XX:+UnlockExperimentalVMOptions", \
12
"-XX:+ExitOnOutOfMemoryError", \
13
"-XX:MaxRAMPercentage=75.0", \
14
"-Djava.security.egd=file:/dev/./urandom", \
15
"-jar", "service.jar", \
16
"--spring.profiles.active=prod" ]
17
EXPOSE 8080
Сборка самого док-контейнера docker build -t spring-dynamo.
Кроме того, давайте посмотрим, что было сгенерировано docker image ls.
Простой текст
xxxxxxxxxx
1
REPOSITORY TAG IMAGE ID CREATED SIZE
2
spring-dynamo latest a974d880400e About a minute ago 364MB
3
openjdk 12-alpine 0c68e7c5b7a0 12 months ago 339MB
Наконец-то наш POC готов!
Удачного кодирования 🙂