试试使用Apache Sqoop
试试使用Sqoop
Sqoop是一个从关系型数据库等结构化数据存储中提取数据并与Hadoop进行协作的工具。它是Apache的顶级项目。我已经亲自使用过,所以我会把我的使用体验写在博客上作为备忘录。
- http://sqoop.apache.org/
不需要构建环境,我们在这里使用了Hotronworks的Sandbox。版本如下所示。虽然出现了某个环境变量未设置的警告,但我们将忽略它。
[root@sandbox ~]# sqoop version
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
14/08/14 22:46:05 INFO sqoop.Sqoop: Running Sqoop version: 1.4.4.2.1.1.0-385
Sqoop 1.4.4.2.1.1.0-385
git commit id d3c37763356e55bbf152053f6db24b1bfe582972
Compiled by jenkins on Wed Apr 16 16:12:40 PDT 2014
- http://hortonworks.com/products/hortonworks-sandbox/
让我试试看
MySQL转换为HDFS。
创建数据库
暂时先创建一个sqoopsample数据库,并且赋予强大的权限。
> mysql -u root -p
> CREATE DATABASE sqoopsample;
> GRANT ALL PRIVILEGES ON sqoopsample.* TO '%'@'localhost';
> GRANT ALL PRIVILEGES ON sqoopsample.* TO ''@'localhost';
> quit;
运行以下脚本以输入数据。
使用mysql sqoopsample命令将sql.cmd文件导入
CREATE TABLE employees(id INT NOT NULL PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(64) NOT NULL,
age INT,
birtday DATE,
description VARCHAR(100));
INSERT INTO employees VALUES(NULL, 'TANAKA Ichiro', 30, '1984-10-10', 'hoge');
INSERT INTO employees VALUES(NULL, 'TANAKA Jiro' , 29, '1985-12-22', 'fuge');
INSERT INTO employees VALUES(NULL, 'TANAKA Saburo', 28, '1986-08-01', NULL);
只要数据输入完整,就没有问题。
mysql> select * from employees;
+----+---------------+------+------------+-------------+
| id | name | age | birtday | description |
+----+---------------+------+------------+-------------+
| 1 | TANAKA Ichiro | 30 | 1984-10-10 | hoge |
| 2 | TANAKA Jiro | 29 | 1985-12-22 | fuge |
| 3 | TANAKA Saburo | 28 | 1986-08-01 | NULL |
+----+---------------+------+------------+-------------+
3 rows in set (0.00 sec)
执行Sqoop
使用Sqoop进行导入并将数据传输到HDFS。由于传输本身就像是一个MR程序,因此可以使用”-m”参数来指定Mapper的数量。以下是执行结果。
[root@sandbox ~]# sqoop import --connect jdbc:mysql://localhost/sqoopsample --table employees -m 1
...
...
[root@sandbox ~]# hadoop fs -ls employees
Found 2 items
-rw-r--r-- 1 root root 0 2014-08-14 22:43 employees/_SUCCESS
-rw-r--r-- 1 root root 103 2014-08-14 22:43 employees/part-m-00000
[root@sandbox ~]# hadoop fs -cat employees/part-m-00000
1,TANAKA Ichiro,30,1984-10-10,hoge
2,TANAKA Jiro,29,1985-12-22,fuge
3,TANAKA Saburo,28,1986-08-01,null
因此,传输已经完成。
追加说明
执行命令后,将在当前目录下生成自动生成的MR的Java程序(文件)。虽然有点长,但我将其贴在下面。
// ORM class for table 'employees'
// WARNING: This class is AUTO-GENERATED. Modify at your own risk.
//
// Debug information:
// Generated date: Thu Aug 14 22:43:02 PDT 2014
// For connector: org.apache.sqoop.manager.MySQLManager
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.lib.db.DBWritable;
import com.cloudera.sqoop.lib.JdbcWritableBridge;
import com.cloudera.sqoop.lib.DelimiterSet;
import com.cloudera.sqoop.lib.FieldFormatter;
import com.cloudera.sqoop.lib.RecordParser;
import com.cloudera.sqoop.lib.BooleanParser;
import com.cloudera.sqoop.lib.BlobRef;
import com.cloudera.sqoop.lib.ClobRef;
import com.cloudera.sqoop.lib.LargeObjectLoader;
import com.cloudera.sqoop.lib.SqoopRecord;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
public class employees extends SqoopRecord implements DBWritable, Writable {
private final int PROTOCOL_VERSION = 3;
public int getClassFormatVersion() { return PROTOCOL_VERSION; }
protected ResultSet __cur_result_set;
private Integer id;
public Integer get_id() {
return id;
}
public void set_id(Integer id) {
this.id = id;
}
public employees with_id(Integer id) {
this.id = id;
return this;
}
private String name;
public String get_name() {
return name;
}
public void set_name(String name) {
this.name = name;
}
public employees with_name(String name) {
this.name = name;
return this;
}
private Integer age;
public Integer get_age() {
return age;
}
public void set_age(Integer age) {
this.age = age;
}
public employees with_age(Integer age) {
this.age = age;
return this;
}
private java.sql.Date birtday;
public java.sql.Date get_birtday() {
return birtday;
}
public void set_birtday(java.sql.Date birtday) {
this.birtday = birtday;
}
public employees with_birtday(java.sql.Date birtday) {
this.birtday = birtday;
return this;
}
private String description;
public String get_description() {
return description;
}
public void set_description(String description) {
this.description = description;
}
public employees with_description(String description) {
this.description = description;
return this;
}
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof employees)) {
return false;
}
employees that = (employees) o;
boolean equal = true;
equal = equal && (this.id == null ? that.id == null : this.id.equals(that.id));
equal = equal && (this.name == null ? that.name == null : this.name.equals(that.name));
equal = equal && (this.age == null ? that.age == null : this.age.equals(that.age));
equal = equal && (this.birtday == null ? that.birtday == null : this.birtday.equals(that.birtday));
equal = equal && (this.description == null ? that.description == null : this.description.equals(that.description));
return equal;
}
public void readFields(ResultSet __dbResults) throws SQLException {
this.__cur_result_set = __dbResults;
this.id = JdbcWritableBridge.readInteger(1, __dbResults);
this.name = JdbcWritableBridge.readString(2, __dbResults);
this.age = JdbcWritableBridge.readInteger(3, __dbResults);
this.birtday = JdbcWritableBridge.readDate(4, __dbResults);
this.description = JdbcWritableBridge.readString(5, __dbResults);
}
public void loadLargeObjects(LargeObjectLoader __loader)
throws SQLException, IOException, InterruptedException {
}
public void write(PreparedStatement __dbStmt) throws SQLException {
write(__dbStmt, 0);
}
public int write(PreparedStatement __dbStmt, int __off) throws SQLException {
JdbcWritableBridge.writeInteger(id, 1 + __off, 4, __dbStmt);
JdbcWritableBridge.writeString(name, 2 + __off, 12, __dbStmt);
JdbcWritableBridge.writeInteger(age, 3 + __off, 4, __dbStmt);
JdbcWritableBridge.writeDate(birtday, 4 + __off, 91, __dbStmt);
JdbcWritableBridge.writeString(description, 5 + __off, 12, __dbStmt);
return 5;
}
public void readFields(DataInput __dataIn) throws IOException {
if (__dataIn.readBoolean()) {
this.id = null;
} else {
this.id = Integer.valueOf(__dataIn.readInt());
}
if (__dataIn.readBoolean()) {
this.name = null;
} else {
this.name = Text.readString(__dataIn);
}
if (__dataIn.readBoolean()) {
this.age = null;
} else {
this.age = Integer.valueOf(__dataIn.readInt());
}
if (__dataIn.readBoolean()) {
this.birtday = null;
} else {
this.birtday = new Date(__dataIn.readLong());
}
if (__dataIn.readBoolean()) {
this.description = null;
} else {
this.description = Text.readString(__dataIn);
}
}
public void write(DataOutput __dataOut) throws IOException {
if (null == this.id) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
__dataOut.writeInt(this.id);
}
if (null == this.name) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
Text.writeString(__dataOut, name);
}
if (null == this.age) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
__dataOut.writeInt(this.age);
}
if (null == this.birtday) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
__dataOut.writeLong(this.birtday.getTime());
}
if (null == this.description) {
__dataOut.writeBoolean(true);
} else {
__dataOut.writeBoolean(false);
Text.writeString(__dataOut, description);
}
}
private static final DelimiterSet __outputDelimiters = new DelimiterSet((char) 44, (char) 10, (char) 0, (char) 0, false);
public String toString() {
return toString(__outputDelimiters, true);
}
public String toString(DelimiterSet delimiters) {
return toString(delimiters, true);
}
public String toString(boolean useRecordDelim) {
return toString(__outputDelimiters, useRecordDelim);
}
public String toString(DelimiterSet delimiters, boolean useRecordDelim) {
StringBuilder __sb = new StringBuilder();
char fieldDelim = delimiters.getFieldsTerminatedBy();
__sb.append(FieldFormatter.escapeAndEnclose(id==null?"null":"" + id, delimiters));
__sb.append(fieldDelim);
__sb.append(FieldFormatter.escapeAndEnclose(name==null?"null":name, delimiters));
__sb.append(fieldDelim);
__sb.append(FieldFormatter.escapeAndEnclose(age==null?"null":"" + age, delimiters));
__sb.append(fieldDelim);
__sb.append(FieldFormatter.escapeAndEnclose(birtday==null?"null":"" + birtday, delimiters));
__sb.append(fieldDelim);
__sb.append(FieldFormatter.escapeAndEnclose(description==null?"null":description, delimiters));
if (useRecordDelim) {
__sb.append(delimiters.getLinesTerminatedBy());
}
return __sb.toString();
}
private static final DelimiterSet __inputDelimiters = new DelimiterSet((char) 44, (char) 10, (char) 0, (char) 0, false);
private RecordParser __parser;
public void parse(Text __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List<String> __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(CharSequence __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List<String> __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(byte [] __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List<String> __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(char [] __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List<String> __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(ByteBuffer __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List<String> __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
public void parse(CharBuffer __record) throws RecordParser.ParseError {
if (null == this.__parser) {
this.__parser = new RecordParser(__inputDelimiters);
}
List<String> __fields = this.__parser.parseRecord(__record);
__loadFromFields(__fields);
}
private void __loadFromFields(List<String> fields) {
Iterator<String> __it = fields.listIterator();
String __cur_str = null;
try {
__cur_str = __it.next();
if (__cur_str.equals("null") || __cur_str.length() == 0) { this.id = null; } else {
this.id = Integer.valueOf(__cur_str);
}
__cur_str = __it.next();
if (__cur_str.equals("null")) { this.name = null; } else {
this.name = __cur_str;
}
__cur_str = __it.next();
if (__cur_str.equals("null") || __cur_str.length() == 0) { this.age = null; } else {
this.age = Integer.valueOf(__cur_str);
}
__cur_str = __it.next();
if (__cur_str.equals("null") || __cur_str.length() == 0) { this.birtday = null; } else {
this.birtday = java.sql.Date.valueOf(__cur_str);
}
__cur_str = __it.next();
if (__cur_str.equals("null")) { this.description = null; } else {
this.description = __cur_str;
}
} catch (RuntimeException e) { throw new RuntimeException("Can't parse input data: '" + __cur_str + "'", e); } }
public Object clone() throws CloneNotSupportedException {
employees o = (employees) super.clone();
o.birtday = (o.birtday != null) ? (java.sql.Date) o.birtday.clone() : null;
return o;
}
public Map<String, Object> getFieldMap() {
Map<String, Object> __sqoop$field_map = new TreeMap<String, Object>();
__sqoop$field_map.put("id", this.id);
__sqoop$field_map.put("name", this.name);
__sqoop$field_map.put("age", this.age);
__sqoop$field_map.put("birtday", this.birtday);
__sqoop$field_map.put("description", this.description);
return __sqoop$field_map;
}
public void setField(String __fieldName, Object __fieldVal) {
if ("id".equals(__fieldName)) {
this.id = (Integer) __fieldVal;
}
else if ("name".equals(__fieldName)) {
this.name = (String) __fieldVal;
}
else if ("age".equals(__fieldName)) {
this.age = (Integer) __fieldVal;
}
else if ("birtday".equals(__fieldName)) {
this.birtday = (java.sql.Date) __fieldVal;
}
else if ("description".equals(__fieldName)) {
this.description = (String) __fieldVal;
}
else {
throw new RuntimeException("No such field: " + __fieldName);
}
}
}