一、写在前面
在实际的生产环境中,我们经常会把Flink处理的数据写入MySQL、Doris等数据库中,下面以MySQL为例,使用JDBC的方式将Flink的数据实时数据写入MySQL。
二、代码示例
2.1 版本说明
flink.version>1.14.6/flink.version>
spark.version>2.4.3/spark.version>
hadoop.version>2.8.5/hadoop.version>
hbase.version>1.4.9/hbase.version>
hive.version>2.3.5/hive.version>
java.version>1.8/java.version>
scala.version>2.11.8/scala.version>
mysql.version>8.0.22/mysql.version>
scala.binary.version>2.11/scala.binary.version>
2.2 导入相关依赖
dependency>
groupId>org.apache.flink/groupId>
artifactId>flink-connector-jdbc_2.11/artifactId>
version>${flink.version}/version>
/dependency>
!--mysql连接器依赖-->
dependency>
groupId>mysql/groupId>
artifactId>mysql-connector-java/artifactId>
version>8.0.22/version>
/dependency>
2.3 连接数据库,创建表
mysql> CREATE TABLE `ws` (
`id` varchar(100) NOT NULL
,`ts` bigint(20) DEFAULT NULL
,`vc` int(11) DEFAULT NULL, PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
2.4 创建POJO类
package com.flink.POJOs;
import java.util.Objects;
/**
* TODO POJO类的特点
* 类是公有(public)的
* 有一个无参的构造方法
* 所有属性都是公有(public)的
* 所有属性的类型都是可以序列化的
*/
public class WaterSensor {
//类的公共属性
public String id;
public Long ts;
public Integer vc;
//无参构造方法
public WaterSensor() {
//System.out.println("调用了无参数的构造方法");
}
public WaterSensor(String id, Long ts, Integer vc) {
this.id = id;
this.ts = ts;
this.vc = vc;
}
//生成get和set方法
public void setId(String id) {
this.id = id;
}
public void setTs(Long ts) {
this.ts = ts;
}
public void setVc(Integer vc) 服务器托管网{
this.vc = vc;
}
public String getId() {
return id;
}
public Long getTs() {
return ts;
}
public Integer getVc() {
return vc;
}
//重写toString方法
@Override
public String toString() {
return "WaterSensor{" +
"id='" + id + ''' +
", ts=" + ts +
", vc=" + vc +
'}';
}
//重写equals和hasCode方法
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
WaterSensor that = (WaterSensor) o;
return id.equals(that.id) && ts.equals(that.ts) && vc.equals(that.vc);
}
@Override
public int hashCode() {
return Objects.hash(id, ts, vc);
}
}
//scala的case类?
2.5 自定义map函数
package com.flink.POJOs;
import org.apache.flink.api.common.functions.MapFunction;
public class WaterSensorMapFunction implements MapFunctionString, WaterSensor> {
@Override
public WaterSensor map(String value) throws Exception {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
}
}
2.5 Flink2MySQL
package com.flink.DataStream.Sink;
import com.flink.POJOs.WaterSensor;
import com.flink.POJOs.WaterSensorMapFunction;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* Flink 输出到 MySQL(JDBC)
*/
public class flinkSinkJdbc {
public static void main(String[] args) throws Exception {
//TODO 创建Flink上下文执行环境
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setParallelism(1);
//TODO Source
DataStreamSourceString> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);
//TODO Transfer
SingleOutputStreamOperatorWaterSensor> waterSensorSingleOutputStreamOperator = dataStreamSource.map(new WaterSensorMapFunction());
/**TODO 写入 mysql
* 1、只能用老的 sink 写法
* 2、JDBCSink 的 4 个参数:
* 第一个参数: 执行的 sql,一般就是 insert into
* 第二个参数: 预编译 sql, 对占位符填充值
* 第三个参数: 执行选项 ---->攒批、重试
* 第四个参数: 连接选项---->url、用户名、密码
*/
SinkFunctionWaterSensor> sinkFunction = JdbcSink.sink("insert into ws values(?,?,?)",
new JdbcStatementBuilderWaterSensor>() {
@Override
public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
preparedStatement.setString(1, waterSensor.getId());
preparedStatement.setLong(2, waterSensor.getTs());
preparedStatement.setInt(3, waterSensor.getVc());
System.out.println("数据写入成功:"+'('+waterSensor.getId()+","+waterSensor.getTs()+","+waterSensor.getVc()+")");
}
}
, JdbcExecutionOptions
.builder()
.withMaxRetries(3) // 重试次数
.withBatchSize(100) 服务器托管网 // 批次的大小:条数
.withBatchIntervalMs(3000) // 批次的时间
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://localhost:3306/dw?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
.withUsername("root")
.withPassword("********")
.withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
.build()
);
//TODO 写入到Mysql
waterSensorSingleOutputStreamOperator.addSink(sinkFunction);
streamExecutionEnvironment.execute();
}
}
2.6 启动necat、Flink,观察数据库写入情况
nc -lk 9999 #启动necat、并监听8888端口,写入数据
启动Flink程序
查看数据库写入是否正常
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
相关推荐: volatile+SIGCHLD信号+可重入函数(了解)
volatile
深入理解SIGCHLD信号
可重入函数索引 volatile 1.gcc -O含义及其作用 2.证明其内存可见性 深入理解SIGCHLD信号 SIGCHLD总结 可重入函数 volatile 保存内存的可见性,告知编译器,该关键字修饰的变量不允许被优化,对该变量的任何操作都必须在内存中操作。 1.…