2015-11-11 2 views
5

Sto usando Apache Spark nella mia applicazione Java. Ho due DataFrame s: df1 e df2. Lo df1 contiene s con email, firstName e lastName. df2 contiene s con email.Come implementare NOT IN per due DataFrames con struttura diversa in Apache Spark

Voglio creare un DataFrame: df3 che contiene tutte le righe df1, che e-mail non è presente in df2.

C'è un modo per farlo con Apache Spark? Ho cercato di creare JavaRDD<String> da df1 e df2 da loro toJavaRDD() colata e filtrando df1 a contenere tutte le email e dopo che l'utilizzo di subtract, ma non so come mappare la nuova JavaRDD per ds1 e ottenere un DataFrame.

Fondamentalmente ho bisogno di tutte le righe che sono in df1 la cui e-mail non è in df2.

DataFrame customers = sqlContext.cassandraSql("SELECT email, first_name, last_name FROM customer "); 

DataFrame customersWhoOrderedTheProduct = sqlContext.cassandraSql("SELECT email FROM customer_bought_product " + 
          "WHERE product_id = '" + productId + "'"); 

JavaRDD<String> customersBoughtEmail = customersWhoOrderedTheProduct.toJavaRDD().map(row -> row.getString(0)); 

List<String> notBoughtEmails = customers.javaRDD() 
         .map(row -> row.getString(0)) 
         .subtract(customersBoughtEmail).collect(); 

risposta

4

Spark 2.0.0+

È possibile utilizzare NOT IN direttamente.

Spark < 2.0.0

Può essere espresso utilizzando outer join e filtro.

val customers = sc.parallelize(Seq(
    ("[email protected]", "John", "Doe"), 
    ("[email protected]", "Jane", "Doe") 
)).toDF("email", "first_name", "last_name") 

val customersWhoOrderedTheProduct = sc.parallelize(Seq(
    Tuple1("[email protected]") 
)).toDF("email") 

val customersWhoHaventOrderedTheProduct = customers.join(
    customersWhoOrderedTheProduct.select($"email".alias("email_")), 
    $"email" === $"email_", "leftouter") 
.where($"email_".isNull).drop("email_") 

customersWhoHaventOrderedTheProduct.show 

// +----------------+----------+---------+ 
// |   email|first_name|last_name| 
// +----------------+----------+---------+ 
// |[email protected]|  John|  Doe| 
// +----------------+----------+---------+ 

Raw SQL equivalente:

customers.registerTempTable("customers") 
customersWhoOrderedTheProduct.registerTempTable(
    "customersWhoOrderedTheProduct") 

val query = """SELECT c.* FROM customers c LEFT OUTER JOIN 
       customersWhoOrderedTheProduct o 
       ON c.email = o.email 
       WHERE o.email IS NULL""" 

sqlContext.sql(query).show 

// +----------------+----------+---------+ 
// |   email|first_name|last_name| 
// +----------------+----------+---------+ 
// |[email protected]|  John|  Doe| 
// +----------------+----------+---------+ 
+2

Grazie. Il primo esempio ha funzionato per me. Questa è la versione Java 'DataFrame customersWhoHaventOrderedTheProduct = clienti .join (customersWhoOrderedTheProduct.select (customersWhoOrderedTheProduct.col (" email ")), customers.col (" email "). EqualTo (customersWhoOrderedTheProduct.col (" email "))," leftouter ") . where (customersWhoOrderedTheProduct.col (" email "). isNull()). drop (customersWhoOrderedTheProduct.col (" email ")); Ho provato l'equivalente SQL ma questo è accaduto' scala.MatchError: UUIDType (di classe org.apache.spark.sql.cassandra.types.UUIDType $) ' –

+0

Sono felice di poterti aiutare. – zero323

+0

Sto usando 'Cassandra' e ho un' UUID' come chiave primaria. Forse Scala non è in grado di eguagliare il tipo. –

2

l'ho fatto in python, oltre vi suggerisco di utilizzare interi come non chiavi stringhe.

from pyspark.sql.types import * 

samples = sc.parallelize([ 
    ("[email protected]", "Alberto", "Bonsanto"), ("[email protected]", "Miguel", "Bonsanto"), 
    ("[email protected]", "Stranger", "Weirdo"), ("[email protected]", "Dakota", "Bonsanto") 
]) 

keys = sc.parallelize(
    [("[email protected]",), ("[email protected]",), ("[email protected]",)] 
) 

complex_schema = StructType([ 
    StructField("email", StringType(), True), 
    StructField("first_name", StringType(), True), 
    StructField("last_name", StringType(), True) 
]) 

simple_schema = StructType([ 
    StructField("email", StringType(), True) 
]) 

df1 = sqlContext.createDataFrame(samples, complex_schema) 
df2 = sqlContext.createDataFrame(keys, simple_schema) 

df1.show() 
df2.show() 

df3 = df1.join(df2, df1.email == df2.email, "left_outer").where(df2.email.isNull()).show() 
+0

Grazie. Sto usando 'Cassandra' quindi molte mie chiavi primarie contengono un' UUID'. –