【ODPS】TableTunnel多线程下载事例

上篇写了ODPS单线程简单下载

本篇介绍多线程下载

1.多线程下载类:

package bysql;import java.io.BufferedWriter;import java.io.IOException;import java.util.Date;import java.util.concurrent.Callable;import com.aliyun.odps.Column;import com.aliyun.odps.TableSchema;import com.aliyun.odps.data.Record;import com.aliyun.odps.data.RecordReader;public class DownloadThread implements Callable<Long> {private long id;private RecordReader recordReader;private TableSchema tableSchema;private BufferedWriter out;public DownloadThread(int id, RecordReader recordReader,TableSchema tableSchema, BufferedWriter out) {this.id = id;this.recordReader = recordReader;this.tableSchema = tableSchema;this.out = out;}@Overridepublic Long call() throws Exception {Long recordNum = 0L;try {Record record;while ((record = recordReader.read()) != null) {recordNum++;consumeRecord(record, tableSchema, out, id);}recordReader.close();} catch (IOException e) {e.printStackTrace();}return recordNum;}private static void consumeRecord(Record record, TableSchema schema,BufferedWriter out, long id) throws IOException {String writeStr = "";String str;for (int i = 0; i < schema.getColumns().size(); i++) {Column column = schema.getColumn(i);String colValue = null;switch (column.getType()) {case BIGINT: {Long v = record.getBigint(i);colValue = v == null ? null : v.toString();break;}case BOOLEAN: {Boolean v = record.getBoolean(i);colValue = v == null ? null : v.toString();break;}case DATETIME: {Date v = record.getDatetime(i);colValue = v == null ? null : v.toString();break;}case DOUBLE: {Double v = record.getDouble(i);colValue = v == null ? null : v.toString();break;}case STRING: {String v = record.getString(i);colValue = v == null ? null : v.toString();break;}default:throw new RuntimeException("Unknown column type: "+ column.getType());}str = colValue == null ? "null" : colValue;if (i != schema.getColumns().size() – 1) {str = schema.getColumn(i).getName() + ":" + str + ",";} else {str = schema.getColumn(i).getName() + ":" + str;}writeStr = writeStr + str;}writeStr = "【Thread " + id + "】" + writeStr+ System.getProperty("line.separator");out.write(writeStr);}}

2.多线程下载事例:

package bysql;import java.io.BufferedWriter;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.io.OutputStreamWriter;import java.util.ArrayList;import java.util.List;import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;import com.aliyun.odps.Odps;import com.aliyun.odps.PartitionSpec;import com.aliyun.odps.account.Account;import com.aliyun.odps.account.AliyunAccount;import com.aliyun.odps.data.RecordReader;import com.aliyun.odps.tunnel.TableTunnel;import com.aliyun.odps.tunnel.TunnelException;public class DownloadThreadSample {private static final String ACCESS_ID = "<your access id>";private static final String ACCESS_KEY = "<your access Key>";private static final String PROJECT_NAME = "<your project>";private static final String TUNNEL_URL = "<your tunnel endpoint>";private static final String ODPS_URL = "<your odps endpoint>";public static void main(String[] args) {String tableName = "point_z";//表名/* 先构建阿里云帐号 */Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);/* Odps类是ODPS SDK的入口 */Odps odps = new Odps(account);odps.setDefaultProject(PROJECT_NAME);// 指定默认使用的Project名称odps.setEndpoint(ODPS_URL);// 设置ODPS服务的地址/*访问ODPS Tunnel服务的入口类*/TableTunnel tunnel = new TableTunnel(odps);tunnel.setEndpoint(TUNNEL_URL);//设置TunnelServer地址try {/*此处表point_z为分区表,下载时必须指定分区* 指定下载分区* */PartitionSpec partitionSpec = new PartitionSpec();partitionSpec.set("z", "1");System.out.println("开始下载数据………");File file = new File("G:\\"+tableName+"(多线程).txt");if (file.exists()){file.delete();}file.createNewFile();BufferedWriter out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true), "utf-8"));long startTime = System.currentTimeMillis();TableTunnel.DownloadSession downloadSession = tunnel.createDownloadSession(PROJECT_NAME, tableName,partitionSpec);long count = downloadSession.getRecordCount();System.out.println("RecordCount is: " + count);int threadNum=6;ExecutorService pool = Executors.newFixedThreadPool(threadNum);ArrayList<Callable<Long>> callers = new ArrayList<Callable<Long>>();long start = 0;long step = count / threadNum;for (int i = 0; i < threadNum – 1; i++) {RecordReader recordReader = downloadSession.openRecordReader(step * i, step);callers.add(new DownloadThread(i, recordReader, downloadSession.getSchema(),out));}RecordReader recordReader = downloadSession.openRecordReader(step* (threadNum – 1), count – ((threadNum – 1) * step));callers.add(new DownloadThread(threadNum – 1, recordReader,downloadSession.getSchema(),out));Long downloadNum = 0L;List<Future<Long>> recordNum = pool.invokeAll(callers);for (Future<Long> num : recordNum)downloadNum += num.get();System.out.println("DownLoad Count is: " + downloadNum);pool.shutdown();out.close();long endTime = System.currentTimeMillis();System.out.println("总共耗时:" + (endTime – startTime) + " ms");System.out.println("————————————————-");} catch (TunnelException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}}}

版权声明:本文为博主原创文章,,未经博主允许不得转载。

不会因为别人显赫的成功而促使自己有卓越的进步。

【ODPS】TableTunnel多线程下载事例

相关文章:

你感兴趣的文章:

标签云: