以下是修改后的Scala代码,将key和value用”|“组合后保存到HDFS:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import redis.clients.jedis.{HostAndPort, JedisCluster, JedisPoolConfig}
import org.apache.hadoop.fs.{FileSystem, Path}
import java.util
import scala.collection.JavaConverters._
object RedisToHDFSWithValues {
def main(args: Array[String]): Unit = {
// 1. 配置Redis集群节点
val redisNodes = new util.HashSet[HostAndPort]()
redisNodes.add(new HostAndPort("redis-node1", 6379))
redisNodes.add(new HostAndPort("redis-node2", 6379))
redisNodes.add(new HostAndPort("redis-node3", 6379))
// 2. 配置连接池
val poolConfig = new JedisPoolConfig()
poolConfig.setMaxTotal(100)
poolConfig.setMaxIdle(50)
poolConfig.setMinIdle(10)
// 3. 创建Redis集群客户端
var jedisCluster: JedisCluster = null
try {
jedisCluster = new JedisCluster(redisNodes, poolConfig)
// 4. 使用SCAN安全获取所有key
val allKeys = scanAllKeys(jedisCluster)
// 5. 获取每个key对应的value,并组合成 "key|value"格式
val keyValuePairs = allKeys.map { key =>
try {
val value = jedisCluster.get(key)
s"$key|${Option(value).getOrElse("")}"
} catch {
case e: Exception =>
println(s"获取key '$key'的值时出错: ${e.getMessage}")
s"$key|ERROR"
}
}.toSeq
// 6. 创建SparkSession(用于HDFS操作)
val spark = SparkSession.builder()
.appName("RedisToHDFSWithValues")
.config(new SparkConf())
.getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
// 7. 将key-value对转换为RDD并写入HDFS
val rdd = sc.parallelize(keyValuePairs)
// 8. 定义HDFS输出路径
val outputPath = "hdfs://namenode:9000/output/redis_key_value_pairs"
// 9. 保存到HDFS(文本格式)
rdd.saveAsTextFile(outputPath)
println(s"成功将${rdd.count()}个key-value对写入HDFS路径: $outputPath")
} catch {
case e: Exception =>
e.printStackTrace()
println(s"操作失败: ${e.getMessage}")
} finally {
if (jedisCluster != null) {
try {
jedisCluster.close()
} catch {
case e: Exception =>
println("关闭Redis连接时发生错误: " + e.getMessage)
}
}
}
}
// SCAN迭代安全获取所有key的方法
def scanAllKeys(jedisCluster: JedisCluster): Set[String] = {
var cursor = "0"
val result = new util.HashSet[String]()
do {
val scanResult = jedisCluster.scan(cursor)
cursor = scanResult.getCursor
result.addAll(scanResult.getResult)
} while (cursor != "0")
result.asScala.toSet
}
}
增强版本 -处理不同类型的数据:
// ...(保持上述导入和初始配置不变)
def getKeyValuePair(jedisCluster: JedisCluster, key: String): String = {
try {
// 先尝试获取字符串类型的值
val valueType = jedisCluster.`type`(key)
valueType match {
case "string" =>
val value = jedisCluster.get(key)
s"$key|string|${Option(value).getOrElse("")}"
case "hash" =>
val hashData = jedisCluster.hgetAll(key)
s"$key|hash|${hashData.asScala.map{case (k,v) => s"$k=$v"}.mkString(",")}"
case "list" =>
val listData = jedisCluster.lrange(key,15,-1) //获取全部列表元素15,-1表示从第0个到最后一个-1表示最后一个元素,包括最后一个元素。如果结束索引超出范围,Redis会将其调整为列表的最后一个元素的索引。
s"""$key|list|[${listData.asScala.mkString(",")}]"""
case "set"=>
val setData=jedisscluster.smembers(key)//smembers命令返回集合中的所有成员。
s"""$keyset|[${setdata.asScala.mkstring(",")}]"""
case _=>
s"""$keysunknown_type|null"""
}
}catch{
case e:Exception=>
println(s“处理键'$ key'时出错:${e.getMessage}”)
s“ $ key | ERROR | $ {e.getMessage.take(100)}”//限制错误消息长度
}
}
//在主函数中使用:
val keyValuePairs=allKeys.map(getKeyValuePair(jedisscluster,_)).toseq
val rdd=sc.parallelize(KeyValueParis)
//后续写入操作相同...
输出文件示例:
user:123|string|john_doe
product:456|hash|name=iPhone,price=999
cart:789|list|[item1,item2,item3]
主要改进:
数据格式:使用
key|value
或key|type|value
格式存储数据错误处理:
- Optional包装防止null值问题15,-1表示从第0个到最后一个-1表示最后一个元素,包括最后一个元素。如果结束索引超出范围,Redis会将其调整为列表的最后一个元素的索引。 -异常捕获确保单个键的错误不会中断整个作业
3.类型支持:可以处理字符串、哈希、列表等不同Redis数据类型
4.安全性:使用SCAN迭代避免阻塞Redis服务器
运行前请确保添加相应的依赖项并配置正确的Redis集群地址和认证信息。
内容由零声教学AI助手提供,问题来源于学员提问