计算机系统应用教程网站

网站首页 > 技术文章 正文

hive 多字段分隔符使用 hive分隔符\001

btikc 2024-10-17 08:48:03 技术文章 13 ℃ 0 评论

分隔符 是hive在建表的时候要考虑的一个重要因素,根据要加载的原始数据的格式不同,通常数据文件中的分隔符也有差异,因此可以在建表的时候指定分隔符,从而映射到hive的数据表。

hive默认分隔符规则以及限制

Hive 默认序列化类是 LazySimpleSerDe,其只支持使用单字节分隔符(char)来加载文本数据,例如逗号、制表符、空格等等,默认的分隔符为”\001”。

根据不同文件的不同分隔符,我们可以通过在创建表时使用 row format delimited 来指定文件中的分割符,确保正确将表中的每一列与文件中的每一列实现一一对应的关系。

突破默认限制规则约束

  1. 解决方案一:替换分隔符
   对原始数据进行预处理,将双分隔符转换为单个分隔符后再导入;
   转换的过程,可以人工处理,也可以使用MR程序处理;

Linux 命令 : sed 's/#//g' data.txt > new_data.txt

MR程序处理伪代码 :
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
 
import java.io.IOException;
 
/**
 * @ClassName ChangeSplitCharMR
 * @Description TODO MapReduce实现将多字节分隔符转换为单字节符
 * @Create By  itcast
 */
public class ChangeSplitCharMR extends Configured implements Tool {
    public int run(String[] arg) throws Exception {
        /**
         * 构建Job
         */
        Job job = Job.getInstance(this.getConf(),"changeSplit");
        job.setJarByClass(ChangeSplitCharMR.class);
 
        /**
         * 配置Job
         */
        //input:读取需要转换的文件
        job.setInputFormatClass(TextInputFormat.class);
        Path inputPath = new Path("datas/split/test01.txt");
        FileInputFormat.setInputPaths(job,inputPath);
 
        //map:调用Mapper
        job.setMapperClass(ChangeSplitMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
 
        //reduce:不需要Reduce过程
        job.setNumReduceTasks(0);
 
        //output
        job.setOutputFormatClass(TextOutputFormat.class);
        Path outputPath = new Path("datas/output/changeSplit");
        TextOutputFormat.setOutputPath(job,outputPath);
 
        /**
         * 提交Job
         */
        return job.waitForCompletion(true) ? 0 : -1;
    }
 
    //程序入口
    public static void main(String[] args) throws Exception {
        //调用run
        Configuration conf = new Configuration();
        int status = ToolRunner.run(conf, new ChangeSplitCharMR(), args);
        System.exit(status);
    }
 
 
    public static class ChangeSplitMapper extends Mapper<LongWritable,Text,Text,NullWritable>{
        //定义输出的Key
        private Text outputKey = new Text();
        //定义输出的Value
        private NullWritable outputValue = NullWritable.get();
 
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取每条数据
            String line = value.toString();
            //将里面的||转换为|
            String newLine = line.replaceAll("\\|\\|", "|");
            //替换后的内容作为Key
            this.outputKey.set(newLine);
            //输出结果
            context.write(this.outputKey,this.outputValue);
        }
    }
}
  1. 解决方案二:RegexSerDe正则加载
使用hive提供的相关正则的语法来处理;

Hive内置的SerDe : 
      除了使用最多的LazySimpleSerDe,Hive该内置了很多SerDe类;
      官网地址:https://cwiki.apache.org/confluence/display/Hive/SerDe;
      多种SerDe用于解析和加载不同类型的数据文件,常用的有ORCSerDe 、RegexSerDe、JsonSerDe等;
      
1、RegexSerDe用来加载特殊数据的问题,使用正则匹配来加载数据;
2、根据正则表达式匹配每一列数据;
示例 :

-- 如果表已存在就删除表
drop table if exists singer;
-- 创建表 : 方法一  使用 hive-serde.jar
create table singer(
id int,
name string,
age string
);
-- 指定使用RegexSerde加载数据,字段类型不限制只能使用 string
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
-- 一个() 代表一个字段,
-- org.apache.hadoop.hive.serde2.RegexSerDe; 不支持output.format.string设定
WITH SERDEPROPERTIES ("input.regex" = "([0-9]*)\\|\\|(.*)\\|\\|(.*)")
STORED  as textfile;
 
--创建表 : 方法二 使用 hive-contrib.jar
create table singer(
id int,
name string,
age string
)
-- 指定使用RegexSerde加载数据,字段类型只能是 string
--  org.apache.hadoop.hive.contrib.serde2.RegexSerDe only accepts string columns
ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.RegexSerDe'
WITH SERDEPROPERTIES ("input.regex" = "([0-9]*)\\|\\|(.*)\\|\\|(.*)","output.format.string"="%1$s %2$s %3$s")
STORED  as textfile;

