2015-04-03 13 views
11

ho messa a punto un semplice test per lo streaming di file di testo da S3 e preso a lavorare quando ho provato qualcosa di simileSpark Streaming textFileStream che non supportano i caratteri jolly

val input = ssc.textFileStream("s3n://mybucket/2015/04/03/") 

e nel secchio avrei file di log di andare lì e tutto funzionerebbe bene.

Ma se il loro era una sottocartella, non avrebbe trovato alcun file che hanno sistemati nella sottocartella (e sì, sono consapevole che HDFS non effettivamente utilizzare una struttura di cartelle)

val input = ssc.textFileStream("s3n://mybucket/2015/04/") 

Quindi, ho cercato di fare semplicemente i caratteri jolly come ho fatto prima con un'applicazione standard di scintilla

val input = ssc.textFileStream("s3n://mybucket/2015/04/*") 

Ma quando provo questo genera un errore

java.io.FileNotFoundException: File s3n://mybucket/2015/04/* does not exist. 
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506) 
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1483) 
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1523) 
at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:176) 
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:134) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) 
at scala.Option.orElse(Option.scala:257) 
..... 

So per certo che è possibile utilizzare i caratteri jolly durante la lettura di fileInput per le applicazioni a scintilla standard, ma sembra che quando si esegue lo streaming input, non lo faccia né elabora automaticamente i file nelle sottocartelle. C'è qualcosa che mi manca qui ??

In definitiva quello che mi serve è un lavoro di streaming per essere in esecuzione 24/7 che sarà il monitoraggio di un secchio S3 che ha i log in esso riposte in base alla data

Quindi qualcosa di simile

s3n://mybucket/<YEAR>/<MONTH>/<DAY>/<LogfileName> 

C'è qualche modo di consegnarlo in cima alla maggior parte delle cartelle e legge automaticamente i file che appaiono in qualsiasi cartella (perché ovviamente la data aumenterà ogni giorno)?

EDIT

Così sul scavare nella documentazione in http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources si afferma che le directory nidificate non sono supportate.

Qualcuno può far luce sul perché questo è il caso?

Inoltre, poiché i miei file saranno annidati in base alla loro data, quale sarebbe un buon modo per risolvere questo problema nella mia applicazione di streaming? È un po 'complicato dal momento che i log impiegano alcuni minuti per essere scritti su S3 e quindi l'ultimo file scritto per il giorno potrebbe essere scritto nella cartella del giorno precedente anche se siamo a pochi minuti dal nuovo giorno.

+0

In realtà non sono sicuro se s3 supportano i caratteri jolly ... – eliasah

+1

Lo fa sicuramente. I miei lavori hanno utilizzato i caratteri jolly negli ultimi 8 mesi. Inoltre, solo per un controllo di integrità ho appena eseguito un lavoro con input jolly, ha funzionato bene. Ho notato che è un po 'schizzinosi riguardo richiedono che non si fa qualcosa di simile S3N: // MyBucket/2015/04 * come dice che Exception in thread java.io.IOException "principale" : non un file: S3N: // MyBucket/2015/04/01 che ha un senso in quanto non è un file Ma se lo fai S3N: // MyBucket/2015/04/* E ' analizza correttamente tutti i file nelle sottocartelle dei giorni .... Questo tipo di mi sembra un insetto. –

+1

Vado a rispondere alla domanda. Ricordo di avere un problema simile ma non ricordo come l'ho risolto. – eliasah

risposta

0

abbiamo avuto lo stesso problema. ci siamo uniti ai nomi delle sottocartelle con la virgola.

List<String> paths = new ArrayList<>(); 
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd"); 

try {   
    Date start = sdf.parse("2015/02/01"); 
    Date end = sdf.parse("2015/04/01"); 

    Calendar calendar = Calendar.getInstance(); 
    calendar.setTime(start);   

    while (calendar.getTime().before(end)) { 
     paths.add("s3n://mybucket/" + sdf.format(calendar.getTime())); 
     calendar.add(Calendar.DATE, 1); 
    }     
} catch (ParseException e) { 
    e.printStackTrace(); 
} 

String joinedPaths = StringUtils.join(",", paths.toArray(new String[paths.size()])); 
val input = ssc.textFileStream(joinedPaths); 

Spero che in questo modo il problema venga risolto.

+0

Freddo. Come gestisci date di fine più grandi? compilando e rilanciando il programma? O mi sta sfuggendo qualcosa? –

6

È possibile creare una "soluzione brutta ma funzionante" estendendo FileInputDStream. scrittura sc.textFileStream(d) equivale a

new FileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString) 

È possibile creare CustomFileInputDStream che si estenderà FileInputDStream. La classe personalizzata copia il metodo di calcolo dalla classe FileInputDStream e regola il metodo findNewFiles in base alle proprie esigenze.

cambiando metodo findNewFiles da:

