OpenFire源码学习之二十五:消息回执与离线消息(下)

这一篇紧接着上面继续了。

方案二

基于redis的消息回执。主要流程分为下面几个步骤:

1)将消息暂存储与redis中,设置好消息的过期时间

2)客户端回执消息id来消灭暂存的消息

3)开通单独线程论坛在第1)步中的消息。根据消息的时间重新发送消息。如果消息第一次存放的时间大雨有效期(自定义10秒),解析消息中的to查找用户是否还在线。如果在则T掉(因为它长时间不理会服务的重要命令),如果不在线则将消息放置离线表。

OK,先来看看消息的存储格式吧。

1.MESSAGE消息 用户集合

SADD SOGU:[username] [VALUE(messageID)] [VALUE(messageID)] …

2.已读消息设备集合

SADD RT:[terminalid] [VALUE(messageID)] [VALUE(messageID)] …

3.消息内容

HMSET OGM:[messageID] CREATIONDATE [VALUE] UPDATEDATE [VALUE] STANZA [VALUE]

4.用户、设备关联

SADD URT:[USERNAME] [VALUE(terminalid)] …….

(先根据消息id查找时间,在java中排序后 查找stanza)

MESSAGE

–离线表

ZADDOFOFFLINE:[username] [INDEX(时间戳)] [VALUE(messageID)] 、[VALUE]、[VALUE]…… [VALUE]

HMSETOFOFFLINE:[messageID] STANZA[VALUE]

CREATIONDATE [VALUE] MESSAGESIZ[VALUE]

将消息暂时消息存储:

public void storeMessage(String username, Packet packet) {Jedis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();String packetID = "";if (packet instanceof Message)packetID = ((Message)packet).getID();else if (packet instanceof IQ)packetID = ((IQ)packet).getID();elsereturn;try {jedis.sadd("SOGU:" + username, packetID);Map<String, String> hash = new HashMap<String, String>();hash.put("STANZA", packet.toXML());hash.put("CREATIONDATE", StringUtils.dateToMillis(new Date()));jedis.hmset("OGM:" + packetID, hash);} finally {XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);}htp.execute(addMessagesToDB(packet));}private Runnable addMessagesToDB(final Packet packet) {return new Runnable() {@Overridepublic void run() {MyDBopt.insertMessage(packet);}

客户端收到消息来回执服务端的操作

private void handle(IQ packet) {JID recipientJID = packet.getTo();if (IQ.Type.crs != packet.getType()) {// Check if the packet was sent to the server hostnameif (recipientJID != null && recipientJID.getNode() == null &&recipientJID.getResource() == null && serverName.equals(recipientJID.getDomain())) {Element childElement = packet.getChildElement();if (childElement != null && childElement.element("addresses") != null) {// to route this packetmulticastRouter.route(packet);return;}}}if (IQ.Type.crs == packet.getType()) {String username = packet.getFrom().getNode();String terminal = packet.getFrom().getTerminal();String msgId = packet.getID();if (username == null || msgId == null || "".equals(msgId)) {return ;}if (terminal == null) {terminal = username + "_" + System.currentTimeMillis()%1000000; }Jedis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();try {jedis.sadd("URT:" + username, terminal);jedis.sadd("RT:" + terminal, packet.getID());} finally {XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);}threadPool.execute(createTask(msgId, username, terminal));return;}if (packet.getID() != null && (IQ.Type.result == packet.getType() || IQ.Type.error == packet.getType())) {// The server got an answer to an IQ packet that was sent from the serverIQResultListener iqResultListener = resultListeners.remove(packet.getID());if (iqResultListener != null) {resultTimeout.remove(packet.getID());if (iqResultListener != null) {try {iqResultListener.receivedAnswer(packet);}catch (Exception e) {Log.error("Error processing answer of remote entity. Answer: "+ packet.toXML(), e);}return;}}}try {// Check for registered components, services or remote serversif (recipientJID != null &&(routingTable.hasComponentRoute(recipientJID) || routingTable.hasServerRoute(recipientJID))) {// A component/service/remote server was found that can handle the PacketroutingTable.routePacket(recipientJID, packet, false);return;}if (isLocalServer(recipientJID)) {// Let the server handle the PacketElement childElement = packet.getChildElement();String namespace = null;if (childElement != null) {namespace = childElement.getNamespaceURI();}if (namespace == null) {if (packet.getType() != IQ.Type.result && packet.getType() != IQ.Type.error) {// Do nothing. We can't handle queries outside of a valid namespaceLog.warn("Unknown packet " + packet.toXML());}}else {// Check if communication to local users is allowedif (recipientJID != null && userManager.isRegisteredUser(recipientJID.getNode())) {PrivacyList list =PrivacyListManager.getInstance().getDefaultPrivacyList(recipientJID.getNode());if (list != null && list.shouldBlockPacket(packet)) {// Communication is blockedif (IQ.Type.set == packet.getType() || IQ.Type.get == packet.getType()) {// Answer that the service is unavailablesendErrorPacket(packet, PacketError.Condition.service_unavailable);}return;}}IQHandler handler = getHandler(namespace);if (handler == null) {if (recipientJID == null) {// Answer an error since the server can't handle the requested namespacesendErrorPacket(packet, PacketError.Condition.service_unavailable);}else if (recipientJID.getNode() == null ||"".equals(recipientJID.getNode())) {// Answer an error if JID is of the form <domain>sendErrorPacket(packet, PacketError.Condition.feature_not_implemented);}else {// JID is of the form <node@domain>// Answer an error since the server can't handle packets sent to a nodesendErrorPacket(packet, PacketError.Condition.service_unavailable);}}else {handler.process(packet);}}}else {// JID is of the form <node@domain/resource> or belongs to a remote server// or to an uninstalled componentroutingTable.routePacket(recipientJID, packet, false);}}catch (Exception e) {……}}离线消息

离线消息的优化。

同样可以拓展XMPP。比如

客户端获取离线消息,可以这么通讯。

仿佛松树就是一位威风的将军,守护着国家的国民。

OpenFire源码学习之二十五:消息回执与离线消息(下)

相关文章:

你感兴趣的文章:

标签云: