1. HBase-client方式实现
1.1 依赖
!--HBase依赖坐标-->
dependency>
groupId>org.apache.hbase/groupId>
artifactId>hbase-client/artifactId>
version>1.2.6/version>
/dependency>
dependency>
groupId>org.apache.hbase/groupId>
artifactId>hbase-server/artifactId>
version>1.2.6/version>
exclusions>!--排除依赖:不加入这句会报错-->
exclusion>
groupId>*/groupId>
artifactId>*/artifactId>
/exclusion>
/exclusions>
/dependency>
1.2 配置及代码
1.2.1 get方式
public class HBaseService {
private static final Logger logger = LoggerFactory.getLogger(HBaseService.class);
/**
* 配置文件读取的配置信息
*/
static Configuration configuration = HBaseConfiguration.create();
/**
* 链接信息
*/
private static Connection conn = null;
static {
try {
conn = ConnectionFactory.createConnection(configuration);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 进行数据的查询以及写入到文件中(通过get方式查询获得数据并写入文件)
* @param rowKey rowKey信息
* @param tableName 表名
* @param dirName 文件目录
* @param fileExist 文件是否存在的标志
*/
public static void addInfoToFile(String rowKey, String tableName, String dirName, boolean fileExist){
Table table = null;
ResultScanner result = null;
try {
Connection connection = ConnectionFactory.createConnection(configuration);
table = connection.getTable(TableName.valueOf(tableName));
ListGet> gets = new ArrayList();
Get get = new Get(Bytes.toBytes(rowKey));
gets.add(get);
// result的集合
Result[] resultArr = table.get(gets);
MapString, MapString,String>> dataMap = new HashMap();
for (Result r : resultArr) {
String rowKey1 = Bytes.toString(r.getRow());
MapString, String> columnDataMap;
if (dataMap.containsKey(rowKey1)){
columnDataMap = dataMap.get(rowKey1);
}else {
columnDataMap = new HashMap();
}
for (Cell kv : r.rawCells()) {
String qualifire = Bytes.toString(CellUtil.cloneQualifier(kv));
String value = Base64Encoder.encode(CellUtil.cloneValue(kv));
columnDataMap.put(qualifire, value);
dataMap.put(rowKey1, columnDataMap);
}
}
if (MapUtil.isNotEmpty(dataMap)){
for (String r : dataMap.keySet()) {
MapString, String> columnMap = dataMap.get(r);
StrBuilder lineStr = new StrBuilder();
lineStr.append(r + "||");
for (String s : columnMap.keySet()) {
lineStr.append(s + ":" + columnMap.get(s) + "t");
}
String fileName = dirName + File.separator + "data.txt";
File f = new File(fileName);
if (!f.exists()){
try {
f.createNewFile();
}catch (IOException e){
logger.error("创建文件失败,异常信息:{}", e.getMessage());
}
}
BufferedWriter writer = new BufferedWriter(
new FileWriter(fileName, true));
writer.write(lineStr.toString() + "n");
logger.info("写入rowkey:{}的波形数据到:{}", r, fileName);
writer.close();
}
}
}catch (Exception e){
logger.error("写入rowkey:{}的波形数据到:{}失败,错误的信息:{}", rowKey, dirName, e.getMessage());
}
}
}
1.3.1 Scan方式
/**
* 通过scan的方式进行数据获取
* @param rowKey rowkey
* @param startKey 开始的rowKey
* @param stopKey 结束的rowKey
* @param regexStr rowKey的正则匹配表达式
*/
public static void findRowKey(String rowKey, String startKey, String stopKey, String regexStr){
Table table = null;
ResultScanner result = null;
try {
TableName[] tbs = conn.getAdmin().listTableNames();
FilterList filt服务器托管网ers = new FilterList();
table = conn.getTable(TableName.valueOf("Vibration_WaveData"));
Scan scan = new Scan();
// 通过正则匹配的方式+rowkey进行数据过滤
RegexStringComparator regexComparator = new RegexStringComparator(regexStr);
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, regexComparator);
// 设置start和stop Rowkey 可以提供检索效率
scan.setStartRow(startKey.getBytes());
scan.setStopRow(stopKey.getBytes());
scan.setFilter(rowFilter);
// 每次从服务器端获取的行数
scan.setCaching(100000);
ResultScanner result1 = table.getScanner(scan);
for (Result r : result1) {
for (KeyValue kv : r.raw()) {
System.out.println(String.format("row:%s, family:%s, qualifier:%s, qualifiervalue:%s, times服务器托管网tamp:%s.",
Bytes.toString(kv.getRow()),
Bytes.toString(kv.getFamily()),
Bytes.toString(kv.getQualifier()),
Bytes.toString(kv.getValue()),
kv.getTimestamp()));
}
}
result1.close();
conn.close();
}catch (Exception e){
System.out.println(e.getMessage());
}
}
2. mapReduce实现
2.1 依赖
!--hadoop依赖坐标-->
dependency>
groupId>org.apache.hadoop/groupId>
artifactId>hadoop-common/artifactId>
version>2.7.6/version>
/dependency>
dependency>
groupId>org.apache.hadoop/groupId>
artifactId>hadoop-mapreduce-client-jobclient/artifactId>
version>2.7.6/version>
/dependency>
dependency>
groupId>commons-cli/groupId>
artifactId>commons-cli/artifactId>
version>1.2/version>
/dependency>
dependency>
groupId>org.apache.hadoop/groupId>
artifactId>hadoop-client/artifactId>
version>2.7.6/version>
/dependency>
2.2 配置文件
hbase-site.xml:
?xml version="1.0" encoding="UTF-8"?>
?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
configuration>
property>
!-- 指定 hbase 是分布式的 -->
name>hbase.cluster.distributed/name>
value>true/value>
/property>
property>
!-- 指定 zk 的地址,多个用“,”分割 -->
name>hbase.zookeeper.quorum/name>
value>192.168.1.100:2181,192.168.1.102:2181/value>
/property>
!-- 开启 uber 模式,默认关闭 -->
property>
name>mapreduce.job.ubertask.enable/name>
value>true/value>
/property>
!-- uber 模式中最大的 mapTask 数量,可向下修改 -->
property>
name>mapreduce.job.ubertask.maxmaps/name>
value>9/value>
/property>
!-- uber 模式中最大的 reduce 数量,可向下修改 -->
property>
name>mapreduce.job.ubertask.maxreduces/name>
value>1/value>
/property>
!-- uber 模式中最大的输入数据量,默认使用 dfs.blocksize 的值,可向下修改 -->
property>
name>mapreduce.job.ubertask.maxbytes/name>
value>/value>
/property>
/configuration>
2.3 导出的代码
public class ReadHbaseDataByMRToHDFS {
static Configuration configuration = HBaseConfiguration.create();
/**
* 进行hbase数据导出的操作
* @param tableName 表名
* @param dirName 文件夹名称
* @param startRow 开始的row key
* @param stopRow 结束的row key
* @param regexStr 进行匹配的字符
*/
public void exportHbaseData(String tableName, String dirName, String startRow, String stopRow, String regexStr) {
logger.info("开始进行HBase数据导出,tableName:{}, dirName:{}, startRow:{}, stopRow:{}, regexStr:{}", tableName, dirName, startRow, stopRow, regexStr);
System.setProperty("HADOOP_USER_NAME", "root");
// 一次rpc请求的超时时间,如果某次RPC请求超过该值,客户端就会主动管理Socket
configuration.set("hbase.rpc.timeout", "600000");
// ,该参数是表示HBase客户端发起一次scan操作的rpc调用至得到响应之间总的超时时间
configuration.set("hbase.client.scanner.timeout.period", "600000");
configuration.set("mapreduce.job.ubertask.maxmaps", "10");
configuration.set("mapreduce.job.ubertask.maxreduces", "1");
configuration.set("mapreduce.task.io.sort.mb", "1024");
configuration.set("mapred.map.tasks", "10");
try {
Job job = Job.getInstance(configuration);
job.setJarByClass(ReadHbaseDataByMRToHDFS.class);
//设置reduce个数
job.setNumReduceTasks(0);
//设置map
Scan scan = new Scan();
// 设置start和stop rowkey以及regex提高检索效率
RegexStringComparator regexComparator = new RegexStringComparator(regexStr);
scan.setStartRow(startRow.getBytes()).setStopRow(stopRow.getBytes());
RowFilter rowFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, regexComparator);
scan.setFilter(rowFilter);
// 每次从服务器端获取的行数
scan.setCaching(900000);
//参数false,关于添加依赖jar
TableMapReduceUtil.initTableMapperJob(tableName,
scan,
ReadHBaseDataByMRToHDFSMapper.class,
Text.class,
NullWritable.class,
job,
false);
//输出目录
FileOutputFormat.setOutputPath(job, new Path(dirName));
//提交
boolean isDone = job.waitForCompletion(true);
if (isDone){
Thread.sleep(3000);
logger.info("进行HBase数据导出成功,tableName:{}, dirName:{}, startRow:{}, stopRow:{}, regexStr:{},状态:{}", tableName, dirName, startRow, stopRow, regexStr, isDone);
}
} catch (Exception e) {
logger.error("进行HBase数据导出时出现异常,tableName:{}, dirName:{}, startRow:{}, stopRow:{}, regexStr:{},异常信息:{}",
tableName, dirName, startRow, stopRow, regexStr, e.getMessage());
}
}
/**
* 参数
* ImmutableBytesWritable
* Result :HBase中的数据每次取出来是一个Result:就是一个rowkey做一个result
* p>
* keyOut:
* valueOut:
*/
static class ReadHBaseDataByMRToHDFSMapper extends TableMapperText, NullWritable> {
Text outKey = new Text();
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
ListCell> cells = value.listCells();
MapString, MapString, String>> cellMap = new HashMap();
//一个cell一条数据 包含一个column
for (Cell cell : cells) {
String rowkey = Bytes.toString(CellUtil.cloneRow(cell));
MapString, String> columnMap = new HashMap();
if (cellMap.containsKey(rowkey)){
columnMap = cellMap.get(rowkey);
}
// String family = Bytes.toString(CellUtil.cloneFamily(cell));
String column = Bytes.toString(CellUtil.cloneQualifier(cell));
String columnValue = Base64Encoder.encode(CellUtil.cloneValue(cell));
columnMap.put(column, columnValue);
cellMap.put(rowkey, columnMap);
// long timeStamp = cell.getTimestamp();
// outKey.set(rowkey + "tt" + column + "tt" + columnValue + "n");
}
if (CollUtil.isNotEmpty(cellMap)){
String lineStr = "";
for (String s : cellMap.keySet()) {
MapString, String> columnMap = cellMap.get(s);
lineStr = s + "||";
for (String c : columnMap.keySet()) {
lineStr += c + ":" + columnMap.get(c) + "t";
}
}
outKey.set(lineStr);
context.write(outKey, NullWritable.get());
outKey.clear();
}
}
}
}
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
相关推荐: [BJDCTF2020]Mark loves cat
先用dirsearch扫一下,访问一下没有什么 需要设置线程 dirsearch -u http://8996e81f-a75c-4180-b0ad-226d97ba61b2.node4.buuoj.cn:81/ –timeout=2 -t 1 -x 400…