【ODPS】本地数据库数据批量上传到ODPS中

利用阿里云ODPS可以批量将数据库中数据上传到ODPS的数据表中,然后进行大数据处理。

本次实例使用的是mysql数据库。

1、表说明

mysql表结构:

odps中数据表结构:

2、MySql数据库连接

package datatrans;import java.sql.Connection;import java.sql.DriverManager;import java.sql.SQLException;import java.sql.Statement;public class CrateStatement {private static String driver = "com.mysql.jdbc.Driver"; // 驱动private static String url = "jdbc:mysql://ip/aliyun_odps_sql"; // 数据库连接地址private static String user = "***"; // 数据库账户private static String password = "******"; // 数据库密码public static Statement createStatement() throws SQLException,ClassNotFoundException {Class.forName(driver);Connection conn = null;try {conn = (Connection) DriverManager.getConnection(url, user, password);if (conn != null) {System.out.println("数据库连接成功!");} else {System.out.println("数据库连接失败!");}} catch (SQLException e) {e.printStackTrace();}Statement st = (Statement) conn.createStatement();return st;}}3、多线程上传到ODPS数据表

package datatrans;import com.aliyun.odps.Column;import com.aliyun.odps.TableSchema;import com.aliyun.odps.data.Record;import com.aliyun.odps.data.RecordWriter;import java.io.IOException;import java.sql.ResultSet;import java.sql.Statement;import java.util.concurrent.Callable;/** * Created by Administrator on 2015/8/13. */public class UploadDataThread implements Callable<Boolean> {private long id;private RecordWriter recordWriter;private Record record;private TableSchema tableSchema;private String sql = "";public UploadDataThread(long id, int threadCount,RecordWriter recordWriter, Record record, TableSchema tableSchema,String sql) {this.id = id;this.recordWriter = recordWriter;this.record = record;this.tableSchema = tableSchema;this.sql = sql;}@Overridepublic Boolean call() throws Exception {System.out.println("Thread " +id+ " SQL:"+sql);Statement stmt = CrateStatement.createStatement();ResultSet rset = null;rset = stmt.executeQuery(sql);//查询数据while (rset.next()) {for (int i = 0; i < tableSchema.getColumns().size(); i++) {Column column = tableSchema.getColumn(i);switch (column.getType()) {case BIGINT:record.setBigint(i, rset.getLong(column.getName()));break;case BOOLEAN:record.setBoolean(i, rset.getBoolean(column.getName()));break;case DATETIME:record.setDatetime(i, rset.getDate(column.getName()));break;case DOUBLE:record.setDouble(i, rset.getDouble(column.getName()));break;case STRING:record.setString(i, rset.getString(column.getName()));break;default:throw new RuntimeException("Unknown column type: "+ column.getType());}}try {recordWriter.write(record);} catch (IOException e) {recordWriter.close();e.printStackTrace();return false;}}recordWriter.close();return true;}}4、测试类

package datatrans;import java.io.IOException;import java.sql.ResultSet;import java.sql.SQLException;import java.sql.Statement;import java.util.ArrayList;import java.util.concurrent.Callable;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import com.aliyun.odps.Odps;import com.aliyun.odps.account.Account;import com.aliyun.odps.account.AliyunAccount;import com.aliyun.odps.data.Record;import com.aliyun.odps.data.RecordWriter;import com.aliyun.odps.tunnel.TableTunnel;import com.aliyun.odps.tunnel.TunnelException;public class DataTranslationDemo {private static final String ACCESS_ID = "<your access id>";private static final String ACCESS_KEY = "<your access Key>";private static final String PROJECT_NAME = "<your project>";private static final String ODPS_URL = "<your tunnel endpoint>";private static final String TUNNEL_URL = "<your odps endpoint>";public static void main(String[] args) throws TunnelException, IOException, InterruptedException, SQLException, ClassNotFoundException {String tableName = "users";String selectSql="select * from users";Account account = new AliyunAccount(ACCESS_ID, ACCESS_KEY);Odps odps = new Odps(account);odps.setDefaultProject(PROJECT_NAME);// 指定默认使用的Project名称odps.setEndpoint(ODPS_URL);// 设置ODPS服务的地址TableTunnel tunnel = new TableTunnel(odps);tunnel.setEndpoint(TUNNEL_URL);// 设置TunnelServer地址,没有设置TunnelServer地址的情况下自动选择TableTunnel.UploadSession uploadSession = tunnel.createUploadSession(PROJECT_NAME, tableName);Statement stmt = CrateStatement.createStatement();ResultSet rset = stmt.executeQuery("SELECT count(1) datanum from users");int totalNum = 0;while (rset.next()) {/*先计算出表中数据总数*/totalNum=(int) rset.getLong(1);}System.out.println("表中数据总数:"+totalNum);/*定义线程数*/int threadNum = 10;/*计算每个线程需要处理的数据量*/int n = (int) Math.ceil(totalNum/threadNum);long startTime = System.currentTimeMillis();System.out.println("正在上传数据………….");ExecutorService pool = Executors.newFixedThreadPool(threadNum);ArrayList<Callable<Boolean>> callers = new ArrayList<Callable<Boolean>>();for (int i = 0; i < threadNum; i++) {/*根据单线程处理数量精确查询SQL*/String sql = selectSql + " limit " +(i*n)+" , "+((i+1)*n-1);RecordWriter recordWriter = uploadSession.openRecordWriter(i);Record record = uploadSession.newRecord();/*上传数据*/callers.add(new UploadDataThread(i,threadNum, recordWriter, record,uploadSession.getSchema(),sql));}pool.invokeAll(callers);pool.shutdown();Long[] blockList = new Long[threadNum];for (int i = 0; i < threadNum; i++)blockList[i] = Long.valueOf(i);uploadSession.commit(blockList);//提交long endTime = System.currentTimeMillis();System.out.println("总共耗时:" + (endTime – startTime) + " ms");System.out.println("————————————————-");System.out.println("upload success!");}}



版权声明:本文为博主原创文章,未经博主允许不得转载。

,总结成功的经验能够让人越来越聪明,

【ODPS】本地数据库数据批量上传到ODPS中

相关文章:

你感兴趣的文章:

标签云: