обзор
Начиная с Spring Framework 5.0 и Spring Boot 2.0, платформа поддерживает асинхронное программирование, как и AWS SDK, начиная с версии 2.0.
В этой статье я буду изучать использование асинхронного API DynamoDB и Spring Webflux путем создания простого реактивного REST-приложения. Допустим, нам нужно обработать HTTP-запросы для извлечения или сохранения какого-либо события (id: string, body: string). Событие будет сохранено в DynamoDB.
Возможно, будет проще просто посмотреть код на Github и следовать ему там.
Maven Зависимости
Начнем с зависимостей Maven для асинхронного SDK WebFlux и DynamoDB.
XML
xxxxxxxxxx
1
<dependencies>
2
<dependency>
3
<groupId>org.springframework.boot</groupId>
4
<artifactId>spring-boot-starter-webflux</artifactId>
5
<version>2.2.4.RELEASE</version>
6
</dependency>
7
<dependency>
8
<groupId>software.amazon.awssdk</groupId>
9
<artifactId>dynamodb</artifactId>
10
<version>2.10.76</version>
11
</dependency>
12
</dependencies>
DynamoDB Stuff
Конфигурация пружины
Простой конфиг, где мы установили соединение с DynamoDB. Для целей тестирования нам нужно указать, dynamoEndpoint
но для реального приложения нам нужно указать регион AWS. DynamoEndpoint
будет указывать на локальный экземпляр DynamoDB, который мы запустим во время тестов.
Джава
xxxxxxxxxx
1
2
public class AppConfig {
3
"${aws.accessKey}") (
4
String accessKey;
5
"${aws.secretKey}") (
7
String secretKey;
8
"${dynamodb.endpoint:}") (
10
String dynamoEndpoint;
11
13
AwsBasicCredentials awsBasicCredentials(){
14
return AwsBasicCredentials.create(accessKey, secretKey);
15
}
16
18
DynamoDbAsyncClient dynamoDbAsyncClient(
19
AwsBasicCredentials awsBasicCredentials
20
){
21
DynamoDbAsyncClientBuilder clientBuilder = DynamoDbAsyncClient.builder();
22
clientBuilder
23
.credentialsProvider(StaticCredentialsProvider.create(awsBasicCredentials));
24
if(!dynamoEndpoint.isEmpty()){
25
clientBuilder.endpointOverride(URI.create(dynamoEndpoint));
26
}
27
return clientBuilder.build();
28
}
29
}
Application.yaml
с подробностями подключения.
YAML
xxxxxxxxxx
1
aws
2
accessKey any
3
secretKey any
4
dynamodb
5
endpoint http //localhost 8000/
Реактивная служба DynamoDB
К сожалению, асинхронная версия AWS SDK пока не поддерживает DynamoDBMapper (вы можете отслеживать готовность картографа здесь ), поэтому создание таблиц, отправка запросов и анализ ответов должны выполняться API «низкого уровня».
Поэтому нам нужно создать DynamoDbService для обработки:
- Создание таблицы, если она не существует
- Сохранение и получение события
Джава
x
1
2
public class DynamoDbService {
3
public static final String TABLE_NAME = "events";
5
public static final String ID_COLUMN = "id";
6
public static final String BODY_COLUMN = "body";
7
final DynamoDbAsyncClient client;
9
11
public DynamoDbService(DynamoDbAsyncClient client) {
12
this.client = client;
13
}
14
//Creating table on startup if not exists
16
17
public void createTableIfNeeded() throws ExecutionException, InterruptedException {
18
ListTablesRequest request = ListTablesRequest
19
.builder()
20
.exclusiveStartTableName(TABLE_NAME)
21
.build();
22
CompletableFuture<ListTablesResponse> listTableResponse = client.listTables(request);
23
CompletableFuture<CreateTableResponse> createTableRequest = listTableResponse
25
.thenCompose(response -> {
26
boolean tableExist = response
27
.tableNames()
28
.contains(TABLE_NAME);
29
30
if (!tableExist) {
31
return createTable();
32
} else {
33
return CompletableFuture.completedFuture(null);
34
}
35
});
36
//Wait in synchronous manner for table creation
38
createTableRequest.get();
39
}
40
public CompletableFuture<PutItemResponse> saveEvent(Event event) {
42
Map<String, AttributeValue> item = new HashMap<>();
43
item.put(ID_COLUMN, AttributeValue.builder().s(event.getUuid()).build());
44
item.put(BODY_COLUMN, AttributeValue.builder().s(event.getBody()).build());
45
PutItemRequest putItemRequest = PutItemRequest.builder()
47
.tableName(TABLE_NAME)
48
.item(item)
49
.build();
50
return client.putItem(putItemRequest);
52
}
53
public CompletableFuture<Event> getEvent(String id) {
55
Map<String, AttributeValue> key = new HashMap<>();
56
key.put(ID_COLUMN, AttributeValue.builder().s(id).build());
57
GetItemRequest getRequest = GetItemRequest.builder()
59
.tableName(TABLE_NAME)
60
.key(key)
61
.attributesToGet(BODY_COLUMN)
62
.build();
63
return client.getItem(getRequest).thenApply(item -> {
65
if (!item.hasItem()) {
66
return null;
67
} else {
68
Map<String, AttributeValue> itemAttr = item.item();
69
String body = itemAttr.get(BODY_COLUMN).s();
70
return new Event(id, body);
71
}
72
});
73
}
74
private CompletableFuture<CreateTableResponse> createTable() {
76
KeySchemaElement keySchemaElement = KeySchemaElement
77
.builder()
78
.attributeName(ID_COLUMN)
79
.keyType(KeyType.HASH)
80
.build();
81
AttributeDefinition attributeDefinition = AttributeDefinition
83
.builder()
84
.attributeName(ID_COLUMN)
85
.attributeType(ScalarAttributeType.S)
86
.build();
87
88
CreateTableRequest request = CreateTableRequest.builder()
89
.tableName(TABLE_NAME)
90
.keySchema(keySchemaElement)
91
.attributeDefinitions(attributeDefinition)
92
.billingMode(BillingMode.PAY_PER_REQUEST)
93
.build();
94
return client.createTable(request);
96
}
97
}
Реактивный REST-контроллер
Простой контроллер с методом GET для извлечения событий по id и методом POST для сохранения событий в DynamoDB. Мы можем сделать это двумя способами - реализовать это с помощью аннотаций или избавиться от аннотаций и сделать это функциональным способом. Это не влияет на производительность, в большинстве случаев это абсолютно зависит от индивидуальных предпочтений, что использовать.
Аннотированные контроллеры
Джава
xxxxxxxxxx
1
2
"/event") (
3
public class AnnotatedController {
4
final DynamoDbService dynamoDbService;
6
public AnnotatedController(DynamoDbService dynamoDbService) {
8
this.dynamoDbService = dynamoDbService;
9
}
10
value = "/{eventId}", produces = MediaType.APPLICATION_JSON_VALUE) (
12
public Mono<Event> getEvent( String eventId) {
13
CompletableFuture<Event> eventFuture = dynamoDbService.getEvent(eventId);
14
return Mono.fromCompletionStage(eventFuture);
15
}
16
consumes = MediaType.APPLICATION_JSON_VALUE) (
18
public void saveEvent( Event event) {
19
dynamoDbService.saveEvent(event);
20
}
21
}
Функциональные конечные точки
Это облегченная модель функционального программирования, в которой функции используются для маршрутизации и обработки запросов. Здесь я создал EventHandler
только для простоты чтения кода.
Джава
xxxxxxxxxx
1
2
public class HttpRouter {
3
5
public RouterFunction<ServerResponse> eventRouter(DynamoDbService dynamoDbService) {
6
EventHandler eventHandler = new EventHandler(dynamoDbService);
7
return RouterFunctions
8
.route(GET("/eventfn/{id}")
9
.and(accept(APPLICATION_JSON)), eventHandler::getEvent)
10
.andRoute(POST("/eventfn")
11
.and(accept(APPLICATION_JSON))
12
.and(contentType(APPLICATION_JSON)), eventHandler::saveEvent);
13
}
14
static class EventHandler {
16
private final DynamoDbService dynamoDbService;
17
public EventHandler(DynamoDbService dynamoDbService) {
19
this.dynamoDbService = dynamoDbService;
20
}
21
Mono<ServerResponse> getEvent(ServerRequest request) {
23
String eventId = request.pathVariable("id");
24
CompletableFuture<Event> eventGetFuture = dynamoDbService.getEvent(eventId);
25
Mono<Event> eventMono = Mono.fromFuture(eventGetFuture);
26
return ServerResponse.ok().body(eventMono, Event.class);
27
}
28
Mono<ServerResponse> saveEvent(ServerRequest request) {
30
Mono<Event> eventMono = request.bodyToMono(Event.class);
31
eventMono.map(dynamoDbService::saveEvent);
32
return ServerResponse.ok().build();
33
}
34
}
35
}
Spring DynamoDB Интеграционный тест
Maven Зависимости
Для запуска интеграционного теста с DynamoDB нам нужен DynamoDBLocal, который на самом деле не DynamoDB, а SQLite с реализованными интерфейсами DynamoDB поверх него.
XML
xxxxxxxxxx
1
<dependency>
2
<groupId>com.amazonaws</groupId>
3
<artifactId>DynamoDBLocal</artifactId>
4
<version>1.12.0</version>
5
<scope>test</scope>
6
</dependency>
7
<build>
9
<plugins>
10
<plugin>
11
<groupId>org.apache.maven.plugins</groupId>
12
<artifactId>maven-dependency-plugin</artifactId>
13
<version>2.10</version>
14
<executions>
15
<execution>
16
<id>copy</id>
17
<phase>test-compile</phase>
18
<goals>
19
<goal>copy-dependencies</goal>
20
</goals>
21
<configuration>
22
<includeScope>test</includeScope>
23
<includeTypes>so,dll,dylib</includeTypes>
24
<!--Keep an eye on output directory - it will be used for starting dynamodb-->
25
<outputDirectory>${project.basedir}/target/native-libs</outputDirectory>
26
</configuration>
27
</execution>
28
</executions>
29
</plugin>
30
</plugins>
31
</build>
32
<repositories>
34
<repository>
35
<id>dynamodb-local-oregon</id>
36
<name>DynamoDB Local Release Repository</name>
37
<url>https://s3-us-west-2.amazonaws.com/dynamodb-local/release</url>
38
</repository>
39
</repositories>
Сервер DynamoDB
Теперь нам нужно запустить DynamoDB перед тестовым запуском. Я предпочитаю делать это как JUnit Class Rule, но мы также можем делать это как бин.
Джава
xxxxxxxxxx
1
public class LocalDynamoDbRule extends ExternalResource {
2
protected DynamoDBProxyServer server;
4
public LocalDynamoDbRule() {
6
//here we set the path from "outputDirectory" of maven-dependency-plugin
7
System.setProperty("sqlite4java.library.path", "target/native-libs");
8
}
9
11
protected void before() throws Exception {
12
this.server = ServerRunner
13
.createServerFromCommandLineArgs(new String[]{"-inMemory", "-port", "8000"});
14
server.start();
15
}
16
18
protected void after() {
19
this.stopUnchecked(server);
20
}
21
protected void stopUnchecked(DynamoDBProxyServer dynamoDbServer) {
23
try {
24
dynamoDbServer.stop();
25
} catch (Exception e) {
26
throw new RuntimeException(e);
27
}
28
}
29
}
Бегущий тест
Теперь мы можем создать интеграционный тест и протестировать событие get по идентификатору и сохранить событие.
Джава
xxxxxxxxxx
1
SpringRunner.class) (
2
webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) (
3
public class IntegrationTest {
4
6
public static LocalDynamoDbRule dynamoDbRule = new LocalDynamoDbRule();
7
9
private WebTestClient webTestClient;
10
12
public void getEvent() {
13
// Create a GET request to test an endpoint
14
webTestClient
15
.get().uri("/event/1")
16
.accept(MediaType.APPLICATION_JSON)
17
.exchange()
18
// and use the dedicated DSL to test assertions against the response
19
.expectStatus().isOk()
20
.expectBody(String.class).isEqualTo(null);
21
}
22
24
public void saveEvent() throws InterruptedException {
25
Event event = new Event("10", "event");
26
webTestClient
27
.post().uri("/event/")
28
.body(BodyInserters.fromValue(event))
29
.exchange()
30
.expectStatus().isOk();
31
Thread.sleep(1500);
32
webTestClient
33
.get().uri("/event/10")
34
.accept(MediaType.APPLICATION_JSON)
35
.exchange()
36
.expectStatus().isOk()
37
.expectBody(Event.class).isEqualTo(event);
38
}
39
}
докер
Здесь мы собираемся подготовить наше приложение для запуска в Docker, поэтому оно будет готово для развертывания в AWS.
СОВЕТ: Начиная с Java 10, вы можете указать, сколько памяти будет использовать JVM в зависимости от памяти контейнера. -XX:MaxRAMPercentage=75.0
означает, что JVM не будет использовать более 75% памяти контейнера.
Dockerfile
Dockerfile
xxxxxxxxxx
1
# Use our standard java12 baseimage
2
FROM openjdk:12-alpine
3
# Copy the artifact into the container
5
COPY target/dynamodb-spring-*-exec.jar /srv/service.jar
6
# Run the artifact and expose the default port
8
WORKDIR /srv
9
ENTRYPOINT [ "java", \
11
"-XX:+UnlockExperimentalVMOptions", \
12
"-XX:+ExitOnOutOfMemoryError", \
13
"-XX:MaxRAMPercentage=75.0", \
14
"-Djava.security.egd=file:/dev/./urandom", \
15
"-jar", "service.jar", \
16
"--spring.profiles.active=prod" ]
17
EXPOSE 8080
Сборка самого док-контейнера docker build -t spring-dynamo.
Кроме того, давайте посмотрим, что было сгенерировано docker image ls.
Простой текст
xxxxxxxxxx
1
REPOSITORY TAG IMAGE ID CREATED SIZE
2
spring-dynamo latest a974d880400e About a minute ago 364MB
3
openjdk 12-alpine 0c68e7c5b7a0 12 months ago 339MB
Наконец-то наш POC готов!
Удачного кодирования 🙂