关于JMSMessagePending的问题

前段时间,有同事跟我说客户那边有很多状态为receive的message,这些message只有在JMS Server或 weblogic Server充启之后才能被消费。经过调查后,这个问题可能是weblogic的一个bug,当然也不排除 跟具体环境有关的可能。下面我们来看看问题的根本原因是什么,这种分析有助我们更进一步理解 weblogic JMS的实现。

首先我们看一下什么是receive,receive表示一个message已经被consumer消费,但服务端还没有关于 这个message的ack,所以消息不能从queue中删除, 由于queue中的消息是point-2-point的,所以某个消 息被标为receive后,这个消息自然不能被其他consumer消费。那么这个ack由谁负责发送给Server呢,什 么时候发送呢?这些都由我们创建JMS Session时使用的Ack_mode决定,典型的ack-mode有如下两种:

auto-ack: 自动响应模式,consumer.receive()调用后,如果服务器端发现有可用的message,消息返 回到客户端JMS实现层,在消息返回给客户前,由weblogic client (JMSSession.getAsyncMessageForConsumer(),异步接受,比如MessageListener,或 JMSSession.receiveMessage(),同步接受)层实现直接调用acknowledge()通知服务器端,服务器端收到 ack后,它会负责负责将处于receive的message从物理queue中删除。

client-ack: 客户响应模式,consumer.receive()调用后,客户端收到消息后,客户端程序决定什么 时候发送ack,可以在消息后立即发送,也可以在消息处理成功后发送,ack的发送通过 message.acknowledge()实现。后面的过程和auto-ack相同。

初看这个问题,感觉是ack没有收到,那么什么情况下会出现ack丢失呢?网络问题? 那么客户端或服 务器端的server log应该能够看到异常,客户坚持说没有任何异常。有点不可思议,要了客户的代码,他 们没有代码,实际上他们的应用是基于Spring Framework的,通过简单的配置来实现他们的业务需要,看 了下Spring的相关代码,客户之所以说没有异常,因为Spring catch了服务器端返回的JMSException,并 吃掉了这个异常(即异常没有打印出来),这个异常输出是可以通过Spring的配置来实现。客户配置后, 给了我具体的异常,如下:

java.lang.IllegalArgumentException: Delay is negative.        at weblogic.timers.internal.TimerManagerImpl.schedule(TimerManagerImpl.java:388)         at weblogic.timers.internal.TimerManagerImpl.schedule(TimerManagerImpl.java:340)         at weblogic.messaging.kernel.internal.ReceiveRequestImpl. (ReceiveRequestImp l.java:98)        at weblogic.messaging.kernel.internal.QueueImpl.receive(QueueImpl.java:820)        at weblogic.jms.backend.BEConsumerImpl.blockingReceiveStart(BEConsumerImpl.java:1 172)         at weblogic.jms.backend.BEConsumerImpl.receive(BEConsumerImpl.java:1383)         at weblogic.jms.backend.BEConsumerImpl.invoke(BEConsumerImpl.java:1088)         at weblogic.messaging.dispatcher.Request.wrappedFiniteStateMachine(Request.java:7 59)        at weblogic.messaging.dispatcher.DispatcherImpl.dispatchAsyncInternal (DispatcherI mpl.java:129)        at weblogic.messaging.dispatcher.DispatcherImpl.dispatchAsync(DispatcherImpl.java :112)         at weblogic.messaging.dispatcher.Request.dispatchAsync(Request.java:1046)         at weblogic.jms.dispatcher.Request.dispatchAsync(Request.java:72)         at weblogic.jms.frontend.FEConsumer.receive(FEConsumer.java:557)        at weblogic.jms.frontend.FEConsumer.invoke(FEConsumer.java:806)        at weblogic.messaging.dispatcher.Request.wrappedFiniteStateMachine(Request.java:7 59)         at weblogic.messaging.dispatcher.DispatcherServerRef.invoke(DispatcherServerRef.j ava:276)        at weblogic.messaging.dispatcher.DispatcherServerRef.handleRequest(DispatcherServ erRef.java:141)        at weblogic.messaging.dispatcher.DispatcherServerRef.Access$000(DispatcherServerR ef.java:36)         at weblogic.messaging.dispatcher.DispatcherServerRef$2.run (DispatcherServerRef.ja va:112)        at weblogic.work.ExecuteThread.execute (ExecuteThread.java:209)        at weblogic.work.ExecuteThread.run (ExecuteThread.java:181)

现在我们看一下Weblogic JMS的receive的基本流程,看看这个exception为什么会被抛出来。

JMSConsumer.receive(long timewait),客户端发起receive请求,其中timewait可有可无,不做指定 的话,说明没有可用消息到达的话,我们会一直等下去。如要不作等待的话,可以使用receiveNoWait() 。receive()中会检查timeout值,如果没有指定timeout,那么Long.maxValue会被设定成这个timeout, 如果timeout小于0,客户端将会收到Invalid Timeout异常,接下来请求会被delegate到JMSSession。

|

JMSSession.receiveMessage(consumer,timeout),这里timeout会被重新计算,然后我们会创建一个 FEConsumerReceiveRequest对象。这个对象中包含计算后的timeout,计算后的timeout应该是个非负值( 上面的异常就是这里的计算导致的,至于为什么客户指定的timeout为1,计算后的timeout变成了负数, 从而导致上面的异常,从代码层面,看不出有什么问题)。FEConsumerReceiveRequest对象创建后,由JMS FrontEnd Dispatcher负责把请求交给后端的JMS Server,Dispatcher是Weblogic JMS中用于负责请求传 输的,它依赖于RJVM layer,这里不做赘述。