SERDEPROPERTIES 解释 :
        input.regex:输入的正则表达式 ; 
        input.regex.case.insensitive:是否忽略字母大小写,默认为false ;
        output.format.string:
                输出的正则表达式,按照顺序往后递增即可(https://blog.csdn.net/alenejinping/article/details/44751101);
                其中%1 %2表示第1,2个占位符。$s表示用字符串形式替代占位符,是一种格式化输出;

-- 加载数据
load data local inpath '/home/singer.txt' into table singer;

-- 查询结果
 select * from singer;
  1. 解决方案三:自定义InputFormat
Hive中也允许使用自定义InputFormat来解决以上问题,通过在自定义InputFormat,来自定义解析逻辑实现读取每一行的数据。

自定义InputFormat,与MapReudce中自定义InputFormat一致,继承TextInputFormat;

代码 : 
1. 自定义 UserInputFormat 类 :
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*;
 
import java.io.IOException;
 
/**
 * @ClassName UserInputFormat
 * @Description TODO 用于实现自定义InputFormat,读取每行数据
 */
 
public class UserInputFormat extends TextInputFormat {
    @Override
    public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job,
                                                            Reporter reporter) throws IOException {
        reporter.setStatus(genericSplit.toString());
        UserRecordReader reader = new UserRecordReader(job,(FileSplit)genericSplit);
        return reader;
    }
}

2. UserRecordReader 类 : 
用于自定义读取器,在自定义InputFormat中使用,将读取到的每行数据中的||替换为|
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
 
import java.io.IOException;
import java.io.InputStream;
 
/**
 * @ClassName UserRecordReader
 * @Description TODO 用于自定义读取器,在自定义InputFormat中使用,将读取到的每行数据中的||替换为|
 */
 
 
public class UserRecordReader implements RecordReader<LongWritable, Text> {
    private static final Log LOG = LogFactory.getLog(LineRecordReader.class.getName());
    int maxLineLength;
    private CompressionCodecFactory compressionCodecs = null;
    private long start;
    private long pos;
    private long end;
    private LineReader in;
    private Seekable filePosition;
    private CompressionCodec codec;
    private Decompressor decompressor;
 
    public UserRecordReader(Configuration job, FileSplit split) throws IOException {
        this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength", Integer.MAX_VALUE);
        start = split.getStart();
        end = start + split.getLength();
        final Path file = split.getPath();
        compressionCodecs = new CompressionCodecFactory(job);
        codec = compressionCodecs.getCodec(file);
        FileSystem fs = file.getFileSystem(job);
        FSDataInputStream fileIn = fs.open(split.getPath());
        if (isCompressedInput()) {
            decompressor = CodecPool.getDecompressor(codec);
            if (codec instanceof SplittableCompressionCodec) {
                final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec)
                        .createInputStream(fileIn, decompressor, start, end,
                                SplittableCompressionCodec.READ_MODE.BYBLOCK);
                in = new LineReader(cIn, job);
                start = cIn.getAdjustedStart();
                end = cIn.getAdjustedEnd();
                filePosition = cIn; // take pos from compressed stream
            } else {
                in = new LineReader(codec.createInputStream(fileIn, decompressor), job);
                filePosition = fileIn;
            }
        } else {
            fileIn.seek(start);
            in = new LineReader(fileIn, job);
            filePosition = fileIn;
        }
        if (start != 0) {
            start += in.readLine(new Text(), 0, maxBytesToConsume(start));
        }
        this.pos = start;
    }
 
    private boolean isCompressedInput() {
        return (codec != null);
    }
 
    private int maxBytesToConsume(long pos) {
        return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos);
    }
 
    private long getFilePosition() throws IOException {
        long retVal;
        if (isCompressedInput() && null != filePosition) {
            retVal = filePosition.getPos();
        } else {
            retVal = pos;
        }
        return retVal;
    }
 
    public LongWritable createKey() {
        return new LongWritable();
    }
 
    public Text createValue() {
        return new Text();
    }
 
    /**
     * Read a line.
     */
    public synchronized boolean next(LongWritable key, Text value) throws IOException {
        while (getFilePosition() <= end) {
            key.set(pos);
            int newSize = in.readLine(value, maxLineLength, Math.max(maxBytesToConsume(pos), maxLineLength));
            String str = value.toString().replaceAll("\\|\\|", "\\|");
            value.set(str);
            pos += newSize;
            if (newSize == 0) {
                return false;
            }
            if (newSize < maxLineLength) {
                return true;
            }
            LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
        }
        return false;
    }
 
    public float getProgress() throws IOException {
        if (start == end) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (getFilePosition() - start) / (float) (end - start));
        }
    }
 
    public synchronized long getPos() throws IOException {
        return pos;
    }
 
    public synchronized void close() throws IOException {
        try {
            if (in != null) {
                in.close();
            }
        } finally {
            if (decompressor != null) {
                CodecPool.returnDecompressor(decompressor);
            }
        }
    }
 
    public static class LineReader extends org.apache.hadoop.util.LineReader {
        LineReader(InputStream in) {
            super(in);
        }
 
        LineReader(InputStream in, int bufferSize) {
            super(in, bufferSize);
        }
 
        public LineReader(InputStream in, Configuration conf) throws IOException {
            super(in, conf);
        }
    }
}

