- 准备工作
首先需要在本地安装好Hadoop和Hive,确保可以连接到源数据库和目标数据库。
- 配置文件
创建一个配置文件,存储源数据库和目标数据库的连接信息、表名等信息。这个配置文件可以使用properties或者xml格式。
- 读取源数据库数据
使用Hive JDBC驱动获取源数据库中的数据,并将其保存到Java对象中。这里可以使用ResultSet来读取查询结果。
- 写入目标数据库
将读取到的数据写入到目标数据库中。同样使用Hive JDBC驱动,执行插入操作。
- 定时任务实现自动同步
使用定时任务框架(比如Quartz)实现周期性地从源数据库同步数据到目标数据库。
代码示例:
- 配置文件示例(database.properties)
# 源数据库配置
source.driverClassName=org.apache.hive.jdbc.HiveDriver
source.url=jdbc:hive2://localhost:10000/source_database
source.username=hive
source.password=hive123
# 目标数据库配置
target.driverClassName=org.apache.hive.jdbc.HiveDriver
target.url=jdbc:hive2://localhost:10000/target_database
target.username=hive
target.password=hive123
# 数据表名
table.name=user_info
- Java代码示例
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
public class SyncData {
private Properties props;
public SyncData(Properties props) {
this.props = props;
}
public void sync() throws SQLException {
// 获取源数据库连接
Connection sourceConn = DriverManager.getConnection(props.getProperty("source.url"), props.getProperty("source.username"), props.getProperty("source.password"));
// 查询源数据库数据
String sql = "select * from " + props.getProperty("table.name");
PreparedStatement pstmt = sourceConn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery();
// 获取目标数据库连接
Connection targetConn = DriverManager.getConnection(props.getProperty("target.url"), props.getProperty("target.username"), props.getProperty("target.password"));
// 将查询结果写入到目标数据库中
while (rs.next()) {
String insertSql = "insert into " + props.getProperty("table.name") + " values (?, ?, ?)";
PreparedStatement insertPstmt = targetConn.prepareStatement(insertSql);
insertPstmt.setInt(1, rs.getInt(1));
insertPstmt.setString(2, rs.getString(2));
insertPstmt.setString(3, rs.getString(3));
insertPstmt.executeUpdate();
insertPstmt.close();
}
// 关闭资源
rs.close();
pstmt.close();
sourceConn.close();
}
}
- 定时任务示例(使用Quartz实现)
import java.io.IOException;
import java.util.Properties;
import org.quartz.CronScheduleBuilder;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
public class SyncJob implements Job {
private Properties props;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
try {
SyncData syncData = new SyncData(props);
syncData.sync();
} catch (SQLException e) {
e.printStackTrace();
}
}
public void setProps(Properties props) {
this.props = props;
}
public static void main(String[] args) throws SchedulerException, IOException {
// 读取配置文件
Properties props = new Properties();
props.load(SyncJob.class.getResourceAsStream("database.properties"));
// 创建任务
JobDetail jobDetail = JobBuilder.newJob(SyncJob.class).withIdentity("syncJob", "group1").build();
((SyncJob)jobDetail.getJobDataMap().get("job")).setProps(props);
// 创建触发器
Trigger trigger = TriggerBuilder.newTrigger().withIdentity("trigger1", "group1")
.withSchedule(CronScheduleBuilder.cronSchedule("0/5 * * * * ?")) // 每隔5秒执行一次
.build();
// 启动定时任务调度器
Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler();
scheduler.scheduleJob(jobDetail, trigger);
scheduler.start();
}
}