6

Sto provando a utilizzare Spark 1.4 window functions in pyspark 1.4.1Perché le funzioni Window non riescono con "La funzione Window X non accetta una specifica frame"?

ma si verificano principalmente errori o risultati imprevisti. Ecco un esempio molto semplice che penso dovrebbe funzionare:

from pyspark.sql.window import Window 
import pyspark.sql.functions as func 

l = [(1,101),(2,202),(3,303),(4,404),(5,505)] 
df = sqlContext.createDataFrame(l,["a","b"]) 

wSpec = Window.orderBy(df.a).rowsBetween(-1,1) 

df.select(df.a, func.rank().over(wSpec).alias("rank")) 
    ==> Failure org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification. 

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")) 
    ===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.; 


wSpec = Window.orderBy(df.a) 

df.select(df.a, func.rank().over(wSpec).alias("rank")) 
    ===> org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException: One or more arguments are expected. 

df.select(df.a, func.lag(df.b,1).over(wSpec).alias("prev"), df.b, func.lead(df.b,1).over(wSpec).alias("next")).collect() 

    [Row(a=1, prev=None, b=101, next=None), Row(a=2, prev=None, b=202, next=None), Row(a=3, prev=None, b=303, next=None)] 

Come si può vedere, se aggiungo rowsBetween specifica struttura, né rank()lag/lead() funzioni finestra riconoscerlo: "Funzione finestra non prende una specifica cornice ".

Se ometto le specifiche del frame rowsBetween al leas lag/lead() non generare eccezioni ma restituire un risultato imprevisto (per me): sempre None. E lo rank() continua a non funzionare con diverse eccezioni.

Qualcuno può aiutarmi a ottenere le mie funzioni finestra giusto?

UPDATE

Va bene, che inizia a guardare come un bug pyspark. ho preparato lo stesso test in puro Spark (Scala, scintilla-shell):

import sqlContext.implicits._ 
import org.apache.spark.sql._ 
import org.apache.spark.sql.types._ 

val l: List[Tuple2[Int,Int]] = List((1,101),(2,202),(3,303),(4,404),(5,505)) 
val rdd = sc.parallelize(l).map(i => Row(i._1,i._2)) 
val schemaString = "a b" 
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, IntegerType, true))) 
val df = sqlContext.createDataFrame(rdd, schema) 

import org.apache.spark.sql.expressions.Window 
import org.apache.spark.sql.functions._ 

val wSpec = Window.orderBy("a").rowsBetween(-1,1) 
df.select(df("a"), rank().over(wSpec).alias("rank")) 
    ==> org.apache.spark.sql.AnalysisException: Window function rank does not take a frame specification.; 

df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next")) 
    ===> org.apache.spark.sql.AnalysisException: Window function lag does not take a frame specification.; 


val wSpec = Window.orderBy("a") 
df.select(df("a"), rank().over(wSpec).alias("rank")).collect() 
    ====> res10: Array[org.apache.spark.sql.Row] = Array([1,1], [2,2], [3,3], [4,4], [5,5]) 

df.select(df("a"), lag(df("b"),1).over(wSpec).alias("prev"), df("b"), lead(df("b"),1).over(wSpec).alias("next")) 
    ====> res12: Array[org.apache.spark.sql.Row] = Array([1,null,101,202], [2,101,202,303], [3,202,303,404], [4,303,404,505], [5,404,505,null]) 

Anche se il rowsBetween non può essere applicato a Scala, sia rank() e lag()/lead() lavoro come mi aspetto quando rowsBetween è omesso.

risposta

3

Per quanto posso dire, ci sono due problemi diversi. La definizione del frame di Windows non è semplicemente supportata da Hive GenericUDAFRank, GenericUDAFLag e GenericUDAFLead, quindi gli errori visualizzati sono un comportamento previsto.

Per quanto riguarda problema con il seguente codice PySpark

wSpec = Window.orderBy(df.a) 
df.select(df.a, func.rank().over(wSpec).alias("rank")) 

sembra che è legato alla mia domanda https://stackoverflow.com/q/31948194/1560062 e deve essere affrontato da SPARK-9978. Finora ora è possibile farlo funzionare cambiando la definizione della finestra a questo:

wSpec = Window.partitionBy().orderBy(df.a)