Java并行编程:从并行任务集获取反馈

  在并行任务启动后,强制性地从并行任务得到反馈。

  假想有一个程序,可以发送批邮件,还使用了多线程机制。你想知道有多少邮件成功发送吗?你想知道在实际发送过程期间,这个批处理工作的实时进展吗?

  要实现多线程的这种反馈,我们可以使用Callable接口。此接口的工作方式基本上与Runnable相同,但是执行方法(call())会返回一个值,该值反映了执行计算的结果。

  

    packagecom.ricardozuasti; importncurrent.Callable; publicclassFictionalEmailSenderimplementsCallable<Boolean>{ privateStringto; privateStringsubject; privateStringbody; publicFictionalEmailSender(Stringto,Stringsubject,Stringbody){ this.to=to; this.subject=subject; this.body=body; } @OverridepublicBooleancall()throwsInterruptedException{ //在0~0.5秒间模拟发送邮件Thread.sleep(Math.round(Math.random()*0.5*1000)); //假设我们有80%的几率成功发送邮件if(Math.random()>0.2){ returntrue; }else{ returnfalse; } } }

  注意:Callable接口可用于返回任意数据类型,因此我们的任务可以返回我们需要的任何信息。

  现在,我们使用一个线程池ExecutorService来发送邮件,由于我们的任务是以Callable接口实现的,我们提交执行的每个新任务,都会得到一个Future引用。注意我们要使用直接的构造器创建ExecutorService,而不是使用来自Executors的工具方法创建。这是因为使用指定类ThreadPoolExecutor提供了一些方法可以派上用场。

  

    packagecom.ricardozuasti; importncurrent.Future; importncurrent.LinkedBlockingQueue; importncurrent.ThreadPoolExecutor; importncurrent.TimeUnit; importjava.util.ArrayList; importjava.util.List; publicclassConcurrency2{ publicstaticvoidmain(String[]args){ try{ ThreadPoolExecutorexecutor=newThreadPoolExecutor(30,30,1, TimeUnit.SECONDS,newLinkedBlockingQueue()); List<Future<Boolean>>futures=newArrayList<Future<Boolean>>(9000); //发送垃圾邮件,用户名假设为4位数字for(inti=1000;i<10000;i++){ futures.add(executor.submit(newFictionalEmailSender(i+””, “Knock,knock,Neo”,”TheMatrixhasyou…”))); } //提交所有的任务后,关闭executorSystem.out.println(“Startingshutdown…”); executor.shutdown(); //每秒钟打印执行进度while(!executor.isTerminated()){ executor.awaitTermination(1,TimeUnit.SECONDS); intprogress=Math.round((executor.getCompletedTaskCount() *100)/executor.getTaskCount()); System.out.println(progress+”%done(“+ executor.getCompletedTaskCount()+”emailshavebeensent).”); } //现在所有邮件已发送完,检查futures,看成功发送的邮件有多少interrorCount=0; intsuccessCount=0; for(Future<Boolean>future:futures){ if(future.get()){ successCount++; }else{ errorCount++; } } System.out.println(successCount+”emailsweresuccessfullysent,but”+ errorCount+”failed.”); }catch(Exceptionex){ ex.printStackTrace(); } } }

  执行这个类,输出结果如下:

  

    Startingshutdown… 1%done(118emailshavebeensent). 2%done(232emailshavebeensent). 3%done(358emailshavebeensent). 5%done(478emailshavebeensent). 6%done(587emailshavebeensent). 7%done(718emailshavebeensent). 9%done(850emailshavebeensent). 10%done(969emailshavebeensent). ……

  所有的任务都由ExecutorService提交,我们开始它的关闭(防止提交新任务)并使用一个循环(实时场景,可能你会继续做其它的事情)来等待,直至所有任务都被执行完成、计算和打印当前每次迭代的进度。

  注意,你可以存储executor引用,也可以在任意时间从其它线程查询它的计算结果和报告进程进度。

  最后,使用Future集合引用,我们得到ExecutorService提交的每个Callable接口,通知成功发送的邮件数量和发送失败的邮件数量。

  此结构不但易于使用,还使得相关性得到清晰的隔离,在调度程序和实际任务之间提供了一个预定义的通信机制。

害怕攀登高峰的人,永远在山下徘徊。

Java并行编程:从并行任务集获取反馈

相关文章:

你感兴趣的文章:

标签云: