玩转大数据系列之Apache Pig如何与MySQL集成(三)

上篇介绍了如何把Pig的结果存储到Solr中,那么可能就会有朋友问了,为什么不存到数据库呢? 不支持还是? 其实只要我们愿意,我们可以存储它的结果集到任何地方,只需要重写我们自己的StoreFunc类即可。关于如何将Pig分析完的结果存储到数据库,在pig的piggy贡献组织里,已经有了对应的UDF了,piggybank是非apache官方提供的工具函数,里面的大部分的UDF都是,其他公司或着个人在后来使用时贡献的,这些工具类,虽然没有正式划入pig的源码包里,但是pig每次发行的时候,都会以扩展库的形式附带,编译后会放在pig根目录下一个叫contrib的目录下,piggybank的地址是 https://cwiki.apache.org/confluence/display/PIG/PiggyBank,感兴趣的朋友们,可以看一看。 将pig分析完的结果存入到数据库,也是非常简单的,需要的条件有: (1)piggybank.jar的jar包 (2)依赖数据库的对应的驱动jar 有一点需要注意下,在将结果存储到数据库之前,一定要确保有访问和写入数据库的权限,否则任务就会失败! 散仙在存储到远程的MySQL上,就是由于权限的问题,而写入失败了,具体的异常是这样描述的:

Java代码

Accessdeniedforuser’root’@’localhost’

Access denied for user 'root'@'localhost' 当出现上面异常的时候,就意味着权限写入有问题,我们使用以下的授权方法,来给目标机赋予权限:(1)允许所有的机器ip访问 GRANT ALL PRIVILEGES ON *.* TO ‘myuser’@’%’ IDENTIFIED BY ‘mypassword’ WITH GRANT OPTION;(2)允许指定的机器ip访问: 1. GRANT ALL PRIVILEGES ON *.* TO ‘myuser’@’192.168.1.3’ IDENTIFIED BY ‘mypassword’ WITH GRANT OPTION;确定有权限之后,我们就可以造一份数据,测试是否可以将HDFS上的数据存储到数据库中,,测试数据如下:

Java代码

1,2,31,2,42,2,43,4,28,2,4提前在对应的MySQL上,建库建表建字段,看下散仙测试表的结构:

最后,在来看下我们的pig脚本是如何定义和使用的:

Java代码

STOREaINTO’/tmp/dbtest’usingorg.apache.pig.piggybank.storage.DBStorage(‘com.mysql.jdbc.Driver’,’jdbc:mysql://192.168.146.63/user’,’root’,’pwd’, ‘INSERTintopig(id,name,count)values(?,?,?)’); ~

–注册数据库驱动包和piggybank的jarregister ./dependfiles/mysql-connector-java-5.1.23-bin.jar;register ./dependfiles/piggybank.jar–为了能使schemal和数据库对应起来,建议在这个地方给数据加上列名a = load '/tmp/dongliang/g.txt' using PigStorage(',') as (id:int,name:chararray,count:int) ;–过滤出id大于2的数据a = filter a by id > 2;–存储结果到数据库里STORE a INTO '/tmp/dbtest' using org.apache.pig.piggybank.storage.DBStorage('com.mysql.jdbc.Driver', 'jdbc:mysql://192.168.146.63/user', 'root', 'pwd','INSERT into pig(id,name,count) values (?,?,?)');~执行成功后,我们再去查看数据库发现已经将pig处理后的数据正确的写入到了数据库中:

最后,附上DBStore类的源码:

Java代码

/* *LicensedtotheApacheSoftwareFoundation(ASF)underone*ormorecontributorlicenseagreements.SeetheNOTICEfile*distributedwiththisworkforadditionalinformation*regardingcopyrightownership.TheASFlicensesthisfile*toyouundertheApacheLicense,Version2.0(the *"License");youmaynotusethisfileexceptincompliance*withtheLicense.YoumayobtainacopyoftheLicenseat* * * *Unlessrequiredbyapplicablelaworagreedtoinwriting,software*distributedundertheLicenseisdistributedonan"ASIS"BASIS,*WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied.*SeetheLicenseforthespecificlanguagegoverningpermissionsand*WritethetupletoDatabasedirectlyhere. //errorsthatcomeduetoutf-8characterencoding//ignorethesekindoferrorsTODO:Temporaryfix-needtofinda//betterwayofhandlingthemintheargumentstatementitself//Wedon’tusearecordwritertowritetodatabase*Initialisethedatabaseconnectionandpreparedstatementhere.//IGNOREsincewearewritingrecordstoDB. } }

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.pig.piggybank.storage;import org.joda.time.DateTime;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.OutputFormat;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.pig.StoreFunc;import org.apache.pig.backend.executionengine.ExecException;import org.apache.pig.data.DataByteArray;import org.apache.pig.data.DataType;import org.apache.pig.data.Tuple;import java.io.IOException;import java.sql.*;public class DBStorage extends StoreFunc { private final Log log = LogFactory.getLog(getClass()); private PreparedStatement ps; private Connection con; private String jdbcURL; private String user; private String pass; private int batchSize; private int count = 0; private String insertQuery; public DBStorage(String driver, String jdbcURL, String insertQuery) {this(driver, jdbcURL, null, null, insertQuery, "100"); } public DBStorage(String driver, String jdbcURL, String user, String pass,String insertQuery) throws SQLException {this(driver, jdbcURL, user, pass, insertQuery, "100"); } public DBStorage(String driver, String jdbcURL, String user, String pass,String insertQuery, String batchSize) throws RuntimeException {log.debug("DBStorage(" + driver + "," + jdbcURL + "," + user + ",XXXX,"+ insertQuery + ")");try {Class.forName(driver);} catch (ClassNotFoundException e) {log.error("can't load DB driver:" + driver, e);throw new RuntimeException("Can't load DB Driver", e);}this.jdbcURL = jdbcURL;this.user = user;this.pass = pass;this.insertQuery = insertQuery;this.batchSize = Integer.parseInt(batchSize); } /** * Write the tuple to Database directly here. */ public void putNext(Tuple tuple) throws IOException {int sqlPos = 1;try {int size = tuple.size();for (int i = 0; i < size; i++) {try {Object field = tuple.get(i);switch (DataType.findType(field)) {case DataType.NULL:ps.setNull(sqlPos, java.sql.Types.VARCHAR);sqlPos++;break;case DataType.BOOLEAN:ps.setBoolean(sqlPos, (Boolean) field);sqlPos++;break;case DataType.INTEGER:ps.setInt(sqlPos, (Integer) field);sqlPos++;break;case DataType.LONG:ps.setLong(sqlPos, (Long) field);sqlPos++;break;case DataType.FLOAT:ps.setFloat(sqlPos, (Float) field);sqlPos++;break;case DataType.DOUBLE:ps.setDouble(sqlPos, (Double) field);sqlPos++;break;case DataType.DATETIME:ps.setDate(sqlPos, new Date(((DateTime) field).getMillis()));sqlPos++;break;case DataType.BYTEARRAY:byte[] b = ((DataByteArray) field).get();ps.setBytes(sqlPos, b);sqlPos++;break;case DataType.CHARARRAY:ps.setString(sqlPos, (String) field);sqlPos++;break;case DataType.BYTE:ps.setByte(sqlPos, (Byte) field);sqlPos++;break;case DataType.MAP:case DataType.TUPLE:case DataType.BAG:throw new RuntimeException("Cannot store a non-flat tuple "+ "using DbStorage");default:throw new RuntimeException("Unknown datatype "+ DataType.findType(field));}} catch (ExecException ee) {throw new RuntimeException(ee);}}ps.addBatch();count++;if (count > batchSize) {count = 0;ps.executeBatch();ps.clearBatch();ps.clearParameters();}} catch (SQLException e) {try {log.error("Unable to insert record:" + tuple.toDelimitedString("\t"),e);} catch (ExecException ee) {// do nothing}if (e.getErrorCode() == 1366) {// errors that come due to utf-8 character encoding// ignore these kind of errors TODO: Temporary fix – need to find a// better way of handling them in the argument statement itself} else {throw new RuntimeException("JDBC error", e);}} } class MyDBOutputFormat extends OutputFormat<NullWritable, NullWritable> {@Overridepublic void checkOutputSpecs(JobContext context) throws IOException,InterruptedException {// IGNORE}@Overridepublic OutputCommitter getOutputCommitter(TaskAttemptContext context)throws IOException, InterruptedException {return new OutputCommitter() {@Overridepublic void abortTask(TaskAttemptContext context) throws IOException {try {if (ps != null) {ps.close();}if (con != null) {con.rollback();con.close();}} catch (SQLException sqe) {throw new IOException(sqe);}}@Overridepublic void commitTask(TaskAttemptContext context) throws IOException {if (ps != null) {try {ps.executeBatch();con.commit();ps.close();con.close();ps = null;con = null;} catch (SQLException e) {log.error("ps.close", e);throw new IOException("JDBC Error", e);}}}@Overridepublic boolean needsTaskCommit(TaskAttemptContext context)throws IOException {return true;}@Overridepublic void cleanupJob(JobContext context) throws IOException {// IGNORE}@Overridepublic void setupJob(JobContext context) throws IOException {// IGNORE}@Overridepublic void setupTask(TaskAttemptContext context) throws IOException {// IGNORE}};}@Overridepublic RecordWriter<NullWritable, NullWritable> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException {// We don't use a record writer to write to databasereturn new RecordWriter<NullWritable, NullWritable>() {@Overridepublic void close(TaskAttemptContext context) {// Noop}@Overridepublic void write(NullWritable k, NullWritable v) {// Noop}};} } @SuppressWarnings("unchecked") @Override public OutputFormat getOutputFormat()throws IOException {return new MyDBOutputFormat(); } /** * Initialise the database connection and prepared statement here. */ @SuppressWarnings("unchecked") @Override public void prepareToWrite(RecordWriter writer)throws IOException {ps = null;con = null;if (insertQuery == null) {throw new IOException("SQL Insert command not specified");}try {if (user == null || pass == null) {con = DriverManager.getConnection(jdbcURL);} else {con = DriverManager.getConnection(jdbcURL, user, pass);}con.setAutoCommit(false);ps = con.prepareStatement(insertQuery);} catch (SQLException e) {log.error("Unable to connect to JDBC @" + jdbcURL);throw new IOException("JDBC Error", e);}count = 0; } @Override public void setStoreLocation(String location, Job job) throws IOException {// IGNORE since we are writing records to DB. }}

离开你的那一天开始,左心房渐渐停止跳动…

玩转大数据系列之Apache Pig如何与MySQL集成(三)

相关文章:

你感兴趣的文章:

标签云: