ZBLOG

将key 与value 当作组合到一起 使用| 一起保存

以下是修改后的Scala代码,将key和value用”|“组合后保存到HDFS:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import redis.clients.jedis.{HostAndPort, JedisCluster, JedisPoolConfig}
import org.apache.hadoop.fs.{FileSystem, Path}
import java.util
import scala.collection.JavaConverters._

object RedisToHDFSWithValues {
  def main(args: Array[String]): Unit = {
    // 1. 配置Redis集群节点
    val redisNodes = new util.HashSet[HostAndPort]()
    redisNodes.add(new HostAndPort("redis-node1", 6379))
    redisNodes.add(new HostAndPort("redis-node2", 6379))
    redisNodes.add(new HostAndPort("redis-node3", 6379))

    // 2. 配置连接池
    val poolConfig = new JedisPoolConfig()
    poolConfig.setMaxTotal(100)
    poolConfig.setMaxIdle(50)
    poolConfig.setMinIdle(10)

    // 3. 创建Redis集群客户端
    var jedisCluster: JedisCluster = null
    try {
      jedisCluster = new JedisCluster(redisNodes, poolConfig)
      
      // 4. 使用SCAN安全获取所有key
      val allKeys = scanAllKeys(jedisCluster)
      
      // 5. 获取每个key对应的value,并组合成 "key|value"格式
      val keyValuePairs = allKeys.map { key =>
        try {
          val value = jedisCluster.get(key)
          s"$key|${Option(value).getOrElse("")}"
        } catch {
          case e: Exception =>
            println(s"获取key '$key'的值时出错: ${e.getMessage}")
            s"$key|ERROR"
        }
      }.toSeq
      
      // 6. 创建SparkSession(用于HDFS操作)
      val spark = SparkSession.builder()
        .appName("RedisToHDFSWithValues")
        .config(new SparkConf())
        .getOrCreate()
      
      import spark.implicits._
      val sc = spark.sparkContext
      
      // 7. 将key-value对转换为RDD并写入HDFS
      val rdd = sc.parallelize(keyValuePairs)
      
      // 8. 定义HDFS输出路径
      val outputPath = "hdfs://namenode:9000/output/redis_key_value_pairs"
      
      // 9. 保存到HDFS(文本格式)
      rdd.saveAsTextFile(outputPath)
      
      println(s"成功将${rdd.count()}个key-value对写入HDFS路径: $outputPath")
      
    } catch {
      case e: Exception =>
        e.printStackTrace()
        println(s"操作失败: ${e.getMessage}")
    } finally {
      if (jedisCluster != null) {
        try {
          jedisCluster.close()
        } catch {
          case e: Exception => 
            println("关闭Redis连接时发生错误: " + e.getMessage)
        }
      }
    }
  }

  // SCAN迭代安全获取所有key的方法
  def scanAllKeys(jedisCluster: JedisCluster): Set[String] = {
    var cursor = "0"
    val result = new util.HashSet[String]()
    
    do {
      val scanResult = jedisCluster.scan(cursor)
      cursor = scanResult.getCursor
      result.addAll(scanResult.getResult)
    } while (cursor != "0")
    
    result.asScala.toSet
  }
}

增强版本 -处理不同类型的数据:

// ...(保持上述导入和初始配置不变)

def getKeyValuePair(jedisCluster: JedisCluster, key: String): String = {
  try {
    // 先尝试获取字符串类型的值
    val valueType = jedisCluster.`type`(key)
    
    valueType match {
      case "string" =>
        val value = jedisCluster.get(key)
        s"$key|string|${Option(value).getOrElse("")}"
        
      case "hash" =>
        val hashData = jedisCluster.hgetAll(key)
        s"$key|hash|${hashData.asScala.map{case (k,v) => s"$k=$v"}.mkString(",")}"
        
      case "list" =>
        val listData = jedisCluster.lrange(key,15,-1) //获取全部列表元素15,-1表示从第0个到最后一个-1表示最后一个元素,包括最后一个元素。如果结束索引超出范围,Redis会将其调整为列表的最后一个元素的索引。
         s"""$key|list|[${listData.asScala.mkString(",")}]"""
        
       case "set"=>  
         val setData=jedisscluster.smembers(key)//smembers命令返回集合中的所有成员。
         s"""$keyset|[${setdata.asScala.mkstring(",")}]"""
         
       case _=>
         s"""$keysunknown_type|null"""
     }
   }catch{
     case e:Exception=>
       println(s“处理键'$ key'时出错:${e.getMessage}”)
       s“ $ key | ERROR | $ {e.getMessage.take(100)}”//限制错误消息长度

   }

}

//在主函数中使用:
val keyValuePairs=allKeys.map(getKeyValuePair(jedisscluster,_)).toseq

val rdd=sc.parallelize(KeyValueParis)

//后续写入操作相同...

输出文件示例:

user:123|string|john_doe
product:456|hash|name=iPhone,price=999
cart:789|list|[item1,item2,item3]

主要改进:

  1. 数据格式:使用 key|valuekey|type|value格式存储数据

  2. 错误处理

    • Optional包装防止null值问题15,-1表示从第0个到最后一个-1表示最后一个元素,包括最后一个元素。如果结束索引超出范围,Redis会将其调整为列表的最后一个元素的索引。 -异常捕获确保单个键的错误不会中断整个作业

3.类型支持:可以处理字符串、哈希、列表等不同Redis数据类型

4.安全性:使用SCAN迭代避免阻塞Redis服务器

运行前请确保添加相应的依赖项并配置正确的Redis集群地址和认证信息。


内容由零声教学AI助手提供,问题来源于学员提问

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://golang.0voice.com/?id=24214

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?