以下是用Scala连接分布式Redis集群、获取key并写入HDFS的完整示例:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import redis.clients.jedis.{HostAndPort, JedisCluster, JedisPoolConfig}
import org.apache.hadoop.fs.{FileSystem, Path}
import java.io.{BufferedWriter, OutputStreamWriter}
import java.util
object RedisToHDFS {
def main(args: Array[String]): Unit = {
// 1. 配置Redis集群节点
val redisNodes = new util.HashSet[HostAndPort]()
redisNodes.add(new HostAndPort("redis-node1", 6379))
redisNodes.add(new HostAndPort("redis-node2", 6379))
redisNodes.add(new HostAndPort("redis-node3", 6379))
// 2. 配置连接池
val poolConfig = new JedisPoolConfig()
poolConfig.setMaxTotal(100)
poolConfig.setMaxIdle(50)
poolConfig.setMinIdle(10)
// 3. 创建Redis集群客户端
var jedisCluster: JedisCluster = null
try {
jedisCluster = new JedisCluster(redisNodes, poolConfig)
// 4. 获取所有key(注意:生产环境慎用keys *,建议使用scan迭代)
val keys = jedisCluster.keys("*")
// 5. 创建SparkSession(用于HDFS操作)
val spark = SparkSession.builder()
.appName("RedisToHDFS")
.config(new SparkConf())
.getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
// 6. 将keys转换为RDD并写入HDFS
val rdd = sc.parallelize(keys.toArray().map(_.toString).toSeq)
// 7. 定义HDFS输出路径
val outputPath = "hdfs://namenode:9000/output/redis_keys"
// 8. 保存到HDFS(文本格式)
rdd.saveAsTextFile(outputPath)
println(s"成功将${rdd.count()}个key写入HDFS路径: $outputPath")
} catch {
case e: Exception =>
e.printStackTrace()
println(s"操作失败: ${e.getMessage}")
} finally {
if (jedisCluster != null) {
try {
jedisCluster.close()
} catch {
case e: Exception =>
println("关闭Redis连接时发生错误: " + e.getMessage)
}
}
}
}
}
如果需要更高级的功能,这里是一个增强版本(使用SCAN迭代避免阻塞):
// ...(保持上述导入和初始配置不变)
// 使用SCAN命令安全迭代所有key
def scanAllKeys(jedisCluster: JedisCluster): Set[String] = {
var cursor = "0"
val result = new util.HashSet[String]()
do {
val scanResult = jedisCluster.scan(cursor)
cursor = scanResult.getCursor
result.addAll(scanResult.getResult)
} while (cursor != "0")
import scala.collection.JavaConverters._
result.asScala.toSet
}
// 在主函数中使用:
val allKeys = scanAllKeys(jedisCluster)
val rdd = sc.parallelize(allKeys.toSeq)
// (后续写入操作相同)
重要注意事项:
- 依赖配置(build.sbt):
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.3.0",
"org.apache.spark" %% "spark-sql" % "3.3.0",
"redis.clients" % "jedis" % "4.3.0"
)
- 生产环境建议:
- Redis密码认证:在JedisCluster构造函数中添加
DefaultJedisClientConfig.builder().password("your_password").build()
- HDFS权限:确保应用程序有写入目标路径的权限
- Keys操作:生产环境避免使用
KEYS *
命令,推荐使用SCAN迭代 - 错误处理:添加重试机制和更完善的异常处理
- 运行方式:
spark-submit --class RedisToHDFS \
--master yarn \
your-jar-file.jar
需要根据您的具体Redis集群配置和Hadoop环境调整连接参数和路径。
内容由零声教学AI助手提供,问题来源于学员提问