HDFS核心API编程案例

  • 删除HDFS集群中所有的空文件和空目录
  • 使用流的方式上传下载文件
  • 统计HDFS文件系统中文件大小小于HDFS集群中默认块大小的文件占比
  • 统计出HDFS文件系统中平均副本数

准备工作

Hadoop集群环境搭建–>将”windows平台编译hadoop安装包”解压,并配置环境变量–>准备hadoop-eclipse-plugin.jar插件,配置到eclipse–>eclipse中进入Map/Reduce Locations配置集群信息–>Add User Library–>添加common、hdfs、mapreduce、yarn相关依赖库–>新建Java项目开始编写代码

做那么多操作,无非是要做到在本地eclipse中编写的程序能够操作HDFS集群中的文件

公共工具类

HDFSUtils.java:初始化FileSystem对象和关闭FileSystem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

/**
* @author pross shawn
*
* create time:2018年3月14日
*
* content:初始化FileSystem对象和关闭FileSystem
*/
public class HDFSUtils {
public static FileSystem fs=null;

/**
* 初始化FileSystem对象
* @throws Exception
*/
public static void initFileSystem() throws Exception{
Configuration conf=new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");
fs=FileSystem.get(conf);
}

/**
* 关闭FileSystem的连接
* @throws Exception
*/
public static void closeFileSystem() throws Exception{
fs.close();
}
}

所需要的依赖包:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;

import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;

删除HDFS集群中所有空文件和空目录

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
/**
* 删除HDFS集群中的所有空文件和空目录
*
* @throws Exception
*/
public static void deleteEmptyDir(Path path) throws Exception {
HDFSUtils.initFileSystem();
// 当前路径就是空目录时
FileStatus[] listFile = HDFSUtils.fs.listStatus(path);
if (listFile.length == 0) {
//删除空目录
HDFSUtils.fs.delete(path, true);
return;
}

//如果不是空文件,先获取指定目录下的文件和子目录
RemoteIterator<LocatedFileStatus> listLocatedStatus = HDFSUtils.fs.listLocatedStatus(path);

while (listLocatedStatus.hasNext()) {
LocatedFileStatus next = listLocatedStatus.next();
//获取当前目录和其父目录
Path currentPath = next.getPath();
Path parentPath=next.getPath().getParent();

// 如果是文件夹,继续往下遍历
if (next.isDirectory()) {

// 如果是空目录,删除
if (HDFSUtils.fs.listStatus(currentPath).length == 0) {
HDFSUtils.fs.delete(currentPath, true);
if(HDFSUtils.fs.listStatus(parentPath).length==0){
HDFSUtils.fs.delete(parentPath, true);
}
} else {
// 不是空目录,那么则重新遍历
if (HDFSUtils.fs.exists(currentPath)) {
AchieveClass.deleteEmptyDir(currentPath);
}
}

// 如果是文件
} else {
// 获取文件的长度
long fileLength = next.getLen();
// 当文件是空文件时, 删除
if (fileLength == 0) {
HDFSUtils.fs.delete(currentPath, true);
}
}
// 当空文件夹或者空文件删除时,有可能导致父文件夹为空文件夹,这里需要判断一下
int length = HDFSUtils.fs.listStatus(parentPath).length;
if(length == 0){
HDFSUtils.fs.delete(parentPath, true);
}
}
HDFSUtils.closeFileSystem();
}

使用流的方式上传下载文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 使用流的方式上传文件
* @param srcPath 上传的本地路径
* @param desPath 上传到HDFS上后的文件名称路径
* @throws Exception
*/
public static void putFileByStream(String srcPath,String desPath) throws Exception{
HDFSUtils.initFileSystem();
InputStream in = new FileInputStream(new File(srcPath));
//Path是HDFS上的文件路径
FSDataOutputStream out = HDFSUtils.fs.create(new Path(desPath));
IOUtils.copyBytes(in, out,4096,true);
System.out.println("put successfully");
HDFSUtils.closeFileSystem();
}

/**
* 使用流的方式下载文件
* @param srcPath HDFS上的下载文件的路径
* @param desPath 下载到本地的文件路径
* @throws Exception
*/
public static void getFileByStream(Path srcPath,File desPath) throws Exception{
HDFSUtils.initFileSystem();
FSDataInputStream in=HDFSUtils.fs.open(srcPath);
OutputStream out=new FileOutputStream(desPath);
IOUtils.copyBytes(in, out,4096,true);
HDFSUtils.closeFileSystem();
}

统计HDFS文件系统中文件大小小于HDFS集群中默认块大小的文件占比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 统计出 HDFS文件系统中文件大小小于 HDFS集群中的默认块大小的文件占比
* 默认块大小为128MB
* @throws Exception
*/
public static void lessBlockSizeOfFile() throws Exception {
HDFSUtils.initFileSystem();
FileStatus[] listStatus = HDFSUtils.fs.listStatus(new Path("/"));
// 文件总数
int count = listStatus.length;
// 小于block大小的文件数个数
int lessBlock = 0;
// 遍历
for (int i = 0; i < count; i++) {
//如果文件大小小于128M,这lessBlock+1
if (listStatus[i].getLen() <= 134217728) {
lessBlock += 1;
}
}
System.out.println("文件总数量为:" + count + "个\n小于默认block的文件数量为:" + lessBlock + "个" + "\n文件大小小于默认块大小的文件占比:"
+ (lessBlock*1D / count) * 100 + "%");
HDFSUtils.closeFileSystem();
}

统计出HDFS文件系统中平均副本数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* HDFS文件系统中的平均副本数(副本总数/总数据块数)
* @throws Exception
*/
public static void avgRepofBlock() throws Exception {
HDFSUtils.initFileSystem();
// 副本总数
int repCount = 0;
// 数据块总数
int blockCount = 0;

RemoteIterator<LocatedFileStatus> listFiles = HDFSUtils.fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus next = listFiles.next();
int BlockNum = next.getBlockLocations().length;
if (BlockNum != 0) {
int repNum = next.getReplication();
int oneRepCount = BlockNum * repNum;
repCount += oneRepCount;
blockCount += BlockNum;
}
}
System.out.println("副本总数:" + repCount + "\n数据块总数:" + blockCount + "\n平均副本数:" + repCount*1D / blockCount);
HDFSUtils.closeFileSystem();
}