现在大多网站会员系统或云端协作平台上都有即时消息通知功能,即消息推送,这对用户来说是非常贴心的功能。要实现消息推送服务,大致可以采用以下几种方式:
EasyPM作为一个团队协作的项目管理平台,消息推送功能是少不了的。我们综合众多因素,最终采用MQTT协议方式,在众多MQTT协议实现架构中,我们选择了Apollo的实现架构。下边就介绍下基于Apollo架构如何实现消息推送和接收。
Apollo是apache旗下的基金项目,它是以Apache ActiveMQ5.x为基础,采用全新的线程和消息调度架构重新实现的消息中间件,针对多核处理器进行了优化处理,它的速度更快、更可靠、更易于维护。apollo与ActiveQQ一样支持多协议:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介绍MQTT协议的使用。
进入Apollo下载页面 ,下载windows版本的压缩包并解压到自己的工作目录(如:
E:\apache-apollo-1.7
),并创建系统环境变量APOLLO_HOME=E:\apache-apollo-1.7
。如果操作是系统是Windows Vista或更高版本,则需要安装Microsoft Visual C++ 2010 Redistributable:
64位JVM
32位JVM
E:\apache-apollo-1.7
之下的bin目录,打开cmd窗口,执行命令:apollo create D:\apollo_broker
,命令执行成功后,在D盘下会有apollo_broker
目录,这便是apollo的服务实例,apollo之旅便从这里开始。D:\apollo_broker
下有个bin目录,其中有两个文件:D:\apollo_broker\bin
目录下打开cmd窗口,执行apollo-broker run
命令来启动apollo服务,admin/password
cmd.exe
文件,点击鼠标右键,以管理员身份运行,输入创建windows服务命令,如下图:apollo_broker
服务,设置其自动启动,启动系统时apollo就会自动启动了
MQTT协议有众多客户端实现,相关客户端请参考apollo官方文档
本文采用eclipse的paho客户端实现
var client = new Paho.MQTT.Client(location.hostname, 61623,"/", "clientId");
/* 61623是ws连接的默认端口,可以在apollo中间件中进行配置
(关于apollo的配置请参考:
http://activemq.apache.org/apollo/documentation/user-manual.html
) */
// set callback handlers
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;
// connect the client
client.connect({userName:'admin',password:'password',onSuccess:onConnect});
// called when the client connects
function onConnect() { // 连接成功后的处理
// Once a connection has been made, make a subscription and send a message.
console.log("onConnect");
client.subscribe("/topic/event"); // 订阅消息的主题
var message = new Paho.MQTT.Message("Hello,this is a test");
message.destinationName = "/topic/event";
client.send(message); // 发送消息
}
// called when the client loses its connection
function onConnectionLost(responseObject) { // 连接丢失后的处理
if (responseObject.errorCode !== 0) {
console.log("onConnectionLost:"+responseObject.errorMessage);
}
}
// called when a message arrives
function onMessageArrived(message) { // 消息接收成功后的处理
console.log("onMessageArrived:"+message.payloadString);
}
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.0.1</version>
</dependency>
java实现代码:
String topic = "MQTT Examples";
String content = "Message from MqttPublishSample";
int qos = 2;
String broker = "tcp://127.0.0.1:61613";
String clientId = "JavaSample";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
System.out.println("Connecting to broker: "+broker);
sampleClient.connect(connOpts);
System.out.println("Connected");
System.out.println("Publishing message: "+content);
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
sampleClient.publish(topic, message);
System.out.println("Message published");
sampleClient.disconnect();
System.out.println("Disconnected");
System.exit(0);
} catch(MqttException me) {
System.out.println("reason "+me.getReasonCode());
System.out.println("msg "+me.getMessage());
System.out.println("loc "+me.getLocalizedMessage());
System.out.println("cause "+me.getCause());
System.out.println("excep "+me);
me.printStackTrace();
}
至此,MQTT协议已部署完毕,java端可以发布消息,而javascript端则可以订阅并接收到java端发布的信息。
本文只是依照官网手册而实现的简单应用,讲解不一定十分准确,有什么不对的地方还请多多指点,更详细的应用请参考官网文档:
apollo
eclipse paho