2014-04-30 2 views
22

Ho bisogno di elaborare più file sparsi su varie directory. Vorrei caricare tutto questo in un unico RDD e quindi eseguire la mappa/ridurre su di esso. Vedo che SparkContext è in grado di caricare più file da una singola directory utilizzando i caratteri jolly. Non sono sicuro di come caricare file da più cartelle.Spark Context Textfile: carica più file

Il seguente frammento di codice non riesce:

for fileEntry in files: 
    fileName = basePath + "/" + fileEntry 
    lines = sc.textFile(fileName) 
    if retval == None: 
     retval = lines 
    else: 
     retval = sc.union(retval, lines) 

Questo non riesce al terzo ciclo con il seguente messaggio di errore:

retval = sc.union(retval, lines) 
TypeError: union() takes exactly 2 arguments (3 given) 

Che è strano dato sto fornendo solo 2 argomenti. Qualsiasi suggerimento apprezzato.

+2

..ma il primo argomento è 'self'. Da [docs] (http://spark.apache.org/docs/latest/api/pyspark/pyspark.context.SparkContext-class.html#union), è necessario 'sc.union ([retval, lines]) ' –

+0

Fammi provare. Sono sorpreso del motivo per cui questo avrebbe funzionato per 2 loop e fallire il terzo ... – Raj

+0

Questo ha fatto il trucco. Grazie Jonathan! – Raj

risposta

39

Come su questo fraseggio invece?

sc.union([sc.textFile(basepath + "/" + f) for f in files]) 

In Scala SparkContext.union() ha due varianti, uno che prende argomenti vararg, e uno che prende una lista. Solo il secondo esiste in Python (poiché Python non ha il polimorfismo).

UPDATE

È possibile utilizzare un unico textFile chiamata per leggere più file.

sc.textFile(','.join(files)) 
+0

Grazie Daniel. Il mio problema potrebbe essere incentrato su Python. Il tuo frammento sembra Scala, – Raj

+0

Ah, perché non me ne sono reso conto ?! Non esiste alcun polimorfismo di funzione in Python, quindi è possibile visualizzare solo una forma di SparkContext.union(). Hanno scelto di esporre quello che prende una lista, non quella che prende un vararg. (Come dice Jonathan.) –

+0

Ho risolto la risposta per avere Python al posto di Scala. –

1

È possibile utilizzare questo

Prima È possibile ottenere un buffer/Elenco dei S3 percorsi:

import scala.collection.JavaConverters._ 
import java.util.ArrayList 
import com.amazonaws.services.s3.AmazonS3Client 
import com.amazonaws.services.s3.model.ObjectListing 
import com.amazonaws.services.s3.model.S3ObjectSummary 
import com.amazonaws.services.s3.model.ListObjectsRequest 

def listFiles(s3_bucket:String, base_prefix : String) = { 
    var files = new ArrayList[String] 

    //S3 Client and List Object Request 
    var s3Client = new AmazonS3Client(); 
    var objectListing: ObjectListing = null; 
    var listObjectsRequest = new ListObjectsRequest(); 

    //Your S3 Bucket 
    listObjectsRequest.setBucketName(s3_bucket) 

    //Your Folder path or Prefix 
    listObjectsRequest.setPrefix(base_prefix) 

    //Adding s3:// to the paths and adding to a list 
    do { 
     objectListing = s3Client.listObjects(listObjectsRequest); 
     for (objectSummary <- objectListing.getObjectSummaries().asScala) { 
     files.add("s3://" + s3_bucket + "/" + objectSummary.getKey()); 
     } 
     listObjectsRequest.setMarker(objectListing.getNextMarker()); 
    } while (objectListing.isTruncated()); 

    //Removing Base Directory Name 
    files.remove(0) 

    //Creating a Scala List for same 
    files.asScala 
    } 

Ora passare questo oggetto List per il seguente pezzo di codice, nota: sc è un oggetto di SqlContext

var df: DataFrame = null; 
    for (file <- files) { 
    val fileDf= sc.textFile(file) 
    if (df!= null) { 
     df= df.unionAll(fileDf) 
    } else { 
     df= fileDf 
    } 
    } 

Ora hai un finale Unified RDD cioè df

opzionale e si può anche partizionare in un unico BigRDD

val files = sc.textFile(filename, 1).repartition(1) 

Ripartizionamento sempre funziona: D

13

risolvo problemi simili utilizzando jolly.

ad es. Ho trovato alcuni tratti nei file che voglio caricare scintilla,

dir

subdir1/cartella1/x.txt

subdir2/cartella2/y.txt

è possibile utilizzare la seguente frase

sc.textFile("dir/*/*/*.txt") 

per caricare tutti i file relativi.

Il carattere jolly "*" funziona solo nella directory a livello singolo, che non è ricorsivo.

2

È possibile utilizzare la seguente funzione di SparkContext:

wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions): RDD[(String, String)]

Leggi una directory di file di testo da HDFS, un file system locale (disponibile in tutti i nodi), o qualsiasi file system Hadoop supportato URI . Ogni file viene letto come un singolo record e restituito in una coppia chiave-valore, dove la chiave è il percorso di ciascun file, il valore è il contenuto di ciascun file.

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext

+0

Questo funziona bene nella maggior parte dei casi, ma nella mia esperienza, questo non funziona quando la dimensione dei file è grande. – KartikKannapur