Spark DataFrame Operations
过滤掉指定列为空值占位符的行
忽略大小写
// df is DataFrame instance
val NULL_VALUES = Set("-", "nan", "null", "none", "_", "", "na")
val nullValues = NULL_VALUES.mkString("'","','", "'")
val dfWithoutNull = df.filter(s"lower($columnName) not in ($nullValues)")
将所有空值占位符全部转化为空字符串
val NULL_VALUES = Set("-", "nan", "null", "none", "_", "", "na")
val nullValues = NULL_VALUES.mkString("'", "','", "'")
val selectCondition = df.columns.map(x => s"(case when lower(trim($x)) in ($nullValues) then '' else trim($x) end) as $x").mkString(",")
val tableName = "table"
df.registerTempTable(tableName)
val dfWithZeroLengthStringAsNull = sqlContext.sql(s"select $selectCondition from $table")
Spark 1.6 递归读取给定目录下的所有csv文件
一开始使用databricks的spark-csv,load到的只能是目录下的所有文件,如果目录下有子目录,就会直接报错。现需要递归load给定目录下所有的文件(包括子目录下的文件):
private def createDataFrame(sqlContext: SQLContext,
dataPath: String,
delimiter: String
): DataFrame = {
val data: RDD[String] = getDataRDD(sqlContext.sparkContext, dataPath)
val splitRDD = data.map(_.split(delimiter))
generateDataFrame(sqlContext, splitRDD, length)
}
private def generateDataFrame(sqlContext: SQLContext,
splitRDD: RDD[Array[String]],
length: Int
): DataFrame = {
val fields = {
for (index <- 0 until length) yield {
StructField("C" + index, StringType, nullable = true)
}
}
val schema = new StructType(fields.toArray)
val rowRDD = splitRDD.map {
record =>
Row.fromSeq(record)
}
sqlContext.createDataFrame(rowRDD, schema)
}
private def getDataRDD(sparkContext: SparkContext,
dataPath: String
): RDD[String] = {
val allFiles = listFiles(dataPath)
val allRDDs = for (one <- allFiles) yield sparkContext.textFile(one)
sparkContext.union(allRDDs)
}
private def listFiles(dataPath: String): List[String] {
val conf = new Configuration()
val fs = FileSystem.get(conf)
val filePath = new Path(file)
require(fs.exists(filePath), s"Data directory [$file] does not exists.")
val allSubFiles = fs.listStatus(filePath)
val allDataFiles = new ListBuffer[String]
for (subFile <- allSubFiles) {
if (subFile.isDirectory)
allDataFiles ++= listFiles(subFile.getPath.toString)
else
allDataFiles += subFile.getPath.toString
}
allDataFiles.toList
}
思路很简单,就是先递归读取文件内容,存为rdd,然后将rdd转化为dataframe。
DataFrame cast column type
假设给定一个DataFrame(df),其中有一列(C1)存储的是省份信息(如上海等),将该列转化为double(理论上来说应该报错),使用如下方式:
df.withColumn("C1",dfWithoutNullTargetColumn("C1").cast(DoubleType))
但是结果是该列全部变为null,并没有抛出异常说类型转换失败。仅记录一下该问题。可以先检查该列的类型再做转换。