2016-06-30 50 views
11

Sto cercando di ordinare un RDD in base al valore e, se più valori sono uguali, ho bisogno di questi valori per chiave lessicograficamente.Ordinamento di JavaPairRDD prima per valore e quindi con il tasto

codice:

JavaPairRDD <String,Long> rddToSort = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long >() { 

    @Override 
    public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception { 
     return new Tuple2 < String, Long > (t._1, t._2.count); 
    } 
}); 

Quello che ho fatto finora è questo, utilizzando takeOrdered e fornendo un CustomComperator, ma dal momento che takeOrdered non può gestire una grande quantità di dati, quando si esegue il codice si mantiene in uscita (si mangia un sacco di memoria che il sistema operativo non è in grado di gestire):

List < Tuple2 < String, Long >> rddSorted = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , String, Long >() { 

    @Override 
    public Tuple2 < String, Long > call(Tuple2 < String, MovieReview > t) throws Exception { 
     return new Tuple2 < String, Long > (t._1, t._2.count); 
    } 
}).takeOrdered(newTopMovies, MapLongValueComparator.VALUE_COMP); 

comperator:

static class MapLongValueComparator implements Comparator < Tuple2 < String, Long >> , Serializable { 
     private static final long serialVersionUID = 1L; 

     private static final MapLongValueComparator VALUE_COMP = new MapLongValueComparator(); 

     @Override 
     public int compare(Tuple2 < String, Long > o1, Tuple2 < String, Long > o2) { 
      if (o1._2.compareTo(o2._2) == 0) { 
       return o1._1.compareTo(o2._1); 
      } 
      return -o1._2.compareTo(o2._2); 
     } 
} 

ERRORE:

16/06/30 21:09:23 INFO scheduler.DAGScheduler: Job 18 failed: takeOrdered at MovieAnalyzer.java:708, took 418.149182 s 

Come vuoi ordinare questo RDD? Come prenderesti il ​​valore di considerazione TopKMovies e, in caso di chiavi di uguaglianza, lessicograficamente.

Grazie.

+0

può fornire la traccia dello stack (se ce n'è?). Perché hai detto che potrebbe essere il problema della memoria, ma il messaggio di errore non consente di vedere cosa è successo esattamente. – Serhiy

+0

@Serhiy Suppongo che si tratti di un problema di memoria poiché l'operazione takeOrdered richiede molto tempo, poiché gestisce una grande quantità di dati in modalità Distributed, ho ottenuto Exit code: 137 e Exit code: 1. avvicinarsi al genere in altro modo risolverà definitivamente il problema. –

+0

Hai provato a ripartizionare i dati? Quando esegui il mapping per accoppiarlo, puoi ridistribuirlo subito dopo. – Serhiy

risposta

3

risolto il problema utilizzando sortByKey con un comparatore & partizioni, dopo Maping il <String, Long> PairRDD a < Tuple2<String,Long> , Long> PairRDD

JavaPairRDD <Tuple2<String,Long>, Long> sortedRdd = rddMovieReviewReducedByKey.mapToPair(new PairFunction < Tuple2 < String, MovieReview > , Tuple2<String,Long>, Long >() { 

    @Override 
    public Tuple2 < Tuple2<String,Long>, Long > call(Tuple2 < String, MovieReview > t) throws Exception { 
     return new Tuple2 < Tuple2<String,Long>, Long > (new Tuple2<String,Long>(t._1,t._2.count), t._2.count); 
    } 
}).sortByKey(new TupleMapLongComparator(), true, 100); 


JavaPairRDD <String,Long> sortedRddToPairs = sortedRdd.mapToPair(new PairFunction<Tuple2<Tuple2<String,Long>,Long>, String, Long>() { 

    @Override 
    public Tuple2<String, Long> call(
      Tuple2<Tuple2<String, Long>, Long> t) throws Exception { 
     return new Tuple2 < String, Long > (t._1._1, t._1._2); 
    } 

}); 

Comparatore:

private class TupleMapLongComparator implements Comparator<Tuple2<String,Long>>, Serializable { 
    @Override 
    public int compare(Tuple2<String,Long> tuple1, Tuple2<String,Long> tuple2) { 

     if (tuple1._2.compareTo(tuple2._2) == 0) { 
      return tuple1._1.compareTo(tuple2._1); 
     } 
     return -tuple1._2.compareTo(tuple2._2); 
    } 
}