Sembra un problema di disallineamento dello schema qui. Se si imposta lo schema come non annullabile e si crea il proprio dataframe con il valore Nessuno, Spark ti getterebbe ValueError: questo campo non è annullabile, ma ha ottenuto l'errore Nessuno.
[Pyspark]
from pyspark.sql.functions import * #udf, concat, col, lit, ltrim, rtrim
from pyspark.sql.types import *
schema = ArrayType(StructType([StructField('A', IntegerType(), nullable=False)]))
# It will throw "ValueError".
df = spark.createDataFrame([[[None]],[[2]]],schema=schema)
df.show()
Ma non è il caso se si utilizza UDF.
Utilizzando lo stesso schema, se si utilizza udf per la trasformazione, non si genera ValueError anche se il proprio udf restituisce un None. Ed è il luogo in cui avviene la mancata corrispondenza dello schema di dati.
Ad esempio:
df = spark.createDataFrame([[[1]],[[2]]], schema=schema)
def throw_none():
def _throw_none(x):
if x[0][0] == 1:
return [['I AM ONE']]
else:
return x
return udf(_throw_none, schema)
# since value col only accept intergerType, it will throw null for
# string "I AM ONE" in the first row. But spark did not throw ValueError
# error this time ! This is where data schema type mismatch happen !
df = df.select(throw_none()(col("value")).name('value'))
df.show()
Poi, la seguente scrittura parquet e leggere vi proietterà l'errore parquet.io.ParquetDecodingException.
df.write.parquet("tmp")
spark.read.parquet("tmp").collect()
Quindi state molto attenti al valore nullo se si utilizza UDF, restituire il tipo di dati a destra nella vostra UDF. E a meno che non sia necessario, non impostare nullable = False nel tuo StructField.Set nullable = True risolverà tutto il problema.
È possibile stampare lo schema dati per tale tabella? –