Springboot整合HBase数据库
1、添加依赖
dependency>
groupId>com.spring4allgroupId>
artifactId>spring-boot-starter-hbaseartifactId>
dependency>
dependency>
groupId>org.springframework.datagroupId>
artifactId>spring-data-hadoop-hbaseartifactId>
version>2.5.0.RELEASEversion>
dependency>
dependency>
groupId>org.springframework.datagroupId>
artifactId>spring-data-hadoopartifactId>
version>2.5.0.RELEASEversion>
dependency>
2、添加配置
通过Yaml方式配置
spring:
hbase:
zookeeper:
quorum: hbase1.xxx.org,hbase2.xxx.org,hbase3.xxx.org
property:
clientPort: 2181
data:
hbase:
quorum: XXX
rootDir: XXX
nodeParent: XXX
zookeeper:
znode:
parent: /hbase
3、添加配置类
@Configuration
public class HBaseConfig {
@Bean
public HBaseService getHbaseService() {
//设置临时的hadoop环境变量,之后程序会去这个目录下的bin目录下找winutils.exe工具,windows连接hadoop时会用到
//System.setProperty("hadoop.home.dir", "D:Program FilesHadoop");
//执行此步时,会去resources目录下找相应的配置文件,例如hbase-site.xml
org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
return new HBaseService(conf);
}
}
4、工具类的方式实现HBASE操作
@Service public class HBaseService { private Admin admin = null; private Connection connection = null; public HBaseService(Configuration conf) { connection = ConnectionFactory.createConnection(conf); admin = connection.getAdmin(); } //创建表 create
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net, {NAME => , VERSIONS => } public boolean creatTable(String tableName, ListString> columnFamily) { //列族column family ListColumnFamilyDescriptor> cfDesc = new ArrayList>(columnFamily.size()); columnFamily.forEach(cf -> { cfDesc.add(ColumnFamilyDescriptorBuilder.newBuilder( Bytes.toBytes(cf)).build()); }); //表 table TableDescriptor tableDesc = TableDescriptorBuilder .newBuilder(TableName.valueOf(tableName)) .setColumnFamilies(cfDesc).build(); if (admin.tableExists(TableName.valueOf(tableName))) { log.debug("table Exists!"); } else { admin.createTable(tableDesc); log.debug("create table Success!"); } close(admin, null, null); return true; } public ListString> getAllTableNames() { ListString> result = new ArrayList>(); TableName[] tableNames = admin.listTableNames(); for (TableName tableName : tableNames) { result.add(tableName.getNameAsString()); } close(admin, null, null); return result; } public MapString, MapString, String>> getResultScanner(String tableName) { Scan scan = new Scan(); return this.queryData(tableName, scan); } private MapString, MapString, String>> queryData(String tableName, Scan scan) { // MapString, MapString, String>> result = new HashMap>(); ResultScanner rs = null; //获取表 Table table = null; table = getTable(tableName); rs = table.getScanner(scan); for (Result r : rs) { // 每一行数据 MapString, String> columnMap = new HashMap>(); String rowKey = null; // 行键,列族和列限定符一起确定一个单元(Cell) for (Cell cell : r.listCells()) { if (rowKey == null) { rowKey = Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); } columnMap.put( //列限定符 Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()), //列族 Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } if (rowKey != null) { result.put(rowKey, columnMap); } } close(null, rs, table); return result; } public void putData(String tableName, String rowKey, String familyName, String[] columns, String[] values) { Table table = null; table = getTable(tableName); putData(table, rowKey, tableName, familyName, columns, values); close(null, null, table); } private void putData(Table table, String rowKey, String tableName, String familyName, String[] columns, String[] values) { //设置rowkey Put put = new Put(Bytes.toBytes(rowKey)); if (columns != null && values != null && columns.length == values.length) { for (int i = 0; i columns.length; i++) { if (columns[i] != null && values[i] != null) { put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columns[i]), Bytes.toBytes(values[i])); } else { throw new NullPointerException(MessageFormat.format( "列名和列数据都不能为空,column:{0},value:{1}", columns[i], values[i])); } } } table.put(put); log.debug("putData add or update data Success,rowKey:" + rowKey); table.close(); } private Table getTable(String tableName) throws IOException { return connection.getTable(TableName.valueOf(tableName)); } private void close(Admin admin, ResultScanner rs, Table table) { if (admin != null) { try { admin.close(); } catch (IOException e) { log.error("关闭Admin失败", e); } if (rs != null) { rs.close(); } if (table != null) { rs.close(); } if (table != null) { try { table.close(); } catch (IOException e) { log.error("关闭Table失败", e); } } } } }
测试类
@RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest class HBaseApplicationTests { @Resource private HBaseService hbaseService; //测试创建表 @Test public void testCreateTable() { hbaseService.creatTable("test_base", Arrays.asList("a", "back")); } //测试加入数据 @Test public void testPutData() { hbaseService.putData("test_base", "000001", "a", new String[]{ "project_id", "varName", "coefs", "pvalues", "tvalues", "create_time"}, new String[]{"40866", "mob_3", "0.9416", "0.0000", "12.2293", "null"}); hbaseService.putData("test_base", "000002", "a", new String[]{ "project_id", "varName", "coefs", "pvalues", "tvalues", "create_time"}, new String[]{"40866", "idno_prov", "0.9317", "0.0000", "9.8679", "null"}); hbaseService.putData("test_base", "000003", "a", new String[]{ "project_id", "varName", "coefs", "pvalues", "tvalues", "create_time"}, new String[]{"40866", "education", "0.8984", "0.0000", "25.5649", "null"}); } //测试遍历全表 @Test public void testGetResultScanner() { MapString, MapString, String>> result2 = hbaseService.getResultScanner("test_base"); System.out.println("-----遍历查询全表内容-----"); result2.forEach((k, value) -> { System.out.println(k + "--->" + value); }); } }
三、使用spring-data-hadoop-hbase
3、配置类
@Configuration public class HBaseConfiguration { @Value("${hbase.zookeeper.quorum}") private String zookeeperQuorum; @Value("${hbase.zookeeper.property.clientPort}") private String clientPort; @Value("${zookeeper.znode.parent}") private String znodeParent; @Bean public HbaseTemplate hbaseTemplate() { org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); conf.set("hbase.zookeeper.quorum", zookeeperQuorum); conf.set("hbase.zookeeper.property.clientPort", clientPort); conf.set("zookeeper.znode.parent", znodeParent); return new HbaseTemplate(conf); } }
4、业务类中使用HbaseTemplate
这个是作为工具类
@Service @Slf4j public class HBaseService { @Autowired private HbaseTemplate hbaseTemplate; //查询列簇 public ListResult> getRowKeyAndColumn(String tableName, String startRowkey, String stopRowkey, String column, String qualifier) { FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL); if (StringUtils.isNotBlank(column)) { log.debug("{}", column); filterList.addFilter(new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(column)))); } if (StringUtils.isNotBlank(qualifier)) { log.debug("{}", qualifier); filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(qualifier)))); } Scan scan = new Scan(); if (filterList.getFilters().size() > 0) { scan.setFilter(filterList); } scan.setStartRow(Bytes.toBytes(startRowkey)); scan.setStopRow(Bytes.toBytes(stopRowkey)); return hbaseTemplate.find(tableName, scan, (rowMapper, rowNum) -> rowMapper); } public ListResult> getListRowkeyData(String tableName, ListString> rowKeys, String familyColumn, String column) { return rowKeys.stream().map(rk -> { if (StringUtils.isNotBlank(familyColumn)) { if (StringUtils.isNotBlank(column)) { return hbaseTemplate.get(tableName, rk, familyColumn, column, (rowMapper, rowNum) -> rowMapper); } else { return hbaseTemplate.get(tableName, rk, familyColumn, (rowMapper, rowNum) -> rowMapper); } } return hbaseTemplate.get(tableName, rk, (rowMapper, rowNum) -> rowMapper); }).collect(Collectors.toList()); } }
四、使用spring-boot-starter-data-hbase
参考:https://blog.csdn.net/cpongo1/article/details/89550486
## 下载spring-boot-starter-hbase代码 git clone https://github.com/SpringForAll/spring-boot-starter-hbase.git ## 安装 cd spring-boot-starter-hbase mvn clean install
2、添加配置项
- spring.data.hbase.quorum 指定 HBase 的 zk 地址
- spring.data.hbase.rootDir 指定 HBase 在 HDFS 上存储的路径
- spring.data.hbase.nodeParent 指定 ZK 中 HBase 的根 ZNode
3、定义好DTO
@Data public class City { private Long id; private Integer age; private String cityName; }
4、创建对应rowMapper
public class CityRowMapper implements RowMapperCity> { private static byte[] COLUMN_FAMILY = "f".getBytes(); private static byte[] NAME = "name".getBytes(); private static byte[] AGE = "age".getBytes(); @Override public City mapRow(Result result, int rowNum) throws Exception { String name = Bytes.toString(result.getValue(COLUMN_FAMILY, NAME)); int age = Bytes.toInt(result.getValue(COLUMN_FAMILY, AGE)); City dto = new City(); dto.setCityName(name); dto.setAge(age); return dto; } }
5、操作实现增改查
- HbaseTemplate.find 返回 HBase 映射的 City 列表
- HbaseTemplate.get 返回 row 对应的 City 信息
- HbaseTemplate.saveOrUpdates 保存或者更新
如果 HbaseTemplate 操作不满足需求,完全可以使用 hbaseTemplate 的getConnection() 方法,获取连接。进而类似 HbaseTemplate 实现的逻辑,实现更复杂的需求查询等功能@Service public class CityServiceImpl implements CityService { @Autowired private HbaseTemplate hbaseTemplate; //查询 public ListCity> query(String startRow, String stopRow) { Scan scan = new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow)); scan.setCaching(5000); ListCity> dtos = this.hbaseTemplate.find("people_table", scan, new CityRowMapper()); return dtos; } //查询 public City query(String row) { City dto = this.hbaseTemplate.get("people_table", row, new CityRowMapper()); return dto; } //新增或者更新 public void saveOrUpdate() { ListMutation> saveOrUpdates = new ArrayListMutation>(); Put put = new Put(Bytes.toBytes("135xxxxxx")); put.addColumn(Bytes.toBytes("people"), Bytes.toBytes("name"), Bytes.toBytes("test")); saveOrUpdates.add(put); this.hbaseTemplate.saveOrUpdates("people_table", saveOrUpdates); } }
Springboot整合Influxdb
中文文档:https://jasper-zhang1.gitbooks.io/influxdb/content/Introduction/installation.html
注意,项目建立在spring-boot-web基础上
1、添加依赖
dependency> groupId>org.influxdbgroupId> artifactId>influxdb-javaartifactId> version>2.15version> dependency>
2、添加配置
spring: influx: database: my_sensor1 password: admin url: http://127.0.0.1:6086 user: admin
3、编写配置类
@Configuration public class InfluxdbConfig { @Value("${spring.influx.url}") private String influxDBUrl; @Value("${spring.influx.user}") private String userName; @Value("${spring.influx.password}") private String password; @Value("${spring.influx.database}") private String database; @Bean("influxDB") public InfluxDB influxdb(){ InfluxDB influxDB = InfluxDBFactory.connect(influxDBUrl, userName, password); try { /** * 异步插入: * enableBatch这里第一个是point的个数,第二个是时间,单位毫秒 * point的个数和时间是联合使用的,如果满100条或者60 * 1000毫秒 * 满足任何一个条件就会发送一次写的请求。 */ influxDB.setDatabase(database).enableBatch(100,1000 * 60, TimeUnit.MILLISECONDS); } catch (Exception e) { e.printStackTrace(); } finally { //设置默认策略 influxDB.setRetentionPolicy("sensor_retention"); } //设置日志输出级别 influxDB.setLogLevel(InfluxDB.LogLevel.BASIC); return influxDB; } }
4、InfluxDB原生API实现
@SpringBootTest(classes = {MainApplication.class}) @RunWith(SpringJUnit4ClassRunner.class) public class InfluxdbDBTest { @Autowired private InfluxDB influxDB; //measurement private final String measurement = "sensor"; @Value("${spring.influx.database}") private String database; /** * 批量插入第一种方式 */ @Test public void insert(){ ListString> lines = new ArrayListString>(); Point point = null; for(int i=0;i50;i++){ point = Point.measurement(measurement) .tag("deviceId", "sensor" + i) .addField("temp", 3) .addField("voltage", 145+i) .addField("A1", "4i") .addField("A2", "4i").build(); lines.add(point.lineProtocol()); } //写入 influxDB.write(lines); } /** * 批量插入第二种方式 */ @Test public void batchInsert(){ BatchPoints batchPoints = BatchPoints .database(database) .consistency(InfluxDB.ConsistencyLevel.ALL) .build(); //遍历sqlserver获取数据 for(int i=0;i50;i++){ //创建单条数据对象——表名 Point point = Point.measurement(measurement) //tag属性——只能存储String类型 .tag("deviceId", "sensor" + i) .addField("temp", 3) .addField("voltage", 145+i) .addField("A1", "4i") .addField("A2", "4i").build(); //将单条数据存储到集合中 batchPoints.point(point); } //批量插入 influxDB.write(batchPoints); } /** * 获取数据 */ @Test public void datas(@RequestParam Integer page){ int pageSize = 10; // InfluxDB支持分页查询,因此可以设置分页查询条件 String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize; String queryCondition = ""; //查询条件暂且为空 // 此处查询所有内容,如果 String queryCmd = "SELECT * FROM " // 查询指定设备下的日志信息 // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加; // + 策略name + "." + measurement + measurement // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度) + queryCondition // 查询结果需要按照时间排序 + " ORDER BY time DESC" // 添加分页查询条件 + pageQuery; QueryResult queryResult = influxDB.query(new Query(queryCmd, database)); System.out.println("query result => "+queryResult); } }
5、采用封装工具类
1、创建实体类
@Data @Measurement(name = "sensor") public class Sensor { @Column(name="deviceId",tag=true) private String deviceId; @Column(name="temp") private float temp; @Column(name="voltage") private float voltage; @Column(name="A1") private float A1; @Column(name="A2") private float A2; @Column(name="time") private String time; }
2、创建工具类
@Component public class InfluxdbUtils { @Autowired private InfluxDB influxDB; @Value("${spring.influx.database}") private String database; /** * 新增单条记录,利用java的反射机制进行新增操作 */ @SneakyThrows public void insertOne(Object obj){ //获取度量 Class?> clasz = obj.getClass(); Measurement measurement = clasz.getAnnotation(Measurement.class); //构建 Point.Builder builder = Point.measurement(measurement.name()); // 获取对象属性 Field[] fieldArray = clasz.getDeclaredFields(); Column column = null; for(Field field : fieldArray){ column = field.getAnnotation(Column.class); //设置属性可操作 field.setAccessible(true); if(column.tag()){ //tag属性只能存储String类型 builder.tag(column.name(), field.get(obj).toString()); }else{ //设置field if(field.get(obj) != null){ builder.addField(column.name(), field.get(obj).toString()); } } } influxDB.write(builder.build()); } /** * 批量新增,方法一 */ @SneakyThrows public void insertBatchByRecords(List?> records){ ListString> lines = new ArrayListString>(); records.forEach(record->{ Class?> clasz = record.getClass(); //获取度量 Measurement measurement = clasz.getAnnotation(Measurement.class); //构建 Point.Builder builder = Point.measurement(measurement.name()); Field[] fieldArray = clasz.getDeclaredFields(); Column column = null; for(Field field : fieldArray){ column = field.getAnnotation(Column.class); //设置属性可操作 field.setAccessible(true); if(column.tag()){ //tag属性只能存储String类型 builder.tag(column.name(), field.get(record).toString()); }else{ //设置field if(field.get(record) != null){ builder.addField(column.name(), field.get(record).toString()); } } } lines.add(builder.build().lineProtocol()); }); influxDB.write(lines); } /** * 批量新增,方法二 */ @SneakyThrows public void insertBatchByPoints(List?> records){ BatchPoints batchPoints = BatchPoints.database(database) .consistency(InfluxDB.ConsistencyLevel.ALL) .build(); records.forEach(record->{ Class?> clasz = record.getClass(); //获取度量 Measurement measurement = clasz.getAnnotation(Measurement.class); //构建 Point.Builder builder = Point.measurement(measurement.name()); Field[] fieldArray = clasz.getDeclaredFields(); Column column = null; for(Field field : fieldArray){ column = field.getAnnotation(Column.class); //设置属性可操作 field.setAccessible(true); if(column.tag()){ //tag属性只能存储String类型 builder.tag(column.name(), field.get(record).toString()); }else{ //设置field if(field.get(record) != null){ builder.addField(column.name(), field.get(record).toString()); } } } batchPoints.point(builder.build()); }); influxDB.write(batchPoints); } /** * 查询,返回Map集合 * @param query 完整的查询语句 */ public ListObject> fetchRecords(String query){ ListObject> results = new ArrayListObject>(); QueryResult queryResult = influxDB.query(new Query(query, database)); queryResult.getResults().forEach(result->{ result.getSeries().forEach(serial->{ ListString> columns = serial.getColumns(); int fieldSize = columns.size(); serial.getValues().forEach(value->{ MapString,Object> obj = new HashMapString,Object>(); for(int i=0;ifieldSize;i++){ obj.put(columns.get(i), value.get(i)); } results.add(obj); }); }); }); return results; } /** * 查询,返回map集合 * @param fieldKeys 查询的字段,不可为空;不可为单独的tag * @param measurement 度量,不可为空; */ public ListObject> fetchRecords(String fieldKeys, String measurement){ StringBuilder query = new StringBuilder(); query.append("select ").append(fieldKeys).append(" from ").append(measurement); return this.fetchRecords(query.toString()); } /** * 查询,返回map集合 * @param fieldKeys 查询的字段,不可为空;不可为单独的tag * @param measurement 度量,不可为空; */ public ListObject> fetchRecords(String fieldKeys, String measurement, String order){ StringBuilder query = new StringBuilder(); query.append("select ").append(fieldKeys).append(" from ").append(measurement); query.append(" order by ").append(order); return this.fetchRecords(query.toString()); } /** * 查询,返回map集合 * @param fieldKeys 查询的字段,不可为空;不可为单独的tag * @param measurement 度量,不可为空; */ public ListObject> fetchRecords(String fieldKeys, String measurement, String order, String limit){ StringBuilder query = new StringBuilder(); query.append("select ").append(fieldKeys).append(" from ").append(measurement); query.append(" order by ").append(order); query.append(limit); return this.fetchRecords(query.toString()); } /** * 查询,返回对象的list集合 */ @SneakyThrows public T> ListT> fetchResults(String query, Class?> clasz){ List results = new ArrayList>(); QueryResult queryResult = influxDB.query(new Query(query, database)); queryResult.getResults().forEach(result->{ result.getSeries().forEach(serial->{ ListString> columns = serial.getColumns(); int fieldSize = columns.size(); serial.getValues().forEach(value->{ Object obj = null; obj = clasz.newInstance(); for(int i=0;ifieldSize;i++){ String fieldName = columns.get(i); Field field = clasz.getDeclaredField(fieldName); field.setAccessible(true); Class?> type = field.getType(); if(type == float.class){ field.set(obj, Float.valueOf(value.get(i).toString())); }else{ field.set(obj, value.get(i)); } } results.add(obj); }); }); }); return results; } /** * 查询,返回对象的list集合 */ public T> ListT> fetchResults(String fieldKeys, String measurement, Class?> clasz){ StringBuilder query = new StringBuilder(); query.append("select ").append(fieldKeys).append(" from ").append(measurement); return this.fetchResults(query.toString(), clasz); } /** * 查询,返回对象的list集合 */ public T> ListT> fetchResults(String fieldKeys, String measurement, String order, Class?> clasz){ StringBuilder query = new StringBuilder(); query.append("select ").append(fieldKeys).append(" from ").append(measurement); query.append(" order by ").append(order); return this.fetchResults(query.toString(), clasz); } /** * 查询,返回对象的list集合 */ public T> ListT> fetchResults(String fieldKeys, String measurement, String order, String limit, Class?> clasz){ StringBuilder query = new StringBuilder(); query.append("select ").append(fieldKeys).append(" from ").append(measurement); query.append(" order by ").append(order); query.append(limit); return this.fetchResults(query.toString(), clasz); } }
3、使用工具类的测试代码
@SpringBootTest(classes = {MainApplication.class}) @RunWith(SpringJUnit4ClassRunner.class) public class InfluxdbUtilTest { @Autowired private InfluxdbUtils influxdbUtils; /** * 插入单条记录 */ @Test public void insert(){ Sensor sensor = new Sensor(); sensor.setA1(10); sensor.setA2(10); sensor.setDeviceId("0002"); sensor.setTemp(10L); sensor.setTime("2021-01-19"); sensor.setVoltage(10); influxdbUtils.insertOne(sensor); } /** * 批量插入第一种方式 */ @GetMapping("/index22") public void batchInsert(){ ListSensor> sensorList = new ArrayListSensor>(); for(int i=0; i50; i++){ Sensor sensor = new Sensor(); sensor.setA1(2); sensor.setA2(12); sensor.setTemp(9); sensor.setVoltage(12); sensor.setDeviceId("sensor4545-"+i); sensorList.add(sensor); } influxdbUtils.insertBatchByRecords(sensorList); } /** * 批量插入第二种方式 */ @GetMapping("/index23") public void batchInsert1(){ ListSensor> sensorList = new ArrayListSensor>(); Sensor sensor = null; for(int i=0; i50; i++){ sensor = new Sensor(); sensor.setA1(2); sensor.setA2(12); sensor.setTemp(9); sensor.setVoltage(12); sensor.setDeviceId("sensor4545-"+i); sensorList.add(sensor); } influxdbUtils.insertBatchByPoints(sensorList); } /** * 查询数据 */ @GetMapping("/datas2") public void datas(@RequestParam Integer page){ int pageSize = 10; // InfluxDB支持分页查询,因此可以设置分页查询条件 String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize; String queryCondition = ""; //查询条件暂且为空 // 此处查询所有内容,如果 String queryCmd = "SELECT * FROM sensor" // 查询指定设备下的日志信息 // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加; // + 策略name + "." + measurement // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度) + queryCondition // 查询结果需要按照时间排序 + " ORDER BY time DESC" // 添加分页查询条件 + pageQuery; ListObject> sensorList = influxdbUtils.fetchRecords(queryCmd); System.out.println("query result => {}"+sensorList ); } /** * 获取数据 */ @GetMapping("/datas21") public void datas1(@RequestParam Integer page){ int pageSize = 10; // InfluxDB支持分页查询,因此可以设置分页查询条件 String pageQuery = " LIMIT " + pageSize + " OFFSET " + (page - 1) * pageSize; String queryCondition = ""; //查询条件暂且为空 // 此处查询所有内容,如果 String queryCmd = "SELECT * FROM sensor" // 查询指定设备下的日志信息 // 要指定从 RetentionPolicyName.measurement中查询指定数据,默认的策略可以不加; // + 策略name + "." + measurement // 添加查询条件(注意查询条件选择tag值,选择field数值会严重拖慢查询速度) + queryCondition // 查询结果需要按照时间排序 + " ORDER BY time DESC" // 添加分页查询条件 + pageQuery; ListSensor> sensorList = influxdbUtils.fetchResults(queryCmd, Sensor.class); //List sensorList = influxdbUtils.fetchResults("*", "sensor", Sensor.class); sensorList.forEach(sensor->{ System.out.println("query result => {}"+sensorList ); }); } }
6、采用封装数据模型的方式
1、在Influxdb库中创建存储策略
CREATE RETENTION POLICY "rp_order_payment" ON "db_order" DURATION 30d REPLICATION 1 DEFAULT
2、创建数据模型
@Data @Measurement(name = "m_order_payment", database = "db_order", retentionPolicy = "rp_order_payment") public class OrderPayment implements Serializable { // 统计批次 @Column(name = "batch_id", tag = true) private String batchId; // 哪个BU @Column(name = "bu_id", tag = true) private String buId; // BU 名称 @Column(name = "bu_name") private String buName; // 总数 @Column(name = "total_count", tag = true) private String totalCount; // 支付量 @Column(name = "pay_count", tag = true) private String payCount; // 金额 @Column(name = "total_money", tag = true) private String totalMoney; }
3、创建Mapper
public class InfluxMapper extends InfluxDBMapper { public InfluxMapper(InfluxDB influxDB) { super(influxDB); } }
4、配置Mapper
@Log4j2 @Configuration public class InfluxAutoConfiguration { @Bean public InfluxMapper influxMapper(InfluxDB influxDB) { InfluxMapper influxMapper = new InfluxMapper(influxDB); return influxMapper; } }
5、测试CRUD
@SpringBootTest(classes = {MainApplication.class}) @RunWith(SpringJUnit4ClassRunner.class) public class InfluxdbMapperTest { @Autowired private InfluxMapper influxMapper; @Test public void save(OrderPayment product) { influxMapper.save(product); } @Test public void queryAll() { ListOrderPayment> products = influxMapper.query(OrderPayment.class); System.out.println(products); } @Test public void queryByBu(String bu) { String sql = String.format("%s'%s'", "select * from m_order_payment where bu_id = ", bu); Query query = new Query(sql, "db_order"); ListOrderPayment> products = influxMapper.query(query, OrderPayment.class); System.out.println(products); } }
参考:https://blog.csdn.net/cpongo1/article/details/89550486
https://github.com/SpringForAll/spring-boot-starter-hbase
https://github.com/JeffLi1993/springboot-learning-example
$(function() {
setTimeout(function () {
var mathcodeList = document.querySelectorAll('.htmledit_views img.mathcode');
if (mathcodeList.length > 0) {
for (let i = 0; i < mathcodeList.length; i++) {
if (mathcodeList[i].naturalWidth === 0 || mathcodeList[i].naturalHeight === 0) {
var alt = mathcodeList[i].alt;
alt = '(' + alt + ')';
var curSpan = $('');
curSpan.text(alt);
$(mathcodeList[i]).before(curSpan);
$(mathcodeList[i]).remove();
}
}
MathJax.Hub.Queue(["Typeset",MathJax.Hub]);
}
}, 1000)
});阅读终点,创作起航,您可以撰写心得或摘录文章要点写篇博文。去创作
天道酬勤的博客
关注
关注
0
点赞
踩
0
收藏
觉得还不错?
一键收藏
打赏
0
评论专栏目录springboot整合hbase资源.zip05-26解压后包含三个资源,分别是hadoop.dll、hadoop-2.7.3.zip、winutils.exe。对应springboot整合hbase文章中解决windows系统下项目启动不了的问题。springboot集成hbase09-17本示例示springboot集成hbase的一个工具,用户下载后,只需要更改HBaseUtil类中的配置为自己的hbase服务器上的相关配置及更改resources下hbase.keytab和krb5.conf文件为自己服务器上的文件即可使用参与评论
您还未登录,请先
登录
后发表或查看评论springboot集成phoenix+hbase10-26springboot集成phoenix+hbase 完整demo!!!!!!!springboot 集成 phoenix+hbase整合,完整demo03-21springboot 集成 phoenix+hbase整合,完整demo。 springboot集成phoenix+hbase 完整demo!!!!!!! springboot phoenix hbase
springboot_hbase_kafka.rar07-30大数据项目的基础数据存储项目,整合hbase存储,并通过中间件作为存储缓冲区Spring Boot存在路径遍历漏洞CVE-2021-22118冰点出品,必属精品08-28
187
背景:Spring Boot存在路径遍历漏洞。官方 issue也有对此的记录,感兴趣可以看下CVE-2021-22118 是一个在 Spring Boot 中发现的漏洞。该漏洞关系到 Spring Boot 的开发者工具(Devtools)中的远程更新(Remote Update)功能。在某些情况下,攻击者可能会利用这个功能进行目录遍历攻击,从而访问到系统中的敏感文件。远程更新功能被开启(spring.devtools.restart.enabled = true)。Java版企业工程项目管理系统源码+java版本+项目模块功能清单+spring cloud +spring bootm0_66404702的博客08-31
63
项目经理、计划开始时间、计划结束时间等信息,可以进行终止和导出操作。1、项目列表:实现对项目列表的增删改查操作,包括查看各项目的立项人、创建时间、2、项目计划管理:项目计划查看和管理模块,可执行增删改查操作,包括查看甘特图。3、收支报表:项目收支报表,包含总体收支、项目收支和收支统计模块。4、资金计划:实现各项目资金计划的详情查看及其增删改和导出操作。1、项目汇总:项目汇总信息查看,包括进度、计划时间等信息。2、进度报表:项目进度报表,包括计划时间和已用资源等信息。java 企业工程管理系统软件源码+Spring Cloud + Spring Boot +二次开发+ MybatisPlus + RedisXiaohong0716的博客08-31
50
涉及技术:Eureka、Config、Zuul、OAuth2、Security、OSS、Turbine、Zipkin、Feign、Monitor、Stream、ElasticSearch等。 工程项目管理软件(工程项目管理系统)对建设工程项目管理组织建设、项目策划决策、规划设计、施工建设到竣工交付、总结评估、运维运营,全过程、全方位的对项目进行综合管理。1、项目列表:实现对项目列表的增删改查操作,包括查看各项目的立项人、创建时间、1、项目汇总:项目汇总信息查看,包括进度、计划时间等信息。【Spring Boot】使用XML配置文件实现数据库操作(一)最新发布衍生星球的博客08-31
52
MyBatis提供了insert、update、delete和delete四个标签来定义SQL语句。接下来就从SQL语句开始介绍每个标签的用法。SSM框架和Spring Boot+Mybatis框架的性能比较?TaloyerG的博客08-27
445
综上所述,SSM框架和Spring Boot+Mybatis框架的性能比较,并没有一个明确的结论,而是需要根据具体的项目需求和场景来进行权衡和选择。一般来说,如果项目比较简单,需要更快的开发和部署,可以选择Spring Boot+Mybatis框架;如果项目比较复杂,需要更多的自定义和控制,可以选择SSM框架。SSM框架和Spring Boot+Mybatis框架的性能比较,没有一个绝对的答案,因为它们的性能受到很多因素的影响,例如项目的规模、复杂度、需求、技术栈、团队水平、测试环境、测试方法等。[Spring Boot] 开发时可以运行,但Maven打包后,无法运行jinwenshuo2007的博客08-31
57
Exception in thread "main" java.lang.UnsupportedClassVersionError:【Spring Boot】数据库持久层框架MyBatis — Spring Boot构建MyBatis应用程序衍生星球的博客08-28
852
将Spring Boot与MyBatis结合使用可以使开发人员更容易地创建和管理数据库应用程序。这些是使用Spring Boot构建MyBatis应用程序的基本步骤。使用Spring Boot和MyBatis可以轻松地创建和管理数据库应用程序。这将告诉MyBatis查找类路径中的mapper文件夹,并使用其中的XML文件。这个接口将定义一个findById方法,它将在数据库中查找具有给定ID的用户。Spring Boot 的核心注解SpringBootApplicationkeyboard专栏08-31
76
SpringBootApplication 包括的注解。基于Spring Boot的住院病人管理系统设计与实现(Java+spring boot+MySQL)zag1069464798的博客08-31
65
获取源码或者论文请私信博主
根据医院信息管理系统的业务场景,本系统分为系统的管理员身份、药品管理员、医院医生、医院财务人员四个角色,不同身份进入医院管理系统会拥有不一样的操作权限,人员信息的配置、医院日常的通知公告、医院基础科室和科室对应的医生信息维护、医院药品的日常进货后进行系统登记,保证每一款药品都能够有迹可查,有详细的登记记录、且能够登记患者在医院的所有记录,包括患者门诊住院的细节信息,患者费用结算等等。Spring Boot(Vue3+ElementPlus+Axios+MyBatisPlus+Spring Boot 前后端分离)【一】m0_73557631的博客08-24
1032
前后端分离开发, 前端主体框架Vue3 + 后端基础框架Spring-Boot前端技术栈: Vue3+Axios+ElementPlus后端技术栈: Spring Boot + MyBatis Plus数据库-MySQL项目的依赖管理-Maven分页-MyBatis Plus 的分页插件。Hbase-技术文档-java.net.UnknownHostException: 不知道这样的主机。 (e64682f1b276)weixin_72186894的博客08-27
949
在使用spring-boot操作habse的时候,在对habse进行操作的时候出现这个问题。。java打war包、jar包方式,java运行war包、jar包方式artistkeepmonkey的博客08-29
129
1打jar包,使用了内置的tomcat服务器,流程简单2打war包,可以放标准tomcat服务器中。Redis进阶 - JVM进程缓存一个编程爱好者的博客08-28
252
原文首更地址,阅读效果更佳!springboot actuator配置na_tion的专栏08-28
74
【代码】springboot actuator配置。springboot整合hbase07-29在Spring Boot中整合HBase,你可以按照以下步骤进行操作:1. 添加HBase依赖:在`pom.xml`文件中添加HBase的依赖。
```xml
org.apache.hbase
hbase-client
版本号```
2. 配置HBase连接:在`application.properties`(或`application.yml`)文件中添加HBase的连接配置。
```properties
# HBase连接配置
hbase.zookeeper.quorum=ZooKeeper地址
hbase.zookeeper.property.clientPort=ZooKeeper端口号
```3. 创建HBase连接配置类:创建一个Java类,用于配置HBase连接。
```java
@Configuration
public class HBaseConfiguration {@Value("${hbase.zookeeper.quorum}")
private String quorum;@Value("${hbase.zookeeper.property.clientPort}")
private String clientPort;@Bean
public Configuration configuration() {
Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", quorum);
config.set("hbase.zookeeper.property.clientPort", clientPort);
return config;
}@Bean
public Connection connection() throws IOException {
return ConnectionFactory.createConnection(configuration());
}
}
```4. 创建HBase操作类:创建一个Java类,用于封装HBase的操作方法。
```java
@Repository
public class HBaseRepository {@Autowired
private Connection connection;public void putData(String tableName, String rowKey, String columnFamily, String column, String value) throws IOException {
TableName tName = TableName.valueOf(tableName);
Table table = connection.getTable(tName);Put put = new Put(Bytes.toBytes(rowKey));
put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));table.put(put);
table.close();
}public String getData(String tableName, String rowKey, String columnFamily, String column) throws IOException {
TableName tName = TableName.valueOf(tableName);
Table table = connection.getTable(tName);Get get = new Get(Bytes.toBytes(rowKey));
Result result = table.get(get);byte[] valueBytes = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(column));
String value = Bytes.toString(valueBytes);table.close();
return value;
}
}
```这里只提供了示例的插入和查询操作方法,你可以根据实际需求扩展其他操作。
现在,你可以在Spring Boot应用程序中使用`HBaseRepository`类来执行HBase的操作了。
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助提交
window.csdn.csdnFooter.options = {
el: '.blog-footer-bottom',
type: 2
}
- 76
- 原创
- 4454
- 周排名
- 2万+
- 总排名
- 5万+
- 访问
- 等级
- 984
- 积分
- 1769
- 粉丝
- 17
- 获赞
- 5
- 评论
- 118
- 收藏
私信关注博客之星–博主的年度最高成就表彰活动成为博客之星不仅可获博客之星专属荣誉还可获博客之星年度大奖,一年仅有一次。去创作
(adsbygoogle = window.adsbygoogle || []).push({});
热门文章
springBoot单元测试
5984Springboot使用@WebListener 作为web监听器
4813Mysql数据库技术知识整理
3397java IO流相关的类的分类和总结
3267Hutool工具类和工具方法
3001分类专栏
Springboot基础
7篇
SpringCloud实战专栏
11篇
java知识点分类整理
2篇
SpringBoot整合整理
14篇
项目实战专栏整理
个人知识点总结整理
源码笔记整理
1篇
Springboot项目基础
9篇
Springboot实战专栏
9篇
Mybatis-plus专栏
4篇
Orcale数据库个人总结
3篇
java基础知识
8篇
最新评论
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐提交
最新文章
- Springboot整合ClickHouse
- Springboot使用AOP
- SpringBoot整合JPA和Hibernate框架
2023年26篇2022年25篇2021年17篇2019年1篇2014年7篇目录
$("a.flexible-btn").click(function(){
$(this).parents('div.aside-box').removeClass('flexible-box');
$(this).parents("p.text-center").remove();
})目录
var timert = setInterval(function() {
sideToolbar = $(".csdn-side-toolbar");
if (sideToolbar.length > 0) {
sideToolbar.css('cssText', 'bottom:64px !important;')
clearInterval(timert);
}
}, 200);评论
被折叠的条评论
为什么被折叠?到【灌水乐园】发言
查看更多评论添加红包请填写红包祝福语或标题
个
红包个数最小为10个
元
红包金额最低5元
当前余额3.43元
前往充值 >需支付:10.00元
打赏作者
天道酬勤的博客
你的鼓励将是我创作的最大动力
1
2
4
6
10
20扫码支付:1
获取中
扫码支付您的余额不足,请更换扫码支付或充值
打赏作者
实付元使用余额支付
点击重新获取扫码支付钱包余额
0抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。余额充值
相关推荐: 【视频】时间序列分类方法:动态时间规整算法DTW和R语言实现|附代码数据
原文链接:http://tecdat.cn/?p=22945 最近我们被客户要求撰写关于动态时间规整算法的研究报告,包括一些图形和统计输出 动态时间扭曲算法何时、如何以及为什么可以有力地取代常见的欧几里得距离,以更好地对时间序列数据进行分类 时间序列分类的动态…
¥三石:
vip 呵呵呵了
橙以
回复
gama2015:
https://blog.csdn.net/qq_42721694
gama2015:
学习了