Ho una classe Record
:Incontro ordine errato durante l'ordinamento di un flusso parallelo
public class Record implements Comparable<Record>
{
private String myCategory1;
private int myCategory2;
private String myCategory3;
private String myCategory4;
private int myValue1;
private double myValue2;
public Record(String category1, int category2, String category3, String category4,
int value1, double value2)
{
myCategory1 = category1;
myCategory2 = category2;
myCategory3 = category3;
myCategory4 = category4;
myValue1 = value1;
myValue2 = value2;
}
// Getters here
}
creo un grande elenco di un sacco di dischi. Solo il secondo e il quinto valore, i/10000
e i
, vengono utilizzati successivamente, dai getter getCategory2()
e getValue1()
rispettivamente.
List<Record> list = new ArrayList<>();
for (int i = 0; i < 115000; i++)
{
list.add(new Record("A", i/10000, "B", "C", i, (double) i/100 + 1));
}
noti che primi 10.000 record avere un category2
di 0
, quindi prossimo 10.000 hanno 1
, etc., mentre i valori value1
sono 0-114.999 sequenzialmente.
Creo un Stream
che sia sia parallel
sia sorted
.
Stream<Record> stream = list.stream()
.parallel()
.sorted(
//(r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2())
)
//.parallel()
;
Ho un ForkJoinPool
che mantiene 8
discussioni, che è il numero di core che ho sul mio PC.
ForkJoinPool pool = new ForkJoinPool(8);
Io uso il trucco described here to submit a stream processing task to my own ForkJoinPool
instead of the common ForkJoinPool
.
List<Record> output = pool.submit(() ->
stream.collect(Collectors.toList()
)).get();
ho previsto che il funzionamento in parallelo sorted
sarebbe rispettare l'ordine incontro del flusso, e che sarebbe un stabile ordinamento, perché il Spliterator
restituito da ArrayList
è ORDERED
.
Tuttavia, codice semplice che stampa gli elementi del risultante List
output
in ordine indica che non è il caso.
for (Record record : output)
{
System.out.println(record.getValue1());
}
uscita, condensato:
0
1
2
3
...
69996
69997
69998
69999
71875 // discontinuity!
71876
71877
71878
...
79058
79059
79060
79061
70000 // discontinuity!
70001
70002
70003
...
71871
71872
71873
71874
79062 // discontinuity!
79063
79064
79065
79066
...
114996
114997
114998
114999
La size()
di output
è 115000
, e tutti gli elementi sembrano essere lì, proprio in un ordine leggermente diverso.
Quindi ho scritto un codice di controllo per vedere se il sort
era stabile. Se è stabile, tutti i valori value1
devono rimanere in ordine. Questo codice verifica l'ordine, stampando eventuali discrepanze.
int prev = -1;
boolean verified = true;
for (Record record : output)
{
int curr = record.getValue1();
if (prev != -1)
{
if (prev + 1 != curr)
{
System.out.println("Warning: " + prev + " followed by " + curr + "!");
verified = false;
}
}
prev = curr;
}
System.out.println("Verified: " + verified);
uscita:
Warning: 69999 followed by 71875!
Warning: 79061 followed by 70000!
Warning: 71874 followed by 79062!
Warning: 99999 followed by 100625!
Warning: 107811 followed by 100000!
Warning: 100624 followed by 107812!
Verified: false
Questa condizione persiste se faccio uno dei seguenti:
Sostituire la
ForkJoinPool
con unThreadPoolExecutor
.ThreadPoolExecutor pool = new ThreadPoolExecutor(8, 8, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10));
Utilizzare comune
ForkJoinPool
elaborando ilStream
direttamente.List<Record> output = stream.collect(Collectors.toList());
chiamata
parallel()
dopo io chiamosorted
.Stream<Record> stream = list.stream().sorted().parallel();
chiamata
parallelStream()
invece distream().parallel()
.Stream<Record> stream = list.parallelStream().sorted();
Ordina un
Comparator
. Si noti che questo criterio di ordinamento è diverso dall'ordine "naturale" che ho definito per l'interfacciaComparable
, sebbene inizi con i risultati già in ordine dall'inizio, il risultato dovrebbe essere lo stesso.Stream<Record> stream = list.stream().parallel().sorted( (r1, r2) -> Integer.compare(r1.getCategory2(), r2.getCategory2()) );
posso solo ottenere questo per preservare l'ordine incontro se non faccio una delle seguenti opzioni sulla Stream
:
- non chiamare
parallel()
. - Non chiamare alcun sovraccarico di
sorted
.
È interessante notare che lo parallel()
senza un tipo ha mantenuto l'ordine.
In entrambi i casi di cui sopra, l'output è:
Verified: true
versione mio Java è 1.8.0_05. Questa anomalia è anche occurs on Ideone, che sembra essere in esecuzione Java 8u25.
Aggiornamento
ho aggiornato il mio JDK alla versione più recente stesura di questo documento, 1.8.0_45, e il problema è invariato.
Domanda
è l'ordine record nel risultante List
(output
) fuori uso a causa del genere è in qualche modo non è stabile, perché l'ordine incontro non è conservato, o qualche altro motivo?
Come posso garantire che l'ordine di incontro venga mantenuto durante la creazione di un flusso parallelo e l'ordinamento?
Proverei a creare il programma più semplice riproducendo il problema, eseguendolo sull'ultima versione di JDK e presentando un bug se è riprodotto: l'ordinamento dovrebbe essere stabile: è documentato come tale. –