添加pom.xml依赖
org.apache.flink
flink-connector-base
1.14.6
com.ververica
flink-sql-connector-mysql-cdc
2.3.0
org.apache.flink
flink-streaming-java_2.12
1.14.6
org.apache.flink
flink-clients_2.12
1.14.6
org.apache.flink
flink-runtime-web_2.12
1.14.6
org.apache.flink
flink-table-runtime_2.12
1.14.6
自定义Sink
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CustomSink extends RichSinkFunction {
private static final Logger logger = LoggerFactory.getLogger(CustomSink.class);
@Override
public void invoke(String value, Context context) throws Exception {
logger.info(">>> " + value);
}
@Override
public void open(Configuration parameters) {
}
@Override
public void close() {
}
}
import com.imddysc.redis_example.sink.CustomSink;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Application {
private static final Logger logger = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) throws Exception {
SpringApplication.run(Applica服务器托管网tion.class, args);
logger.info("redis_example 启动完成...");
MySqlSource source = MySqlSource.builder()
.hostname("127.0.0.1")
.port(3306)
.databaseList("bigscreen")
.tableList("bigscreen.sys_user")
.username("root")
.password("123456")
.deserializer(new JsonDebeziumDeserializationSchema())
.includeSchemaChanges(true)
.build();
Configuration configuration = new Configuration();
configuration.setInteger(RestOptions.PORT, 8081);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.enableCheckpointing(5000);
DataStreamSink sink = env.fromSource(source, WatermarkStrategy.noWatermarks(), "MySQL Source").addSink(new CustomSink());
env.execute();
}
}
程序输出
修改数据,返回数据结果,已经结果格式化
2023-09-25 15:39:40.143 INFO [Source: MySQL Source -> Sink: Unnamed (1/8)#0]com.imddysc.redis_example.sink.CustomSink.invoke:15 ->>> {"before":{"id":100,"username":"scbtest001","password":null,"truename":"市场部测试员001","email":"18612345678@qq.com","phone":"18612345678","organization_id":4,"create_time":1685084734000,"create_by":null,"update_time":1685084764000,"update_by":null},"after":{"id":100,"username":"scbtest001","password":"4280d89a5a03f812751f504cc10ee8a5","truename":"市场部测试员001","email":"18612345678@qq.com","phone":"18612345678","organization_id":4,"create_time":1685084734000,"create_by":null,"update_time":1685084764000,"update_by":null},"source":{"version":"1.6.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1695627579000,"服务器托管网snapshot":"false","db":"bigscreen","sequence":null,"table":"sys_user","server_id":1,"gtid":null,"file":"binlog.000063","pos":527936292,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1695627579793,"transaction":null}
{
"before": {
"id": 100,
"username": "scbtest001",
"password": null,
"truename": "市场部测试员001",
"email": "18612345678@qq.com",
"phone": "18612345678",
"organization_id": 4,
"create_time": 1685084734000,
"create_by": null,
"update_time": 1685084764000,
"update_by": null
},
"after": {
"id": 100,
"username": "scbtest001",
"password": "4280d89a5a03f812751f504cc10ee8a5",
"truename": "市场部测试员001",
"email": "18612345678@qq.com",
"phone": "18612345678",
"organization_id": 4,
"create_time": 1685084734000,
"create_by": null,
"update_time": 1685084764000,
"update_by": null
},
"source": {
"version": "1.6.4.Final",
"connector": "mysql",
"name": "mysql_binlog_source",
"ts_ms": 1695627579000,
"snapshot": "false",
"db": "bigscreen",
"sequence": null,
"table": "sys_user",
"server_id": 1,
"gtid": null,
"file": "binlog.000063",
"pos": 527936292,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1695627579793,
"transaction": null
}
服务器托管,北京服务器托管,服务器租用 http://www.fwqtg.net
机房租用,北京机房租用,IDC机房托管, http://www.fwqtg.net
网络安全工具 抓包工具 抓包工具是网络安全领域中常用的一种工具,用于捕获和分析网络数据包,帮助用户了解网络流量、发现网络攻击和漏洞等问题。以下是几个常用的抓包工具: Wireshark:Wireshark是一种开放源代码的网络协议分析工具,支持多种操作系统,…