5

Sto leggendo i dati da file CSV di grandi dimensioni, elaborandoli e caricandoli in un database SQLite. La creazione di profili suggerisce che l'80% del mio tempo è dedicato all'I/O e il 20% sta elaborando l'input per prepararlo all'inserimento del DB. Ho accelerato la fase di elaborazione con multiprocessing.Pool in modo che il codice I/O non stia aspettando il prossimo record. Ma questo ha causato seri problemi di memoria perché il passaggio I/O non riusciva a stare al passo con i lavoratori.multiprocessing.Pool.imap_unordered con dimensione di coda fissa o buffer?

L'esempio seguente giocattolo illustra il mio problema:

#!/usr/bin/env python # 3.4.3 
import time 
from multiprocessing import Pool 

def records(num=100): 
    """Simulate generator getting data from large CSV files.""" 
    for i in range(num): 
     print('Reading record {0}'.format(i)) 
     time.sleep(0.05) # getting raw data is fast 
     yield i 

def process(rec): 
    """Simulate processing of raw text into dicts.""" 
    print('Processing {0}'.format(rec)) 
    time.sleep(0.1) # processing takes a little time 
    return rec 

def writer(records): 
    """Simulate saving data to SQLite database.""" 
    for r in records: 
     time.sleep(0.3) # writing takes the longest 
     print('Wrote {0}'.format(r)) 

if __name__ == "__main__": 
    data = records(100) 
    with Pool(2) as pool: 
     writer(pool.imap_unordered(process, data, chunksize=5)) 

risultati questo codice in un backlog di record che alla fine consuma tutta la memoria perché non riesco a persistere i dati su disco abbastanza veloce. Esegui il codice e noterai che Pool.imap_unordered consumerà tutti i dati quando writer è al 15 ° record o così. Ora immagina che la fase di elaborazione stia producendo dizionari da centinaia di milioni di righe e puoi capire perché ho esaurito la memoria. Amdahl's Law forse in azione.

Qual è la correzione per questo? Penso che ho bisogno di una sorta di buffer per Pool.imap_unordered che dice "una volta che ci sono x record che devono essere inseriti, fermati e aspetta fino a quando non ci sono meno di x prima di fare di più". Dovrei essere in grado di ottenere un miglioramento della velocità dalla preparazione del prossimo record mentre l'ultimo viene salvato.

Ho provato a utilizzare NuMap dal modulo papy (che ho modificato per funzionare con Python 3) per fare esattamente questo, ma non era più veloce. In effetti, era peggio che eseguire il programma in sequenza; NuMap utilizza due thread più processi multipli.

Le funzionalità di importazione di massa di SQLite probabilmente non sono adatte alla mia attività poiché i dati richiedono un'elaborazione e una normalizzazione sostanziali.

Ho circa 85G di testo compresso da elaborare. Sono aperto ad altre tecnologie di database, ma ho scelto SQLite per facilità d'uso e perché si tratta di un lavoro write-once read-many in cui solo 3 o 4 persone useranno il database risultante dopo aver caricato tutto.

risposta

2

Dal trattamento è veloce, ma la scrittura è lenta, sembra che il tuo problema sia legato a I/O. Pertanto potrebbe non esserci molto da guadagnare usando il multiprocessing .

Tuttavia, è possibile staccare pezzi di data, processo il pezzo, e attendere che che i dati è stato scritto prima si staccava un altro pezzo:

import itertools as IT 
if __name__ == "__main__": 
    data = records(100) 
    with Pool(2) as pool: 
     chunksize = ... 
     for chunk in iter(lambda: list(IT.islice(data, chunksize)), []): 
      writer(pool.imap_unordered(process, chunk, chunksize=5)) 
+1

Questa sembra essere la soluzione migliore. È un compromesso tra lasciare che i processi vadano fuori sincrono e ottenere un aumento di velocità durante la fase di elaborazione. Sarebbe bello avere una funzione 'multiprocessing' che esegua' imap' con una sorta di parametro buffer. – ChrisP

0

Sembra che tutto ciò di cui si ha realmente bisogno è di sostituire le code illimitate sotto lo Pool con code limitate (e bloccanti). In questo modo, se una parte ottiene il vantaggio, bloccherà fino a quando non saranno pronte.

Questo sarebbe facile da fare da capolino a the source, sottoclasse o monkeypatch Pool, qualcosa di simile:

class Pool(multiprocessing.pool.Pool): 
    def _setup_queues(self): 
     self._inqueue = self._ctx.Queue(5) 
     self._outqueue = self._ctx.Queue(5) 
     self._quick_put = self._inqueue._writer.send 
     self._quick_get = self._outqueue._reader.recv 
     self._taskqueue = queue.Queue(10) 

Ma non è, ovviamente, portatile (anche a CPython 3.3, tanto meno di una diversa implementazione Python 3).

io penso si può fare portabile in 3.4+, fornendo una personalizzata context, ma non sono stato in grado di ottenere tale diritto, quindi ...

+0

Questa è un'idea interessante che non funziona semplicemente in pratica. '_taskqueue' è assegnato in realtà dopo che viene eseguito' _setup_queues'. Anche l'inserimento di una dimensione massima nella coda non funziona a causa della struttura di "imap_unordered", che non restituirà mai il generatore perché la coda si blocca se viene impostata una dimensione massima. – ChrisP

+0

@ChrisP: Beh, la prima parte è facile da risolvere, solo un po 'più caotica. Il secondo, però, hai ragione. Ho assunto troppo sul modo in cui le funzioni della mappa sono implementate. Questo sarebbe più facile da costruire in cima a un 'concurrent.futures.ProcessPoolExecutor; in cima a 'multiprocessing.Pool', devi sostanzialmente riscrivere le funzioni di' map' da zero in cima ai singoli submission ... – abarnert

2

Mentre stavo lavorando sullo stesso problema , ho pensato che un modo efficace per prevenire la piscina da sovraccarico è quello di utilizzare un semaforo con un generatore:

from multiprocessing import Pool, Semaphore 

def produce(semaphore, from_file): 
    with open(from_file) as reader: 
     for line in reader: 
      # Reduce Semaphore by 1 or wait if 0 
      semaphore.acquire() 
      # Now deliver an item to the caller (pool) 
      yield line 

def process(item): 
    result = (first_function(item), 
       second_function(item), 
       third_function(item)) 
    return result 

def consume(semaphore, result): 
    database_con.cur.execute("INSERT INTO ResultTable VALUES (?,?,?)", result) 
    # Result is consumed, semaphore may now be increased by 1 
    semaphore.release() 

def main() 
    global database_con 
    semaphore_1 = Semaphore(1024) 
    with Pool(2) as pool: 
     for result in pool.imap_unordered(process, produce(semaphore_1, "workfile.txt"), chunksize=128): 
      consume(semaphore1, result) 

Vedi anche:

K Hong - Multithreading - Semaphore objects & thread pool

Lecture from Chris Terman - MIT 6.004 L21: Semaphores