用JAVA实现缓冲多线程无阻塞读取远程文件

我平时比较喜欢从网上听歌,有些链接下载速度太慢了。如果用 HttpURLConnection类的方法打开连接,然后用InputStream类获得输入流,再用 BufferedInputStream构造出带缓冲区的输入流,如果网速太慢的话,无论缓冲 区设置多大,听起来都是断断续续的,达不到真正缓冲的目的。于是尝试编写代 码实现用缓冲方式读取远程文件,以下贴出的代码是我写的MP3解码器的一部分 。我是不怎么赞同使用多线程下载的,加之有的链接下载速度本身就比较快,所 以在下载速度足够的情况下,就让下载线程退出,直到只剩下一个下载线程。当 然,多线程中令人头痛的死锁问题、HttpURLConnection的超时阻塞问题都会使 代码看起来异常复杂。

简要介绍一下实现多线程环形缓冲的方法。将缓冲区buf[]分为16块,每块 32K,下载线程负责向缓冲区写数据,每次写一块;读线程(BuffRandAcceURL类 )每次读小于32K的任意字节。同步描述:写/写互斥等待空闲块;写/写并发填 写buf[];读/写并发使用buf[]。

经过我很长一段时间使用,我认为比较满意地实现了我的目标,同其它MP3播 放器对比,我的这种方法能够比较流畅、稳定地下载并播放。我把实现多线程下 载缓冲的方法写出来,不足之处恳请批评指正。

一、HttpReader类功能:HTTP协议从指定URL读取数据