|

RJVM layer, 负责RMI socket层的数据发送

|

FEConsumer.receive(invocableRequest),RJVM层处理完socket数据后,请求会被转给JMSConsumer, JMSConsumer通过状态机(state machine)来控制请求处理,没有过多的逻辑,它会基于收到的receive request创建一个BEConsumerReceiveRequest对象,然后把这个请求通过JMS BackEnd Dispatcher转发给 BEConsumerImpl。之所以存在FrontEnd /BackEnd Dispatcher,主要考虑到处理请求的server和queue所 在的不是同一server。

|

BEConsumerImpl.receive(request),request进入BEConsumerImpl后,它也通过state machine来控制 请求处理,下面两个方法在调用过程中被顺序调用,

BEConsumerImpl.blockingReceiveStart(request),这里首先检查timeout值,然后调用 QueueImpl.receive(…)从queue中获取message,receive()的具体参数如下,包括timeout, expression(即检查条件,我们定义的message selecTor就在其中)。

BEConsumerImpl.blockingReceiveProcessMessage(request)

BEConsumerImpl.blockingReceiveComplete(request)

|

QueueImpl.receive(expression,count,acknowledge,owner,timeout,started,userBlob),这里除了 状态检查,没有其他逻辑,它会根据传进来的参数,初始化一个ReceiveRequestImpl对象。

|

ReceiveRequestImpl.new(),这个new代表ReceiveRequestImpl的构造函数。

|

QueueImpl.get(…),如果timeout = 0,即如果客户调用的是receiveNoWait的话,我们直接去通过 QueueImpl.get(…),如果没有match的message,那么直接将新建request的result设定为no result,否 则将match的message设定为result。

QueueImpl.addReader(receiveRequestImpl),如果timeout != 0,我们会在 ReceiveRequestImpl.start()中调用QueueImpl.addReader(),addReader()中同样会通过QueueImpl.get ()检查是否有match的message,如果找到相应的message,我们会把message reference状态改为receive。

TimerManagerImpl.schedule(timeout),如果QueueImpl.addReader()中的QueueImpl.get()没有找到 相应的message,我们需要等待(依据客户指定的timeout),这个等待通过timer去实现,如下:

timer = timerManager.schedule(this, timeout);

指定的timeout到达后,如果和没有可用的message,no result将被返回。从上面的异常堆栈来看,问 题就出在这里,如果timeout为负数,timerMangerImpl在启动trigger的时候,会抛出如下的 runtimeException,

java.lang.IllegalArgumentException: Delay is negative.

也许你会疑问,这没什么问题吧,timerTrigger只有在没有message的时候才会被schedule,既然没有 message,那有谈何状态receive message?没错,起timerTrigger之前我们的确没有修改message状态, 但你注意到没有,我们在起timerTrigger前,把receiveRequestImpl加入到QueueImpl去了,但我们在碰 到IllegalArgumentException时并没有把这个receiveRequestImpl从QueueImpl中删除,问题就在这里。

 1   synchronized void addReader(Reader reader) throws KernelException {  2 3     List list = get(..); 4     int newCount; 5     if (list != null) { 6 7     } else { 8       reader.incrementReserveCount(- reservedCount); 9       newCount = reader.getCount();10     }11      if (newCount > 0) {12       logger.debug("Adding consumer to reader list");13       readerList.add(reader);14     }15   }

如果我们不把receiveRequestImpl从QueueImpl的readerList中删除,那么如果过一会有message sender发送一条和我们上述请求match的message到这个queue。weblogic收到这个message后,它会检查 readerList,如果这个message match某个reader,我们会把message状态改成receive,当由于 IllegalArgumentException,客户端收到它的时候,客户端会close JMSSession,也就是说这个消息虽然 有reader,但无法deliver到客户端。

我们再来看看Weblogic JMS sender的相关流程,

QueueImpl.messageSendComplete(),消息发送过程结束后(比如涉及sTore的话,消息此时已经被存储 ),到这一步的话,我们会调整系统接受的消息数,然后通过makeMessageAvailable()把消息标成 visiable或deliver给正在等待的reader。

QueueImpl.makeMessageAvailable(),它会直接调用match()去检查readerList中是否存在正在等待它 的reader。

QueueImpl.match(),它通过finderReader()从readerList中检查reader,如果有符合条件的reader, 它会把这个message标志为receive,同时把这个message挪到pending list中去。

前面我们说了,虽然reader还在,但与之对应的JMSConsumer已经被close,所以这个消息根本就无法 deliver出去,自然就不会有ack从客户端返回了,这个消息也只能一直pending了。

这个问题可能是个bug,目前还在确认之中,但我同时也在和客户沟通,可能跟他的环境有一定关系( 比如NTP时间同步问题)。

虽然这个问题能引发message pending,但并不是所有的message pending问题都是由它应起的。网络 问题也能引发类似问题,具体问题具体分析,主要的参考客户端的JMSException。位于receive后的状态 是transation,也就是如果发现状态为transaction的message的话,一般而言,是这个消息要么就是发送 还没结束,要么就是消息正处于一个delete的事务单元中,这里就不再一一罗列了。

昨晚多几分钟的准备,今天少几小时的麻烦。

关于JMSMessagePending的问题

相关文章:

你感兴趣的文章:

标签云: