2016-03-20 27 views
5

miei dataframes contiene un campo che è una data e appare nel formato stringa, come esempioPySpark: il filtraggio di un dataframe dal campo data nell'intervallo dove data è stringa

'2015-07-02T11:22:21.050Z' 

devo filtrata del dataframe sulla la data per ottenere solo i record nell'ultima settimana. Così, ho cercato un approccio mappa dove ho trasformato le date delle stringhe agli oggetti datetime con strptime:

def map_to_datetime(row): 
    format_string = '%Y-%m-%dT%H:%M:%S.%fZ' 
    row.date = datetime.strptime(row.date, format_string) 

df = df.map(map_to_datetime) 

e poi vorrei applicare un filtro come

df.filter(lambda row: 
    row.date >= (datetime.today() - timedelta(days=7))) 

riesco a ottenere il lavoro di mappatura ma il filtro non riesce con

TypeError: condition should be string or Column

c'è un modo per utilizzare un filtraggio in un modo che funziona o dovrei cambiare l'approccio e come?

risposta

5

È possibile risolvere questo senza l'utilizzo di codice Python lato dei lavoratori e il passaggio a RDDs. Prima di tutto, dal momento che si utilizza ISO 8601 stringa, i dati possono essere direttamente colate di data o timestamp:

from pyspark.sql.functions import col 

df = sc.parallelize([ 
    ('2015-07-02T11:22:21.050Z',), 
    ('2016-03-20T21:00:00.000Z',) 
]).toDF(("d_str",)) 

df_casted = df.select("*", 
    col("d_str").cast("date").alias("dt"), 
    col("d_str").cast("timestamp").alias("ts")) 

Questo vi farà risparmiare un andata e ritorno tra la JVM e Python. Ci sono anche alcuni modi per avvicinarti alla seconda parte. Solo data:

from pyspark.sql.functions import current_date, datediff, unix_timestamp 

df_casted.where(datediff(current_date(), col("dt")) < 7) 

timestamp:

def days(i: int) -> int: 
    return 60 * 60 * 24 * i 

df_casted.where(unix_timestamp() - col("ts").cast("long") < days(7)) 

Si può anche dare un'occhiata a current_timestamp e date_sub

Nota: vorrei evitare di usare DataFrame.map. È preferibile utilizzare DataFrame.rdd.map. Ti farà risparmiare un po 'di lavoro quando passerai a 2.0+

5

ho trovato un modo per risolvere il mio problema utilizzando l'API SparkSQL con date tenuti come stringhe e fare questo:

last_week = (datetime.today() - timedelta(days=7)).strftime(format='%Y-%m-%d') 

new_df = df.where(df.date >= last_week)