Статьи

Построение Apache Kafka Messaging Producer в Bluemix

Преимущество систем обмена сообщениями заключается в том, что они не связывают обработку данных (потребителей) с производителями данных, и с помощью Kafka вы можете легко и раздельно масштабировать потребителей. Кроме того, с помощью асинхронной обработки вы можете разгрузить работу и улучшить взаимодействие с пользователем приложения. Эти возможности особенно важны для нативных облачных приложений с микросервисами.

Недавно был выпущен Kafka 0.9, который теперь  доступен  как   служба Message Hub (бета) в  Bluemix . Для разработчиков доступны разные API, и мой коллега Найл Уидон предоставил  примеры  того, как их использовать. Существует Java API, REST API и Node.js API, который оборачивает REST API и добавляет специфические функции IBM, такие как администрирование и более простая аутентификация.

Образец Node.js  представляет собой простое приложение чата. Он использует модуль Node  cfenv  для доступа к переменным среды Bluemix и модуль -узел Node-модуля Node   для доступа к Kafka. Я немного изменил образец, чтобы 1. отделить потребителя от производителя и 2. удалить приложение-образец чата. Ниже приведен минимальный код для производителя. Завтра я напишу больше о потребителе.

Вы можете запустить производителя либо локально, либо в Bluemix. Чтобы запустить его в Bluemix, создайте приложение Bluemix Node.js, добавьте службу Message Hub и выполните эти команды из корневого каталога проекта.

cf login
cf push <mykafkaproducer>
curl http://mykafkaproducer.mybluemix.net

Чтобы запустить производителя локально, выполните эти команды.

npm install
node app.js <message_hub_rest_endpoint> <message_hub_api_key>
curl http://localhost:6003

Чтобы проверить, работает ли он, разверните пример приложения чата и подключитесь, чтобы просмотреть сообщения от вашего производителя. И приложение чата, и приложение-производитель должны использовать одну и ту же службу Message Hub.

Название изображения

package.json

{
  "name": "node-kafka-producer",
  "version": "1.0.0",
  "description": "",
  "scripts": {
    "start": "node app.js"
  },
  "dependencies": {
    "express": "4.12.x",
    "cfenv": "1.0.x",
    "message-hub-rest": "^1.0.1"
  },
  "repository": {},
  "engines": {
    "node": "0.12.x"
  } 
}

app.js

/**
 * Copyright 2015 IBM
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
*/
var express = require('express');
var app = express();

var Cfenv = require('cfenv');
var MessageHub = require('message-hub-rest');
var appEnv = Cfenv.getAppEnv();
var instance;
var cleanedUp = false;
var topic = 'livechat';

app.listen(appEnv.port, '0.0.0.0', function() {
  console.log("server starting on " + appEnv.url);
});

app.get('/', function (req, res) {
  pushMessage("Hello World!");
  res.send('Hello World!');
});

var start = function(restEndpoint, apiKey, callback) {
  if(!appEnv.services || (appEnv.services && Object.keys(appEnv.services).length === 0)) {
    if(restEndpoint && apiKey) {
      appEnv.services = {
        "messagehub": [
           {
              "label": "messagehub",
              "credentials": {
                 "api_key": apiKey,
                 "kafka_rest_url": restEndpoint,
              }
           }
        ]
      };
    } else {
      console.error('A REST Endpoint and API Key must be provided.');
      process.exit(1);
    }
  } else {
    console.log('Endpoint and API Key provided have been ignored, as there is a valid VCAP_SERVICES.');
  }

  instance = new MessageHub(appEnv.services);

  instance.topics.create(topic)
      .then(function(response) {
        console.log('topic created');
      })
      .fail(function(error) {
        console.log(error);
        stop(1);
      });
};

var pushMessage = function(message) {
    var list = new MessageHub.MessageList();
    var message = {
      user: "Niklas",
      message: message,
    }

    list.push(JSON.stringify(message));

    instance.produce(topic, list.messages)
      .fail(function(error) {
        throw new Error(error);
      });
};

var registerExitHandler = function(callback) {
  if(callback) {
    var events = ['exit', 'SIGINT', 'uncaughtException'];

    for(var index in events) {
      process.on(events[index], callback);
    }
  } else if(!callback) {
    throw new ReferenceException('Provided callback parameter is undefined.');
  }
};

// Register a callback function to run when
// the process exits.
registerExitHandler(function() {
  stop();
});

var stop = function(exitCode) {
  exitCode = exitCode || 0;

  if(!cleanedUp) {
    console.log('Running exit handler.');
    cleanedUp = true;
    process.exit(exitCode);
  }
};

// If this module has been loaded by another module, don't start
// the service automatically. If it's being started from the command license
// (i.e. node app.js), start the service automatically.
if(!module.parent) {
  if(process.argv.length >= 4) {
    start(process.argv[process.argv.length - 2], process.argv[process.argv.length - 1]);
  } else {
    start();
  }
}

module.exports = {
  start: start,
  stop: stop,
  appEnv: appEnv
}