ZBLOG

flink 13.6 tableEnvironment .connect连接 mysql java代码

下面是使用 Flink 1.13.6 TableEnvironment 连接 MySQL 的 Java 代码示例:

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;

import java.util.Properties;

public class Main {
    public static void main(String[] args) throws Exception {
        // 创建 StreamExecutionEnvironment 和 StreamTableEnvironment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build());

        // 创建 JdbcConnectionOptions
        String url = "jdbc:mysql://localhost:3306/test";
        String username = "root";
        String password = "123456";
        Properties properties = new Properties();
        properties.setProperty("user", username);
        properties.setProperty("password", password);
        JdbcConnectionOptions jdbcConnectionOptions = JdbcConnectionOptions.builder()
                .withUrl(url)
                .withDriverName("com.mysql.cj.jdbc.Driver")
                .withUsername(username)
                .withPassword(password)
                .withProperties(properties)
                .build();

        // 注册 MySQL 表
        String tableName = "mysql_table";
        tableEnv.executeSql(String.format("CREATE TABLE %s (\n" +
                "  id BIGINT,\n" +
                "  name STRING,\n" +
                "  PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'jdbc',\n" +
                "  'url' = '%s',\n" +
                "  'table-name' = '%s',\n" +
                "  'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "  'username' = '%s',\n" +
                "  'password' = '%s'\n" +
                ")", tableName, url, tableName, username, password));

        // 插入数据到 MySQL 表
        tableEnv.toAppendStream(tableEnv.fromValues(
                Row.of(1L, "Alice"),
                Row.of(2L, "Bob")
        ), Row.class)
                .addSink(JdbcSink.sink(
                        "INSERT INTO " + tableName + " (id, name) VALUES (?, ?)",
                        (ps, r) -> {
                            ps.setLong(1, (Long) r.getField(0));
                            ps.setString(2, (String) r.getField(1));
                        },
                        jdbcConnectionOptions))
                .name("JDBC Sink");

        // 执行任务
        env.execute();
    }
}

在上面的代码示例中,我们使用了 Flink JDBC Connector 将 Table 中的数据插入到 MySQL 表中。首先,我们创建了一个 JdbcConnectionOptions 对象来配置连接 MySQL 数据库的参数。然后,我们使用 CREATE TABLE 命令在 TableEnvironment 中注册了一个名为 mysql_table 的 MySQL 表,并指定表的连接器类型、URL、用户名和密码等信息。最后,我们使用 toAppendStream 方法将数据插入到 MySQL 表中,并使用 JdbcSink 将数据写入到 MySQL 表中。

需要注意的是,在执行上述代码之前,需要先将 flink-sql-connector-mysql 依赖添加到项目中。可以在 Maven 或 Gradle 中添加以下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-sql-connector-mysql</artifactId>
    <version>1.13.6</version>
</dependency>
dependencies {
    implementation 'org.apache.flink:flink-sql-connector-mysql_2.12:1.13.6'
}

如果使用其他版本的 Flink,需要将上述示例代码中的版本号相应替换。

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://golang.0voice.com/?id=1103

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?