任务列表分派给多个线程的策略及方法

多线程下载由来已久,如 FlashGet、NetAnts 等工具,它们都是依懒于 HTTP 协议的支持(Range 字段指定请求内容范围),首先能读取出请求内容 (即欲下载的文件) 的大小,划分出若干区块,把区块分段分发给每个线程去下载,线程从本段起始处下载数据及至段尾,多个线程下载的内容最终会写入到同一个文件中。

只研究有用的,工作中的需求:要把多个任务分派给多个线程去执行,这其中就会有一个任务列表指派到线程的策略思考:已知:1. 一个待执行的任务列表,2. 指定要启动的线程数;问题是:每个线程实际要执行哪些任务。

策略是:任务列表连续按线程数分段,先保证每线程平均能分配到的任务数,余下的任务从前至后依次附加到线程中–只是数量上,实际每个线程执行的任务都还是连续的。如果出现那种僧多(线程) 粥(任务) 少的情况,实际启动的线程数就等于任务数,一挑一。这里只实现了每个线程各扫自家门前雪,动作快的完成后眼见别的线程再累都是爱莫能助。

实现及演示代码如下:由三个类实现,写在了一个 java 文件中:TaskDistribuTor 为任务分发器,Task 为待执行的任务,WorkThread 为自定的工作线程。代码中运用了命令模式,如若能配以监听器,用上观察者模式来控制 UI 显示就更绝妙不过了,就能实现像下载中的区块着色跳跃的动感了,在此定义下一步的着眼点了。

代码中有较为详细的注释,看这些注释和执行结果就很容易理解的。main() 是测试方法

