Статьи

Multipart Upload на S3 с jclouds

1. Цель

В предыдущей статье мы рассмотрели, как мы можем использовать универсальные API-интерфейсы BLOB-объектов из jclouds для загрузки контента в S3. В этой статье мы будем использовать специальный асинхронный API-интерфейс S3 из jclouds для загрузки контента и использования многоэтапной загрузки, предоставляемой S3 .

2. Подготовка

2.1. Настройте пользовательский API

Первая часть процесса загрузки — это создание jclouds API — это специальный API для Amazon S3:

public AWSS3AsyncClient s3AsyncClient() {
   String identity = ...
   String credentials = ...
   BlobStoreContext context = ContextBuilder.newBuilder("aws-s3").
      credentials(identity, credentials).buildView(BlobStoreContext.class);
   RestContext<AWSS3Client, AWSS3AsyncClient> providerContext = context.unwrap();
   return providerContext.getAsyncApi();
}

2.2. Определение количества частей для контента

Amazon S3 имеет ограничение в 5 МБ для каждой загружаемой части. Таким образом, первое, что нам нужно сделать, это определить правильное количество частей, на которые мы можем разбить наш контент, чтобы у нас не было частей ниже этого предела 5 МБ:

public static int getMaximumNumberOfParts(byte[] byteArray) {
   int numberOfParts= byteArray.length / fiveMB; // 5*1024*1024
   if (numberOfParts== 0) {
      return 1;
   }
   return numberOfParts;
}

2,3. Разбиение контента на части

Собирались разбить массив байтов на заданное количество частей:

public static List<byte[]> breakByteArrayIntoParts(byte[] byteArray, int maxNumberOfParts) {
   List<byte[]> parts = Lists.<byte[]> newArrayListWithCapacity(maxNumberOfParts);
   int fullSize = byteArray.length;
   long dimensionOfPart = fullSize / maxNumberOfParts;
   for (int i = 0; i < maxNumberOfParts; i++) {
      int previousSplitPoint = (int) (dimensionOfPart * i);
      int splitPoint = (int) (dimensionOfPart * (i + 1));
      if (i == (maxNumberOfParts - 1)) {
         splitPoint = fullSize;
      }
      byte[] partBytes = Arrays.copyOfRange(byteArray, previousSplitPoint, splitPoint);
      parts.add(partBytes);
   }
   return parts;
}

Мы собираемся проверить логику разбиения байтового массива на части — мы собираемся сгенерировать несколько байтов, разделить байтовый массив, собрать его обратно вместе, используя Guava, и убедиться, что мы вернули оригинал:

@Test
public void given16MByteArray_whenFileBytesAreSplitInto3_thenTheSplitIsCorrect() {
   byte[] byteArray = randomByteData(16);
   int maximumNumberOfParts = S3Util.getMaximumNumberOfParts(byteArray);
   List<byte[]> fileParts = S3Util.breakByteArrayIntoParts(byteArray, maximumNumberOfParts);
   assertThat(fileParts.get(0).length + fileParts.get(1).length + fileParts.get(2).length,
      equalTo(byteArray.length));
   byte[] unmultiplexed = Bytes.concat(fileParts.get(0), fileParts.get(1), fileParts.get(2));
   assertThat(byteArray, equalTo(unmultiplexed));
}

Для генерации данных мы просто используем поддержку Random :

byte[] randomByteData(int mb) {
   byte[] randomBytes = new byte[mb * 1024 * 1024];
   new Random().nextBytes(randomBytes);
   return randomBytes;
}

2,4. Создание полезных нагрузок

Теперь, когда мы определили правильное количество частей для нашего контента и нам удалось разбить контент на части, нам нужно сгенерировать объекты Payload для API jclouds:

public static List<Payload> createPayloadsOutOfParts(Iterable<byte[]> fileParts) {
   List<Payload> payloads = Lists.newArrayList();
   for (byte[] filePart : fileParts) {
      byte[] partMd5Bytes = Hashing.md5().hashBytes(filePart).asBytes();
      Payload partPayload = Payloads.newByteArrayPayload(filePart);
      partPayload.getContentMetadata().setContentLength((long) filePart.length);
      partPayload.getContentMetadata().setContentMD5(partMd5Bytes);
      payloads.add(partPayload);
   }
   return payloads;
}

3. Загрузить

