Преимущество систем обмена сообщениями заключается в том, что они не связывают обработку данных (потребителей) с производителями данных, и с помощью Kafka вы можете легко и раздельно масштабировать потребителей. Кроме того, с помощью асинхронной обработки вы можете разгрузить работу и улучшить взаимодействие с пользователем приложения. Эти возможности особенно важны для нативных облачных приложений с микросервисами.
Недавно был выпущен Kafka 0.9, который теперь доступен как служба Message Hub (бета) в Bluemix . Для разработчиков доступны разные API, и мой коллега Найл Уидон предоставил примеры того, как их использовать. Существует Java API, REST API и Node.js API, который оборачивает REST API и добавляет специфические функции IBM, такие как администрирование и более простая аутентификация.
Образец Node.js представляет собой простое приложение чата. Он использует модуль Node cfenv для доступа к переменным среды Bluemix и модуль -узел Node-модуля Node для доступа к Kafka. Я немного изменил образец, чтобы 1. отделить потребителя от производителя и 2. удалить приложение-образец чата. Ниже приведен минимальный код для производителя. Завтра я напишу больше о потребителе.
Вы можете запустить производителя либо локально, либо в Bluemix. Чтобы запустить его в Bluemix, создайте приложение Bluemix Node.js, добавьте службу Message Hub и выполните эти команды из корневого каталога проекта.
cf login
cf push <mykafkaproducer>
curl http://mykafkaproducer.mybluemix.net
Чтобы запустить производителя локально, выполните эти команды.
npm install
node app.js <message_hub_rest_endpoint> <message_hub_api_key>
curl http://localhost:6003
Чтобы проверить, работает ли он, разверните пример приложения чата и подключитесь, чтобы просмотреть сообщения от вашего производителя. И приложение чата, и приложение-производитель должны использовать одну и ту же службу Message Hub.
package.json
{
"name": "node-kafka-producer",
"version": "1.0.0",
"description": "",
"scripts": {
"start": "node app.js"
},
"dependencies": {
"express": "4.12.x",
"cfenv": "1.0.x",
"message-hub-rest": "^1.0.1"
},
"repository": {},
"engines": {
"node": "0.12.x"
}
}
app.js
/**
* Copyright 2015 IBM
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
var express = require('express');
var app = express();
var Cfenv = require('cfenv');
var MessageHub = require('message-hub-rest');
var appEnv = Cfenv.getAppEnv();
var instance;
var cleanedUp = false;
var topic = 'livechat';
app.listen(appEnv.port, '0.0.0.0', function() {
console.log("server starting on " + appEnv.url);
});
app.get('/', function (req, res) {
pushMessage("Hello World!");
res.send('Hello World!');
});
var start = function(restEndpoint, apiKey, callback) {
if(!appEnv.services || (appEnv.services && Object.keys(appEnv.services).length === 0)) {
if(restEndpoint && apiKey) {
appEnv.services = {
"messagehub": [
{
"label": "messagehub",
"credentials": {
"api_key": apiKey,
"kafka_rest_url": restEndpoint,
}
}
]
};
} else {
console.error('A REST Endpoint and API Key must be provided.');
process.exit(1);
}
} else {
console.log('Endpoint and API Key provided have been ignored, as there is a valid VCAP_SERVICES.');
}
instance = new MessageHub(appEnv.services);
instance.topics.create(topic)
.then(function(response) {
console.log('topic created');
})
.fail(function(error) {
console.log(error);
stop(1);
});
};
var pushMessage = function(message) {
var list = new MessageHub.MessageList();
var message = {
user: "Niklas",
message: message,
}
list.push(JSON.stringify(message));
instance.produce(topic, list.messages)
.fail(function(error) {
throw new Error(error);
});
};
var registerExitHandler = function(callback) {
if(callback) {
var events = ['exit', 'SIGINT', 'uncaughtException'];
for(var index in events) {
process.on(events[index], callback);
}
} else if(!callback) {
throw new ReferenceException('Provided callback parameter is undefined.');
}
};
// Register a callback function to run when
// the process exits.
registerExitHandler(function() {
stop();
});
var stop = function(exitCode) {
exitCode = exitCode || 0;
if(!cleanedUp) {
console.log('Running exit handler.');
cleanedUp = true;
process.exit(exitCode);
}
};
// If this module has been loaded by another module, don't start
// the service automatically. If it's being started from the command license
// (i.e. node app.js), start the service automatically.
if(!module.parent) {
if(process.argv.length >= 4) {
start(process.argv[process.argv.length - 2], process.argv[process.argv.length - 1]);
} else {
start();
}
}
module.exports = {
start: start,
stop: stop,
appEnv: appEnv
}