package com.unmi.common;<br />import java.util.ArrayList;<br />import java.util.List;<br />/** <br />* 指派任务列表给线程的分发器 <br />* @author Unmi<br />* QQ: 1125535 Email: fantasia@sina.com <br />* MSN: kypfos@MSN.com 2008-03-25 <br />*/<br />public class TaskDistribuTor {<br />  /** <br />   * 测试方法 <br />   * @param args <br />   */<br />  public static void main(String[] args) {<br />    //初始化要执行的任务列表<br />    List<Task> taskList = new ArrayList<Task>();<br />    for (int i = 0; i < 108; i++) {<br />      taskList.add(new Task(i));<br />    } <br />    //设定要启动的工作线程数为 5 个 <br />    int threadCount = 5;<br />    List<Task>[] taskListPerThread = distributeTasks(taskList, threadCount);<br />    System.out.println("实际要启动的工作线程数:"+taskListPerThread.length);<br />    for (int i = 0; i < taskListPerThread.length; i++) {<br />      Thread workThread = new WorkThread(taskListPerThread[i],i);<br />      workThread.start();<br />    } <br />  } <br />  /** <br />   * 把 List 中的任务分配给每个线程,先平均分配,剩于的依次附加给前面的线程<br />   * 返回的数组有多少个元素 (List<Task>) 就表明将启动多少个工作线程<br />   * @param taskList 待分派的任务列表<br />   * @param threadCount  线程数<br />   * @return 列表的数组,每个数组中存有该线程要执行的任务列表<br />   */<br />  public static List<Task>[] distributeTasks(List<Task> taskList, int threadCount) {<br />    // 每个线程至少要执行的任务数,假如不为零则表示每个线程都会分配到任务<br />    int minTaskCount = taskList.size() / threadCount;<br />    // 平均分配后还剩下的任务数,不为零则还有任务依个附加到前面的线程中 <br />    int remainTaskCount = taskList.size() % threadCount;<br />    // 实际要启动的线程数,如果工作线程比任务还多 <br />    // 自然只需要启动与任务相同个数的工作线程,一对一的执行 <br />    // 毕竟不打算实现了线程池,所以用不着预先初始化好休眠的线程<br />    int actualThreadCount = minTaskCount > 0 ? threadCount : remainTaskCount;<br />    // 要启动的线程数组,以及每个线程要执行的任务列表<br />    List<Task>[] taskListPerThread = new List[actualThreadCount];<br />    int taskIndex = 0;<br />    //平均分配后多余任务,每附加给一个线程后的剩余数,重新声明与 remainTaskCount <br />    //相同的变量,不然会在执行中改变 remainTaskCount 原有值,产生麻烦 <br />    int remainIndces = remainTaskCount;<br />    for (int i = 0; i < taskListPerThread.length; i++) {<br />      taskListPerThread[i] = new ArrayList<Task>();<br />      // 如果大于零,线程要分配到基本的任务<br />      if (minTaskCount > 0) {<br />        for (int j = taskIndex; j < minTaskCount + taskIndex; j++) {<br />          taskListPerThread[i].add(taskList.get(j));<br />        } <br />        taskIndex += minTaskCount;<br />      } <br />      // 假如还有剩下的,则补一个到这个线程中 <br />      if (remainIndces > 0) {<br />        taskListPerThread[i].add(taskList.get(taskIndex++));<br />        remainIndces--;<br />      } <br />    } <br />    // 打印任务的分配情况 <br />    for (int i = 0; i < taskListPerThread.length; i++) {<br />      System.out.println("线程 " + i + " 的任务数:" + taskListPerThread[i].size() + " 区间[" <br />          + taskListPerThread[i].get(0).getTaskId() + "," <br />          + taskListPerThread[i].get(taskListPerThread[i].size() - 1).getTaskId() + "]");<br />    } <br />    return taskListPerThread;<br />  } <br />} <br />/** <br />* 要执行的任务,可在执行时改变它的某个状态或调用它的某个操作 <br />* 例如任务有三个状态,就绪,运行,完成,默认为就绪态 <br />* 要进一步完善,可为 Task 加上状态变迁的监听器,因之决定UI的显示 <br />*/<br />class Task {<br />  public static final int READY = 0;<br />  public static final int RUNNING = 1;<br />  public static final int FINISHED = 2;<br />  private int status;<br />  //声明一个任务的自有业务含义的变量,用于标识任务<br />  private int taskId;<br />  //任务的初始化方法 <br />  public Task(int taskId){<br />    this.status = READY;<br />    this.taskId = taskId;<br />  } <br />  /** <br />   * 执行任务<br />   */<br />  public void execute() {<br />    // 设置状态为运行中 <br />    setStatus(Task.RUNNING);<br />    System.out.println("当前线程 ID 是:" + Thread.currentThread().getName() <br />        +" | 任务 ID 是:"+this.taskId);<br />    // 附加一个延时 <br />    try {<br />      Thread.sleep(1000);<br />    } catch (InterruptedException e) {<br />      e.printStackTrace();<br />    } <br />    // 执行完成,改状态为完成 <br />    setStatus(FINISHED);<br />  } <br />  public void setStatus(int status) {<br />    this.status = status;<br />  } <br />  public int getTaskId() {<br />    return taskId;<br />  } <br />} <br />/** <br />* 自定义的工作线程,持有分派给它执行的任务列表<br />*/<br />class WorkThread extends Thread {<br />  //本线程待执行的任务列表,你也可以指为任务索引的起始值 <br />  private List<Task> taskList = null;<br />  private int threadId;<br />  /** <br />   * 构造工作线程,为其指派任务列表,及命名线程 ID<br />   * @param taskList 欲执行的任务列表<br />   * @param threadId 线程 ID<br />   */<br />  public WorkThread(List<Task> taskList,int threadId) {<br />    this.taskList = taskList;<br />    this.threadId = threadId;<br />  } <br />  /** <br />   * 执行被指派的所有任务<br />   */<br />  public void run() {<br />    for (Task task : taskList) {<br />      task.execute();<br />    } <br />  } <br />} <br />package com.unmi.common;<br />import java.util.ArrayList;<br />import java.util.List;<br />/**<br />* 指派任务列表给线程的分发器<br />* @author Unmi<br />* QQ: 1125535 Email: fantasia@sina.com<br />* MSN: kypfos@MSN.com 2008-03-25<br />*/<br />public class TaskDistribuTor {<br />/**<br /> * 测试方法<br /> * @param args<br /> */<br />public static void main(String[] args) {<br /> //初始化要执行的任务列表<br /> List<Task> taskList = new ArrayList<Task>();<br /> for (int i = 0; i < 108; i++) {<br /> taskList.add(new Task(i));<br /> }<br /> //设定要启动的工作线程数为 5 个<br /> int threadCount = 5;<br /> List<Task>[] taskListPerThread = distributeTasks(taskList, threadCount);<br /> System.out.println("实际要启动的工作线程数:"+taskListPerThread.length);<br /> for (int i = 0; i < taskListPerThread.length; i++) {<br /> Thread workThread = new WorkThread(taskListPerThread[i],i);<br /> workThread.start();<br /> }<br />}<br />/**<br /> * 把 List 中的任务分配给每个线程,先平均分配,剩于的依次附加给前面的线程<br /> * 返回的数组有多少个元素 (List<Task>) 就表明将启动多少个工作线程<br /> * @param taskList 待分派的任务列表<br /> * @param threadCount  线程数<br /> * @return 列表的数组,每个数组中存有该线程要执行的任务列表<br /> */<br />public static List<Task>[] distributeTasks(List<Task> taskList, int threadCount) {<br /> // 每个线程至少要执行的任务数,假如不为零则表示每个线程都会分配到任务<br /> int minTaskCount = taskList.size() / threadCount;<br /> // 平均分配后还剩下的任务数,不为零则还有任务依个附加到前面的线程中<br /> int remainTaskCount = taskList.size() % threadCount;<br /> // 实际要启动的线程数,如果工作线程比任务还多<br /> // 自然只需要启动与任务相同个数的工作线程,一对一的执行<br /> // 毕竟不打算实现了线程池,所以用不着预先初始化好休眠的线程<br /> int actualThreadCount = minTaskCount > 0 ? threadCount : remainTaskCount;<br /> // 要启动的线程数组,以及每个线程要执行的任务列表<br /> List<Task>[] taskListPerThread = new List[actualThreadCount];<br /> int taskIndex = 0;<br /> //平均分配后多余任务,每附加给一个线程后的剩余数,重新声明与 remainTaskCount<br /> //相同的变量,不然会在执行中改变 remainTaskCount 原有值,产生麻烦<br /> int remainIndces = remainTaskCount;<br /> for (int i = 0; i < taskListPerThread.length; i++) {<br /> taskListPerThread[i] = new ArrayList<Task>();<br /> // 如果大于零,线程要分配到基本的任务<br /> if (minTaskCount > 0) {<br />  for (int j = taskIndex; j < minTaskCount + taskIndex; j++) {<br />  taskListPerThread[i].add(taskList.get(j));<br />  }<br />  taskIndex += minTaskCount;<br /> }<br /> // 假如还有剩下的,则补一个到这个线程中<br /> if (remainIndces > 0) {<br />  taskListPerThread[i].add(taskList.get(taskIndex++));<br />  remainIndces--;<br /> }<br /> }<br /> // 打印任务的分配情况<br /> for (int i = 0; i < taskListPerThread.length; i++) {<br /> System.out.println("线程 " + i + " 的任务数:" + taskListPerThread[i].size() + " 区间["<br />  + taskListPerThread[i].get(0).getTaskId() + ","<br />  + taskListPerThread[i].get(taskListPerThread[i].size() - 1).getTaskId() + "]");<br /> }<br /> return taskListPerThread;<br />}<br />}<br />/**<br />* 要执行的任务,可在执行时改变它的某个状态或调用它的某个操作<br />* 例如任务有三个状态,就绪,运行,完成,默认为就绪态<br />* 要进一步完善,可为 Task 加上状态变迁的监听器,因之决定UI的显示<br />*/<br />class Task {<br />public static final int READY = 0;<br />public static final int RUNNING = 1;<br />public static final int FINISHED = 2;<br />private int status;<br />//声明一个任务的自有业务含义的变量,用于标识任务<br />private int taskId;<br />//任务的初始化方法<br />public Task(int taskId){<br /> this.status = READY;<br /> this.taskId = taskId;<br />}<br />/**<br /> * 执行任务<br /> */<br />public void execute() {<br /> // 设置状态为运行中<br /> setStatus(Task.RUNNING);<br /> System.out.println("当前线程 ID 是:" + Thread.currentThread().getName()<br />  +" | 任务 ID 是:"+this.taskId);<br /> // 附加一个延时<br /> try {<br /> Thread.sleep(1000);<br /> } catch (InterruptedException e) {<br /> e.printStackTrace();<br /> }<br /> // 执行完成,改状态为完成<br /> setStatus(FINISHED);<br />}<br />public void setStatus(int status) {<br /> this.status = status;<br />}<br />public int getTaskId() {<br /> return taskId;<br />}<br />}<br />/**<br />* 自定义的工作线程,持有分派给它执行的任务列表<br />*/<br />class WorkThread extends Thread {<br />//本线程待执行的任务列表,你也可以指为任务索引的起始值<br />private List<Task> taskList = null;<br />private int threadId;<br />/**<br /> * 构造工作线程,为其指派任务列表,及命名线程 ID<br /> * @param taskList 欲执行的任务列表<br /> * @param threadId 线程 ID<br /> */<br />public WorkThread(List<Task> taskList,int threadId) {<br /> this.taskList = taskList;<br /> this.threadId = threadId;<br />}<br />/**<br /> * 执行被指派的所有任务<br /> */<br />public void run() {<br /> for (Task task : taskList) {<br /> task.execute();<br /> }<br />}<br />}

执行结果如下,注意观察每个线程分配到的任务数量及区间。直到所有的线程完成了所分配到的任务后程序结束:

线程 0 的任务数:22 区间[0,21]

线程 1 的任务数:22 区间[22,43]

线程 2 的任务数:22 区间[44,65]

线程 3 的任务数:21 区间[66,86]

线程 4 的任务数:21 区间[87,107]

实际要启动的工作线程数:5

当前线程 ID 是:Thread-0 | 任务 ID 是:0

当前线程 ID 是:Thread-1 | 任务 ID 是:22

当前线程 ID 是:Thread-2 | 任务 ID 是:44

当前线程 ID 是:Thread-3 | 任务 ID 是:66

当前线程 ID 是:Thread-4 | 任务 ID 是:87

当前线程 ID 是:Thread-0 | 任务 ID 是:1

当前线程 ID 是:Thread-1 | 任务 ID 是:23

当前线程 ID 是:Thread-2 | 任务 ID 是:45

…………………………………………………………………

上面坦白来只算是基本功夫,贴出来还真见笑了。还有更为复杂的功能:

像多线程的下载工具的确更充分利用了网络资源,而且像 FlashGet、NetAnts 都实现了:假如某个线程下载完了欲先所分配段的内容之后,会帮其他线程下载未完成数据,直到任务完成;或某一下载线程的未完成段区间已经很小了,用不着别人来帮忙时,这就涉及到任务的进一步分配。再如,以上两个工具都能动态增加、减小或中止线程,越说越复杂了,它们原本比这复杂多了,这些实现可能定义各种队列来实现,如未完成任务队列、下载中任务队列和已完成队列。难以细究了。

即使爬到最高的山上,一次也只能脚踏实地地迈一步。

任务列表分派给多个线程的策略及方法

相关文章:

你感兴趣的文章:

标签云: