面向Java开发人员的Scala指南: 深入了解Scala并发性 了解 Scala

欢迎进入Java社区论坛,与200万技术人员互动交流 >>进入

Scala 并发性 v2

Scala Library Reference 中有一个有趣的包:scala.concurrency.这个包包含许多不同的并发性结构,包括我们即将利用的 MailBox 类。

顾名思义,MailBox 从本质上说就是 Drop,用于在检测之前保存数据块的单槽缓冲区。但是,MailBox 最大的优势在于它将发送和接收数据的细节完全封装到模式匹配和 case 类中,这使它比简单的 Drop(或 Drop 的多槽数据保存类 java.util.concurrent.BoundedBuffer)更加灵活。

清单 4. ProdConSample, v2 (Scala)

package com.tedneward.scalaexamples.scala.V2{ import concurrent.{MailBox, ops} object ProdConSample { class Producer(drop : Drop) extends Runnable { val importantInfo : Array[String] = Array( “Mares eat oats”, “Does eat oats”, “Little lambs eat ivy”, “A kid will eat ivy too” ); override def run() : Unit = { importantInfo.foreach((msg) => drop.put(msg)) drop.put(“DONE”) } } class Consumer(drop : Drop) extends Runnable { override def run() : Unit = { var message = drop.take() while (message != “DONE”) { System.out.format(“MESSAGE RECEIVED: %s%n”, message) message = drop.take() } } } class Drop { private val m = new MailBox() private case class Empty() private case class Full(x : String) m send Empty() // initialization def put(msg : String) : Unit = { m receive { case Empty() => m send Full(msg) } } def take() : String = { m receive { case Full(msg) => m send Empty(); msg } } } def main(args : Array[String]) : Unit = { // Create Drop val drop = new Drop() // Spawn Producer new Thread(new Producer(drop)).start(); // Spawn Consumer new Thread(new Consumer(drop)).start(); } }}

此处,v2 和 v1 之间的惟一区别在于 Drop 的实现,它现在利用 MailBox 类处理传入以及从 Drop 中删除的消息的阻塞和信号事务。(我们可以重写 Producer 和 Consumer,让它们直接使用 MailBox,但考虑到简单性,我们假定希望保持所有示例中的 Drop API 相一致。)使用 MailBox 与使用典型的 BoundedBuffer(Drop)稍有不同,因此我们来仔细看看其代码。

MailBox 有两个基本操作:send 和 receive.receiveWithin 方法仅仅是基于超时的 receive.MailBox 接收任何类型的消息。send() 方法将消息放置到邮箱中,并立即通知任何关心该类型消息的等待接收者,并将它附加到一个消息链表中以便稍后检索。receive() 方法将阻塞,直到接收到对于功能块合适的消息。

因此,在这种情况下,我们将创建两个 case 类,一个不包含任何内容(Empty),这表示 MailBox 为空,另一个包含消息数据(Full.

put 方法,由于它会将数据放置在 Drop 中,对 MailBox 调用 receive() 以查找 Empty 实例,因此会阻塞直到发送 Empty.此时,它发送一个 Full 实例给包含新数据的 MailBox.

take 方法,由于它会从 Drop 中删除数据,对 MailBox 调用 receive() 以查找 Full 实例,提取消息(再次得益于模式匹配从 case 类内部提取值并将它们绑到本地变量的能力)并发送一个 Empty 实例给 MailBox.不需要明确的锁定,并且不需要考虑监控程序。

Scala 并发性 v3

事实上,我们可以显著缩短代码,只要 Producer 和 Consumer 不需要功能全面的类(此处便是如此) ― 两者从本质上说都是 Runnable.run() 方法的瘦包装器,Scala 可以使用 scala.concurrent.ops 对象的 spawn 方法来实现,如清单 5 所示:

清单 5. ProdConSample, v3 (Scala)

package com.tedneward.scalaexamples.scala.V3{ import concurrent.MailBox import concurrent.ops._ object ProdConSample { class Drop { private val m = new MailBox() private case class Empty() private case class Full(x : String) m send Empty() // initialization def put(msg : String) : Unit = { m receive { case Empty() => m send Full(msg) } } def take() : String = { m receive { case Full(msg) => m send Empty(); msg } } } def main(args : Array[String]) : Unit = { // Create Drop val drop = new Drop() // Spawn Producer spawn { val importantInfo : Array[String] = Array( “Mares eat oats”, “Does eat oats”, “Little lambs eat ivy”, “A kid will eat ivy too” ); importantInfo.foreach((msg) => drop.put(msg)) drop.put(“DONE”) } // Spawn Consumer spawn { var message = drop.take() while (message != “DONE”) { System.out.format(“MESSAGE RECEIVED: %s%n”, message) message = drop.take() } } } }}

spawn 方法(通过包块顶部的 ops 对象导入)接收一个代码块(另一个 by-name 参数示例)并将它包装在匿名构造的线程对象的 run() 方法内部。事实上,并不难理解 spawn 的定义在 ops 类的内部是什么样的:

清单 6. scala.concurrent.ops.spawn()

def spawn(p: => Unit) = { val t = new Thread() { override def run() = p } t.start() }

……这再一次强调了 by-name 参数的强大之处。

ops.spawn 方法的一个缺点在于,它是在 2003 年 Java 5 concurrency 类还不可用的时候编写的。特别是,java.util.concurrent.Executor 及其同类的作用是让开发人员更加轻松地生成线程,而不需要实际处理直接创建线程对象的细节。幸运的是,在您自己的自定义库中重新创建 spawn 的定义是相当简单的,这需要利用 Executor(或 ExecutorService 或 ScheduledExecutorService)来执行线程的实际启动任务。

事实上,Scala 的并发性支持超越了 MailBox 和 ops 类;Scala 还支持一个类似的 “Actors” 概念,它使用了与 MailBox 所采用的方法相类似的消息传递方法,但应用更加全面并且灵活性也更好。但是,这部分内容将在下期讨论。

结束语

Scala 为并发性提供了两种级别的支持,这与其他与 Java 相关的主题极为类似:

首先,对底层库的完全访问(比如说 java.util.concurrent)以及对 “传统” Java 并发性语义的支持(比如说监控程序和 wait()/notifyAll())。

两个例子中的目标是相同的:让开发人员能够更加轻松地专注于问题的实质,而不用考虑并发编程的低级细节(显然,第二种方法更好地实现了这一目标,至少对于没有过多考虑低级细节的人来说是这样的。)

但是,当前 Scala 库的一个明显的缺陷就是缺乏 Java 5 支持;scala.concurrent.ops 类应该具有 spawn 这样的利用新的 Executor 接口的方法。它还应该支持利用新的 Lock 接口的各种版本的 synchronized.幸运的是,这些都是可以在 Scala 生命周期中实现的库增强,而不会破坏已有代码;它们甚至可以由 Scala 开发人员自己完成,而不需要等待 Scala 的核心开发团队提供给他们(只需要花费少量时间)。

描述名字大小下载方法

本文的示例 Scala 代码j-scala02049.zip10KBHTTP

[1][2][3]

人只要不失去方向,就不会失去自己

面向Java开发人员的Scala指南: 深入了解Scala并发性 了解 Scala

相关文章:

你感兴趣的文章:

标签云: