网站首页 > 技术文章 正文
分隔符 是hive在建表的时候要考虑的一个重要因素,根据要加载的原始数据的格式不同,通常数据文件中的分隔符也有差异,因此可以在建表的时候指定分隔符,从而映射到hive的数据表。
hive默认分隔符规则以及限制
Hive 默认序列化类是 LazySimpleSerDe,其只支持使用单字节分隔符(char)来加载文本数据,例如逗号、制表符、空格等等,默认的分隔符为”\001”。
根据不同文件的不同分隔符,我们可以通过在创建表时使用 row format delimited 来指定文件中的分割符,确保正确将表中的每一列与文件中的每一列实现一一对应的关系。
突破默认限制规则约束
- 解决方案一:替换分隔符
对原始数据进行预处理,将双分隔符转换为单个分隔符后再导入;
转换的过程,可以人工处理,也可以使用MR程序处理;
Linux 命令 : sed 's/#//g' data.txt > new_data.txt
MR程序处理伪代码 :
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
/**
* @ClassName ChangeSplitCharMR
* @Description TODO MapReduce实现将多字节分隔符转换为单字节符
* @Create By itcast
*/
public class ChangeSplitCharMR extends Configured implements Tool {
public int run(String[] arg) throws Exception {
/**
* 构建Job
*/
Job job = Job.getInstance(this.getConf(),"changeSplit");
job.setJarByClass(ChangeSplitCharMR.class);
/**
* 配置Job
*/
//input:读取需要转换的文件
job.setInputFormatClass(TextInputFormat.class);
Path inputPath = new Path("datas/split/test01.txt");
FileInputFormat.setInputPaths(job,inputPath);
//map:调用Mapper
job.setMapperClass(ChangeSplitMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
//reduce:不需要Reduce过程
job.setNumReduceTasks(0);
//output
job.setOutputFormatClass(TextOutputFormat.class);
Path outputPath = new Path("datas/output/changeSplit");
TextOutputFormat.setOutputPath(job,outputPath);
/**
* 提交Job
*/
return job.waitForCompletion(true) ? 0 : -1;
}
//程序入口
public static void main(String[] args) throws Exception {
//调用run
Configuration conf = new Configuration();
int status = ToolRunner.run(conf, new ChangeSplitCharMR(), args);
System.exit(status);
}
public static class ChangeSplitMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
//定义输出的Key
private Text outputKey = new Text();
//定义输出的Value
private NullWritable outputValue = NullWritable.get();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//获取每条数据
String line = value.toString();
//将里面的||转换为|
String newLine = line.replaceAll("\\|\\|", "|");
//替换后的内容作为Key
this.outputKey.set(newLine);
//输出结果
context.write(this.outputKey,this.outputValue);
}
}
}
- 解决方案二:RegexSerDe正则加载
使用hive提供的相关正则的语法来处理;
Hive内置的SerDe :
除了使用最多的LazySimpleSerDe,Hive该内置了很多SerDe类;
官网地址:https://cwiki.apache.org/confluence/display/Hive/SerDe;
多种SerDe用于解析和加载不同类型的数据文件,常用的有ORCSerDe 、RegexSerDe、JsonSerDe等;
1、RegexSerDe用来加载特殊数据的问题,使用正则匹配来加载数据;
2、根据正则表达式匹配每一列数据;
示例 :
-- 如果表已存在就删除表
drop table if exists singer;
-- 创建表 : 方法一 使用 hive-serde.jar
create table singer(
id int,
name string,
age string
);
-- 指定使用RegexSerde加载数据,字段类型不限制只能使用 string
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
-- 一个() 代表一个字段,
-- org.apache.hadoop.hive.serde2.RegexSerDe; 不支持output.format.string设定
WITH SERDEPROPERTIES ("input.regex" = "([0-9]*)\\|\\|(.*)\\|\\|(.*)")
STORED as textfile;
--创建表 : 方法二 使用 hive-contrib.jar
create table singer(
id int,
name string,
age string
)
-- 指定使用RegexSerde加载数据,字段类型只能是 string
-- org.apache.hadoop.hive.contrib.serde2.RegexSerDe only accepts string columns
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES ("input.regex" = "([0-9]*)\\|\\|(.*)\\|\\|(.*)","output.format.string"="%1$s %2$s %3$s")
STORED as textfile;
SERDEPROPERTIES 解释 :
input.regex:输入的正则表达式 ;
input.regex.case.insensitive:是否忽略字母大小写,默认为false ;
output.format.string:
输出的正则表达式,按照顺序往后递增即可(https://blog.csdn.net/alenejinping/article/details/44751101);
其中%1 %2表示第1,2个占位符。$s表示用字符串形式替代占位符,是一种格式化输出;
-- 加载数据
load data local inpath '/home/singer.txt' into table singer;
-- 查询结果
select * from singer;
- 解决方案三:自定义InputFormat
Hive中也允许使用自定义InputFormat来解决以上问题,通过在自定义InputFormat,来自定义解析逻辑实现读取每一行的数据。
自定义InputFormat,与MapReudce中自定义InputFormat一致,继承TextInputFormat;
代码 :
1. 自定义 UserInputFormat 类 :
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
import java.io.IOException;
/**
* @ClassName UserInputFormat
* @Description TODO 用于实现自定义InputFormat,读取每行数据
*/
public class UserInputFormat extends TextInputFormat {
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job,
Reporter reporter) throws IOException {
reporter.setStatus(genericSplit.toString());
UserRecordReader reader = new UserRecordReader(job,(FileSplit)genericSplit);
return reader;
}
}
2. UserRecordReader 类 :
用于自定义读取器,在自定义InputFormat中使用,将读取到的每行数据中的||替换为|
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
import java.io.IOException;
import java.io.InputStream;
/**
* @ClassName UserRecordReader
* @Description TODO 用于自定义读取器,在自定义InputFormat中使用,将读取到的每行数据中的||替换为|
*/
public class UserRecordReader implements RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(LineRecordReader.class.getName());
int maxLineLength;
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private Seekable filePosition;
private CompressionCodec codec;
private Decompressor decompressor;
public UserRecordReader(Configuration job, FileSplit split) throws IOException {
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
codec = compressionCodecs.getCodec(file);
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
if (isCompressedInput()) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec)
.createInputStream(fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new LineReader(cIn, job);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn; // take pos from compressed stream
} else {
in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new LineReader(fileIn, job);
filePosition = fileIn;
}
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
}
private boolean isCompressedInput() {
return (codec != null);
}
private int maxBytesToConsume(long pos) {
return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
}
private long getFilePosition() throws IOException {
long retVal;
if (isCompressedInput() && null != filePosition) {
retVal = filePosition.getPos();
} else {
retVal = pos;
}
return retVal;
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
/**
* Read a line.
*/
public synchronized boolean next(LongWritable key, Text value) throws IOException {
while (getFilePosition() <= end) {
key.set(pos);
int newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength));
String str = value.toString().replaceAll("\\|\\|", "\\|");
value.set(str);
pos += newSize;
if (newSize == 0) {
return false;
}
if (newSize < maxLineLength) {
return true;
}
LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
}
return false;
}
public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (getFilePosition() - start) / (float) (end - start));
}
}
public synchronized long getPos() throws IOException {
return pos;
}
public synchronized void close() throws IOException {
try {
if (in != null) {
in.close();
}
} finally {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
}
}
}
public static class LineReader extends org.apache.hadoop.util.LineReader {
LineReader(InputStream in) {
super(in);
}
LineReader(InputStream in, int bufferSize) {
super(in, bufferSize);
}
public LineReader(InputStream in, Configuration conf) throws IOException {
super(in, conf);
}
}
}
本地打成jar包并上传到服务器;
使用命令上传jar到hive的依赖包目录 : add jar xxx.jar;
重新创建表,加载数据,同时指定InputFormat为自定义的InputFormat :
--如果表已存在就删除表
drop table if exists singer;
--创建表
create table singer(
id string,
name string,
works string)
--指定使用分隔符为|
row format delimited fields terminated by '|'
--指定使用自定义的类实现解析
stored as
inputformat 'xxx.xxx.hive.mr.UserInputFormat'
outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
--加载数据
load data local inpath '/usr/local/soft/selectdata/test01.txt' into table singer;
以上三种方法 : 替换分隔符无法解决数据字段中依然存在分隔符的问题,自定义InputFormat的开发成本较高,所以整体推荐使用正则加载的方式来实现对于特殊数据的处理。
- 解决方案四:MultiDelimitSerDe
官方文档 : https://cwiki.apache.org/confluence/display/Hive/MultiDelimitSerDe
语法 :
CREATE TABLE test (
id string,
hivearray array<binary>,
hivemap map<string,int>)
-- ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.MultiDelimitSerDe' -- 官方的
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe' -- 打开 hive-contrib jar 后看到的,使用这个执行不报错;
WITH SERDEPROPERTIES ("field.delim"="[,]","collection.delim"=":","mapkey.delim"="@");
局限性 :
1. 在分隔符中,field.delim 是必需的,可以是多个字符,而 collection.delim 和 mapkey.delim 是可选的,仅支持单个字符。
2. 不支持嵌套复杂类型,例如 Array<Array>。
3. 要在 Hive 版本 4.0.0 之前使用 MultiDelimitSerDe,您必须将 hive-contrib jar 添加到类路径中,例如使用 add jar 命令。
问题 :
1. Class org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe not found
解决方法 : add jar /usr/hdp/3.1.5.0-152/hive/lib/hive-contrib.jar
示例 :
-- 如果表已存在就删除表
drop table if exists singer;
-- 创建表
create table singer(
id int,
name string,
age string
)
-- 指定使用RegexSerde加载数据
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe'
WITH SERDEPROPERTIES ("field.delim"="||")
STORED as textfile;
-- 加载数据
load data local inpath '/home/visits.txt' into table singer;
select * from singer;
处理csv格式文件
需求 :
csv : 逗号分隔值文件;
需要处理的文本数据 : 只要列值里有逗号的,全都加了引号引起来了,代表是一个整体的字符串;
解决方法 : 使用 CSV Serde;
CSVSerde 已针对 Hive 0.14 及更高版本构建和测试,并使用与 Hive 发行版捆绑在一起的 Open-CSV 2.3。
需要 hive-serde.jar 和 opencsv-2.3.jar;
示例 :
测试数据 : 1 'zs ls' 34
-- 建表语句
CREATE TABLE my_csv(
id int,
name string,
age string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
"separatorChar" = "\t",
"quoteChar" = "'",
"escapeChar" = "\\"
)
STORED AS TEXTFILE;
SERDEPROPERTIES 解释 :
为行的字段指定字段分隔符、字段内容引用字符和转义字符;
默认的分隔符是 :
分隔符:DEFAULT_SEPARATOR , 逗号
引号符:DEFAULT_QUOTE_CHARACTER “引号
转义符:DEFAULT_ESCAPE_CHARACTER \转义符
-- 加载数据
load data local inpath '/home/visits.txt' into table my_csv;
-- 查询结果
select * from my_csv;
-- 参考 : https://blog.csdn.net/wypblog/article/details/106152742
使用 OpenCSVSerde 时,show 的时候字段全变成 string 类型
org.apache.hadoop.hive.ql.exec.DDLTask#showCreateTable 里面会调用 org.apache.hadoop.hive.ql.metadata.Table#getCols 方法,这里的 getDeserializer() 调用会根据我们建表时候指定的 row format serde 来创建对应的 Deserializer,因为 test_open_csv_serde 表的 serde 是 OpenCSVSerde,所以在初始化 Deserializer 的时候会调用 org.apache.hadoop.hive.serde2.OpenCSVSerde#initialize 方法,这里面会把表的所有列类型设置为 PrimitiveObjectInspectorFactory.javaStringObjectInspector,这个就直接导致后面我们 show create table 的时候显示所有字段为 string。紧接着用 columnOIs 去初始化 inspector 对象。
初始化完 OpenCSVSerde 之后,getCols 方法会调用 org.apache.hadoop.hive.metastore.MetaStoreUtils#getFieldsFromDeserializer 方法,这个方法第一行就是 ObjectInspector oi = deserializer.getObjectInspector();,也就是获取我们上面介绍的 OpenCSVSerde 的 inspector:后面用这个解析字段类型的时候就直接拿到所有字段为 string。
猜你喜欢
- 2024-10-17 hive中json字符串解析之get_json_object与json_tuple
- 2024-10-17 hive学习笔记之三:内部表和外部表
- 2024-10-17 精选Hive高频面试题11道,附答案详细解析
- 2024-10-17 分享7个Flutter开发库,让你成为高效开发者
- 2024-10-17 看完这一篇数据仓库干货,终于搞懂什么是hive了
- 2024-10-17 Hive架构及Hive SQL的执行流程解读
- 2024-10-17 hadoop上搭建hive hadoop+hive
- 2024-10-17 HIVE常用函数大全 hive用法
- 2024-10-17 求求你别再手动部署jar包了,太low了
- 2024-10-17 Hive 函数 + Shell编程的具体实践与运用
你 发表评论:
欢迎- 11-19零基础学习!数据分析分类模型「支持向量机」
- 11-19机器学习 | 算法笔记(三)- 支持向量机算法以及代码实现
- 11-19我以前一直没有真正理解支持向量机,直到我画了一张图
- 11-19研一小姑娘分享机器学习之SVM支持向量机
- 11-19[机器学习] sklearn支持向量机
- 11-19支持向量机
- 11-19初探支持向量机:用大白话解释、原理详解、Python实现
- 11-19支持向量机的核函数
- 最近发表
- 标签列表
-
- oraclesql优化 (66)
- 类的加载机制 (75)
- feignclient (62)
- 一致性hash算法 (71)
- dockfile (66)
- 锁机制 (57)
- javaresponse (60)
- 查看hive版本 (59)
- phpworkerman (57)
- spark算子 (58)
- vue双向绑定的原理 (68)
- springbootget请求 (58)
- docker网络三种模式 (67)
- spring控制反转 (71)
- data:image/jpeg (69)
- base64 (69)
- java分页 (64)
- kibanadocker (60)
- qabstracttablemodel (62)
- java生成pdf文件 (69)
- deletelater (62)
- com.aspose.words (58)
- android.mk (62)
- qopengl (73)
- epoch_millis (61)
本文暂时没有评论,来添加一个吧(●'◡'●)