1 import org.apache.hadoop.conf.Configuration; 2 3 import java.util.HashMap; 4 import java.util.Properties; 5 6 public class Propss { 7 static Properties producer_Props = new Properties(); 8 static Properties consumer_Props = new Properties(); 9 static HashMap<String, Object> kafka_Consumer = new HashMap<>(); 10 public static Configuration setConf(Configuration conf){ 11 conf.set(\"hbase.zookeeper.quorum\",\"hadoop106,hadoop107,hadoop108\"); 12 conf.set(\"hbae.zookeeper.property.client\",\"2181\"); 13 return conf; 14 } 15 16 static{ 17 kafka_Consumer.put(\"bootstrap.servers\",\"hadoop106:9092,hadoop107:9092,hadoop108:9092\"); 18 kafka_Consumer.put(\"group.id\", \"test\"); 19 //from beginning 20 kafka_Consumer.put(\"auto.offset.reset\",\"earliest\"); 21 kafka_Consumer.put(\"key.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\"); 22 kafka_Consumer.put(\"value.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\"); 23 24 producer_Props.setProperty(\"bootstrap.servers\",\"hadoop106:9092,hadoop107:9092,hadoop108:9092\"); 25 producer_Props.setProperty(\"ack\",\"all\"); 26 producer_Props.put(\"key.serializer\",\"org.apache.kafka.common.serialization.StringSerializer\"); 27 producer_Props.put(\"value.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\"); 28 producer_Props.put(\"auto.offset.reset\",\"earliest\"); 29 30 consumer_Props.setProperty(\"bootstrap.servers\",\"hadoop106:9092,hadoop107:9092,hadoop108:9092\"); 31 consumer_Props.setProperty(\"group.id\",\"test\"); 32 consumer_Props.put(\"key.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\"); 33 consumer_Props.put(\"value.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\"); 34 consumer_Props.put(\"auto.offset.reset\",\"earliest\"); 35 } 36 }
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.hbase.*; 3 import org.apache.hadoop.hbase.client.*; 4 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 5 import org.apache.hadoop.hbase.util.Bytes; 6 import java.io.IOException; 7 import java.util.ArrayList; 8 import java.util.Iterator; 9 import java.util.List; 10 import java.util.UUID; 11 12 public class HBase_Util { 13 static Configuration con = HBaseConfiguration.create(); 14 static Configuration conf = Propss.setConf(con); 15 static Connection connection; 16 static HBaseAdmin admin; 17 static Table t; 18 19 static { 20 try { 21 connection = ConnectionFactory.createConnection(conf); 22 admin = (HBaseAdmin)connection.getAdmin(); 23 } catch (IOException e) { 24 e.printStackTrace(); 25 } 26 } 27 28 //建表 29 public static void build_Table(String tableName,List<String> FamilyNames) throws Exception { 30 TableDescriptorBuilder buider = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); 31 for (String columnName : FamilyNames) { 32 ColumnFamilyDescriptor info = ColumnFamilyDescriptorBuilder.of(Bytes.toBytes(columnName)); 33 buider.setColumnFamily(info); 34 } 35 TableDescriptor build = buider.build(); 36 admin.createTable(build); 37 System.out.println(\"____build_done____\"); 38 } 39 40 //插入一条数据 41 public static void insert_Row(String tableName,String row,String Family,String qualifier,String value) throws Exception { 42 t = connection.getTable(TableName.valueOf(tableName)); 43 Put put = new Put(Bytes.toBytes(row)); 44 Put put1 = put.addColumn(Bytes.toBytes(Family), Bytes.toBytes(qualifier), Bytes.toBytes(value)); 45 t.put(put1); 46 System.out.println(\"____insert_Row_done____\"); 47 } 48 49 //插入num条数据 50 public static void insert_Batch(String tableName,String row,String Family,String qualifier,String value,Integer num) throws Exception { 51 t = connection.getTable(TableName.valueOf(tableName)); 52 List<Put> list=new ArrayList<>(); 53 for (int i = 0; i < num; i++) { 54 String s = UUID.randomUUID().toString().replaceAll(\"-\", \"\"); 55 Put puts = new Put(Bytes.toBytes(s)); 56 Put putss = puts.addColumn(Bytes.toBytes(Family), Bytes.toBytes(qualifier), Bytes.toBytes(value+i)); 57 list.add(putss); 58 } 59 t.put(list); 60 System.out.println(\"____insert_Batch_done____\"); 61 } 62 63 //删除表 64 public static void drop_Table(String tableName) throws Exception { 65 if (admin.tableExists(TableName.valueOf(tableName))){ 66 admin.disableTable(TableName.valueOf(tableName)); 67 admin.deleteTable(TableName.valueOf(tableName)); 68 System.out.println(\"____drop_Table_done____\"); 69 }else { 70 System.out.println(\"____no_such_Table_found____\"); 71 } 72 73 } 74 75 //删除一条数据 76 public static void delete_Row(String tableName,String row) throws Exception { 77 t = connection.getTable(TableName.valueOf(tableName)); 78 Delete delete = new Delete(Bytes.toBytes(row)); 79 t.delete(delete); 80 System.out.println(\"____delete_Row_done____\"); 81 } 82 83 //特定列过滤查询 84 public static void scan_Filter(String tableName,String Family,String qualifier,CompareOperator compare,byte[] value) throws Exception { 85 t = connection.getTable(TableName.valueOf(tableName)); 86 SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(Family), Bytes.toBytes(qualifier), compare, value); 87 Scan scan = new Scan(); 88 Scan scan1 = scan.setFilter(filter); 89 ResultScanner scanner = t.getScanner(scan1); 90 Iterator<Result> iterator = scanner.iterator(); 91 while (iterator.hasNext()){ 92 Cell[] cells = iterator.next().rawCells(); 93 System.out.println(\"____\"+new String(CellUtil.cloneRow(cells[0]))+\"____\"); 94 for (Cell cell : cells) { 95 System.out.print(new String(CellUtil.cloneRow(cell))); 96 System.out.print(\" - \"); 97 System.out.print(new String(CellUtil.cloneFamily(cell))); 98 System.out.print(\" - \"); 99 System.out.print(new String(CellUtil.cloneQualifier(cell))); 100 System.out.print(\" - \"); 101 System.out.println(new String(CellUtil.cloneValue(cell))); 102 } 103 104 } 105 System.out.println(\"____scan_Filter_done____\"); 106 } 107 //查询一条数据 108 public static void scan_Row(String tableName,String row)throws Exception{ 109 t = connection.getTable(TableName.valueOf(tableName)); 110 Get get = new Get(Bytes.toBytes(row)); 111 Result result = t.get(get); 112 Cell[] cells = result.rawCells(); 113 for (Cell cell : cells) { 114 System.out.print(new String(CellUtil.cloneRow(cell))); 115 System.out.print(\" - \"); 116 System.out.print(new String(CellUtil.cloneFamily(cell))); 117 System.out.print(\" - \"); 118 System.out.print(new String(CellUtil.cloneQualifier(cell))); 119 System.out.print(\" - \"); 120 System.out.println(new String(CellUtil.cloneValue(cell))); 121 } 122 System.out.println(\"____scan_Row_done____\"); 123 } 124 //区间查询数据 125 public static void scan_Rows(String tableName,String row1,String row2)throws Exception{ 126 t = connection.getTable(TableName.valueOf(tableName)); 127 Scan sc=new Scan(Bytes.toBytes(row1),Bytes.toBytes(row2)); 128 ResultScanner scanner = t.getScanner(sc); 129 Iterator<Result> iterator = scanner.iterator(); 130 System.out.println(\"____前闭后开____\"); 131 while (iterator.hasNext()){ 132 Result next = iterator.next(); 133 Cell[] cells = next.rawCells(); 134 System.out.println(\"____\"+new String(CellUtil.cloneRow(cells[0]))+\"____\"); 135 for (Cell cell : cells) { 136 System.out.print(new String(CellUtil.cloneRow(cell))); 137 System.out.print(\" - \"); 138 System.out.print(new String(CellUtil.cloneFamily(cell))); 139 System.out.print(\" - \"); 140 System.out.print(new String(CellUtil.cloneQualifier(cell))); 141 System.out.print(\" - \"); 142 System.out.println(new String(CellUtil.cloneValue(cell))); 143 } 144 } 145 System.out.println(\"____scan_Rows_done____\"); 146 } 147 //查询一条特定列族数据 148 public static void get_value_by_family(String tableName,String row,String family)throws Exception{ 149 t = connection.getTable(TableName.valueOf(tableName)); 150 Get get = new Get(Bytes.toBytes(row)); 151 get.addFamily(Bytes.toBytes(family)); 152 Result result = t.get(get); 153 Cell[] cells = result.rawCells(); 154 for (Cell cell : cells) { 155 System.out.print(new String(CellUtil.cloneRow(cell))); 156 System.out.print(\" - \"); 157 System.out.print(new String(CellUtil.cloneFamily(cell))); 158 System.out.print(\" - \"); 159 System.out.print(new String(CellUtil.cloneQualifier(cell))); 160 System.out.print(\" - \"); 161 System.out.println(new String(CellUtil.cloneValue(cell))); 162 } 163 System.out.println(\"____get_value_by_family_done____\"); 164 } 165 166 //查询一条特定列数据 167 public static void get_value_by_qualifier(String tableName,String row,String family,String qualifier)throws Exception{ 168 t = connection.getTable(TableName.valueOf(tableName)); 169 Get get = new Get(Bytes.toBytes(row)); 170 get.addColumn(Bytes.toBytes(family),Bytes.toBytes(qualifier)); 171 Result result = t.get(get); 172 Cell[] cells = result.rawCells(); 173 for (Cell cell : cells) { 174 System.out.print(new String(CellUtil.cloneRow(cell))); 175 System.out.print(\" - \"); 176 System.out.print(new String(CellUtil.cloneFamily(cell))); 177 System.out.print(\" - \"); 178 System.out.print(new String(CellUtil.cloneQualifier(cell))); 179 System.out.print(\" - \"); 180 System.out.println(new String(CellUtil.cloneValue(cell))); 181 } 182 System.out.println(\"____get_value_by_qualifier_done____\"); 183 } 184 185 //全查某表 186 public static void scan_All(String tableName) throws Exception { 187 t = connection.getTable(TableName.valueOf(tableName)); 188 Scan sc=new Scan(); 189 ResultScanner scanner = t.getScanner(sc); 190 Iterator<Result> iterator = scanner.iterator(); 191 while (iterator.hasNext()){ 192 Result next = iterator.next(); 193 Cell[] cells = next.rawCells(); 194 System.out.println(\"____\"+new String(CellUtil.cloneRow(cells[0]))+\"____\"); 195 for (Cell cell : cells) { 196 System.out.print(new String(CellUtil.cloneRow(cell))); 197 System.out.print(\" - \"); 198 System.out.print(new String(CellUtil.cloneFamily(cell))); 199 System.out.print(\" - \"); 200 System.out.print(new String(CellUtil.cloneQualifier(cell))); 201 System.out.print(\" - \"); 202 System.out.println(new String(CellUtil.cloneValue(cell))); 203 } 204 } 205 System.out.println(\"____scan_All_done____\"); 206 } 207 //查看所有表 208 public static void list() throws Exception { 209 TableName[] tableNames = admin.listTableNames(); 210 for (TableName tableName : tableNames) { 211 System.out.println(tableName.toString()); 212 } 213 214 } 215 //查看某表结构 216 public static void desc_Table(String tableName) throws Exception { 217 List<TableDescriptor> tableDescriptors = admin.listTableDescriptors(); 218 Iterator<TableDescriptor> iterator = tableDescriptors.iterator(); 219 while (iterator.hasNext()){ 220 TableDescriptor next = iterator.next(); 221 if (next.getTableName().toString().equals(tableName)){ 222 System.out.println(next); 223 } 224 } 225 System.out.println(\"____list_done____\"); 226 227 } 228 229 230 //关流 231 public static void stop() throws Exception { 232 connection.close(); 233 System.out.println(\"____connection_close_done____\"); 234 } 235 }
1 import org.apache.hadoop.hbase.CompareOperator; 2 import org.apache.hadoop.hbase.util.Bytes; 3 4 import java.util.Arrays; 5 import java.util.List; 6 7 public class Test { 8 public static void main(String[] args) throws Exception { 9 //查询所有表 10 HBase_Util.list(); 11 //创建表 12 List<String> list = Arrays.asList(\"info\",\"data\"); 13 HBase_Util.build_Table(\"user_info2\",list); 14 //删除某表 15 HBase_Util.drop_Table(\"user_info2\"); 16 //查询某表结构 17 HBase_Util.desc_Table(\"user_info2\"); 18 //插入一条数据 19 HBase_Util.insert_Row(\"user_info2\",\"001\",\"info\",\"name\",\"zhangsan\"); 20 HBase_Util.insert_Row(\"user_info2\",\"001\",\"info\",\"age\",\"18\"); 21 HBase_Util.insert_Row(\"user_info2\",\"001\",\"info\",\"sex\",\"male\"); 22 HBase_Util.insert_Row(\"user_info2\",\"001\",\"data\",\"Math\",\"100\"); 23 HBase_Util.insert_Row(\"user_info2\",\"001\",\"data\",\"Chinese\",\"97\"); 24 HBase_Util.insert_Row(\"user_info2\",\"001\",\"data\",\"English\",\"95\"); 25 26 HBase_Util.insert_Row(\"user_info2\",\"002\",\"info\",\"name\",\"lisi\"); 27 HBase_Util.insert_Row(\"user_info2\",\"002\",\"info\",\"age\",\"19\"); 28 HBase_Util.insert_Row(\"user_info2\",\"002\",\"info\",\"sex\",\"famale\"); 29 HBase_Util.insert_Row(\"user_info2\",\"002\",\"data\",\"Math\",\"70\"); 30 HBase_Util.insert_Row(\"user_info2\",\"002\",\"data\",\"Chinese\",\"100\"); 31 HBase_Util.insert_Row(\"user_info2\",\"002\",\"data\",\"English\",\"99\"); 32 33 HBase_Util.insert_Row(\"user_info2\",\"003\",\"info\",\"name\",\"wangwu\"); 34 HBase_Util.insert_Row(\"user_info2\",\"003\",\"info\",\"age\",\"20\"); 35 HBase_Util.insert_Row(\"user_info2\",\"003\",\"info\",\"sex\",\"male\"); 36 HBase_Util.insert_Row(\"user_info2\",\"003\",\"data\",\"Math\",\"65\"); 37 HBase_Util.insert_Row(\"user_info2\",\"003\",\"data\",\"Chinese\",\"50\"); 38 HBase_Util.insert_Row(\"user_info2\",\"003\",\"data\",\"English\",\"49\"); 39 //全查某表 40 HBase_Util.scan_All(\"user_info2\"); 41 //查询一条记录 42 HBase_Util.scan_Row(\"user_info2\",\"001\"); 43 //查询一条记录的 特定列族的 数据 44 HBase_Util.get_value_by_family(\"user_info2\",\"001\",\"info\"); 45 //查询一条记录的 特定列的 数据 46 HBase_Util.get_value_by_qualifier(\"user_info2\",\"001\",\"data\",\"Math\"); 47 //查询区间数据 [前闭后开) 48 HBase_Util.scan_Rows(\"user_info2\",\"001\",\"003\"); 49 //特定列过滤查询 50 HBase_Util.scan_Filter(\"user_info2\",\"data\",\"Math\", CompareOperator.GREATER, Bytes.toBytes(60)); 51 //删除某一条记录 52 HBase_Util.delete_Row(\"user_info2\",\"003\"); 53 54 HBase_Util.stop(); 55 } 56 }
来源:https://www.cnblogs.com/chang09/p/15986859.html
本站部分图文来源于网络,如有侵权请联系删除。