本地打成jar包并上传到服务器;
使用命令上传jar到hive的依赖包目录 : add jar  xxx.jar;
重新创建表,加载数据,同时指定InputFormat为自定义的InputFormat :

--如果表已存在就删除表
drop table if exists singer;
 
--创建表
create table singer(
    id string,
    name string,
    works string)
--指定使用分隔符为|
row format delimited fields terminated by '|'
--指定使用自定义的类实现解析
stored as
inputformat 'xxx.xxx.hive.mr.UserInputFormat'
outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
 
--加载数据
load data local inpath '/usr/local/soft/selectdata/test01.txt' into table singer;

以上三种方法 : 替换分隔符无法解决数据字段中依然存在分隔符的问题,自定义InputFormat的开发成本较高,所以整体推荐使用正则加载的方式来实现对于特殊数据的处理。

  1. 解决方案四:MultiDelimitSerDe
官方文档 :  https://cwiki.apache.org/confluence/display/Hive/MultiDelimitSerDe

语法 :
    CREATE TABLE test (
     id string,
     hivearray array<binary>,
     hivemap map<string,int>) 
--    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.MultiDelimitSerDe'  -- 官方的
    ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe'  -- 打开 hive-contrib jar 后看到的,使用这个执行不报错;
    WITH SERDEPROPERTIES ("field.delim"="[,]","collection.delim"=":","mapkey.delim"="@");

局限性 : 
        1. 在分隔符中,field.delim 是必需的,可以是多个字符,而 collection.delim 和 mapkey.delim 是可选的,仅支持单个字符。
        2. 不支持嵌套复杂类型,例如 Array<Array>。
        3. 要在 Hive 版本 4.0.0 之前使用 MultiDelimitSerDe,您必须将 hive-contrib jar 添加到类路径中,例如使用 add jar 命令。
    
问题 : 
        1. Class org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe not found
    		解决方法 : add jar /usr/hdp/3.1.5.0-152/hive/lib/hive-contrib.jar

示例 : 
    -- 如果表已存在就删除表
    drop table if exists singer;
    -- 创建表
    create table singer(
    id int,
    name string,
    age string
    )
    -- 指定使用RegexSerde加载数据
    ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe'
    WITH SERDEPROPERTIES ("field.delim"="||")
    STORED  as textfile;

    -- 加载数据
    load data local inpath '/home/visits.txt' into table singer;

     select * from singer;

处理csv格式文件

需求 :
        csv : 逗号分隔值文件;
        需要处理的文本数据 : 只要列值里有逗号的,全都加了引号引起来了,代表是一个整体的字符串;

解决方法 : 使用 CSV Serde;
        CSVSerde 已针对 Hive 0.14 及更高版本构建和测试,并使用与 Hive 发行版捆绑在一起的 Open-CSV 2.3。
        需要 hive-serde.jar 和 opencsv-2.3.jar;

示例 : 
测试数据 : 1	'zs	ls'	34

-- 建表语句
CREATE TABLE my_csv(
id int,
name string,
age string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
   "separatorChar" = "\t",
   "quoteChar"     = "'",
   "escapeChar"    = "\\"
)
STORED AS TEXTFILE;

SERDEPROPERTIES 解释 :
      为行的字段指定字段分隔符、字段内容引用字符和转义字符;
      默认的分隔符是 : 
            分隔符:DEFAULT_SEPARATOR , 逗号
            引号符:DEFAULT_QUOTE_CHARACTER “引号
            转义符:DEFAULT_ESCAPE_CHARACTER \转义符
-- 加载数据
load data local inpath '/home/visits.txt' into table my_csv;

-- 查询结果
 select * from my_csv;

-- 参考 : https://blog.csdn.net/wypblog/article/details/106152742
使用 OpenCSVSerde 时,show 的时候字段全变成 string 类型
    org.apache.hadoop.hive.ql.exec.DDLTask#showCreateTable 里面会调用 org.apache.hadoop.hive.ql.metadata.Table#getCols 方法,这里的 getDeserializer() 调用会根据我们建表时候指定的 row format serde 来创建对应的 Deserializer,因为 test_open_csv_serde 表的 serde 是 OpenCSVSerde,所以在初始化 Deserializer 的时候会调用 org.apache.hadoop.hive.serde2.OpenCSVSerde#initialize 方法,这里面会把表的所有列类型设置为 PrimitiveObjectInspectorFactory.javaStringObjectInspector,这个就直接导致后面我们 show create table 的时候显示所有字段为 string。紧接着用 columnOIs 去初始化 inspector 对象。
    初始化完 OpenCSVSerde 之后,getCols 方法会调用 org.apache.hadoop.hive.metastore.MetaStoreUtils#getFieldsFromDeserializer 方法,这个方法第一行就是 ObjectInspector oi = deserializer.getObjectInspector();,也就是获取我们上面介绍的 OpenCSVSerde 的 inspector:后面用这个解析字段类型的时候就直接拿到所有字段为 string。
    

Tags:

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表