/** *//*** author by http://www.bt285.cn http://www.5a520.cn*/package instream;     import java.io.IOException;   import java.io.InputStream;   import java.net.HttpURLConnection;   import java.net.URL;     public final class HttpReader {       public static final int MAX_RETRY = 10;       private static long content_length;       private URL url;       private HttpURLConnection httpConnection;       private InputStream in_stream;       private long cur_pos;           //用于决定seek方法中是否执行文件定位       private int connect_timeout;       private int read_timeout;              public HttpReader(URL u) {           this(u, 5000, 5000);       }              public HttpReader(URL u, int connect_timeout, int read_timeout) {           this.connect_timeout = connect_timeout;           this.read_timeout = read_timeout;           url = u;           if (content_length == 0) {               int retry = 0;               while (retry < HttpReader.MAX_RETRY)                   try {                       this.seek(0);                       content_length = httpConnection.getContentLength();                       break;                   } catch (Exception e) {                       retry++;                   }           }       }              public static long getContentLength() {           return content_length;       }              public int read(byte[] b, int off, int len) throws IOException {           int r = in_stream.read(b, off, len);           cur_pos += r;           return r;       }              public int getData(byte[] b, int off, int len) throws IOException {           int r, rema = len;           while (rema > 0) {               if ((r = in_stream.read(b, off, rema)) == -1) {                   return -1;               }               rema -= r;               off += r;               cur_pos += r;           }           return len;       }              public void close() {           if (httpConnection != null) {               httpConnection.disconnect();               httpConnection = null;           }           if (in_stream != null) {               try {                   in_stream.close();               } catch (IOException e) {}               in_stream = null;           }           url = null;       }              /**//*       * 抛出异常通知再试       * 响应码503可能是由某种暂时的原因引起的,例如同一IP频繁的连接请求可能遭服务器拒绝       */      public void seek(long start_pos) throws IOException {           if (start_pos == cur_pos && in_stream != null)               return;           if (httpConnection != null) {               httpConnection.disconnect();               httpConnection = null;           }           if (in_stream != null) {               in_stream.close();               in_stream = null;           }           httpConnection = (HttpURLConnection) url.openConnection();           httpConnection.setConnectTimeout(connect_timeout);           httpConnection.setReadTimeout(read_timeout);           String sProperty = "bytes=" + start_pos + "-";           httpConnection.setRequestProperty("Range", sProperty);           //httpConnection.setRequestProperty("Connection", "Keep-Alive");           int responseCode = httpConnection.getResponseCode();           if (responseCode < 200 || responseCode >= 300) {               try {                   Thread.sleep(500);               } catch (InterruptedException e) {                   e.printStackTrace();               }               throw new IOException("HTTP responseCode="+responseCode);           }             in_stream = httpConnection.getInputStream();           cur_pos = start_pos;       }     }

二、IWriterCallBack接口功能:实现读/写通信。

package instream;     public interface IWriterCallBack {       public boolean tryWriting(Writer w) throws InterruptedException;       public void updateBuffer(int i, int len);       public void updateWriterCount();       public void terminateWriters();   }

三、Writer类:下载线程,负责向buf[]写数据。

/** *//*** http://www.bt285.cn http://www.5a520.cn */package instream;   import java.io.IOException;   import java.net.URL;     public final class Writer implements Runnable {       private static boolean isalive = true;       private byte[] buf;       private IWriterCallBack icb;       protected int index;            //buf[]内"块"索引号       protected long start_pos;       //index对应的文件位置(相对于文件首的偏移量)       protected int await_count;      //用于判断:下载速度足够就退出一个"写"线程       private HttpReader hr;              public Writer(IWriterCallBack call_back, URL u, byte[] b, int i) {           hr = new HttpReader(u);           if(HttpReader.getContentLength() == 0)  //实例化HttpReader对象都不成功               return;           icb = call_back;           buf = b;           Thread t = new Thread(this,"dt_"+i);           t.setPriority(Thread.NORM_PRIORITY + 1);           t.start();       }              public void run() {           int write_bytes=0, write_pos=0, rema = 0, retry = 0;           boolean cont = true;           while (cont) {               try {                   // 1.等待空闲块                   if(retry == 0) {                       if (icb.tryWriting(this) == false)                           break;                       write_bytes = 0;                       rema = BuffRandAcceURL.UNIT_LENGTH;                       write_pos = index << BuffRandAcceURL.UNIT_LENGTH_BITS;                   }                                      // 2.定位                   hr.seek(start_pos);                     // 3.下载"一块"                   int w;                   while (rema > 0 && isalive) {                       w = (rema < 2048) ? rema : 2048; //每次读几K合适?                       if ((w = hr.read(buf, write_pos, w)) == -1) {                           cont = false;                           break;                       }                       rema -= w;                       write_pos += w;                       start_pos += w;                       write_bytes += w;                   }                                      //4.通知"读"线程                   retry = 0;                   icb.updateBuffer(index, write_bytes);               } catch (InterruptedException e) {                   isalive = false;                   icb.terminateWriters();                   break;               } catch (IOException e) {                   if(++retry == HttpReader.MAX_RETRY) {                       isalive = false;                       icb.terminateWriters();                       break;                   }               }           }           icb.updateWriterCount();           try {               hr.close();           } catch (Exception e) {}           hr = null;           buf = null;           icb = null;       }     }

四、IRandomAccess接口:随机读取文件接口,BuffRandAcceURL类和 BuffRandAcceFile类实现接口方法。BuffRandAcceFile类实现读取本地磁盘文件 ,这儿就不给出其源码了。

package instream;     public interface IRandomAccess {       public int read() throws Exception;       public int read(byte b[]) throws Exception;       public int read(byte b[], int off, int len) throws Exception;       public int dump(int src_off, byte b[], int dst_off, int len) throws Exception;       public void seek(long pos) throws Exception;       public long length();       public long getFilePointer();       public void close();   }

五、BuffRandAcceURL类功能:创建下载线程;read方法从buf[]读数据。

关键是如何简单有效防止死锁?以下只是我的一次尝试,请指正。

/** *//*** http://www.5a520.cn  http://www.bt285.cn*/ package instream;     import java.net.URL;   import java.net.URLDecoder;   import decode.Header;   import tag.MP3Tag;   import tag.TagThread;     public final class BuffRandAcceURL implements IRandomAccess, IWriterCallBack {       public static final int UNIT_LENGTH_BITS = 15;                  //32K       public static final int UNIT_LENGTH = 1 << UNIT_LENGTH_BITS;       public static final int BUF_LENGTH = UNIT_LENGTH << 4;            //16块       public static final int UNIT_COUNT = BUF_LENGTH >> UNIT_LENGTH_BITS;       public static final int BUF_LENGTH_MASK = (BUF_LENGTH - 1);       private static final int MAX_WRITER = 8;       private static long file_pointer;       private static int read_pos;       private static int fill_bytes;       private static byte[] buf;      //同时也作读写同步锁:buf.wait()/buf.notify()       private static int[] buf_bytes;       private static int buf_index;       private static int alloc_pos;       private static URL url = null;       private static boolean isalive = true;       private static int writer_count;       private static int await_count;       private long file_length;       private long frame_bytes;              public BuffRandAcceURL(String sURL) throws Exception {           this(sURL,MAX_WRITER);       }              public BuffRandAcceURL(String sURL, int download_threads) throws Exception {           buf = new byte[BUF_LENGTH];           buf_bytes = new int[UNIT_COUNT];           url = new URL(sURL);                      //创建线程以异步方式解析ID3           new TagThread(url);                      //打印当前文件名           try {               String s = URLDecoder.decode(sURL, "GBK");               System.out.println("start>> " + s.substring(s.lastIndexOf("/") + 1));               s = null;           } catch (Exception e) {               System.out.println("start>> " + sURL);           }                      //创建"写"线程           for(int i = 0; i < download_threads; i++)               new Writer(this, url, buf, i+1);           frame_bytes = file_length = HttpReader.getContentLength();           if(file_length == 0) {               Header.strLastErr = "连接URL出错,重试 " + HttpReader.MAX_RETRY + " 次后放弃。";               throw new Exception("retry " + HttpReader.MAX_RETRY);           }           writer_count = download_threads;                      //缓冲           try_cache();                      //跳过ID3 v2           MP3Tag mP3Tag = new MP3Tag();           int v2_size = mP3Tag.checkID3V2(buf,0);           if (v2_size > 0) {               frame_bytes -= v2_size;               //seek(v2_size):               fill_bytes -= v2_size;               file_pointer = v2_size;               read_pos = v2_size;               read_pos &= BUF_LENGTH_MASK;               int units = v2_size >> UNIT_LENGTH_BITS;               for(int i = 0; i < units; i++) {                   buf_bytes[i] = 0;                   this.notifyWriter();               }               buf_bytes[units] -= v2_size;               this.notifyWriter();           }           mP3Tag = null;       }              private void try_cache() throws InterruptedException {           int cache_size = BUF_LENGTH;           if(cache_size > (int)file_length - alloc_pos)               cache_size = (int)file_length - alloc_pos;           cache_size -= UNIT_LENGTH;                      //等待填写当前正在读的那"一块"缓冲区           /**//*if(fill_bytes >= cache_size && writer_count > 0) {              synchronized (buf) {                  buf.wait();              }              return;          }*/                     //等待填满缓冲区           while (fill_bytes < cache_size) {               if (writer_count == 0 || isalive == false)                   return;               if(BUF_LENGTH > (int)file_length - alloc_pos)                   cache_size = (int)file_length - alloc_pos - UNIT_LENGTH;               System.out.printf("/r[缓冲%1$6.2f%%] ",(float)fill_bytes / cache_size * 100);               synchronized (buf) {                   buf.wait();               }           }           System.out.printf("/r");       }              private int try_reading(int i, int len) throws Exception {           int n = (i == UNIT_COUNT - 1) ? 0 : (i + 1);           int r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);           while (r < len) {               if (writer_count == 0 || isalive == false)                   return r;               try_cache();               r = (buf_bytes[i] == 0) ? 0 : (buf_bytes[i] + buf_bytes[n]);           }                      return len;       }              /**//*       * 各个"写"线程互斥等待空闲块       */      public synchronized boolean tryWriting(Writer w) throws InterruptedException {           await_count++;           while (buf_bytes[buf_index] != 0 && isalive) {               this.wait();           }                      //下载速度足够就结束一个"写"线程           if(writer_count > 1 && w.await_count >= await_count &&                   w.await_count >= writer_count)               return false;                      if(alloc_pos >= file_length)               return false;           w.await_count = await_count;           await_count--;           w.start_pos = alloc_pos;           w.index = buf_index;           alloc_pos += UNIT_LENGTH;           buf_index = (buf_index == UNIT_COUNT - 1) ? 0 : buf_index + 1;           return isalive;       }              public void updateBuffer(int i, int len) {           synchronized (buf) {               buf_bytes[i] = len;               fill_bytes += len;               buf.notify();           }       }              public void updateWriterCount() {           synchronized (buf) {               writer_count--;               buf.notify();           }       }              public synchronized void notifyWriter() {           this.notifyAll();       }              public void terminateWriters() {           synchronized (buf) {               if (isalive) {                   isalive = false;                   Header.strLastErr = "读取文件超时。重试 " + HttpReader.MAX_RETRY                           + " 次后放弃,请您稍后再试。";               }               buf.notify();           }                      notifyWriter();            }              public int read() throws Exception {           int iret = -1;           int i = read_pos >> UNIT_LENGTH_BITS;           // 1."等待"有1字节可读           while (buf_bytes[i] < 1) {               try_cache();               if (writer_count == 0)                   return -1;           }           if(isalive == false)               return -1;             // 2.读取           iret = buf[read_pos] & 0xff;           fill_bytes--;           file_pointer++;           read_pos++;           read_pos &= BUF_LENGTH_MASK;           if (--buf_bytes[i] == 0)               notifyWriter();     // 3.通知             return iret;       }           public int read(byte b[]) throws Exception {        return read(b, 0, b.length);    }    public int read(byte[] b, int off, int len) throws Exception {        if(len > UNIT_LENGTH)            len = UNIT_LENGTH;        int i = read_pos >> UNIT_LENGTH_BITS;                      // 1."等待"有足够内容可读           if(try_reading(i, len) < len || isalive == false)             return -1;          // 2.读取        int tail_len = BUF_LENGTH - read_pos; // write_pos != BUF_LENGTH        if (tail_len < len) {            System.arraycopy(buf, read_pos, b, off, tail_len);            System.arraycopy(buf, 0, b, off + tail_len, len - tail_len);        } else            System.arraycopy(buf, read_pos, b, off, len);         fill_bytes -= len;        file_pointer += len;        read_pos += len;        read_pos &= BUF_LENGTH_MASK;        buf_bytes[i] -= len;        if (buf_bytes[i] < 0) {            int ni = read_pos >> UNIT_LENGTH_BITS;            buf_bytes[ni] += buf_bytes[i];            buf_bytes[i] = 0;            notifyWriter();        } else if (buf_bytes[i] == 0)            notifyWriter();        return len;    }           /**//*     * 从src_off位置复制,不移动文件"指针"     */    public int dump(int src_off, byte b[], int dst_off, int len) throws Exception {        int rpos = read_pos + src_off;           if(try_reading(rpos >> UNIT_LENGTH_BITS, len) < len || isalive == false)            return -1;           int tail_len = BUF_LENGTH - rpos;        if (tail_len < len) {            System.arraycopy(buf, rpos, b, dst_off, tail_len);            System.arraycopy(buf, 0, b, dst_off + tail_len, len - tail_len);        } else            System.arraycopy(buf, rpos, b, dst_off, len);           // 不发信号          return len;       }           public long length() {        return file_length;       }              public long getFilePointer() {        return file_pointer;       }         public void close() {        //...    }              //       public void seek(long pos) throws Exception {        //...    }          }

怎么能研究出炸药呢?爱迪生不经历上千次的来自失败,怎么能发明电灯呢?

用JAVA实现缓冲多线程无阻塞读取远程文件

相关文章:

你感兴趣的文章:

标签云: