本文主要包括:
- HttpFS高并发写HDFS
HttpFS配置与启动
这个网上很多,这里就不赘述了
HttpFS与WebHDFS的区别
WebHDFS概述
WebHDFS 提供了访问HDFS的RESTful接口,内置组件,默认开启。 WebHDFS 使得集群外的客户端可以不用安装HADOOP和JAVA环境就可以对HDFS进行访问,且客户端不受语言限制。 当客户端请求某文件时,WebHDFS会将其重定向到该资源所在的datanode。端口默认是9870
说到底:WebHDFS是为了让用户可以使用Restful风格访问HDFS,但是,它还是通过直接与datanode通信写数据,写数据的协议还是用的hdfs协议。性能比较高
但是,使用WebHDFS还是需要client与hadoop集群在一个内网内,如果client在外网,就访问不到datanode
其他的概述可以在网上看,可以参考HDFS API的RESTful风格–WebHDFS
HttpFS概述
HttpFS 是一个提供 REST HTTP 网关的服务器,支持所有 HDFS 文件系统操作(读取和写入)。它可以与 webhdfs REST HTTP API 互操作。
- HttpFS 是一个提供RESTful 接口的网关的服务器,该网关支持所有HDFS文件系统操作
- 对于文件CURD的操作全部提交给HttpFS服务进行中转,然后由HttpFS去跟HDFS集群交互
- HttpFS是一个独立于HDFS的服务,若使用需要手动安装。本质上是一个代理服务
- HttpFS本身是Java Web应用程序。使用内置的Jetty服务器对外提供服务
- HttpFS默认端口号为14000
WebHDFS 与 HttpFs 之间的主要区别
WebHDFS 需要访问集群的所有节点,并且当读取某些数据时,它会直接从该节点传输,而在 HttpFs 中,单个节点的作用类似于“网关”,并且将 是到客户端节点的单点数据传输。 因此,HttpFs 在大文件传输期间可能会被阻塞,但好处是我们最大限度地减少了访问 HDFS 所需的占用空间。
Java通过HttpFS写数据到HDFS
写Partquet/Textfile文件
写parquet文件网上资料很多,我这里主要是借助hive-exec包来对数据类型转换,否则自己转换,怕出现数据准确性问题,特别是decimal/timestamp
package com.digiwin.dmp.utils;
import com.digiwin.dmp.constant.Constants;
import com.digiwin.dmp.model.HiveTableMeta;
import com.digiwin.dmp.model.TbbSourceBean;
import com.digiwin.dmp.parquet.ParquetDataSchema;
import com.digiwin.dmp.parquet.ParquetDataWrite;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.example.ExampleParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class HiveTableWriteUtil {
private static final Logger logger = LogManager.getLogger(ImpalaJdbcUtil.class);
private static final String PARQUET = "parquet";
private static final String TEXTFILE = "textfile";
public static TbbSourceBean getSourceData() throws IOException {
InputStream inputStream = HiveTableWriteUtil.class.getResourceAsStream("/tbb_table_parquet_format.json");
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
String line = reader.readLine();
TbbSourceBean tbbSourceBean = GsonUtil.fromJson(line, TbbSourceBean.class);
return tbbSourceBean;
}
public static void main(String[] args) throws Exception {
TbbSourceBean tbbSourceBean = getSourceData();
ArrayList<Map<String,Object>> partition = new ArrayList<>();
doFileLoad("test_dev", "test", "tbb_table_parquet_format",partition,tbbSourceBean.getData(),tbbSourceBean.getSchema(),TEXTFILE,"\u0001",true);
}
/**
* 入口方法
* @param env Constants.TEST_DEV/PROD_HW/TEST_HW/TEST_HW/PROD_AZURE
* @param database 数据库
* @param tableName 表名
* @param partitionList 分区列表,如果不是分区表,new JSONArray()即可
* @param hiveData hive表的数据
* @param hiveTableMeta 列信息
* @param fileType parquet/textfile
* @param delimiter 默认:\u0001
* @param isTruncate true/false
* @throws Exception
*/
public static void doFileLoad(String env, String database, String tableName, ArrayList<Map<String,Object>> partitionList, List<List<Object>> hiveData,List<HiveTableMeta> hiveTableMeta ,String fileType,String delimiter, boolean isTruncate) throws Exception {
int maxRetryCount = 5;
int currentRetryCount = 0;
while (currentRetryCount < maxRetryCount) {
try {
if(PARQUET.equals(fileType)){
writeJSONDataWithParquet(env,database,tableName,partitionList,hiveData,hiveTableMeta,isTruncate);
} else if (TEXTFILE.equals(fileType)) {
writeJSONDataWithText(env,database,tableName,partitionList,hiveData,hiveTableMeta,delimiter,isTruncate);
} else {
throw new Exception("当前仅支持parquet、textfile");
}
break;
} catch (Exception e) {
e.printStackTrace();
}
currentRetryCount++;
}
if (currentRetryCount == maxRetryCount) {
throw new Exception("<============= 数据写入异常,已经重试5次,本次写入失败,请联系中台人员 =============>");
}
}
/**
* 这里的hiveData就是tbb从地端接受过来的jsonarray,这种数据只会遍历一次,时间复杂度低
* @param env
* @param database
* @param tableName
* @param partitionList
* @param hiveData
* {"schema":[{"column_name":"TB003","column_type":"string"},{"column_name":"MB002","column_type":"string"},{"column_name":"TA001_2","column_type":"string"}],
* "data":[["004BYS015C30102-075","Y閥鑄件本體 WCB","511-1110707017"]]}
* @param hiveTableMeta
* @param isTruncate
* @throws Exception
*/
public static void writeJSONDataWithParquet(String env, String database, String tableName, ArrayList<Map<String,Object>> partitionList, List<List<Object>> hiveData,List<HiveTableMeta> hiveTableMeta, boolean isTruncate) throws Exception {
Configuration conf = HadoopUtil.getHdfsConfiguration(env);
FileSystem fileSystem = FileSystem.get(conf);
String defaultPath = getHiveTableDefaultPath(fileSystem,database, tableName, partitionList,false);
String defaultPathTemp = getHiveTableDefaultPath(fileSystem,database, tableName, partitionList,true);
Path hdfsFilePath = getTableHdfsFilePath(defaultPath,PARQUET);
Path hdfsFilePathTemp = getTableHdfsFilePath(defaultPathTemp,PARQUET);
try{
MessageType messageType = getParquetSchema(hiveTableMeta);
ExampleParquetWriter.Builder builder = ExampleParquetWriter
.builder(hdfsFilePathTemp)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withConf(conf)
.withRowGroupSize(1024*1024*1) //1M数据flush一次,防止webhdfs性能问题导致Error Writing request body to server 问题
.withType(messageType);
ParquetWriter<Group> writer = builder.build();
SimpleGroupFactory groupFactory = new SimpleGroupFactory(messageType);
for (int i = 0; i < hiveData.size(); i++) {
Group group = groupFactory.newGroup();
writer.write(jsonArrayConvernParquetGroup(group, hiveData.get(i), hiveTableMeta));
}
writer.close();
} catch (Exception e){
throw new Exception("写入parquet文件到" + hdfsFilePathTemp.getName() + "失败,请检查" + e.getMessage());
}
// 移动数据文件到目标文件夹
moveFileToTargetPath(fileSystem,isTruncate,hdfsFilePathTemp,hdfsFilePath,defaultPath);
}
/**
* 写text文件到hdfs,不传入metadata,数据类型为[[1,2,3,4],[2,3,4,5]]
* 此方法性能比上面的高,但对用户不友好
* @param env
* @param database
* @param tableName
* @param partitionList
* @param hiveData
* @param delimiter
* @throws Exception
*/
public static void writeJSONDataWithText(String env, String database, String tableName, ArrayList<Map<String,Object>> partitionList, List<List<Object>> hiveData,List<HiveTableMeta> hiveTableMeta,String delimiter,boolean isTruncate) throws Exception {
Configuration conf = HadoopUtil.getHdfsConfiguration(env);
FileSystem fileSystem = FileSystem.get(conf);
String defaultPath = getHiveTableDefaultPath(fileSystem,database, tableName, partitionList,false);
String defaultPathTemp = getHiveTableDefaultPath(fileSystem,database, tableName, partitionList,true);
Path hdfsFilePath = getTableHdfsFilePath(defaultPath,TEXTFILE);
Path hdfsFilePathTemp = getTableHdfsFilePath(defaultPathTemp,TEXTFILE);
try{
FSDataOutputStream outputStream = fileSystem.create(hdfsFilePathTemp);
for (int i = 0; i < hiveData.size(); i++) {
List<String> colArray = hiveData.get(i).stream().map(Object::toString).collect(Collectors.toList());
if (colArray.size() == hiveTableMeta.size()){
String colValue = String.join(delimiter, colArray) + "\n";
outputStream.write(colValue.getBytes());
} else {
logger.warn("数据列数:" + colArray.size() + " != 表列数:" + hiveTableMeta.size());
}
}
// 关闭流
outputStream.close();
// 移动数据文件到目标文件夹
moveFileToTargetPath(fileSystem,isTruncate,hdfsFilePathTemp,hdfsFilePath,defaultPath);
} catch (Exception e){
throw new Exception("写入text文件到" + hdfsFilePathTemp.getName() + "失败,请检查" + e.getMessage());
}
}
/**
* 删除指定文件夹下所有文件
* @param fileSystem
* @param path
*/
public static void truncateHdfsData(FileSystem fileSystem , Path path){
try{
if(fileSystem.exists(path)){
FileStatus[] fileStatuses = fileSystem.listStatus(path);
for (FileStatus fileStatus : fileStatuses) {
fileSystem.delete(fileStatus.getPath(),true);
}
}
} catch (IOException ioException){
throw new RuntimeException("删除文件失败:"+ ioException.getMessage());
}
}
private static void moveFileToTargetPath(FileSystem fileSystem ,boolean isTruncate,Path hdfsFilePathTemp,Path hdfsFilePath,String defaultPath) throws IOException {
// 写入前清理数据
if(isTruncate){
Path path = new Path(defaultPath);
logger.info("正在清理:" + path.toUri() + "下数据");
truncateHdfsData(fileSystem,path);
}
if(fileSystem.rename(hdfsFilePathTemp,hdfsFilePath)){
logger.info("数据移动到目标文件夹:" + "成功");
} else {
throw new IOException("数据移动到目标文件夹:失败");
}
}
/**
* 获取表在hdfs的路径
* @param database
* @param tableName
* @param partitionList dt=20230111
* @return
*/
public static String getHiveTableDefaultPath(FileSystem fileSystem, String database, String tableName, ArrayList<Map<String,Object>> partitionList, boolean isTmp) {
String defaultPath = "" ;
try{
defaultPath = isTmp ? String.format("/tmp/hive/dofileload/%s.db/%s/", database, tableName) :
String.format("/user/hive/warehouse/%s.db/%s/", database, tableName);
for (int i = 0; i < partitionList.size(); i++) {
Map<String,Object> partition = partitionList.get(i);
for (Object key: partition.keySet()){
defaultPath += key + "=" + partition.get(key) + "/";
}
}
if (!fileSystem.exists(new Path(defaultPath))){
logger.info( "hdfs上路径不存在,创建: " + defaultPath);
fileSystem.mkdirs(new Path(defaultPath));
}
} catch (IOException e) {
throw new RuntimeException("创建HDFS路径:" + defaultPath + "失败,请检查:" + e.getMessage() );
}
return defaultPath;
}
/**
* 获取表在hdfs的路径
* @param defaultPath
* @param fileType
* @return
*/
public static Path getTableHdfsFilePath(String defaultPath,String fileType){
long timestamp = System.currentTimeMillis();
String fileName = DigestUtils.md5Hex(defaultPath) + "_" + timestamp + (PARQUET.equals(fileType) ? ".parq" : ".text");
defaultPath += fileName;
return new Path(defaultPath);
}
/**
* 获取parqut的MessageType
* @param tableMetaData
* @return
* @throws Exception
*/
public static MessageType getParquetSchema(List<HiveTableMeta> tableMetaData) throws Exception {
List<String> columnNames = new ArrayList<>();
List<TypeInfo> columnTypes = new ArrayList<>();
for (HiveTableMeta hiveTableMeta : tableMetaData) {
columnNames.add(hiveTableMeta.getColumn_name());
columnTypes.add(hiveTypeConvernParquetTypeInfo(hiveTableMeta.getColumn_type()));
}
MessageType messageType = ParquetDataSchema.convert(columnNames, columnTypes);
return messageType;
}
/**
* 转换hive 列类型到TypeInfo类型,目前仅支持基础类型,不支持复杂类型(array,map,struct等)
*
* @param value
* @return
* @throws Exception
*/
private static TypeInfo hiveTypeConvernParquetTypeInfo(String value) throws Exception {
if (Constants.hivePrimitiveTypeInfoMap.containsKey(value)) {
return Constants.hivePrimitiveTypeInfoMap.get(value);
} else if (value.startsWith("decimal")) {
String input = value.replaceAll(" ", "");
// 使用正则表达式匹配并提取数字
Pattern pattern = Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
Matcher matcher = pattern.matcher(input.trim());
if (matcher.find()) {
int precision = Integer.parseInt(matcher.group(1));
int scale = Integer.parseInt(matcher.group(2));
return new DecimalTypeInfo(precision, scale);
} else {
throw new Exception("获取decimal类型异常: 未找到匹配的precision、scale");
}
} else {
ArrayList<String> supportType = new ArrayList<>(Constants.hivePrimitiveTypeInfoMap.keySet());
supportType.add("decimal");
throw new Exception("不支持的数据类型:" + value + ",目前仅支持:" + supportType);
}
}
/**
*
* @param group
* @param hiveData
* @param tableMetaData
* @return
* @throws Exception
*/
private static Group jsonArrayConvernParquetGroup(Group group, List<Object> hiveData, List<HiveTableMeta> tableMetaData) throws Exception {
for (int i = 0; i < tableMetaData.size(); i++) {
String column_name = tableMetaData.get(i).getColumn_name();
String column_type = tableMetaData.get(i).getColumn_type();
if(hiveData.size() == tableMetaData.size() && null != hiveData.get(i)){
if (TypeInfoFactory.booleanTypeInfo.getTypeName().equals(column_type)) {
group.append(column_name, ParquetDataWrite.booleanDataWriter(Boolean.parseBoolean(hiveData.get(i).toString())));
} else if (TypeInfoFactory.intTypeInfo.getTypeName().equals(column_type)) {
group.append(column_name, ParquetDataWrite.intWriter(Integer.parseInt(hiveData.get(i).toString())));
} else if (TypeInfoFactory.longTypeInfo.getTypeName().equals(column_type)) {
group.append(column_name, ParquetDataWrite.longWriter(Long.parseLong(hiveData.get(i).toString())));
} else if (TypeInfoFactory.stringTypeInfo.getTypeName().equals(column_type) || TypeInfoFactory.charTypeInfo.getTypeName().equals(column_type)
|| TypeInfoFactory.varcharTypeInfo.getTypeName().equals(column_type)) {
group.append(column_name, ParquetDataWrite.stringWriter(hiveData.get(i).toString()));
} else if (TypeInfoFactory.floatTypeInfo.getTypeName().equals(column_type)) {
group.append(column_name, ParquetDataWrite.floatWriter(Float.parseFloat(hiveData.get(i).toString())));
} else if (TypeInfoFactory.doubleTypeInfo.getTypeName().equals(column_type)) {
group.append(column_name, ParquetDataWrite.doubleDataWriter(Double.parseDouble(hiveData.get(i).toString())));
} else if (TypeInfoFactory.shortTypeInfo.getTypeName().equals(column_type)) {
group.append(column_name, ParquetDataWrite.shortDataWriter(Short.parseShort(hiveData.get(i).toString())));
} else if (TypeInfoFactory.dateTypeInfo.getTypeName().equals(column_type)) {
//这里需要注意下,验证一下是否正常
group.append(column_name, ParquetDataWrite.dateWrite(java.sql.Date.valueOf(hiveData.get(i).toString())));
} else if (TypeInfoFactory.timestampTypeInfo.getTypeName().equals(column_type)) {
group.append(column_name, ParquetDataWrite.timestampWrite(Timestamp.valueOf(hiveData.get(i).toString())));
} else if (column_type.startsWith("decimal")) {
String input = column_type.replaceAll(" ", "");
// 使用正则表达式匹配并提取数字
Pattern pattern = Pattern.compile("decimal\\((\\d+),(\\d+)\\)");
Matcher matcher = pattern.matcher(input.trim());
if (matcher.find()) {
int precision = Integer.parseInt(matcher.group(1));
int scale = Integer.parseInt(matcher.group(2));
group.append(column_name, ParquetDataWrite.decimalWrite(hiveData.get(i).toString(), precision, scale));
} else {
throw new Exception("获取decimal类型异常: 未找到匹配的precision、scale");
}
} else {
ArrayList<String> supportType = new ArrayList<>(Constants.hivePrimitiveTypeInfoMap.keySet());
supportType.add("decimal");
throw new Exception("不支持的数据类型:" + column_type + ",目前仅支持:" + supportType);
}
}
}
return group;
}
public static Configuration getHdfsConfiguration(String env) {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", Constants.webHDFSConf.get(env));
//下面是为了解决打包找不到类,但是在IDEA中可以正常执行
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.webhdfs.impl", org.apache.hadoop.hdfs.web.WebHdfsFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
//conf.addResource(HadoopUtil.class.getResourceAsStream("/" + env + "/core-site.xml"));
//conf.addResource(HadoopUtil.class.getResourceAsStream("/" + env + "/hdfs-site.xml"));
System.setProperty("HADOOP_USER_NAME", "hdfs");
//配置连接
return conf;
}
}
遇到的问题
- 通过httpfs写数据,在高并发压测的时候,并发大于20就直接报
java.io.IOException: Error writing request body to server
这个问题纠结了2周,最后查到,是因为httpfs在3.1.3之前有个bug,它的报错日志是不准确/模糊的。把maven的依赖包升级以后,报错就换了
报错换成:<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.2.4</version> </dependency>
这里就很很明显看出问题了,就是因为ava.io.IOException: java.util.concurrent.TimeoutException: Idle timeout expired: 1000/1000 ms at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:121) at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:110) at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.toIOException(WebHdfsFileSystem.java:546) at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.validateResponse(WebHdfsFileSystem.java:524) at org.apache.hadoop.hdfs.web.WebHdfsFileSystem.access$200(WebHdfsFileSystem.java:134) at org.apache.hadoop.hdfs.web.WebHdfsFileSystem$FsPathOutputStreamRunner$1.write(WebHdfsFileSystem.java:1006) at org.apache.parquet.bytes.BytesInput$ByteArrayBytesInput.writeAllTo(BytesInput.java:438) at org.apache.parquet.hadoop.ParquetFileWriter.writeDictionaryPage(ParquetFileWriter.java:341) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:200) at org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:263) at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:168) at org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:143) at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:125)
Idle timeout
,并且它的超时时间设置成1s,但是,通过查看官网上的设置,默认值是60s,所以,cdh版本的hadoop3.0配置和apache不一样,这里修改httpfs-site.xml
配置如下:
修改完该配置后,压测可以达到500,但是在1000并发还是会出现问题。但是,500已经能够满足我们生产需要,这里就不再继续追下去了<property> <name>hadoop.http.idle_timeout.ms</name> <value>600000</value> <description>Httpfs Server connection timeout in milliseconds.</description> </property>