以下是一个简单的Scala Spark程序,用于从AFS读取日志文件,并将每一行按照制表符分隔(user_id和question_id),然后批量写入MySQL表的相应字段。
import org.apache.spark.sql.{SparkSession, Row}
import java.sql.{Connection, DriverManager}
object LogProcessingApp {
def main(args: Array[String]): Unit = {
// 创建SparkSession
val spark = SparkSession.builder()
.appName("Log Processing App")
.getOrCreate()
// 从AFS读取log文件
val logData = spark.read.textFile("afs://path/to/logfile.log")
// 将log文件每一行按制表符切分,并转化为Row对象
val rows = logData.rdd.map(line => {
val fields = line.split("\t")
Row(fields(0), fields(1))
})
// 创建DataFrame
val schema = List("user_id", "question_id")
val df = spark.createDataFrame(rows, schema)
// 将数据写入MySQL表中
df.write.mode("append").format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/db_name")
.option("dbtable", "table_name")
.option("user", "username")
.option("password", "password")
.save()
// 停止SparkSession
spark.stop()
}
}
请根据实际情况修改afs://path/to/logfile.log为你的AFS路径,以及替换MySQL相关信息(URL、数据库名、用户名和密码)和目标表名。
注意:在运行该程序之前,请确保你已经正确配置了Spark和MySQL的环境。
内容由零声教学AI助手提供,问题来源于学员提问