Процесс загрузки является гибким многоэтапным процессом — это означает:

  • загрузка может быть начата до получения всех данных — данные могут быть загружены по мере поступления
  • данные загружаются порциями — если одна из этих операций не выполняется, ее можно просто получить
  • блоки могут загружаться параллельно — это может значительно увеличить скорость загрузки, особенно в случае больших файлов

3.1. Инициирование операции загрузки

Первым шагом в операции загрузки является запуск процесса . Этот запрос к S3 должен содержать стандартные заголовки HTTP — в частности, необходимо вычислить заголовок ContentMD5 . Мы собирались использовать поддержку хеш-функции Guava здесь:

Hashing.md5().hashBytes(byteArray).asBytes();

Это хэш md5 всего байтового массива, а не его частей.

Для начала загрузки и для всех дальнейших взаимодействий с S3 мы будем использовать AWSS3AsyncClient — асинхронный API, который мы создали ранее:

ObjectMetadata metadata = ObjectMetadataBuilder.create().key(key).contentMD5(md5Bytes).build();
String uploadId = s3AsyncApi.initiateMultipartUpload(container, metadata).get();

Ключом является ручкой , присвоенный объект — это должно быть уникальным идентификатором , указанный клиентом.

Также обратите внимание, что, хотя мы используем асинхронную версию API, мы блокируем результат этой операции — это потому, что нам понадобится результат инициализации, чтобы иметь возможность двигаться вперед.

Результатом операции является идентификатор загрузки, возвращаемый S3 — он будет идентифицировать загрузку на протяжении всего ее жизненного цикла и будет присутствовать во всех последующих операциях загрузки.

3.2. Загрузка частей

Следующим шагом является загрузка частей . Наша цель здесь — отправить эти запросы параллельно , так как операция загрузки частей представляет большую часть процесса загрузки:

List<ListenableFuture<String>> ongoingOperations = Lists.newArrayList();
for (int partNumber = 0; partNumber < filePartsAsByteArrays.size(); partNumber++) {
   ListenableFuture<String> future = s3AsyncApi.uploadPart(
      container, key, partNumber + 1, uploadId, payloads.get(partNumber));
   ongoingOperations.add(future);
}

Номера деталей должны быть непрерывными, но порядок отправки запросов не имеет значения.

После отправки всех запросов на загрузку части нам нужно дождаться их ответов, чтобы мы могли собрать индивидуальное значение ETag для каждой части:

Function<ListenableFuture<String>, String> getEtagFromOp =
  new Function<ListenableFuture<String>, String>() {
   public String apply(ListenableFuture<String> ongoingOperation) {
      try {
         return ongoingOperation.get();
      } catch (InterruptedException | ExecutionException e) {
         throw new IllegalStateException(e);
      }
   }
};
List<String> etagsOfParts = Lists.transform(ongoingOperations, getEtagFromOp);

Если по какой-либо причине одна из операций загрузки части завершается неудачно, операцию можно повторить, пока она не завершится успешно. Приведенная выше логика не содержит механизма повторных попыток, но его создание должно быть достаточно простым.

3.3. Завершение операции загрузки

Последний шаг процесса загрузки — завершение многоэтапной операции . API S3 требует ответов от предыдущих частей, загруженных в виде карты , которую мы теперь можем легко создать из списка ETag, который мы получили выше:

Map<Integer, String> parts = Maps.newHashMap();
for (int i = 0; i < etagsOfParts.size(); i++) {
   parts.put(i + 1, etagsOfParts.get(i));
}

И наконец, отправьте полный запрос:

s3AsyncApi.completeMultipartUpload(container, key, uploadId, parts).get();

Это вернет итоговый ETag готового объекта и завершит весь процесс загрузки.

4. Вывод

В этой статье мы создали многочастную полностью параллельную операцию загрузки в S3, используя пользовательский API-интерфейс Scl jclouds. Эта операция готова к использованию как есть, но ее можно улучшить несколькими способами.

Во-первых, необходимо добавить логику повторения вокруг операций загрузки, чтобы лучше справляться с ошибками.

Далее, для действительно больших файлов, даже несмотря на то, что механизм отправляет все параллельные запросы на загрузку, механизм регулирования должен ограничивать количество отправляемых параллельных запросов. Это сделано для того, чтобы пропускная способность не стала узким местом, а также для того, чтобы убедиться, что сам Amazon не помечает процесс загрузки как превышающий допустимый лимит запросов в секунду — Guava RateLimiter потенциально может быть очень подходящим для этого.

PS Вы можете копаться в Twitter за мной
.