第92课作业,通过SerDes的方式对一下数据进行Hive的存储和查询操作:
0^^Hadoop^^America^^5000|8000|12000|level8^^male
1^^Spark^^America^^8000|10000|15000|level9^^famale
2^^Flink^^America^^7000|8000|13000|level10^^male
3^^Hadoop^^America^^9000|11000|12000|level10^^famale
4^^Spark^^America^^10000|11000|12000|level12^^male
5^^Flink^^America^^11000|12000|18000|level18^^famale
6^^Hadoop^^America^^15000|16000|19000|level16^^male
7^^Spark^^America^^18000|19000|20000|level20^^male
8^^Flink^^America^^15000|16000|19000|level19^^male
实现:inputformat格式编码解析,灵活对hive源数据进行清洗
1,按^^进行分割
2,同时也按|进行切分
实现步骤:
1,源数据位置:
root@master:/usr/local/IMF_testdata/hivestudy#ls
employeesinputformat.txt IMFInputFormat2.jar
2,查看文件内容
root@master:/usr/local/IMF_testdata/hivestudy#cat employeesinputformat.txt
0^^Hadoop^^America^^5000|8000|12000|level8^^male
1^^Spark^^America^^8000|10000|15000|level9^^famale
2^^Flink^^America^^7000|8000|13000|level10^^male
3^^Hadoop^^America^^9000|11000|12000|level10^^famale
4^^Spark^^America^^10000|11000|12000|level12^^male
5^^Flink^^America^^11000|12000|18000|level18^^famale
6^^Hadoop^^America^^15000|16000|19000|level16^^male
7^^Spark^^America^^18000|19000|20000|level20^^male
8^^Flink^^America^^15000|16000|19000|level19^^male
3,开发inputformat代码,源代码附后.导出jar包IMFInputFormat2.jar
代码中使用了正则表达式对文本进行了解析:
String patternhive = “^(.*)^^(.*)^^(.*)^^(.*)|(.*)|(.*)|(.*)^^(.*)”;
按^^及|进行解析,解析以后进行分组,依次获取各分组的值,然后使用”u001″组拼接成字符串.
问题:使用”t”拼接在hive中导入数据为null;
解决:使用”u001″组拼接成字符串.,顺利导入数据到hive。
4,在hive中的操作:
删表:
drop table employee_inputformat;
导入jar包
add jar/usr/local/IMF_testdata/hivestudy/IMFInputFormat2.jar;
建立表
CREATE TABLEemployee_InputFormat(userid INT,nameString,address String, salarys1 int ,salarys2 int ,salarys3 int ,salarys4string , gendre string) stored asINPUTFORMAT 'com.dt.spark.hive.IMFInputFormat' OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
加载数据
LOAD DATA LOCAL INPATH'/usr/local/IMF_testdata/hivestudy/employeesinputformat.txt' INTO TABLEemployee_InputFormat;
数据查询
select * from employee_InputFormat;
5,运行结果如下:
hive> desc formatted employee_inputformat;
OK
# col_name data_type comment
userid int
name string
address string
salarys1 int
salarys2 int
salarys3 int
salarys4 string
gendre string
# Detailed Table Information
Database: default
Owner: root
CreateTime: Sun Dec 11 20:47:21 CST 2016
LastAccessTime: UNKNOWN
Protect Mode: None
Retention: 0
Location: hdfs://master:9000/user/hive/warehouse/employee_inputformat
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE true
numFiles 1
totalSize 467
transient_lastDdlTime 1481460441
# Storage Information
SerDe Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
InputFormat: com.dt.spark.hive.IMFInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
Time taken: 0.111 seconds, Fetched: 36row(s)
hive>
附件源代码:
package com.dt.spark.hive;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
importorg.apache.hadoop.mapred.JobConfigurable;
importorg.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
importorg.apache.hadoop.mapred.TextInputFormat;
public class IMFInputFormat extends TextInputFormat implements
JobConfigurable
{
public RecordReader getRecordReader(
InputSplit genericSplit, JobConfjob, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new IMFRecordReader((FileSplit)genericSplit,job);
}
}
源代码:
package com.dt.spark.hive;
import java.io.IOException;
import java.io.InputStream;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.util.LineReader;
import org.apache.hadoop.mapred.RecordReader;
public class IMFRecordReader implements RecordReader {
private CompressionCodecFactorycompressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReaderlineReader;
int maxLineLength;
public IMFRecordReader(FileSplitinputSplit, Configuration job) throws IOException {
maxLineLength = job.getInt("mapred.IMFRecordReader.maxlength", Integer.MAX_VALUE);
start = inputSplit.getStart();
end = start + inputSplit.getLength();
final Pathfile = inputSplit.getPath();
compressionCodecs = new CompressionCodecFactory(job);
final CompressionCodeccodec = compressionCodecs.getCodec(file);
// Open file and seek to thestart of the split
fs = file.getFileSystem(job);
fileIn =fs.open(file);
booleanskipFirstLine = false;
if (codec !=null) {
lineReader = new LineReader(codec.createInputStream(fileIn),job);
end = Long.MAX_VALUE;
} else {
if (start
skipFirstLine = true;
start;
fileIn.seek(start);
}
lineReader = new LineReader(fileIn,job);
}
if (skipFirstLine) {
start += lineReader.readLine(new Text(), 0, (int) Math.min((long) Integer.MAX_VALUE,end - start));
}
this.pos =start;
}
public IMFRecordReader(InputStreamin, longoffset, longendOffset, intmaxLineLength) {
this.maxLineLength =maxLineLength;
this.lineReader =new LineReader(in);
this.start =offset;
this.pos =offset;
this.end =endOffset;
}
public IMFRecordReader(InputStreamin, longoffset, longendOffset, Configuration job) throws IOException {
this.maxLineLength =job.getInt("mapred.IMFRecordReader.maxlength", Integer.MAX_VALUE);
this.lineReader =new LineReader(in,job);
this.start =offset;
this.pos =offset;
this.end =endOffset;
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
/**
* Reads the next record inthe split. getusefull fields from the raw nginx
* log.
*
* @param key
* key of the record which will map tothe byte offset of the
* record's line
* @param value
* the record in text format
* @return true if a recordexisted, false otherwise
* @throws IOException
*/
public synchronized boolean next(LongWritablekey, Text value)throws IOException {
// Stay within the split
while (pos
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
相关推荐: Python用Keras神经网络序列模型回归拟合预测、准确度检查和结果可视化|附代码数据
原文链接:http://tecdat.cn/?p=23573 最近我们被客户要求撰写关于Keras神经网络序列模型的研究报告,包括一些图形和统计输出。 我们可以很容易地用Keras序列模型拟合回归数据并预测测试数据。 在这篇文章中,我们将简要地学习如何用Py…