2015-06-29 10 views
5

Sto provando ad accelerare un calcolo costoso su un vettore di grandi dimensioni utilizzando thread. La mia funzione consuma un vettore, calcola un vettore di nuovi valori (non aggrega, ma l'ordine di input deve essere mantenuto) e lo restituisce. Tuttavia, sto cercando di capire come generare i thread, assegnare fette vettoriali a ciascuno, quindi raccogliere e combinare i risultati.Consumare porzioni di vettore non sovrapposte e combinare i risultati

// tunable 
const NUMTHREADS: i32 = 4; 

fn f(val: i32) -> i32 { 
    // expensive computation 
    let res = val + 1; 
    res 

} 

fn main() { 
    // choose an odd number of elements 
    let orig = (1..14).collect::<Vec<i32>>(); 
    let mut result: Vec<Vec<i32>> = vec!(); 
    let mut flat: Vec<i32> = Vec::with_capacity(orig.len()); 
    // split into slices 
    for chunk in orig.chunks(orig.len()/NUMTHREADS as usize) { 
     result.push(
      chunk.iter().map(|&digit| 
       f(digit)).collect() 
      ); 
    }; 
    // flatten result vector 
    for subvec in result.iter() { 
     for elem in subvec.iter() { 
      flat.push(elem.to_owned()); 
     } 
    } 
    println!("Flattened result: {:?}", flat); 
} 

Il calcolo filettata dovrebbe prendere tra for chunk… e // flatten …, ma non può trovare molti semplici esempi di fili deposizione x, assegnando pezzi in sequenza, e restituendo il vettore appena calcolato dal filo e in un contenitore in modo che possa essere appiattito. Devo avvolgere orig.chunks() in un Arc e afferrare manualmente ogni blocco in un ciclo? Devo passare f in ogni thread? Dovrò usare un B-Tree per garantire che l'ordine di input e output corrisponda? Posso usare solo simple_parallel?

risposta

2

Beh, questa è un'applicazione ideale per l'instabile thread::scoped():

#![feature(scoped)] 
use std::thread::{self, JoinGuard}; 

// tunable 
const NUMTHREADS: i32 = 4; 

fn f(val: i32) -> i32 { 
    // expensive computation 
    let res = val + 1; 
    res 
} 

fn main() { 
    // choose an odd number of elements 
    let orig: Vec<i32> = (1..14).collect(); 

    let mut guards: Vec<JoinGuard<Vec<i32>>> = vec!(); 

    // split into slices 
    for chunk in orig.chunks(orig.len()/NUMTHREADS as usize) { 
     let g = thread::scoped(move || chunk.iter().cloned().map(f).collect()); 
     guards.push(g); 
    }; 

    // collect the results 
    let mut result: Vec<i32> = Vec::with_capacity(orig.len()); 
    for g in guards { 
     result.extend(g.join().into_iter()); 
    } 

    println!("Flattened result: {:?}", result); 
} 

è instabile e non sarà probabilmente stabilizzata in questa forma perché ha un difetto intrinseco (è possibile trovare maggiori here). Per quanto posso vedere, simple_parallel è solo un'estensione di questo approccio - nasconde il giochino con JoinGuards e può anche essere usato in Rust stabile (probabilmente con alcuni unsafe ty, credo). Non è raccomandato per l'uso generale, tuttavia, come suggeriscono i suoi documenti.

Naturalmente, è possibile utilizzare thread::spawn(), ma allora si avrà bisogno di clonare ogni pezzo in modo che possa essere spostato in ogni thread:

use std::thread::{self, JoinHandle}; 

// tunable 
const NUMTHREADS: i32 = 4; 

fn f(val: i32) -> i32 { 
    // expensive computation 
    let res = val + 1; 
    res 
} 

fn main() { 
    // choose an odd number of elements 
    let orig: Vec<i32> = (1..14).collect(); 

    let mut guards: Vec<JoinHandle<Vec<i32>>> = vec!(); 

    // split into slices 
    for chunk in orig.chunks(orig.len()/NUMTHREADS as usize) { 
     let chunk = chunk.to_owned(); 
     let g = thread::spawn(move || chunk.into_iter().map(f).collect()); 
     guards.push(g); 
    }; 

    // collect the results 
    let mut result: Vec<i32> = Vec::with_capacity(orig.len()); 
    for g in guards { 
     result.extend(g.join().unwrap().into_iter()); 
    } 

    println!("Flattened result: {:?}", result); 
}