Sto leggendo i dati da file CSV di grandi dimensioni, elaborandoli e caricandoli in un database SQLite. La creazione di profili suggerisce che l'80% del mio tempo è dedicato all'I/O e il 20% sta elaborando l'input per prepararlo all'inserimento del DB. Ho accelerato la fase di elaborazione con multiprocessing.Pool
in modo che il codice I/O non stia aspettando il prossimo record. Ma questo ha causato seri problemi di memoria perché il passaggio I/O non riusciva a stare al passo con i lavoratori.multiprocessing.Pool.imap_unordered con dimensione di coda fissa o buffer?
L'esempio seguente giocattolo illustra il mio problema:
#!/usr/bin/env python # 3.4.3
import time
from multiprocessing import Pool
def records(num=100):
"""Simulate generator getting data from large CSV files."""
for i in range(num):
print('Reading record {0}'.format(i))
time.sleep(0.05) # getting raw data is fast
yield i
def process(rec):
"""Simulate processing of raw text into dicts."""
print('Processing {0}'.format(rec))
time.sleep(0.1) # processing takes a little time
return rec
def writer(records):
"""Simulate saving data to SQLite database."""
for r in records:
time.sleep(0.3) # writing takes the longest
print('Wrote {0}'.format(r))
if __name__ == "__main__":
data = records(100)
with Pool(2) as pool:
writer(pool.imap_unordered(process, data, chunksize=5))
risultati questo codice in un backlog di record che alla fine consuma tutta la memoria perché non riesco a persistere i dati su disco abbastanza veloce. Esegui il codice e noterai che Pool.imap_unordered
consumerà tutti i dati quando writer
è al 15 ° record o così. Ora immagina che la fase di elaborazione stia producendo dizionari da centinaia di milioni di righe e puoi capire perché ho esaurito la memoria. Amdahl's Law forse in azione.
Qual è la correzione per questo? Penso che ho bisogno di una sorta di buffer per Pool.imap_unordered
che dice "una volta che ci sono x record che devono essere inseriti, fermati e aspetta fino a quando non ci sono meno di x prima di fare di più". Dovrei essere in grado di ottenere un miglioramento della velocità dalla preparazione del prossimo record mentre l'ultimo viene salvato.
Ho provato a utilizzare NuMap
dal modulo papy
(che ho modificato per funzionare con Python 3) per fare esattamente questo, ma non era più veloce. In effetti, era peggio che eseguire il programma in sequenza; NuMap
utilizza due thread più processi multipli.
Le funzionalità di importazione di massa di SQLite probabilmente non sono adatte alla mia attività poiché i dati richiedono un'elaborazione e una normalizzazione sostanziali.
Ho circa 85G di testo compresso da elaborare. Sono aperto ad altre tecnologie di database, ma ho scelto SQLite per facilità d'uso e perché si tratta di un lavoro write-once read-many in cui solo 3 o 4 persone useranno il database risultante dopo aver caricato tutto.
Questa sembra essere la soluzione migliore. È un compromesso tra lasciare che i processi vadano fuori sincrono e ottenere un aumento di velocità durante la fase di elaborazione. Sarebbe bello avere una funzione 'multiprocessing' che esegua' imap' con una sorta di parametro buffer. – ChrisP