private def findNewFiles(currentTime: Long): Array[String] = { 
    try { 
     lastNewFileFindingTime = clock.getTimeMillis() 

    // Calculate ignore threshold 
    val modTimeIgnoreThreshold = math.max(
    initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting 
    currentTime - durationToRemember.milliseconds // trailing end of the remember window 
) 
    logDebug(s"Getting new files for time $currentTime, " + 
    s"ignoring files older than $modTimeIgnoreThreshold") 
    val filter = new PathFilter { 
    def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) 
    } 
    val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) 
    val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime 
    logInfo("Finding new files took " + timeTaken + " ms") 
    logDebug("# cached file times = " + fileToModTime.size) 
    if (timeTaken > slideDuration.milliseconds) { 
    logWarning(
     "Time taken to find new files exceeds the batch size. " + 
     "Consider increasing the batch size or reducing the number of " + 
     "files in the monitored directory." 
    ) 
    } 
    newFiles 
} catch { 
    case e: Exception => 
    logWarning("Error finding new files", e) 
    reset() 
    Array.empty 
} 

}

a:

private def findNewFiles(currentTime: Long): Array[String] = { 
    try { 
     lastNewFileFindingTime = clock.getTimeMillis() 

     // Calculate ignore threshold 
     val modTimeIgnoreThreshold = math.max(
     initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting 
     currentTime - durationToRemember.milliseconds // trailing end of the remember window 
    ) 
     logDebug(s"Getting new files for time $currentTime, " + 
     s"ignoring files older than $modTimeIgnoreThreshold") 
     val filter = new PathFilter { 
     def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) 
     } 
     val directories = fs.listStatus(directoryPath).filter(_.isDirectory) 
     val newFiles = ArrayBuffer[FileStatus]() 

     directories.foreach(directory => newFiles.append(fs.listStatus(directory.getPath, filter) : _*)) 

     val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime 
     logInfo("Finding new files took " + timeTaken + " ms") 
     logDebug("# cached file times = " + fileToModTime.size) 
     if (timeTaken > slideDuration.milliseconds) { 
     logWarning(
      "Time taken to find new files exceeds the batch size. " + 
      "Consider increasing the batch size or reducing the number of " + 
      "files in the monitored directory." 
     ) 
     } 
     newFiles.map(_.getPath.toString).toArray 
    } catch { 
     case e: Exception => 
     logWarning("Error finding new files", e) 
     reset() 
     Array.empty 
    } 
    } 

verifica la presenza di file in tutte le sottocartelle di primo grado, è possibile regolare in modo da utilizzare il timestamp lotto per accedere alle "sottodirectory" pertinenti.

ho creato il CustomFileInputDStream come ho già detto e attivato chiamando:

new CustomFileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString) 

Sembra comportarsi noi aspettavamo.

Quando scrivo soluzione di questo devo aggiungere alcuni punti di riflessione:

  • si sta rompendo Spark incapsulamento e la creazione di una classe personalizzata che si dovrà sostenere solo come passare il tempo.

  • Credo che una soluzione come questa sia l'ultima risorsa. Se il tuo caso d'uso può essere implementato in modo diverso, di solito è meglio evitare una soluzione come questa.

  • Se in S3 ci saranno molte "sottodirectory" e verificheremo ciascuna di esse, vi costerà.

  • Sarà molto interessante capire se Databricks non supporta i file annidati solo a causa di possibili penalità di prestazioni o meno, forse c'è una ragione più profonda alla quale non ho pensato.

+0

Ho un caso d'uso simile e sto pensando di seguire questa strada se non trovo un sostituto. Ho una data partizione delle sottocartelle usando il formato YYYY-MM-DD-HH. Ogni ora viene creata una nuova cartella e i file vengono caricati al suo interno. Quindi non dovrò necessariamente eseguire la scansione di tutte le sottocartelle (solo le ultime tre) e non colpirò i problemi di prestazioni. Sono più preoccupato della manutenibilità di tale codice e della gestione dello stato per i riavvii (in quale cartella dell'ora + file è stata scansionata l'ultima volta, ecc.).Vedi se riesci a condividere le tue opinioni su questo o anche sul codice che ha funzionato per il tuo FileDstream personalizzato. – Cheeko

+0

Se si utilizza la directory checkpoint con il flusso, quando si riavvia l'applicazione, si pianificherà innanzitutto la pianificazione di tutti i batch che avrebbero dovuto essere eseguiti durante l'inattività dell'applicazione. Ad esempio, se l'intervallo di streaming è di 1 minuto e l'applicazione è inattiva alle 10:00 e si fa il backup alle 10:30, quando sarà attiva, l'app tenterà di eseguire il batch di 10:01, 10:02, ecc. Ora se si implementa un findNewFiles (currentTime) in modo che le cartelle scansionate derivino dal tempo corrente, sarà possibile scansionare i file "giusti" dopo i riavvii. –

+0

Si prega di notare che currentTime non è attualmente in CORRENTE, è l'ora del lotto. L'unico problema che posso pensare in questo approccio è che se i tuoi file non sono immutabili. ad esempio, scrivi alcuni dati sul file A alle 10:10 e sovrascrivi questi dati alle 10:20, quindi se la tua applicazione era in calo tra 10: 10-10: 20 perderai la prima scrittura in A. È davvero un problema, ma non ho familiarità con molte organizzazioni che lavorano con file mutabili in tali scenari. –