2014-07-08 11 views
6

Ho una cartella di input che contiene +100.000 file.Come rinominare una quantità enorme di file in Hadoop/Spark?

Vorrei eseguire un'operazione batch, ad esempio rinominarli tutti in un determinato modo o spostarli in un nuovo percorso in base alle informazioni nel nome di ogni file.

Vorrei utilizzare Spark per farlo, ma purtroppo quando ho provato il seguente pezzo di codice:

final org.apache.hadoop.fs.FileSystem ghfs = org.apache.hadoop.fs.FileSystem.get(new java.net.URI(args[0]), new org.apache.hadoop.conf.Configuration()); 
     org.apache.hadoop.fs.FileStatus[] paths = ghfs.listStatus(new org.apache.hadoop.fs.Path(args[0])); 
     List<String> pathsList = new ArrayList<>(); 
     for (FileStatus path : paths) { 
      pathsList.add(path.getPath().toString()); 
     } 
     JavaRDD<String> rddPaths = sc.parallelize(pathsList); 

     rddPaths.foreach(new VoidFunction<String>() { 
      @Override 
      public void call(String path) throws Exception { 
       Path origPath = new Path(path); 
       Path newPath = new Path(path.replace("taboola","customer")); 
       ghfs.rename(origPath,newPath); 
      } 
     }); 

ottengo un errore che hadoop.fs.FileSystem non è Serializable (e quindi probabilmente non ci riesco essere utilizzato in operazioni parallele)

Qualche idea su come posso risolvere il problema o farlo in un altro modo?

risposta

4

Il problema è che si sta tentando di serializzare l'oggetto ghfs. Se si utilizza mapPartitions e si ricrea l'oggetto ghfs in ogni partizione, sarà possibile eseguire il codice con un paio di modifiche minori.

+0

Grazie! questa è esattamente la direzione che prenderò! –

3

Suggerirei di rinominarli come se fossi con la classe del file system in un contesto di riduzione della mappa non solo (solo nel driver), non è un grosso problema rinominare i file 100k, è troppo lento, quindi puoi provare a multithread. Basta fare una cosa del genere

FileSystem fileSystem = new Path("").getFileSystem(new Configuration()); 
File [] files = FileUtil.listFiles(directory) 
for (File file : files) { 
    fileSystem.rename(new Path(file.getAbsolutePath()),new Path("renamed")); 
} 

Btw l'errore che stai ricevendo in scintilla è perché scintilla richiede oggetti che utilizza per implementare Serializable, che FileSystem non lo fa.


Non posso confermare questo, ma sembrerebbe che ogni rinominare in HDFS comporterebbe l'NameNode poiché tiene traccia l'intera struttura di directory e la posizione del nodo di file (confirmation link), il che significa che non può essere fatto in modo efficiente in parallelo. Come da this answer la ridenominazione è un'operazione di soli metadati quindi dovrebbe essere eseguita in modo molto veloce in serie.

+0

La domanda non riguarda la modifica di qualsiasi cosa all'interno dell'RDD. Si tratta di eseguire alcune operazioni per ogni riga dell'RDD in modo distribuito. Forse hai la pazienza di eseguire il cambio di nome da una macchina, ma è un uso assolutamente valido di Spark per distribuire questo lavoro su molte macchine. –

+0

@DanielDarabos ok hai ragione che l'immutabilità di RDD non ha nulla a che fare con questo problema cancellato dalla risposta. Ma continuo a pensare che la scintilla sia eccessiva per la ridenominazione di alcuni file – aaronman

+0

@DanielDarabos, inoltre, non mi citare su questo, ma è certamente possibile che rinominare i file nella stessa directory non possa essere fatto in parallelo su HDFS e quindi la tua soluzione avrà problemi con mapper bloccati – aaronman

4

È necessario eseguire anche FileSystem.get all'interno dello VoidFunction.

Il driver richiede un FileSystem per ottenere l'elenco dei file, ma anche ogni lavoratore ha bisogno di un FileSystem per la ridenominazione. Il driver non può passare il suo FileSystem ai lavoratori, perché non è serializzabile. Ma i lavoratori possono ottenere il proprio FileSystem semplicemente bene.

Nella API Scala si potrebbe usare RDD.foreachPartition di scrivere il codice in un modo che si fa solo una volta FileSystem.get per partizione, invece di una volta per linea. È probabilmente disponibile anche nell'API Java.

+0

Grazie! questa è esattamente la direzione che prenderò (solo io sono con Java al momento ...) –

+0

Dopo tutto ciò che discute l'OP va e accetta una risposta tardiva che è fondamentalmente un sottoinsieme del tuo karma cattivo – aaronman

+0

per argomentare troppo :). –

0

Ho affrontato problema simile quando i miei HDFS directory di archiviazione raggiunto limite di elementi max

Request error: org.apache.hadoop.hdfs.protocol.FSLimitException$MaxDirectoryItemsExceededException 
The directory item limit of /my/archive is exceeded: limit=1048576 items=1048576 

ho deciso di spostare tutti gli elementi da anno prec (2015) per sottocartella separata. Ecco soluzione pura guscio

export HADOOP_CLIENT_OPTS="-XX:-UseGCOverheadLimit -Xmx4096m" 
hdfs dfs -ls /my/archive \ 
    | grep 2015- \ 
    | awk '{print $8}' \ 
    | gnu-parallel -X -s 131000 hdfs dfs -mv {} /my/archive/2015 

Osservazione:

  1. cliente opta sostituzione è necessario per hdfs dfs -ls a causa della grande quantità di file. Vedi here per maggiori dettagli.
  2. hdfs dfs il client ha un limite per la durata dell'elenco argomenti: circa 131000 (2^17) caratteri.
  3. Sono stati necessari alcuni minuti per spostare i file 420k.