Voglio verificare se esistono diversi file in hdf prima di caricarli da SparkContext. Io uso pyspark. Ho provato os.system("hadoop fs -test -e %s" %path)
ma come ho un sacco di percorsi da controllare, il lavoro si è schiantato. Ho provato anche a sc.wholeTextFiles(parent_path)
e poi ho filtrato per chiavi. ma si è bloccato anche perché il parent_path contiene molti sottocavi e file. Potresti aiutarmi?pyspark: come verificare se un file esiste in hdf
risposta
rigth come si dice Tristan Reid:
... (Spark) Può leggere molti formati, e supporta Hadoop espressioni glob, che sono terribilmente utile per la lettura da percorsi multipli in HDFS, ma doesn Non ho una funzione incorporata di cui sono a conoscenza per il passaggio di directory o file, né ha utilità specifiche per interagire con Hadoop o HDFS.
In ogni caso, questa è la sua risposta a una domanda relativa: Pyspark: get list of files/directories on HDFS path
Una volta che avete l'elenco dei file in una directory, è facile verificare se un particolare file esiste.
Spero possa essere utile in qualche modo.
Hai provato a utilizzare pydoop? La funzione exists
dovrebbe funzionare
Una possibilità è che è possibile utilizzare hadoop fs -lsr your_path
per ottenere tutti i percorsi e quindi verificare se i percorsi che ti interessano sono in quel set.
Per quanto riguarda l'arresto anomalo, è possibile che sia stato il risultato di tutte le chiamate a os.system
anziché essere specifico del comando hadoop. A volte chiamare un processo esterno può causare problemi relativi ai buffer che non vengono mai rilasciati, in particolare i buffer di I/O (stdin/stdout).
Una soluzione sarebbe quella di effettuare una singola chiamata a uno script di bash che scorre su tutti i percorsi. È possibile creare lo script utilizzando un modello di stringa nel codice, compilare l'array di percorsi nello script, scriverlo e quindi eseguirlo.
Potrebbe anche essere una buona idea passare al modulo subprocess
di python, che offre un controllo più granulare sulla gestione dei sottoprocessi. Ecco l'equivalente di os.system
:
process = subprocess.check_output(
args=your_script,
stdout=PIPE,
shell=True
)
Si noti che è possibile passare stdout
a qualcosa come un handle di file se questo ti aiuta con il debug o rendendo il processo più robusto. Inoltre, è possibile passare l'argomento shell=True
a False
a meno che non si stia chiamando uno script reale o si utilizzino elementi specifici della shell come pipe o reindirizzamento.