2015-08-31 29 views
6

Sto provando a inviare un grande CSV a kafka. La struttura di base è leggere una riga del CSV e comprimerla con l'intestazione.Invio di CSV grandi a Kafka usando python Spark

a = dict(zip(header, line.split(",") 

questo viene poi convertito in un JSON con:

message = json.dumps(a) 

allora io uso biblioteca Kafka-python per inviare il messaggio

from kafka import SimpleProducer, KafkaClient 
kafka = KafkaClient("localhost:9092") 
producer = SimpleProducer(kafka) 
producer.send_messages("topic", message) 

Utilizzando PYSPARK ho facilmente creato un RDD di messaggi dal file CSV

sc = SparkContext() 
text = sc.textFile("file.csv") 
header = text.first().split(',') 
def remove_header(itr_index, itr): 
    return iter(list(itr)[1:]) if itr_index == 0 else itr 
noHeader = text.mapPartitionsWithIndex(remove_header) 

messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(",")) 

Ora voglio inviare questi messaggi: mi definisco una funzione

def sendkafka(message): 
    kafka = KafkaClient("localhost:9092") 
    producer = SimpleProducer(kafka) 
    return producer.send_messages('topic',message) 

Poi creo un nuovo RDD per inviare i messaggi

sentRDD = messageRDD.map(lambda x: kafkasend(x)) 

Ho poi chiamo sentRDD.count()

Che inizia a sfornare e inviare messaggi

Sfortunatamente questo è molto lento. Invia 1000 messaggi al secondo. Questo è su un cluster a 10 nodi di 4 cpus ciascuno e 8 GB di memoria.

In confronto, la creazione dei messaggi richiede circa 7 secondi su un csv di 10 milioni di righe. ~ circa 2 gb

Penso che il problema è che sto creando un'istanza di un produttore di kafka all'interno della funzione. Tuttavia, se non lo faccio, allora si lamenta che il produttore non esiste anche se ho provato a definirlo a livello globale.

Forse qualcuno può far luce su come questo problema può essere affrontato.

Grazie,

risposta

6

È possibile creare un unico produttore per partizione e utilizzare mapPartitions o foreachPartition:

def sendkafka(messages): 
    kafka = KafkaClient("localhost:9092") 
    producer = SimpleProducer(kafka) 
    for message in messages: 
     yield producer.send_messages('topic', message) 

sentRDD = messageRDD.mapPartitions(sendkafka) 

Se sopra da solo non vi aiuterà si può cercare di estenderlo utilizzando un asynchronous producer.

In Spark 2.x è anche possibile utilizzare l'origine dati Kafka. Dovrete includere spark-sql-kafka vaso, Spark corrispondenza e la versione Scala (qui 2.2.0 e 2.11, rispettivamente):

dati
spark.jars.packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 

convertire in un DataFrame (se non è già DataFrame):

messageDF = spark.createDataFrame(messageRDD, "string") 

e scrivere usando DataFrameWriter:

(messageDF.write 
    .format("kafka") 
    .option("topic", topic_name) 
    .option("kafka.bootstrap.servers", bootstrap_servers) 
    .save()) 
+0

Grazie zero323. con un singolo produttore al di fuori della scintilla usando un async potrei ottenere 8000 al secondo. Quindi ho apportato qualche ritocco. Ho scoperto che avevo 15 partizioni per questo CSV quindi ho dato il lavoro a 15 core.Ho quindi giocato con le opzioni asincrone fino a quando la dimensione del batch era 20000. Ciò mi ha dato un throughput massimo di 225 mila al secondo. Quindi con un po 'di sintonizzazione ho effettivamente ottenuto un prezzo ragionevole. Questo è 45 secondi per lo streaming di un CSV di 10 milioni di righe. –

+1

@PhineasDashevsky, sarebbe molto utile se fosse possibile condividere il codice per la soluzione finale. – Picarus

+0

https://iabdb.me/2015/09/09/kafka-on-the-shore-my-experiences-benchmarking-apache-kafka-part-i/ In questo arcticolo ho il codice e una descrizione più lunga di come farlo –