【ODPS】TableTunnel单线程简单下载事例

ODPS Tunnel是ODPS的数据通道,用户可以通过Tunnel向ODPS中上传或者下载数据。目前Tunnel仅支持表(不包括视图View)数据的上传下载。

主要接口描述

TableTunnel访问ODPS Tunnel服务的入口类。用户可以通过公网或者阿里云内网环境对ODPS 及其Tunnel进行访问。当用户在阿里云内网环境中,使用Tunnel内网连接下载数据时,ODPS不会将该操作产生的流量计入计费。此外内网地址仅对杭州域的云产品有效。

TableTunnel.UploadSession表示一个向ODPS表中上传数据的会话。

TableTunnel.DownloadSession表示一个向ODPS表中下载数据的会话。

接口定义:

{DownloadSession(Configuration conf, String projectName, String tableName,String partitionSpec) TunnelExceptionstart, long count, boolean compress) }

Download对象:

生命周期:从创建Download实例到下载结束创建Download实例,可以通过调用构造方法创建,也可以通过TableTunnel创建;请求方式:同步Server端会为该Download创建一个session,生成唯一downloadId标识该Download,客户端可以通过getId获取该操作开销较大,server端会对数据文件创建索引,当文件数很多时,该时间会比较长;同时server端会返回总Record数,,可以根据总Record数启动多个并发同时下载下载数据:请求方式:异步调用openRecordReader方法,生成RecordReader实例,其中参数start标识本次下载的record的起始位置,从0开始,取值范围是 >= 0, count标识本次下载的记录数,取值范围是>0。查看下载:请求方式:同步调用getStatus可以获取当前Download状态4种状态说明:UNKNOWN, server端刚创建一个session时设置的初始值NORMAL, 创建Download对象成功CLOSED, 下载结束后EXPIRED, 下载超时

说明:分区表下载必须指定分区

事例源码:

package bysql;import java.io.BufferedWriter;import java.io.File;import java.io.FileOutputStream;import java.io.IOException;import java.io.OutputStreamWriter;import java.util.Date;import com.aliyun.odps.Column;import com.aliyun.odps.Odps;import com.aliyun.odps.PartitionSpec;import com.aliyun.odps.TableSchema;import com.aliyun.odps.account.Account;import com.aliyun.odps.account.AliyunAccount;import com.aliyun.odps.data.Record;import com.aliyun.odps.data.RecordReader;import com.aliyun.odps.tunnel.TableTunnel;import com.aliyun.odps.tunnel.TunnelException;public class DownloadSample {private static final String ACCESS_ID = "<your access id>";private static final String ACCESS_KEY = "<your access Key>";private static final String PROJECT_NAME = "<your project>";private static final String TUNNEL_URL = "<your tunnel endpoint>";private static final String ODPS_URL = "<your odps endpoint>";public static void main(String args[]) throws Exception {String tableName = "point_z";//表名/* 先构建阿里云帐号 */Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);/* Odps类是ODPS SDK的入口 */Odps odps = new Odps(account);odps.setDefaultProject(PROJECT_NAME);// 指定默认使用的Project名称odps.setEndpoint(ODPS_URL);// 设置ODPS服务的地址/*访问ODPS Tunnel服务的入口类*/TableTunnel tunnel = new TableTunnel(odps);tunnel.setEndpoint(TUNNEL_URL);//设置TunnelServer地址try {/*此处表point_z为分区表,下载时必须指定分区* 指定下载分区* */PartitionSpec partitionSpec = new PartitionSpec();partitionSpec.set("z", "1");System.out.println("开始下载数据………");File file = new File("G:\\"+tableName+"(单线程).txt");//下载文件if (file.exists()){file.delete();}file.createNewFile();long startTime = System.currentTimeMillis();/*在分区表上创建下载会话(分区表必须指定分区,非分区表不用)*/TableTunnel.DownloadSession downloadSession = tunnel.createDownloadSession(PROJECT_NAME, tableName,partitionSpec);long count = downloadSession.getRecordCount();System.out.println("RecordCount is: " + count);RecordReader recordReader = downloadSession.openRecordReader(0,count);Record record;while ((record = recordReader.read()) != null) {consumeRecord(record, downloadSession.getSchema(),file);}recordReader.close();long endTime = System.currentTimeMillis();System.out.println("总共耗时:" + (endTime – startTime) + " ms");System.out.println("————————————————-");} catch (TunnelException e) {e.printStackTrace();} catch (IOException e1) {e1.printStackTrace();}}private static void consumeRecord(Record record, TableSchema schema,File file)throws IOException {BufferedWriter out = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file, true), "utf-8"));String writeStr="";String str = null;for (int i = 0; i < schema.getColumns().size(); i++) {Column column = schema.getColumn(i);String colValue = null;switch (column.getType()) {case BIGINT: {Long v = record.getBigint(i);colValue = v == null ? null : v.toString();break;}case BOOLEAN: {Boolean v = record.getBoolean(i);colValue = v == null ? null : v.toString();break;}case DATETIME: {Date v = record.getDatetime(i);colValue = v == null ? null : v.toString();break;}case DOUBLE: {Double v = record.getDouble(i);colValue = v == null ? null : v.toString();break;}case STRING: {String v = record.getString(i);colValue = v == null ? null : v.toString();break;}default:throw new RuntimeException("Unknown column type: "+ column.getType());}str = colValue == null ? "null" : colValue;if (i != schema.getColumns().size()-1){str = schema.getColumn(i).getName()+":"+str+",";}else{str = schema.getColumn(i).getName()+":"+str;}writeStr =writeStr+str; }str =str +System.getProperty("line.separator");writeStr =writeStr +System.getProperty("line.separator");out.write(writeStr);out.close();}}

版权声明:本文为博主原创文章,未经博主允许不得转载。

就得加倍付出汗水,赢得场场精彩

【ODPS】TableTunnel单线程简单下载事例

相关文章:

  • 【算法】直接插入排序C语言实现
  • 你感兴趣的文章:

    标签云:

    亚洲高清电影在线, 免费高清电影, 八戒影院夜间, 八戒电影最新大片, 出轨在线电影, 午夜电影院, 在线影院a1166, 在线电影院, 在线观看美剧下载, 日本爱情电影, 日韩高清电影在线, 电影天堂网, 直播盒子app, 聚合直播, 高清美剧, 高清美剧在线观看 EhViewer-E站, E站, E站绿色版, qqmulu.com, qq目录网, qq网站目录,