Статьи

ОО, функциональный, императивный и реактивный: все сплетены вместе

Это первая из двух частей серии, в которой обсуждается, как различные парадигмы в программировании можно плавно объединить с помощью «первоклассной процедуры», термина, который я использую для наилучшего описания концепции.

Рабочий код в этой статье демонстрирует, как можно беспрепятственно объединить следующее для обслуживания запроса:

  1. Проверьте запрос (в потоке цикла событий сокета).
  2. Начните транзакцию и зарегистрируйте запрос в базе данных. Это будет в другом потоке, чтобы избежать остановки потока цикла событий сокета.
  3. Делайте ответные звонки, чтобы получить данные из других служб.
  4. Запустите некоторый функциональный код, чтобы определить стандартное отклонение времени обслуживания.
  5. Предпринимать альтернативные потоки для обработки особых случаев (включая обработку исключений). Затем, если нет исключений, вызывающих откат, сохраните результаты в базе данных, это опять-таки находится в другом потоке, чтобы не связать поток цикла реактивных событий.
  6. Отправьте ответ после совершения транзакции

Это позволяет использовать парадигму программирования, наиболее подходящую для решения различных проблем. Обратите внимание, что обслуживание запроса в демонстрации является произвольным. Основное внимание уделяется демонстрации того, как различные парадигмы программирования могут быть сплетены вместе.

Теперь написать полное описание процедуры первого класса выходит за рамки одной статьи. Есть много шаблонов, используемых вместе, чтобы включить композицию с помощью первоклассных процедур. Поэтому я собираюсь представить введение в первоклассные процедуры в двух частях:

  • Эта статья демонстрирует, с помощью рабочего кода, насколько гибкая и простая композиция с первоклассными процедурами
  • Следующая статья предоставит объяснение, более тесно связанное с теорией о том, как первоклассная процедура эволюционировала к ее нынешнему пониманию.

Мы начнем с нескольких простых примеров, а затем перейдем к более интересному переплетению первоклассных процедур.

Первоклассная процедура

Простой цикл событий

Следующая процедура первого класса обслуживает запрос REST. Это будет выполнено в потоке цикла событий сокета HTTP.

public void service(ObjectResponse<ServicedThreadResponse> response) {

    response.send(new ServicedThreadResponse(
      Thread.currentThread().getName(), "Event", System.currentTimeMillis()));
}

Простой поток на запрос

Следующая процедура первого класса обслуживает запрос REST, извлекая значение из базы данных и отправляя его в ответе. Это будет выполняться отдельным пулом потоков.

public void service(
        ServicedThreadRequest request, 
        ThreadPerRequestRepository repository, 
        ObjectResponse<ServicedThreadResponse> response) {

    int identifier = request.getIdentifier() % 10;
    ThreadPerRequest entity = repository.findById(identifier).get();
    response.send(new ServicedThreadResponse(
      Thread.currentThread().getName(), entity.getName(), System.currentTimeMillis()));
}

Различие нити будет обсуждаться позже. Однако на данный момент обратите внимание, что репозиторий Spring используется только процедурой первого класса потока на запрос.

Первоклассные процедуры, сплетенные вместе

Хорошо, выше немного скучно. Мы видели это на серверах веб-приложений раньше, теперь покажите нам что-нибудь интересное!

Чтобы показать что-то более интересное, мы собираемся сплести первоклассные процедуры вместе, чтобы получить пример, подробно описанный в начале этой статьи.

Каждый шаг в обслуживании запросов реализован как первоклассная процедура. Мы рассмотрим каждую первоклассную процедуру в указанном порядке.

Запрос на проверку (в цикле событий гнезда)

Это простая проверка правильности запроса. Поскольку это простая логика, мы используем поток цикла событий сокета. Таким образом, нам не нужно оплачивать накладные расходы на переключение контекста потока и накладные расходы на потоки, чтобы отклонить недопустимые запросы. Код выглядит следующим образом:

const HttpException = Java.type("net.officefloor.server.http.HttpException");
const Integer = Java.type("java.lang.Integer")

function validate(identifier, requestIdentifier) {
    if (Number(identifier) <= 0) {
        throw new HttpException(422, "Invalid identifier");
    }
    requestIdentifier.set(Integer.valueOf(identifier))
}
validate.officefloor = [ 
    { httpPathParameter: "identifier" },
    { out: Integer },
    { next : "valid" }
];

Обратите внимание, что проверка написана на JavaScript. Это сделано для того, чтобы клиентские правила проверки JavaScript можно было повторно использовать для проверки запросов для обеспечения согласованности между клиентом и сервером.

