2015-05-22 4 views
21

Ho una classe Record:Incontro ordine errato durante l'ordinamento di un flusso parallelo

public class Record implements Comparable<Record> 
{ 
    private String myCategory1; 
    private int myCategory2; 
    private String myCategory3; 
    private String myCategory4; 
    private int myValue1; 
    private double myValue2; 

    public Record(String category1, int category2, String category3, String category4, 
     int value1, double value2) 
    { 
     myCategory1 = category1; 
     myCategory2 = category2; 
     myCategory3 = category3; 
     myCategory4 = category4; 
     myValue1 = value1; 
     myValue2 = value2; 
    } 

    // Getters here 
} 

creo un grande elenco di un sacco di dischi. Solo il secondo e il quinto valore, i/10000 e i, vengono utilizzati successivamente, dai getter getCategory2() e getValue1() rispettivamente.

List<Record> list = new ArrayList<>(); 
for (int i = 0; i < 115000; i++) 
{ 
    list.add(new Record("A", i/10000, "B", "C", i, (double) i/100 + 1)); 
} 

noti che primi 10.000 record avere un category2 di 0, quindi prossimo 10.000 hanno 1, etc., mentre i valori value1 sono 0-114.999 sequenzialmente.

Creo un Stream che sia sia parallel sia sorted.

Stream<Record> stream = list.stream() 
    .parallel() 
    .sorted(
     //(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2()) 
    ) 
    //.parallel() 
; 

Ho un ForkJoinPool che mantiene 8 discussioni, che è il numero di core che ho sul mio PC.

ForkJoinPool pool = new ForkJoinPool(8); 

Io uso il trucco described here to submit a stream processing task to my own ForkJoinPool instead of the common ForkJoinPool.

List<Record> output = pool.submit(() -> 
    stream.collect(Collectors.toList() 
)).get(); 

ho previsto che il funzionamento in parallelo sorted sarebbe rispettare l'ordine incontro del flusso, e che sarebbe un stabile ordinamento, perché il Spliterator restituito da ArrayList è ORDERED.

Tuttavia, codice semplice che stampa gli elementi del risultante Listoutput in ordine indica che non è il caso.

for (Record record : output) 
{ 
    System.out.println(record.getValue1()); 
} 

uscita, condensato:

0 
1 
2 
3 
... 
69996 
69997 
69998 
69999 
71875 // discontinuity! 
71876 
71877 
71878 
... 
79058 
79059 
79060 
79061 
70000 // discontinuity! 
70001 
70002 
70003 
... 
71871 
71872 
71873 
71874 
79062 // discontinuity! 
79063 
79064 
79065 
79066 
... 
114996 
114997 
114998 
114999 

La size() di output è 115000, e tutti gli elementi sembrano essere lì, proprio in un ordine leggermente diverso.

Quindi ho scritto un codice di controllo per vedere se il sort era stabile. Se è stabile, tutti i valori value1 devono rimanere in ordine. Questo codice verifica l'ordine, stampando eventuali discrepanze.

int prev = -1; 
boolean verified = true; 
for (Record record : output) 
{ 
    int curr = record.getValue1(); 
    if (prev != -1) 
    { 
     if (prev + 1 != curr) 
     { 
      System.out.println("Warning: " + prev + " followed by " + curr + "!"); 
      verified = false; 
     } 
    } 
    prev = curr; 
} 
System.out.println("Verified: " + verified); 

uscita:

Warning: 69999 followed by 71875! 
Warning: 79061 followed by 70000! 
Warning: 71874 followed by 79062! 
Warning: 99999 followed by 100625! 
Warning: 107811 followed by 100000! 
Warning: 100624 followed by 107812! 
Verified: false 

Questa condizione persiste se faccio uno dei seguenti:

  • Sostituire la ForkJoinPool con un ThreadPoolExecutor.

    ThreadPoolExecutor pool = new ThreadPoolExecutor(8, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10)); 
    
  • Utilizzare comune ForkJoinPool elaborando il Stream direttamente.

    List<Record> output = stream.collect(Collectors.toList()); 
    
  • chiamata parallel()dopo io chiamo sorted.

    Stream<Record> stream = list.stream().sorted().parallel(); 
    
  • chiamata parallelStream() invece di stream().parallel().

    Stream<Record> stream = list.parallelStream().sorted(); 
    
  • Ordina un Comparator. Si noti che questo criterio di ordinamento è diverso dall'ordine "naturale" che ho definito per l'interfaccia Comparable, sebbene inizi con i risultati già in ordine dall'inizio, il risultato dovrebbe essere lo stesso.

    Stream<Record> stream = list.stream().parallel().sorted(
        (r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2()) 
    ); 
    

posso solo ottenere questo per preservare l'ordine incontro se non faccio una delle seguenti opzioni sulla Stream:

  • non chiamare parallel().
  • Non chiamare alcun sovraccarico di sorted.

È interessante notare che lo parallel() senza un tipo ha mantenuto l'ordine.

In entrambi i casi di cui sopra, l'output è:

Verified: true 

versione mio Java è 1.8.0_05. Questa anomalia è anche occurs on Ideone, che sembra essere in esecuzione Java 8u25.

Aggiornamento

ho aggiornato il mio JDK alla versione più recente stesura di questo documento, 1.8.0_45, e il problema è invariato.

Domanda

è l'ordine record nel risultante List (output) fuori uso a causa del genere è in qualche modo non è stabile, perché l'ordine incontro non è conservato, o qualche altro motivo?

Come posso garantire che l'ordine di incontro venga mantenuto durante la creazione di un flusso parallelo e l'ordinamento?

+6

Proverei a creare il programma più semplice riproducendo il problema, eseguendolo sull'ultima versione di JDK e presentando un bug se è riprodotto: l'ordinamento dovrebbe essere stabile: è documentato come tale. –

risposta

11

Sembra che Arrays.parallelSort non sia stabile in alcune circostanze. Ben individuato. L'ordinamento parallelo dei flussi viene implementato in termini di Arrays.parallelSort, pertanto influisce anche sugli stream. Ecco un esempio semplificato:

public class StableSortBug { 
    static final int SIZE = 50_000; 

    static class Record implements Comparable<Record> { 
     final int sortVal; 
     final int seqNum; 

     Record(int i1, int i2) { sortVal = i1; seqNum = i2; } 

     @Override 
     public int compareTo(Record other) { 
      return Integer.compare(this.sortVal, other.sortVal); 
     } 
    } 

    static Record[] genArray() { 
     Record[] array = new Record[SIZE]; 
     Arrays.setAll(array, i -> new Record(i/10_000, i)); 
     return array; 
    } 

    static boolean verify(Record[] array) { 
     return IntStream.range(1, array.length) 
         .allMatch(i -> array[i-1].seqNum + 1 == array[i].seqNum); 
    } 

    public static void main(String[] args) { 
     Record[] array = genArray(); 
     System.out.println(verify(array)); 
     Arrays.sort(array); 
     System.out.println(verify(array)); 
     Arrays.parallelSort(array); 
     System.out.println(verify(array)); 
    } 
} 

Sulla mia macchina (2 x 2 di base le discussioni) Stampa la seguente:

true 
true 
false 

Naturalmente, si suppone di stampare true tre volte. Questo è nelle attuali versioni di sviluppo di JDK 9.Non sarei sorpreso se si verificasse in tutte le versioni JDK 8 fino ad ora, dato ciò che hai provato. Curiosamente, la riduzione della dimensione o del divisore cambierà il comportamento. Una dimensione di 20.000 e un divisore di 10.000 sono stabili e anche una dimensione di 50.000 e un divisore di 1.000 sono stabili. Sembra che il problema abbia a che fare con una serie di valori sufficientemente ampia che confronta la parità rispetto alla dimensione split parallela.

Il problema OpenJDK JDK-8076446 copre questo errore.

+4

C'è anche https://bugs.openjdk.java.net/browse/JDK-8076446 –

+0

(true, true, false) anche su Windows7 (64), 8u40. – edharned

+2

@StefanZobel Oh sì, grazie, ho chiuso il nuovo bug come duplicato di quello vecchio. –