Sto provando a inviare un grande CSV a kafka. La struttura di base è leggere una riga del CSV e comprimerla con l'intestazione.Invio di CSV grandi a Kafka usando python Spark
a = dict(zip(header, line.split(",")
questo viene poi convertito in un JSON con:
message = json.dumps(a)
allora io uso biblioteca Kafka-python per inviare il messaggio
from kafka import SimpleProducer, KafkaClient
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
producer.send_messages("topic", message)
Utilizzando PYSPARK ho facilmente creato un RDD di messaggi dal file CSV
sc = SparkContext()
text = sc.textFile("file.csv")
header = text.first().split(',')
def remove_header(itr_index, itr):
return iter(list(itr)[1:]) if itr_index == 0 else itr
noHeader = text.mapPartitionsWithIndex(remove_header)
messageRDD = noHeader.map(lambda x: json.dumps(dict(zip(header, x.split(","))
Ora voglio inviare questi messaggi: mi definisco una funzione
def sendkafka(message):
kafka = KafkaClient("localhost:9092")
producer = SimpleProducer(kafka)
return producer.send_messages('topic',message)
Poi creo un nuovo RDD per inviare i messaggi
sentRDD = messageRDD.map(lambda x: kafkasend(x))
Ho poi chiamo sentRDD.count()
Che inizia a sfornare e inviare messaggi
Sfortunatamente questo è molto lento. Invia 1000 messaggi al secondo. Questo è su un cluster a 10 nodi di 4 cpus ciascuno e 8 GB di memoria.
In confronto, la creazione dei messaggi richiede circa 7 secondi su un csv di 10 milioni di righe. ~ circa 2 gb
Penso che il problema è che sto creando un'istanza di un produttore di kafka all'interno della funzione. Tuttavia, se non lo faccio, allora si lamenta che il produttore non esiste anche se ho provato a definirlo a livello globale.
Forse qualcuno può far luce su come questo problema può essere affrontato.
Grazie,
Grazie zero323. con un singolo produttore al di fuori della scintilla usando un async potrei ottenere 8000 al secondo. Quindi ho apportato qualche ritocco. Ho scoperto che avevo 15 partizioni per questo CSV quindi ho dato il lavoro a 15 core.Ho quindi giocato con le opzioni asincrone fino a quando la dimensione del batch era 20000. Ciò mi ha dato un throughput massimo di 225 mila al secondo. Quindi con un po 'di sintonizzazione ho effettivamente ottenuto un prezzo ragionevole. Questo è 45 secondi per lo streaming di un CSV di 10 milioni di righe. –
@PhineasDashevsky, sarebbe molto utile se fosse possibile condividere il codice per la soluzione finale. – Picarus
https://iabdb.me/2015/09/09/kafka-on-the-shore-my-experiences-benchmarking-apache-kafka-part-i/ In questo arcticolo ho il codice e una descrizione più lunga di come farlo –