0%

HttpFS高并发写HDFS

本文主要包括:

  • HttpFS高并发写HDFS

HttpFS配置与启动

这个网上很多,这里就不赘述了

HttpFS与WebHDFS的区别

WebHDFS概述

WebHDFS 提供了访问HDFS的RESTful接口,内置组件,默认开启。 WebHDFS 使得集群外的客户端可以不用安装HADOOP和JAVA环境就可以对HDFS进行访问,且客户端不受语言限制。 当客户端请求某文件时,WebHDFS会将其重定向到该资源所在的datanode。端口默认是9870

说到底:WebHDFS是为了让用户可以使用Restful风格访问HDFS,但是,它还是通过直接与datanode通信写数据,写数据的协议还是用的hdfs协议。性能比较高
但是,使用WebHDFS还是需要client与hadoop集群在一个内网内,如果client在外网,就访问不到datanode

其他的概述可以在网上看,可以参考HDFS API的RESTful风格–WebHDFS

HttpFS概述

HttpFS 是一个提供 REST HTTP 网关的服务器,支持所有 HDFS 文件系统操作(读取和写入)。它可以与 webhdfs REST HTTP API 互操作。

  • HttpFS 是一个提供RESTful 接口的网关的服务器,该网关支持所有HDFS文件系统操作
  • 对于文件CURD的操作全部提交给HttpFS服务进行中转,然后由HttpFS去跟HDFS集群交互
  • HttpFS是一个独立于HDFS的服务,若使用需要手动安装。本质上是一个代理服务
  • HttpFS本身是Java Web应用程序。使用内置的Jetty服务器对外提供服务
  • HttpFS默认端口号为14000

WebHDFS 与 HttpFs 之间的主要区别

WebHDFS 需要访问集群的所有节点,并且当读取某些数据时,它会直接从该节点传输,而在 HttpFs 中,单个节点的作用类似于“网关”,并且将 是到客户端节点的单点数据传输。 因此,HttpFs 在大文件传输期间可能会被阻塞,但好处是我们最大限度地减少了访问 HDFS 所需的占用空间。

Java通过HttpFS写数据到HDFS

写Partquet/Textfile文件

写parquet文件网上资料很多,我这里主要是借助hive-exec包来对数据类型转换,否则自己转换,怕出现数据准确性问题,特别是decimal/timestamp

package com.digiwin.dmp.utils;


import com.digiwin.dmp.constant.Constants;
import com.digiwin.dmp.model.HiveTableMeta;
import com.digiwin.dmp.model.TbbSourceBean;
import com.digiwin.dmp.parquet.ParquetDataSchema;
import com.digiwin.dmp.parquet.ParquetDataWrite;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;

import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;


public class HiveTableWriteUtil {
    private static final Logger logger = LogManager.getLogger(ImpalaJdbcUtil.class);
    private static final String PARQUET = "parquet";
    private static final String TEXTFILE = "textfile";

    public static TbbSourceBean getSourceData() throws IOException {
        InputStream inputStream = HiveTableWriteUtil.class.getResourceAsStream("/tbb_table_parquet_format.json");
        BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
        String line = reader.readLine();
        TbbSourceBean tbbSourceBean = GsonUtil.fromJson(line, TbbSourceBean.class);
        return tbbSourceBean;
    }

    public static void main(String[] args) throws Exception {
        TbbSourceBean tbbSourceBean = getSourceData();
        ArrayList<Map<String,Object>> partition = new ArrayList<>();

        doFileLoad("test_dev", "test", "tbb_table_parquet_format",partition,tbbSourceBean.getData(),tbbSourceBean.getSchema(),TEXTFILE,"\u0001",true);
    }



    /**
     * 入口方法
     * @param env                   Constants.TEST_DEV/PROD_HW/TEST_HW/TEST_HW/PROD_AZURE
     * @param database              数据库
     * @param tableName             表名
     * @param partitionList         分区列表,如果不是分区表,new JSONArray()即可
     * @param hiveData              hive表的数据
     * @param hiveTableMeta         列信息
     * @param fileType              parquet/textfile
     * @param delimiter             默认:\u0001
     * @param isTruncate            true/false
     * @throws Exception
     */
    public static void doFileLoad(String env, String database, String tableName, ArrayList<Map<String,Object>> partitionList, List<List<Object>> hiveData,List<HiveTableMeta> hiveTableMeta ,String fileType,String delimiter, boolean isTruncate) throws Exception {
        int maxRetryCount = 5;
        int currentRetryCount = 0;

        while (currentRetryCount < maxRetryCount) {
            try {
                if(PARQUET.equals(fileType)){
                    writeJSONDataWithParquet(env,database,tableName,partitionList,hiveData,hiveTableMeta,isTruncate);
                } else if (TEXTFILE.equals(fileType)) {
                    writeJSONDataWithText(env,database,tableName,partitionList,hiveData,hiveTableMeta,delimiter,isTruncate);
                } else {
                    throw new Exception("当前仅支持parquet、textfile");
                }
                break;
            } catch (Exception e) {
                e.printStackTrace();
            }
            currentRetryCount++;
        }

        if (currentRetryCount == maxRetryCount) {
            throw new Exception("<============= 数据写入异常,已经重试5次,本次写入失败,请联系中台人员 =============>");
        }

    }

    /**
     * 这里的hiveData就是tbb从地端接受过来的jsonarray,这种数据只会遍历一次,时间复杂度低
     * @param env
     * @param database
     * @param tableName
     * @param partitionList
     * @param hiveData
     *          {"schema":[{"column_name":"TB003","column_type":"string"},{"column_name":"MB002","column_type":"string"},{"column_name":"TA001_2","column_type":"string"}],
     *          "data":[["004BYS015C30102-075","Y閥鑄件本體 WCB","511-1110707017"]]}
     * @param hiveTableMeta
     * @param isTruncate
     * @throws Exception
     */
    public static void writeJSONDataWithParquet(String env, String database, String tableName, ArrayList<Map<String,Object>> partitionList, List<List<Object>> hiveData,List<HiveTableMeta> hiveTableMeta, boolean isTruncate) throws Exception {
        Configuration conf = HadoopUtil.getHdfsConfiguration(env);
        FileSystem fileSystem = FileSystem.get(conf);
        String defaultPath = getHiveTableDefaultPath(fileSystem,database, tableName, partitionList,false);
        String defaultPathTemp = getHiveTableDefaultPath(fileSystem,database, tableName, partitionList,true);
        Path hdfsFilePath = getTableHdfsFilePath(defaultPath,PARQUET);
        Path hdfsFilePathTemp = getTableHdfsFilePath(defaultPathTemp,PARQUET);

        try{
            MessageType messageType = getParquetSchema(hiveTableMeta);
            ExampleParquetWriter.Builder builder = ExampleParquetWriter
                    .builder(hdfsFilePathTemp)
                    .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
                    .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
                    .withCompressionCodec(CompressionCodecName.SNAPPY)
                    .withConf(conf)
                    .withRowGroupSize(1024*1024*1)   //1M数据flush一次,防止webhdfs性能问题导致Error Writing request body to server 问题
                    .withType(messageType);

            ParquetWriter<Group> writer = builder.build();
            SimpleGroupFactory groupFactory = new SimpleGroupFactory(messageType);
            for (int i = 0; i < hiveData.size(); i++) {
                Group group = groupFactory.newGroup();
                writer.write(jsonArrayConvernParquetGroup(group, hiveData.get(i), hiveTableMeta));
            }
            writer.close();
        } catch (Exception e){
            throw new Exception("写入parquet文件到" + hdfsFilePathTemp.getName() + "失败,请检查" + e.getMessage());
        }
        // 移动数据文件到目标文件夹
        moveFileToTargetPath(fileSystem,isTruncate,hdfsFilePathTemp,hdfsFilePath,defaultPath);
    }

    /**
     * 写text文件到hdfs,不传入metadata,数据类型为[[1,2,3,4],[2,3,4,5]]
     * 此方法性能比上面的高,但对用户不友好
     * @param env
     * @param database
     * @param tableName
     * @param partitionList
     * @param hiveData
     * @param delimiter
     * @throws Exception
     */
    public static void writeJSONDataWithText(String env, String database, String tableName, ArrayList<Map<String,Object>> partitionList, List<List<Object>> hiveData,List<HiveTableMeta> hiveTableMeta,String delimiter,boolean isTruncate) throws Exception {
        Configuration conf = HadoopUtil.getHdfsConfiguration(env);
        FileSystem fileSystem = FileSystem.get(conf);
        String defaultPath = getHiveTableDefaultPath(fileSystem,database, tableName, partitionList,false);
        String defaultPathTemp = getHiveTableDefaultPath(fileSystem,database, tableName, partitionList,true);
        Path hdfsFilePath = getTableHdfsFilePath(defaultPath,TEXTFILE);
        Path hdfsFilePathTemp = getTableHdfsFilePath(defaultPathTemp,TEXTFILE);
        try{
            FSDataOutputStream outputStream = fileSystem.create(hdfsFilePathTemp);
            for (int i = 0; i < hiveData.size(); i++) {
                List<String> colArray = hiveData.get(i).stream().map(Object::toString).collect(Collectors.toList());
                if (colArray.size() == hiveTableMeta.size()){
                    String colValue = String.join(delimiter, colArray) + "\n";
                    outputStream.write(colValue.getBytes());
                } else {
                    logger.warn("数据列数:" + colArray.size() + " != 表列数:" + hiveTableMeta.size());
                }

            }
            // 关闭流
            outputStream.close();

            // 移动数据文件到目标文件夹
            moveFileToTargetPath(fileSystem,isTruncate,hdfsFilePathTemp,hdfsFilePath,defaultPath);
        } catch (Exception e){
            throw new Exception("写入text文件到" + hdfsFilePathTemp.getName() + "失败,请检查" + e.getMessage());
        }

    }

    /**
     * 删除指定文件夹下所有文件
     * @param fileSystem
     * @param path
     */
    public static void truncateHdfsData(FileSystem fileSystem , Path path){
        try{
            if(fileSystem.exists(path)){
                FileStatus[] fileStatuses =  fileSystem.listStatus(path);
                for (FileStatus fileStatus : fileStatuses) {
                    fileSystem.delete(fileStatus.getPath(),true);
                }
            }
        } catch (IOException ioException){
            throw new RuntimeException("删除文件失败:"+ ioException.getMessage());
        }
    }


    private static void moveFileToTargetPath(FileSystem fileSystem ,boolean isTruncate,Path hdfsFilePathTemp,Path hdfsFilePath,String defaultPath) throws IOException {
        // 写入前清理数据
        if(isTruncate){
            Path path = new Path(defaultPath);
            logger.info("正在清理:" + path.toUri() + "下数据");
            truncateHdfsData(fileSystem,path);
        }

        if(fileSystem.rename(hdfsFilePathTemp,hdfsFilePath)){
            logger.info("数据移动到目标文件夹:" + "成功");
        } else {
            throw new IOException("数据移动到目标文件夹:失败");
        }
    }
    /**
     * 获取表在hdfs的路径
     * @param database
     * @param tableName
     * @param partitionList dt=20230111
     * @return
     */
    public static String getHiveTableDefaultPath(FileSystem fileSystem, String database, String tableName, ArrayList<Map<String,Object>> partitionList, boolean isTmp) {
        String defaultPath = "" ;
        try{
            defaultPath = isTmp ? String.format("/tmp/hive/dofileload/%s.db/%s/", database, tableName) :
                    String.format("/user/hive/warehouse/%s.db/%s/", database, tableName);
            for (int i = 0; i < partitionList.size(); i++) {
                Map<String,Object> partition = partitionList.get(i);
                for (Object key: partition.keySet()){
                    defaultPath += key + "=" + partition.get(key) + "/";
                }
            }
            if (!fileSystem.exists(new Path(defaultPath))){
                logger.info( "hdfs上路径不存在,创建: " + defaultPath);
                fileSystem.mkdirs(new Path(defaultPath));
            }
        } catch (IOException e) {
            throw new RuntimeException("创建HDFS路径:" + defaultPath + "失败,请检查:" + e.getMessage() );
        }
        return defaultPath;
    }

    /**
     * 获取表在hdfs的路径
     * @param defaultPath
     * @param fileType
     * @return
     */
    public static Path getTableHdfsFilePath(String defaultPath,String fileType){
        long timestamp = System.currentTimeMillis();
        String fileName = DigestUtils.md5Hex(defaultPath) + "_" + timestamp + (PARQUET.equals(fileType) ? ".parq" : ".text");
        defaultPath += fileName;
        return new Path(defaultPath);
    }



    /**
     * 获取parqut的MessageType
     * @param tableMetaData
     * @return
     * @throws Exception
     */
    public static MessageType getParquetSchema(List<HiveTableMeta> tableMetaData) throws Exception {
        List<String> columnNames = new ArrayList<>();
        List<TypeInfo> columnTypes = new ArrayList<>();
        for (HiveTableMeta hiveTableMeta : tableMetaData) {
            columnNames.add(hiveTableMeta.getColumn_name());
            columnTypes.add(hiveTypeConvernParquetTypeInfo(hiveTableMeta.getColumn_type()));
        }
        MessageType messageType = ParquetDataSchema.convert(columnNames, columnTypes);
        return messageType;
    }

    /**
     * 转换hive 列类型到TypeInfo类型,目前仅支持基础类型,不支持复杂类型(array,map,struct等)
     *
     * @param value
     * @return
     * @throws Exception
     */
    private static TypeInfo hiveTypeConvernParquetTypeInfo(String value) throws Exception {
        if (Constants.hivePrimitiveTypeInfoMap.containsKey(value)) {
            return Constants.hivePrimitiveTypeInfoMap.get(value);
        } else if (value.startsWith("decimal")) {
            String input = value.replaceAll(" ", "");
            // 使用正则表达式匹配并提取数字
            Pattern pattern = Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
            Matcher matcher = pattern.matcher(input.trim());
            if (matcher.find()) {
                int precision = Integer.parseInt(matcher.group(1));
                int scale = Integer.parseInt(matcher.group(2));
                return new DecimalTypeInfo(precision, scale);
            } else {
                throw new Exception("获取decimal类型异常: 未找到匹配的precision、scale");
            }
        } else {
            ArrayList<String> supportType = new ArrayList<>(Constants.hivePrimitiveTypeInfoMap.keySet());
            supportType.add("decimal");
            throw new Exception("不支持的数据类型:" + value + ",目前仅支持:" + supportType);
        }
    }

    /**
     *
     * @param group
     * @param hiveData
     * @param tableMetaData
     * @return
     * @throws Exception
     */
    private static Group jsonArrayConvernParquetGroup(Group group, List<Object> hiveData, List<HiveTableMeta> tableMetaData) throws Exception {
        for (int i = 0; i < tableMetaData.size(); i++) {
            String column_name = tableMetaData.get(i).getColumn_name();
            String column_type = tableMetaData.get(i).getColumn_type();

            if(hiveData.size() == tableMetaData.size() && null != hiveData.get(i)){
                if (TypeInfoFactory.booleanTypeInfo.getTypeName().equals(column_type)) {
                    group.append(column_name, ParquetDataWrite.booleanDataWriter(Boolean.parseBoolean(hiveData.get(i).toString())));
                } else if (TypeInfoFactory.intTypeInfo.getTypeName().equals(column_type)) {
                    group.append(column_name, ParquetDataWrite.intWriter(Integer.parseInt(hiveData.get(i).toString())));
                } else if (TypeInfoFactory.longTypeInfo.getTypeName().equals(column_type)) {
                    group.append(column_name, ParquetDataWrite.longWriter(Long.parseLong(hiveData.get(i).toString())));
                } else if (TypeInfoFactory.stringTypeInfo.getTypeName().equals(column_type) || TypeInfoFactory.charTypeInfo.getTypeName().equals(column_type)
                        || TypeInfoFactory.varcharTypeInfo.getTypeName().equals(column_type)) {
                    group.append(column_name, ParquetDataWrite.stringWriter(hiveData.get(i).toString()));
                } else if (TypeInfoFactory.floatTypeInfo.getTypeName().equals(column_type)) {
                    group.append(column_name, ParquetDataWrite.floatWriter(Float.parseFloat(hiveData.get(i).toString())));
                } else if (TypeInfoFactory.doubleTypeInfo.getTypeName().equals(column_type)) {
                    group.append(column_name, ParquetDataWrite.doubleDataWriter(Double.parseDouble(hiveData.get(i).toString())));
                } else if (TypeInfoFactory.shortTypeInfo.getTypeName().equals(column_type)) {
                    group.append(column_name, ParquetDataWrite.shortDataWriter(Short.parseShort(hiveData.get(i).toString())));
                } else if (TypeInfoFactory.dateTypeInfo.getTypeName().equals(column_type)) {
                    //这里需要注意下,验证一下是否正常
                    group.append(column_name, ParquetDataWrite.dateWrite(java.sql.Date.valueOf(hiveData.get(i).toString())));
                } else if (TypeInfoFactory.timestampTypeInfo.getTypeName().equals(column_type)) {
                    group.append(column_name, ParquetDataWrite.timestampWrite(Timestamp.valueOf(hiveData.get(i).toString())));
                } else if (column_type.startsWith("decimal")) {
                    String input = column_type.replaceAll(" ", "");
                    // 使用正则表达式匹配并提取数字
                    Pattern pattern = Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
                    Matcher matcher = pattern.matcher(input.trim());
                    if (matcher.find()) {
                        int precision = Integer.parseInt(matcher.group(1));
                        int scale = Integer.parseInt(matcher.group(2));
                        group.append(column_name, ParquetDataWrite.decimalWrite(hiveData.get(i).toString(), precision, scale));
                    } else {
                        throw new Exception("获取decimal类型异常: 未找到匹配的precision、scale");
                    }
                } else {
                    ArrayList<String> supportType = new ArrayList<>(Constants.hivePrimitiveTypeInfoMap.keySet());
                    supportType.add("decimal");
                    throw new Exception("不支持的数据类型:" + column_type + ",目前仅支持:" + supportType);
                }
            }

        }
        return group;
    }
    