officefloorАтрибут , добавленный к функции обеспечивает мета-данные. Это необходимо, поскольку JavaScript не предоставляет строго типизированную информацию, необходимую для процедур первого класса.

Обязательно зарегистрировать запрос в базе данных

После проверки идентификатор запроса регистрируется в базе данных. Это также создает уникальный номер для запроса на основе столбца IDENTITY в базе данных.

@Next("registered")
public static void registerRequest(
      @Val int requestIdentifier, 
      WeavedRequestRepository repository, 
      Out<WeavedRequest> weavedRequest) {

    WeavedRequest entity = new WeavedRequest(requestIdentifier);
    repository.save(entity);
    weavedRequest.set(entity);
}

реагирующий

Следующим является некоторый Reactive код для одновременного вызова двух конечных точек REST, подробно описанных в начале этой статьи (простой цикл обработки событий и простой поток-на-запрос). Поскольку мы используем Reactive, мы можем вызывать их одновременно для повышения производительности.

Обратите внимание, что при ожидании ответов поток фактически простаивает с потоками, обслуживающими другие функции. Это асинхронная обработка, так что потоки не связаны ожиданием. Как только оба набора результатов возвращаются, они уведомляют соответствующий асинхронный поток для продолжения обработки.

К настоящему времени вы можете заметить Out/ @Valкомбинации. Вот как значения могут быть переданы из одной процедуры первого класса в другую процедуру первого класса. Обратите внимание, что если тип для разных значений один и тот же, для их различения можно использовать спецификатор. Остальные аргументы предоставляются из внедрения зависимости (в данном случае Spring).

private final static String URL = "http://localhost:7878/{path}";

@Next("useData")
public static void retrieveData(
     WebClient client,
     AsynchronousFlow eventLoopFlow, 
     @EventLoopResponse Out<ServicedThreadResponse> eventLoopResponse,
     @Val WeavedRequest request, 
     AsynchronousFlow threadPerRequestFlow, 
     @ThreadPerRequestResponse Out<ServicedThreadResponse> threadPerRequestResponse) {

    Flux.range(1, 10)
        .map((index) -> 
             client.get().uri(URL, "event-loop")
             .retrieve().bodyToMono(ServicedThreadResponse.class))
        .flatMap((response) -> response)
        .collectList()
        .subscribe((responses) -> eventLoopFlow.complete(
            () -> eventLoopResponse.set(
              responses.stream().toArray(ServicedThreadResponse[]::new))));

    Flux.range(1, 10)
        .map((index) -> 
             client.post().uri(URL, "thread-per-request")
             .contentType(MediaType.APPLICATION_JSON)
             .syncBody(new ServicedThreadRequest(request.getId()))
             .retrieve().bodyToMono(ServicedThreadResponse.class))
        .flatMap((response) -> response)
        .collectList()
        .subscribe((responses) -> threadPerRequestFlow.complete(
            () -> threadPerRequestResponse.set(
              responses.stream().toArray(ServicedThreadResponse[]::new))));
}

функциональная

Затем реактивные ответы предоставляются функциональному коду Scala для определения стандартного отклонения времени обслуживания.

def mean(timestamps: Iterable[Long]): Double = 
    timestamps.sum.toDouble / timestamps.size

def variance(timestamps: Iterable[Long]): Double = {
    val avg = mean(timestamps)
    timestamps.map(timestamp => math.pow(timestamp.toDouble - avg, 2)).sum / timestamps.size
}

def stdDev(timestamps: Iterable[Long]): Double = math.sqrt(variance(timestamps))

@Next("use")
def standardDeviation(
      @EventLoopResponse @Val eventLoopResponses: Array[ServicedThreadResponse], 
      @ThreadPerRequestResponse @Val threadPerRequestResponses: Array[ServicedThreadResponse]
      ): Double =
    stdDev(
      (eventLoopResponses ++ threadPerRequestResponses)
      .map(response => response.getTimestamp))

Обратите внимание, что библиотека может быть использована для сокращения этого кода. Однако мы сделали это, чтобы продемонстрировать, как функциональный код может быть интегрирован в первоклассные процедуры.

Управление потоком

Следующая процедура первого класса запускает поток для обработки особых случаев. Если не возникает проблем с особыми случаями, то стандартное отклонение сохраняется в базе данных.

@FlowInterface
public static interface Flows {
    void handleSpecialCases(FlowSuccessful callback);
    void stored();
}

