hadoop: hdfs: host: hdfs://192.168.0.161:8020 path: /app-logs user: hdfs batch-size: 105267200 #1024*1024*1024 1G batch-rollover-interval: 60000 #1000*60*2 2miniutes kerberos: keytab: C:\\ProgramData\\MIT\\Kerberos5\\hdfs.headless.keytab user: hdfs-test@EMERGEN.COM kerber-conf: C:\\ProgramData\\MIT\\Kerberos5\\krb5.conf
jar包:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>3.2.2</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>3.2.2</version> </dependency>
配置conf
public static Configuration getConf(){ System.out.println(hdfsConf.toString()); Configuration conf = new Configuration(); Kerbers kerberos = hdfsConf.getKerberos(); String user=kerberos.getUser(); String keytab=kerberos.getKeytab(); String krbConf=kerberos.getKerberConf(); conf.set(\"fs.defaultFS\", hdfsConf.getHost()); /** * hadoop使用kerberos验证 */ conf.set(\"hadoop.security.authentication\", \"kerberos\"); /** * hadoop namenode节点的principal(验证实体) */ conf.set(\"dfs.namenode.kerberos.principal\", user); /** * 访问hadoop集群的principal */ conf.set(\"kerberos.principal\", user); /** * 访问hadoop集群的principal对应的keytab文件路径 */ conf.set(\"kerberos.keytab\", keytab); /** * krb5.conf->kerberos客户端配置文件,这个Kerberos配置文件是必须配置的,不然找不到kerberos服务 */ System.setProperty(\"java.security.krb5.conf\", krbConf); UserGroupInformation.setConfiguration(conf); try { //使用待验证的实体,调用loginUserFromKeytab api向hbase进行kerberos验证 UserGroupInformation.loginUserFromKeytab(user, keytab); } catch (Exception ex){ } return conf; }
创建hdfs连接
public static FileSystem getfileSystem() { //HDFS关键类FileSystem //连接的URI URI uri = URI.create(hdfsConf.getHost()); //相关配置 // Configuration conf = new Configuration(); Configuration conf=getConf(); //可以设置副本个数如:conf.set(\"dfs.replication\",\"3\"); //客户端名称 FileSystem fileSystem = null; try { fileSystem=fileSystem.get(conf); // fileSystem = FileSystem.get(uri, conf, hdfsConf.getUser()); } catch (IOException e) { log.error(\"连接HDFS失败\" + e.getMessage()); } catch (Exception e) { log.error(\"连接HDFS失败\" + e.getMessage()); } return fileSystem; }
hdfs 帮助类:
public static FileSystem getfileSystem() { //HDFS关键类FileSystem //连接的URI URI uri = URI.create(hdfsConf.getHost()); //相关配置 // Configuration conf = new Configuration(); Configuration conf=getConf(); //可以设置副本个数如:conf.set(\"dfs.replication\",\"3\"); //客户端名称 FileSystem fileSystem = null; try { fileSystem=fileSystem.get(conf); // fileSystem = FileSystem.get(uri, conf, hdfsConf.getUser()); } catch (IOException e) { log.error(\"连接HDFS失败\" + e.getMessage()); } catch (Exception e) { log.error(\"连接HDFS失败\" + e.getMessage()); } return fileSystem; }
测试用例:
void testHdfs() throws Exception { log.info(\"===============testHdfs======================\"); Path src = new Path(\"D:\\\\eee.txt\"); Path dst = new Path(\"/app-logs/\"); FileSystem fileSystem = HdfsOperator.getfileSystem(); RemoteIterator<LocatedFileStatus> locatedFileStatusRemoteIterator = HdfsOperator.listFiles(fileSystem, new Path(\"/app-logs\"), false); System.out.println(locatedFileStatusRemoteIterator); while(locatedFileStatusRemoteIterator.hasNext()){ LocatedFileStatus next = locatedFileStatusRemoteIterator.next(); System.out.println(next.getPath().toString()); } }
输出结果:
来源:https://www.cnblogs.com/younger5/p/15959651.html
本站部分图文来源于网络,如有侵权请联系删除。