下面是使用 Flink 1.13.6 TableEnvironment 连接 MySQL 的 Java 代码示例:
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
import java.util.Properties;
public class Main {
public static void main(String[] args) throws Exception {
// 创建 StreamExecutionEnvironment 和 StreamTableEnvironment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());
// 创建 JdbcConnectionOptions
String url = "jdbc:mysql://localhost:3306/test";
String username = "root";
String password = "123456";
Properties properties = new Properties();
properties.setProperty("user", username);
properties.setProperty("password", password);
JdbcConnectionOptions jdbcConnectionOptions = JdbcConnectionOptions.builder()
.withUrl(url)
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUsername(username)
.withPassword(password)
.withProperties(properties)
.build();
// 注册 MySQL 表
String tableName = "mysql_table";
tableEnv.executeSql(String.format("CREATE TABLE %s (\n" +
" id BIGINT,\n" +
" name STRING,\n" +
" PRIMARY KEY (id) NOT ENFORCED\n" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = '%s',\n" +
" 'table-name' = '%s',\n" +
" 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
" 'username' = '%s',\n" +
" 'password' = '%s'\n" +
")", tableName, url, tableName, username, password));
// 插入数据到 MySQL 表
tableEnv.toAppendStream(tableEnv.fromValues(
Row.of(1L, "Alice"),
Row.of(2L, "Bob")
), Row.class)
.addSink(JdbcSink.sink(
"INSERT INTO " + tableName + " (id, name) VALUES (?, ?)",
(ps, r) -> {
ps.setLong(1, (Long) r.getField(0));
ps.setString(2, (String) r.getField(1));
},
jdbcConnectionOptions))
.name("JDBC Sink");
// 执行任务
env.execute();
}
}
在上面的代码示例中,我们使用了 Flink JDBC Connector 将 Table 中的数据插入到 MySQL 表中。首先,我们创建了一个 JdbcConnectionOptions 对象来配置连接 MySQL 数据库的参数。然后,我们使用 CREATE TABLE 命令在 TableEnvironment 中注册了一个名为 mysql_table 的 MySQL 表,并指定表的连接器类型、URL、用户名和密码等信息。最后,我们使用 toAppendStream 方法将数据插入到 MySQL 表中,并使用 JdbcSink 将数据写入到 MySQL 表中。
需要注意的是,在执行上述代码之前,需要先将 flink-sql-connector-mysql 依赖添加到项目中。可以在 Maven 或 Gradle 中添加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-mysql</artifactId>
<version>1.13.6</version>
</dependency>
dependencies {
implementation 'org.apache.flink:flink-sql-connector-mysql_2.12:1.13.6'
}
如果使用其他版本的 Flink,需要将上述示例代码中的版本号相应替换。