public static void store(
      @Parameter double standardDeviation, 
      Flows flows, 
      @Val WeavedRequest request, 
      WeavedRequestRepository repository, 
      Out<RequestStandardDeviation> stDevOut) {

    flows.handleSpecialCases(() -> {
        request.setRequestStandardDeviation(
          new RequestStandardDeviation(standardDeviation, request));
        repository.save(request);
        stDevOut.set(request.getRequestStandardDeviation());
        flows.stored();
    });
}

Обработка особых случаев осуществляется по следующей первоклассной процедуре.

public static void handleSpecialCase(
      @Val WeavedRequest request
      ) throws WeavedRollbackException, WeavedCommitException {

    switch (request.getRequestIdentifier()) {
        case 3:
            throw new WeavedRollbackException(request);
        case 4:
            throw new WeavedCommitException(request);
    }
}

Прикосновение к обработке исключений

Две процедуры обработки исключений первого класса следующие:

public static void handle(
      @Parameter WeavedRollbackException exception, 
      ObjectResponse<WeavedErrorResponse> response) {

    WeavedRequest request = exception.getWeavedRequest();
    response.send(new WeavedErrorResponse(request.getRequestIdentifier(), request.getId()));
}


public static void handle(
      @Parameter WeavedCommitException exception, 
      WeavedRequestRepository repository, 
      ObjectResponse<WeavedErrorResponse> response) {

    WeavedRequest request = exception.getWeavedRequest();
    request.setWeavedError(new WeavedError(
      "Request Identifier (" + request.getRequestIdentifier() + ") is special case", 
      request));
    repository.save(request);
    response.send(
      new WeavedErrorResponse(request.getRequestIdentifier(), request.getId()));
}

Второй обработчик работает внутри транзакции, поэтому он включает в себя дополнительные данные, хранящиеся в базе данных.

Обратите внимание, что из-за того, что первоклассная композиция процедур не требует от вызывающей стороны перехватывать исключения, проверенные исключения обнимаются. Мы считаем проверенные исключения очень полезной информацией в составе потока. Однако различие состоит в том, что это должно быть не заботой вызывающего абонента, а скорее заботой потока. Для меня это большая разница и останавливает проблему обработки исключений catch и log. Обработка исключений теперь является отдельной задачей, которая может быть закодирована впоследствии.

Успешный ответ

При успешном хранении деталей запроса в базе данных следующая процедура первого класса отправляет ответ.

public void send(
      @Val WeavedRequest request, 
      @Val RequestStandardDeviation standardDeviation, 
      @EventLoopResponse @Val ServicedThreadResponse[] eventLoopResponse,
      @ThreadPerRequestResponse @Val ServicedThreadResponse[] threadPerRequestResponse, 
      ObjectResponse<WeavedResponse> response) {

    response.send(new WeavedResponse(
      request.getRequestIdentifier(), 
      request.getId(), 
      eventLoopResponse, 
      threadPerRequestResponse, 
      standardDeviation.getStandardDeviation()));
}

Котлин для некоторых OO

Да, и просто для большего удовольствия от полиглотов, объекты OO, используемые для представления запросов / ответов JSON, следующие.

@HttpObject
data class ServicedThreadRequest(
      val identifier: Int)

data class ServicedThreadResponse(
      val threadName: String, 
      val lookupName: String, 
      val timestamp: Long)

data class WeavedErrorResponse(
      val requestIdentifier: Int, 
      val requestNumber: Int)

data class WeavedResponse(
      val requestIdentifier: Int,
      val requestNumber: Int,
      val eventLoopResponses: Array,
      val threadPerRequestResponses: Array,
      val standardDeviation: Double)

Доказательство работает

Ниже приведен тест для подтверждения последовательности первоклассных процедур, обслуживающих запрос.

public static final SpringRule spring = new SpringRule();

public static final OfficeFloorRule officeFloor = new OfficeFloorRule();

@ClassRule
public static final RuleChain ordered = RuleChain.outerRule(spring).around(officeFloor);

@Rule
public final HttpClientRule client = new HttpClientRule();

private static final ObjectMapper mapper = new ObjectMapper();
static {
    mapper.registerModule(new KotlinModule());
}

@Test
public void confirmWeavedTogether() throws Exception {
    HttpResponse response = this.client.execute(
      new HttpPost(this.client.url("/weave/1")));
    assertEquals("Should be successful", 200, response.getStatusLine().getStatusCode());
    WeavedResponse body = mapper.readValue(
      EntityUtils.toString(response.getEntity()), WeavedResponse.class);
    WeavedRequest entity = spring.getBean(WeavedRequestRepository.class)
      .findById(body.getRequestNumber()).get();
    assertNotNull("Should have standard deviation stored", entity.getRequestStandardDeviation());
}

