// https://www.cplusplus.com/reference/algorithm/lower_bound/ // copied from C++; in def lower_bound(a:Seq[String], x:String) = { var l = 0; var h = a.length; while (l < h){ val m = (l+h)/2; if(x <= a(m)){ h = m; } else { l = m + 1; } } l; } // example on df of "customer" records. val df = ...customer records... // grab 1% of the data. val samp = df.select($"lastname").sample(0.01).cache // calculate stride (todo) val prcnts = samp.withColumn("rn",row_number().over(Window.orderBy($"lastname"))). filter($"rn" % stride) === 0) // grab a sorted sequence of last-names that represent bucket boundaries. val quantizer = prcnts.select($"lastname").collect.map(_.getString(0)).sortWith(_ < _) // define a "quantization" udf, that will return a bucket id for a given lastname. val quant_udf = udf( (x:String) => lower_bound(quantizer,x) ) // append "bucket id" to each record. val dfq = df.withColumn("q", quant_udf($"lastname") ) dfq. repartition($"q"). // splits data on buckets sortWithinPartitions($"lastname",$"custid"). write. // write out partitionBy("q"). format("parquet"). mode(SaveMode.Overwrite).save("/as/whatever") ---quantization // copied from C++; in def lower_bound(a:Seq[String], x:String) = { var l = 0; var h = a.length; while (l < h){ val m = (l+h)/2; if(x <= a(m)){ h = m; } else { l = m + 1; } } l; } def create_quantizer(df:DataFrame, in:String, range:Int = 200) = { val reccnt = df.count val sampratio = (range*1000.0)/reccnt; val samp = df.select(col(in)).sample(sampratio).cache val stride = samp.count / (range-1) val prcnts = samp.withColumn("rn",row_number().over(Window.orderBy(col(in)))).filter(($"rn" % stride)===0) prcnts.select(col(in)).collect.map(_.getString(0)).sortWith(_ < _) } def quantize(df:DataFrame, in:String, out:String, quantizer:Seq[String]) = { val quant_udf = udf( (x:String) => lower_bound(quantizer,x) ) df.withColumn(out,quant_udf(col(in))) } val cts = spark.read.format("parquet").load("/home/alex/datasets/ctsparquet") val q = create_quantizer(cts.select(concat($"symbol",$"tdate").as("c")),"c",100) val ctsq = quantize(cts.withColumn("tmp",concat($"symbol",$"tdate")), "tmp","q", q).drop("tmp") ctsq. repartition($"q"). // splits data on bucketid sortWithinPartitions($"symbol",$"tdate"). write. // write out partitionBy("q"). format("parquet"). mode(SaveMode.Overwrite).save("/home/alex/datasets/ctsq") // simple usage scenario val cts_ibm = spark.read.format("parquet").load("/home/alex/datasets/ctsq/q="+lower_bound(q,"IBM"))