Qualcuno sa di un equivalente parallelo di File.walkFileTree java o qualcosa di simile? Può essere una libreria Java o Scala.Versione parallela di Files.walkFileTree (java o scala)
risposta
Supponiamo che l'esecuzione di una richiamata su ciascun file sia sufficiente.
Questo codice non gestirà i loop nel file system. Avrai bisogno di un registro di dove sei stato (ad esempio java.util.concurrent.ConcurrentHashMap
). Ci sono tutti i tipi di miglioramenti che è possibile aggiungere, come la segnalazione di eccezioni invece di ignorarli silenziosamente.
import java.io.File
import scala.util._
def walk(f: File, callback: File => Unit, pick: File => Boolean = _ => true) {
Try {
val (dirs, fs) = f.listFiles.partition(_.isDirectory)
fs.filter(pick).foreach(callback)
dirs.par.foreach(f => walk(f, callback, pick))
}
}
Raccolta i file utilizzando una piega al posto di un foreach
non è drasticamente più difficile, ma lascio che come esercizio per il lettore. (A ConcurrentLinkedQueue
è probabilmente abbastanza veloce per accettarle tutte in una richiamata in ogni caso se non hai le discussioni veramente lenti e un filesystem impressionante.)
In realtà speravo di ottenere il link alla libreria 'mature-ish' che lo fa e ha alcuni futuri aggiuntivi, ma il tuo esempio è sufficiente per le mie esigenze attuali. Grazie! – matt
Come altri hanno sottolineato, camminare su un albero di file è quasi certamente vincolato all'IO anziché al limite della CPU, quindi i vantaggi di eseguire una passeggiata su file con file multithread sono discutibili. Ma se lo volessi, probabilmente potresti fare il tuo da solo con uno ForkJoinPool
o simile.
import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class MultiThreadedFileTreeWalk {
private static class RecursiveWalk extends RecursiveAction {
private static final long serialVersionUID = 6913234076030245489L;
private final Path dir;
public RecursiveWalk(Path dir) {
this.dir = dir;
}
@Override
protected void compute() {
final List<RecursiveWalk> walks = new ArrayList<>();
try {
Files.walkFileTree(dir, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
if (!dir.equals(RecursiveWalk.this.dir)) {
RecursiveWalk w = new RecursiveWalk(dir);
w.fork();
walks.add(w);
return FileVisitResult.SKIP_SUBTREE;
} else {
return FileVisitResult.CONTINUE;
}
}
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
System.out.println(file + "\t" + Thread.currentThread());
return FileVisitResult.CONTINUE;
}
});
} catch (IOException e) {
e.printStackTrace();
}
for (RecursiveWalk w : walks) {
w.join();
}
}
}
public static void main(String[] args) throws IOException {
RecursiveWalk w = new RecursiveWalk(Paths.get(".").toRealPath());
ForkJoinPool p = new ForkJoinPool();
p.invoke(w);
}
}
Questo esempio percorre ciascuna directory su un thread separato. Ecco il tutorial per la libreria fork/join di Java 7.
Se c'è qualche funzionalità da eseguire su ciascun elemento, dall'esperienza passata si può ottenere una prestazione significativa camminando su un albero di file ed eseguendo l'operazione su ciascun nodo simultaneamente vs in serie. – Hazok
@Hazok Dipende dalla funzionalità. Se la funzionalità è molto intensiva della CPU, allora potrebbe superare il limite di IO di camminare su un albero di file. In tal caso, potrebbe essere utile rendere il tuo codice concorrente. Tuttavia, questo non sarà sempre il caso. – Jeffrey
D'accordo, ecco perché ho qualificato la dichiarazione. Volevo solo sottolineare che ci sono casi in cui è possibile ottenere guadagni in termini di prestazioni poiché è stato dichiarato "discutibile" nella risposta. – Hazok
Questo esercizio non è né il più breve la risposta Scala né come Java-like come la risposta Java .
L'idea era di iniziare passeggiate parallele con qualcosa come un filo per dispositivo.
I girelli sono sui thread ForkJoinPool, quindi quando danno il via a un futuro per ogni test di percorso, quelli sono attività biforcute sul pool. Il test della directory utilizza il blocco gestito quando legge la directory, cercando i file.
Il risultato viene restituito completando una promessa a seconda del test del percorso futuro. (Nessun meccanismo qui per rilevare il completamento a mani vuote.)
Un test più interessante dovrebbe includere la lettura di file zip, poiché la decompressione mangerà della CPU.
Mi chiedo se paulp will do something clever with deep listing.
import util._
import collection.JavaConverters._
import concurrent.{ TimeoutException => Timeout, _ }
import concurrent.duration._
import ExecutionContext.Implicits._
import java.io.IOException
import java.nio.file.{ FileVisitResult => Result, _ }
import Result.{ CONTINUE => Go, SKIP_SUBTREE => Prune, TERMINATE => Stop }
import java.nio.file.attribute.{ BasicFileAttributes => BFA }
object Test extends App {
val fileSystem = FileSystems.getDefault
val starts = (if (args.nonEmpty) args.toList else mounts) map (s => (fileSystem getPath s))
val p = Promise[(Path, BFA)]
def pathTest(path: Path, attrs: BFA) =
if (attrs.isDirectory) {
val entries = blocking {
val res = Files newDirectoryStream path
try res.asScala.toList finally res.close()
}
List("hello","world") forall (n => entries exists (_.getFileName.toString == n))
} else {
path.getFileName.toString == "enough"
}
def visitor(root: Path) = new SimpleFileVisitor[Path] {
def stopOrGo = if (p.isCompleted) Stop else Go
def visiting(path: Path, attrs: BFA) = {
future { pathTest(path, attrs) } onComplete {
case Success(true) => p trySuccess (path, attrs)
case Failure(e) => p tryFailure e
case _ =>
}
stopOrGo
}
override def preVisitDirectory(dir: Path, attrs: BFA) = (
if ((starts contains dir) && dir != root) Prune
else visiting(dir, attrs)
)
override def postVisitDirectory(dir: Path, e: IOException) = {
if (e != null) p tryFailure e
stopOrGo
}
override def visitFile(file: Path, attrs: BFA) = visiting(file, attrs)
}
//def walk(p: Path): Path = Files walkFileTree (p, Set().asJava, 10, visitor(p))
def walk(p: Path): Path = Files walkFileTree (p, visitor(p))
def show(store: FileStore) = {
val ttl = store.getTotalSpace/1024
val used = (store.getTotalSpace - store.getUnallocatedSpace)/1024
val avail = store.getUsableSpace/1024
Console println f"$store%-40s $ttl%12d $used%12d $avail%12d"
store
}
def mounts = {
val devs = for {
store <- fileSystem.getFileStores.asScala
if store.name startsWith "/dev/"
if List("ext4","fuseblk") contains store.`type`
} yield show(store)
val devstr = """(\S+) \((.*)\)""".r
(devs.toList map (_.toString match {
case devstr(name, dev) if devs.toList exists (_.name == dev) => Some(name)
case s => Console println s"Bad dev str '$s', skipping" ; None
})).flatten
}
starts foreach (f => future (walk(f)))
Try (Await result (p.future, 20.seconds)) match {
case Success((name, attrs)) => Console println s"Result: ${if (attrs.isDirectory) "dir" else "file"} $name"
case Failure(e: Timeout) => Console println s"No result: timed out."
case Failure(t) => Console println s"No result: $t."
}
}
Grazie per aver dedicato così tanto tempo a scrivere questo codice. Ho deciso di accettare la soluzione Rex Kerr perché è così breve e facile da eseguire il debug. – matt
@lucek Rex è il migliore. Grazie per la domanda, è stato divertente esplorare l'API. Ho anche alzato le altre risposte. –
Non penso che abbia senso perché tutti i thread paralleli avranno lo stesso bottleneck - HDD. E non può essere parallelo alle operazioni di rete io. – aim
Perché camminare con l'albero dei file in parallelo è una buona idea? Questo di solito è legato all'IO, non alla CPU. –
Nel mio caso l'elaborazione dei file è limitata dalla CPU e l'utilizzo degli I/O è compreso tra il 10% e il 20%. – matt