RabbitMQ – Task Queue in Java

这次我们试着实现这样一个小程序:

嗯,就是任务队列(task queue)。

不是将任务集中在一堆并一直等到所有任务一并完成为止,而是将每一个任务封装为一个消息,并将其发送到队列,后台的workers就从队列中分担工作。

web应用尤其喜欢这种处理方式,比如面对一个请求时我们有一大堆复杂逻辑需要处理,而我们却不需要立即响应处理结果,那就放到后面慢慢弄。

(PS:另外也有直接对任务进行持久化,然后用scheduler什么的去定时处理。无论如何,没有银弹。)

对于复杂的任务,我们可以用Thread.sleep模拟一下。

比如provider每发一个”hello…”,worker读到消息后开始数点,每读到一个”.”就睡一会儿。

provider也简单模拟一下,一次塞个20个消息到队列:

publicstaticvoidmain(String[]argv)throwsjava.io.IOException{ConnectionFactoryfactory=newConnectionFactory();factory.setHost(“localhost”);Connectionconnection=factory.newConnection();Channelchannel=connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);Stringmessage=”Hello…”;for(inti=0;i<20;i++){channel.basicPublish(“”,TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.concat(i+1+””).getBytes());System.out.println(“[x]Sent'”+message+(i+1)+”‘”+(i+1)+”times”);}channel.close();connection.close();}

有一个需要注意的地方,就是consumer揽了活后没干完就死掉了。

我需要其他还活着的consumer替死者完成工作。

RabbitMQ支持消息应答,如果worder没有做出应答却死掉了,provider则会将消息重新发给其他活着的consumer。

但这个和timeout无关,只有在worker的connection断掉时才会重新发送。

如果调用了没有autoAck参数的basicConsume,消息应答默认是启用的,也就是autoAck=false。

booleanautoAck=false;channel.basicConsume(TASK_QUEUE_NAME,autoAck,consumer);

当autoAck==false时需要我们显示调用channel.basicAck方法将接收的消息ack一下。

如果接收了消息却不显示调用应答方法,就不能再接收新的消息,这就造成了浪费。

另外,如果设置了autoAck就不要显示进行应答,否则会来一个com.rabbitmq.client.ShutdownSignalException。

consumer死了有其他人处理后事,那整个server死掉了怎么办?

为了让消息不丢失,我们需要将队列和消息标记为durable。

booleandurable=true;channel.queueDeclare(“hello”,durable,false,false,null);

好了,,这样即使重启RabbitMQ服务也不会丢失队列。

但这并不保证消息不会丢失,为了保证这一点,我们在provider发布消息时加了essageProperties.PERSISTENT_TEXT_PLAIN:

channel.basicPublish(“”,TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.concat(i+1+””).getBytes());

虽然这种方式并不完美,我们还需要做其他的一些工作,但暂时先到这里。

最后一个问题是,如何做到给consumer公平分配任务。

如果没有做这个处理,会出现这样一种情况。

举个例子:provider发送了20个消息,随即启动的consumer_1把这20个消息全都独占了。

在consumer_1工作期间又有consumer_2被启动,但此时consumer_2没有任何任务。

此时provider又发送了20个消息,这时consumer_2会得到10个任务。

我们可以使用channel.basicQos(int prefetchCount)方法限制预获取的数量,比如prefetchCount==1就是返回应答后可以再获得1个消息。

好了,consumer代码如下:

publicclassWorker{privatestaticfinalStringTASK_QUEUE_NAME=”task_queue”;publicstaticvoidmain(String[]argv)throwsjava.io.IOException,java.lang.InterruptedException{ConnectionFactoryfactory=newConnectionFactory();factory.setHost(“localhost”);Connectionconnection=factory.newConnection();Channelchannel=connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);System.out.println(“[*]Waitingformessages.ToexitpressCTRL+C”);channel.basicQos(1);QueueingConsumerconsumer=newQueueingConsumer(channel);booleanautoAck=false;channel.basicConsume(TASK_QUEUE_NAME,autoAck,consumer);while(true){QueueingConsumer.Deliverydelivery=consumer.nextDelivery();Stringmessage=newString(delivery.getBody());System.out.println(“[x]Received'”+message+”‘”);doWork(message);System.out.println(“[x]Done”);channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}}privatestaticvoiddoWork(Stringtask)throwsInterruptedException{for(charch:task.toCharArray()){if(ch==’.’)Thread.sleep(1000);}}}

本文出自 “Alvez. 99.9% 0B/s” 博客,请务必保留此出处

原来和文字沾上边的孩子从来都是不快乐的,

RabbitMQ – Task Queue in Java

相关文章:

你感兴趣的文章:

标签云: