import org.apache.spark.sql.types._ val quotes = spark.read.textFile("/user/alex/hwquotes/") val findvaludf = udf( (x:String,arr:Array[String]) => { val o = arr.map(_.split("=")). filter(_.length==2).filter(_(0)==x); if(o.length>0) o(0)(1) else null } ) val q1 = quotes.select( split($"value","\\|").as("arr") ). withColumn("symbol", $"arr"(8)). withColumn("tdate", findvaludf(lit("11"), $"arr").cast(DateType) ). withColumn("bid", findvaludf(lit("0"), $"arr") ). withColumn("bidsiz", findvaludf(lit("1"), $"arr") ). withColumn("bidtim", findvaludf(lit("3"), $"arr") ). withColumn("ask", findvaludf(lit("5"), $"arr") ). withColumn("asksiz", findvaludf(lit("6"), $"arr") ). withColumn("asktim", findvaludf(lit("8"), $"arr") ). drop("arr") val q2 = Seq("bid","ask").foldLeft( q1 ) ( (a,b) => a.withColumn(b,col(b).cast(DoubleType) ) ) val q3 = Seq("bidsiz","asksiz").foldLeft( q2 ) ( (a,b) => a.withColumn(b,col(b).cast(IntegerType) ) ) val q4 = Seq("bidtim","asktim").foldLeft( q3 ) ( (a,b) => a.withColumn(b,concat($"tdate",lit(" "), col(b)).cast(TimestampType ) ) )