Ткачество все вместе

На следующей диаграмме показана конфигурация для объединения вышеуказанных первоклассных процедур.

Это единственная конфигурация / код, необходимый для составления первоклассных процедур. Обратите внимание на имена, которые представляют имена процедур первого класса и соответствующие метаданные.

Это означает, что нам нужно проверять порт на все вызовы и тесты. Да, все, что вы видите выше, работает с одного порта. Вам не нужно выбирать между платформой, которая предоставляет только потоки на запрос или однопоточные циклы событий. Это из-за стратегии выполнения, обеспечиваемой внедрением потока первоклассных процедур.

Впрыск резьбы

Конфигурация потоков на самом деле следующая:

<teams>
    <team source="net.officefloor.frame.impl.spi.team.ExecutorCachedTeamSource" 
          type="org.springframework.data.repository.CrudRepository" />
</teams>

Здесь мы помечаем все процедуры, требующие, чтобы Spring Repository выполнялся пулом потоков. Помните, я сказал помнить об использовании Spring Repository. Итак, в приведенной выше конфигурации есть любая первоклассная процедура, требующая репозитория Spring, выполняемого настроенным пулом потоков. Обратите внимание, что пулы потоков называются командами, поскольку моделирование происходит из первоклассных процедур, поступающих из ведомств.

Поэтому, снова глядя на поток, выполнение потока выглядит следующим образом:

  1. Validate использует поток цикла события прослушивателя сокета
  2. Запрос на регистрацию использует репозиторий Spring, поэтому выполнение переключается на поток из настроенного пула потоков
  3. Этот поток переносит на триггер асинхронные реактивные вызовы
  4. Поток цикла реактивных событий затем вызывает обратные вызовы. Поскольку код Scala выполняется быстро, поток цикла реактивных событий продолжает выполнять чистую функцию Scala. Здесь считается, что переключение контекста потока слишком много, и более эффективно просто вызывать высокооптимизированную чистую функцию Scala. Однако, если мы хотим разделить функцию Scala на другой пул потоков, мы можем настроить другой пул потоков (обычно через зависимость маркера от процедуры первого класса).
  5. Оставшийся императивный код имеет переключение обратно к потоку из настроенного пула потоков, что зависит от репозитория Spring. Кроме того, локальные потоки между потоками распространяются на каждый используемый поток, поэтому транзакция репозитория Spring не теряется (т.е. транзакция активна для всех процедур первого класса в пределах границ транзакции).
  6. Ответ отправляется.

Теперь все вышеперечисленное можно настроить с помощью Thread Injection . Если у нас есть, например, более одного синхронного хранилища данных, мы можем создать пул потоков для взаимодействия с каждым хранилищем данных, чтобы избежать одного медленного хранилища данных, связывающего все потоки приложения.

Это также означает, что вы можете настроить различные потоки для разных сред без необходимости изменения какого-либо кода.

отказ

В реальных приложениях я бы старался избегать многих из вышеперечисленных языков программирования вместе. Я бы постарался упростить их до пары, чтобы избежать слишком большого набора навыков, связанных с повышением затрат на обслуживание вашего приложения (а также уменьшением проблем при смешанной компиляции). Это всего лишь демонстрация того, как ОО, функциональный, императивный и реактивный код могут быть сплетены вместе с первоклассными процедурами. Кроме того, он демонстрирует, как вы можете написать конкретные решения, прежде чем абстрагироваться .

Кроме того, как вы можете видеть, нам пришлось охватить большую широту в каждой парадигме программирования. Если код не является хорошим представлением парадигмы, мы очень рады получить отзывы об улучшениях от тех, кто более знаком с конкретной парадигмой.

И если мы пропустили важную парадигму, пожалуйста, дайте мне знать, чтобы мы могли рассмотреть возможность ее включения. Когда дело доходит до кодирования, мы ценим разнообразие, чтобы предоставить разработчикам выбор. Мы пытаемся разрушить заборы между парадигмами, чтобы создать одну большую счастливую кодирующую семью.

Резюме

Мы продемонстрировали, как первоклассная процедура может объединять код полиглота, написанный в разных парадигмах, для обслуживания запроса. Код, приведенный выше в статье, — это весь код, необходимый для приложения. Больше нет необходимости в коде плетения.

Кроме того, чтобы избежать проблем, это работает только на моем компьютере (в этой статье), код для вышеупомянутого доступен здесь . Смотрите прочитать мне о том , как запустить его.

Для большего понимания того, что происходит, см. Учебники , мои другие статьи .

И следите за моей следующей статьей об использовании первоклассной процедуры.