    public static Configuration getHdfsConfiguration(String env) {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", Constants.webHDFSConf.get(env));
        //下面是为了解决打包找不到类,但是在IDEA中可以正常执行
        conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
        conf.set("fs.webhdfs.impl", org.apache.hadoop.hdfs.web.WebHdfsFileSystem.class.getName());
        conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
        //conf.addResource(HadoopUtil.class.getResourceAsStream("/" + env + "/core-site.xml"));
        //conf.addResource(HadoopUtil.class.getResourceAsStream("/" + env + "/hdfs-site.xml"));
        System.setProperty("HADOOP_USER_NAME", "hdfs");
        //配置连接
        return conf;
    }
}

遇到的问题

  1. 通过httpfs写数据,在高并发压测的时候,并发大于20就直接报java.io.IOException: Error writing request body to server
    这个问题纠结了2周,最后查到,是因为httpfs在3.1.3之前有个bug,它的报错日志是不准确/模糊的。把maven的依赖包升级以后,报错就换了
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>3.2.4</version>
    </dependency>
    报错换成:
    ava.io.IOException: java.util.concurrent.TimeoutException: Idle timeout expired: 1000/1000 ms
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121)
        at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:110)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.toIOException(WebHdfsFileSystem.java:546)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:524)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access$200(WebHdfsFileSystem.java:134)
        at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathOutputStreamRunner$1.write(WebHdfsFileSystem.java:1006)
        at org.apache.parquet.bytes.BytesInput$ByteArrayBytesInput.writeAllTo(BytesInput.java:438)
        at org.apache.parquet.hadoop.ParquetFileWriter.writeDictionaryPage(ParquetFileWriter.java:341)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:200)
        at org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:263)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:168)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:143)
        at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:125)
    这里就很很明显看出问题了,就是因为Idle timeout,并且它的超时时间设置成1s,但是,通过查看官网上的设置,默认值是60s,所以,cdh版本的hadoop3.0配置和apache不一样,这里修改httpfs-site.xml配置如下:
    <property>
        <name>hadoop.http.idle_timeout.ms</name>
        <value>600000</value>
        <description>Httpfs Server connection timeout in milliseconds.</description>
    </property>
    修改完该配置后,压测可以达到500,但是在1000并发还是会出现问题。但是,500已经能够满足我们生产需要,这里就不再继续追下去了