val df = sc.parallelize( Seq( ("a","b","c"), ("a","d","e") ) ).toDF("id","x","y") df.join(df.select($"parent_id".as("id"), $"id".as("child_id"), $"y".as("other_y")) ,Seq("id"),"inner") import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions._ import org.apache.spark.sql.types._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.control.Exception._ import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.lib.input.TextInputFormat def readDelim (path:String, head:Seq[String], delim:String = ",") = { val conf = new Configuration conf.set("textinputformat.record.delimiter", "\n") val rdd = sc. newAPIHadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf). map(_._2.toString.replaceAll("[\\x7F-\\xFF\\r\\n]","") ). map(x=>x.split(delim).map(x=>x.trim()).map(x=>if(x.length==0) null else if(x.equalsIgnoreCase("null")) null else x) ). map(_.padTo(head.length,null)).map( Row.fromSeq(_) ) val schema = StructType( head.map(StructField(_,DataTypes.StringType, true) ) ) spark.sqlContext.createDataFrame(rdd,schema) } def readLocalFile(path:String) = { val br = new java.io.BufferedReader(new java.io.FileReader(path)) val sb = new StringBuffer(); var ln = br.readLine(); while(ln != null){ sb.append(ln); ln = br.readLine(); } sb.toString } val head = readLocalFile("/home/alex/datasets/UCI_Credit_Card.head") val arr_head = head.split(",") val ucidata = readDelim("/home/alex/datasets/UCI_Credit_Card.data",arr_head,",") def readFhead(path:String) = { val line = new java.io.BufferedReader(new java.io.FileReader("/home/alex/tmp/cts.fhead")).readLine() StructType(line.split(",").map(_.split("~")).map(x=> x(1) match { case "DATE" => StructField(x(0), DataTypes.DateType, true ) case "NUMBER" => if( x(4).toInt==0 && x(3).toInt < 10 ){ StructField(x(0), DataTypes.IntegerType, true ) } else if( x(4).toInt==0 ){ StructField(x(0), DataTypes.LongType, true ) }else { StructField(x(0), DataTypes.createDecimalType(x(3).toInt,x(4).toInt), true ) } case _ => StructField(x(0), DataTypes.StringType, true ) })) } def readDelim (path:String, fheadstr:String, delim:String = ",", dateformat:String = "yyyyMMdd", timestampformat:String = "yyyyMMdd HH:mm:ss.SSS") = { val fhead = fheadstr.split(",").map(_.split("~")).map(x=>(x(0),x(1),x(2).toInt,x(3).toInt,x(4).toInt) ) val conf = new Configuration conf.set("textinputformat.record.delimiter", "\n") val rdd = sc. newAPIHadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf). map(_._2.toString.replaceAll("[\\x7F-\\xFF\\r\\n]","") ). map(x=>x.split(delim).map(x=>x.trim()).map(x=>if(x.length==0) null else if(x.equalsIgnoreCase("null")) null else x) ). map(_.padTo(fhead.length,null)).map( Row.fromSeq(_) ) val schema = StructType( fhead.map(x=>StructField(x._1,DataTypes.StringType, true) ) ) var blah = spark.sqlContext.createDataFrame(rdd,schema) blah = fhead.filter(_._2.equals("NUMBER")).filter(_._5 == 0).filter(_._4 < 10). foldLeft(blah)( (a,b) => a.withColumn(b._1,col(b._1).cast(IntegerType))) blah = fhead.filter(_._2.equals("NUMBER")).filter(_._5 == 0).filter(_._4 >= 10). foldLeft(blah)( (a,b) => a.withColumn(b._1,col(b._1).cast(LongType))) blah = fhead.filter(_._2.equals("NUMBER")).filter(_._5 > 0). foldLeft(blah)( (a,b) => a.withColumn(b._1,col(b._1).cast(DataTypes.createDecimalType(b._4,b._5)))) blah = fhead.filter(_._2.equals("NUMBER")).filter(_._5 > 0). foldLeft(blah)( (a,b) => a.withColumn(b._1,col(b._1).cast(DataTypes.createDecimalType(b._4,b._5)))) val dateFormat = new java.text.SimpleDateFormat(dateformat) val str2date_udf = udf((field:String) => { (allCatch opt new java.sql.Date(dateFormat.parse(field).getTime)) }) blah = fhead.filter(_._2.equals("DATE")).foldLeft(blah)( (a,b) => a.withColumn(b._1, str2date_udf( col(b._1) ) )) val timestampFormat = new java.text.SimpleDateFormat(timestampformat) val str2timestamp_udf = udf((field:String) => { (allCatch opt new java.sql.Timestamp(dateFormat.parse(field).getTime)) }) blah = fhead.filter(_._2.equals("TIMESTAMP")).foldLeft(blah)( (a,b) => a.withColumn(b._1, str2timestamp_udf( col(b._1) ) )) blah } val cts = readDelim("/home/alex/tmp/cts.csv.gz",readLocalFile("/home/alex/tmp/cts.fhead")); val cts = readDelim("/home/alex/datasets/ctsdata/cts.dump.csv",readLocalFile("/home/alex/datasets/ctsdata/cts.fhead")); cts.write.mode("overwrite").format("parquet").save("/home/alex/datasets/ctsparquet") import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.expressions._ import org.apache.spark.sql.types._ import scala.collection.mutable.{ArrayBuffer, HashMap} class OuterUnionDataset(a:Dataset[Row]) { def outerUnion(b:Dataset[Row]) = { val ucols = (a.schema.fields.map( x=> (x.name toUpperCase, x.dataType ) ).toSet union b.schema.fields.map(x=>(x.name toUpperCase, x.dataType)).toSet ).toMap val (acols,bcols) = (a.columns.map( _ toUpperCase ), b.columns.map( _ toUpperCase )) val cmn = acols.toSet.intersect(bcols.toSet).toSeq val (onlya,onlyb) = (acols.filter(!cmn.contains(_)), bcols.filter(!cmn.contains(_))) var (a1,b1) = (a,b) for(x <- onlya){ b1 = b1.withColumn(x,lit(null).cast(ucols(x)));} for(x <- onlyb){ a1 = a1.withColumn(x,lit(null).cast(ucols(x)));} val ocols = (acols ++ onlyb).map(col) a1.select(ocols:_*).union(b1.select(ocols:_*)) } } implicit def toOuterUnionDataset(a:Dataset[Row]) = new OuterUnionDataset(a) val df1 = sc.parallelize( Seq( ("df1a","b","c"), ("df1a","d","e") ) ).toDF("id","x","y") val df2 = sc.parallelize( Seq( ("df2a","b","c","df"), ("df2a","d","e","df") ) ).toDF("y","id","z","q") ---quantization // copied from C++; in def lower_bound(a:Seq[String], x:String) = { var l = 0; var h = a.length; while (l < h){ val m = (l+h)/2; if(x <= a(m)){ h = m; } else { l = m + 1; } } l; } def create_quantizer(df:DataFrame, in:String, range:Int = 200) = { val reccnt = df.count val sampratio = (range*1000.0)/reccnt; val samp = df.select(col(in)).sample(sampratio).cache val stride = samp.count / (range-1) val prcnts = samp.withColumn("rn",row_number().over(Window.orderBy(col(in)))).filter(($"rn" % stride)===0) prcnts.select(col(in)).collect.map(_.getString(0)).sortWith(_ < _) } def quantize(df:DataFrame, in:String, out:String, quantizer:Seq[String]) = { val quant_udf = udf( (x:String) => lower_bound(quantizer,x) ) df.withColumn(out,quant_udf(col(in))) } val cts = spark.read.format("parquet").load("/home/alex/datasets/ctsparquet") val q = create_quantizer(cts.select(concat($"symbol",$"tdate").as("c")),"c",100) val ctsq = quantize(cts.withColumn("tmp",concat($"symbol",$"tdate")), "tmp","q", q).drop("tmp") val cts = spark.read.format("parquet").load("/home/alex/datasets/ctsparquet") ctsq. repartition($"q"). // splits data on bucketid sortWithinPartitions($"symbol",$"tdate"). write. // write out partitionBy("q"). format("parquet"). mode(SaveMode.Overwrite).save("/home/alex/datasets/ctsq") // simple usage scenario val cts_ibm = spark.read.format("parquet").load("/home/alex/datasets/ctsq/q="+lower_bound(q,"IBM")) val cts_all = spark.read.format("parquet").load("/home/alex/datasets/ctsq") -------------------------------------------------------------------------- z-order: allows to do quantizer on more than 1 column. def bitpad(n1:Long, b:Int, s:Int) = { var (o,i,n) = (0L, 0, n1); while(n > 0){ o += (n&1) << i; n /= 2; i += b; } o << s } // append z-order column. def addzorder(df:DataFrame, in:Seq[String], out:String) = { val a = df.sample(0.1).agg( lit(1), in.map(x=>collect_set(col(x)).as(x)):_* ) val d = in.map(x => a.select(explode(col(x)).as(x)).filter(col(x).isNotNull). repartition(1).orderBy(col(x)).collect.map(_.get(0).toString)).zipWithIndex val h = d.map { case (x,i) => x.zipWithIndex.map{ case(y,z) => (y, bitpad(z,in.size,i)) }.toMap } val u = h.map(x => udf( (q:String) => x.getOrElse(if(q==null) "" else q,0L) )); val c = in.zip(u).map{ case (c,u) => u(col(c)) }.reduce( (x,y) => x.bitwiseOR(y) ); df.withColumn(out,c) } -- after running addzorder, do quantizer on z-order column. e.g.: val cts = spark.read.format("parquet").load("/home/alex/datasets/ctsparquet") val q0 = addzorder(cts, Seq("tdate","symbol"), "z") val q = create_quantizer(q0.select($"z")),"z",100) val ctsq = quantize(q0, "z","q", q) ctsq. repartition($"q"). // splits data on bucketid sortWithinPartitions($"z",$"tdate",$"symbol"). write. // write out partitionBy("q"). format("parquet"). mode(SaveMode.Overwrite).save("/home/alex/datasets/ctsq") ---sometimes you need to do a join of BIG table against a "smaller" table. tbl1.join(sml_tbl,Seq("id"),"inner") ---2 scenarios: if sml_tbl is "small" (10-100mb), then spark will broadcast sml_tbl, and do a "map" side join against tbl1. (fast, since only sml_tbl gets moved). ---otherwise, tbl1 AND sml_tbl gets redistributed. (slowest) --- lets say "tbl1 has 100B" (very very big). --- sml_tbl has 100M (very big, but not "that" big). Bloom Filers; https://en.wikipedia.org/wiki/Bloom_filter // just a tiny table wiht small fraction of original symbols val tiny_table = cts.filter($"symbol" isin ("IBM","AAPL","MSFT")) val blm = tiny_table.stat.bloomFilter($"symbol", 10000, 0.02) // build a "rule" that keeps only records that "might" show up in tiny table. val might_contain_udf = udf((x: String) => blm.mightContain(x)) cts.filter( might_contain_udf($"symbol") ).groupBy($"symbol").count.orderBy($"count".desc).show(100,false)