2009-06-16 6 views
8

Background:Classe Python per unire file ordinati, come può essere migliorato?

sto pulendo grande (non può essere tenuto in memoria) file delimitati da tabulazioni. Mentre pulisco il file di input, creo un elenco in memoria; quando arriva a 1.000.000 di voci (circa 1 GB in memoria) lo ordino (usando la chiave predefinita di seguito) e scrivo l'elenco in un file. Questa classe serve a rimettere insieme i file ordinati. Funziona sui file che ho incontrato finora. Il mio caso più grande, finora, è l'unione di 66 file ordinati.

Domande:

  1. ci sono buchi nella mia logica (dove è fragile)?
  2. Ho implementato correttamente l'algoritmo di merge-sort ?
  3. Esistono miglioramenti evidenti che potrebbero essere apportati?

Esempio Dati:

Questa è un'astrazione di una linea in uno di questi file:

'hash_of_SomeStringId\tSome String Id\t\t\twww.somelink.com\t\tOtherData\t\n'

L'asporto è che io uso 'SomeStringId'.lower().replace(' ', '') come la mia chiave di ordinamento.

Codice originale:

class SortedFileMerger(): 
    """ A one-time use object that merges any number of smaller sorted 
     files into one large sorted file. 

     ARGS: 
      paths - list of paths to sorted files 
      output_path - string path to desired output file 
      dedup - (boolean) remove lines with duplicate keys, default = True 
      key - use to override sort key, default = "line.split('\t')[1].lower().replace(' ', '')" 
        will be prepended by "lambda line: ". This should be the same 
        key that was used to sort the files being merged! 
    """ 
    def __init__(self, paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"): 
     self.key = eval("lambda line: %s" % key) 
     self.dedup = dedup 
     self.handles = [open(path, 'r') for path in paths] 
     # holds one line from each file 
     self.lines = [file_handle.readline() for file_handle in self.handles] 
     self.output_file = open(output_path, 'w') 
     self.lines_written = 0 
     self._mergeSortedFiles() #call the main method 

    def __del__(self): 
     """ Clean-up file handles. 
     """ 
     for handle in self.handles: 
      if not handle.closed: 
       handle.close() 
     if self.output_file and (not self.output_file.closed): 
      self.output_file.close() 

    def _mergeSortedFiles(self): 
     """ Merge the small sorted files to 'self.output_file'. This can 
      and should only be called once. 
      Called from __init__(). 
     """ 
     previous_comparable = '' 
     min_line = self._getNextMin() 
     while min_line: 
      index = self.lines.index(min_line) 
      comparable = self.key(min_line) 
      if not self.dedup:      
       #not removing duplicates 
       self._writeLine(index) 
      elif comparable != previous_comparable: 
       #removing duplicates and this isn't one 
       self._writeLine(index) 
      else:         
       #removing duplicates and this is one 
       self._readNextLine(index) 
      previous_comparable = comparable 
      min_line = self._getNextMin() 
     #finished merging 
     self.output_file.close() 

    def _getNextMin(self): 
     """ Returns the next "smallest" line in sorted order. 
      Returns None when there are no more values to get. 
     """ 
     while '' in self.lines: 
      index = self.lines.index('') 
      if self._isLastLine(index): 
       # file.readline() is returning '' because 
       # it has reached the end of a file. 
       self._closeFile(index) 
      else: 
       # an empty line got mixed in 
       self._readNextLine(index) 
     if len(self.lines) == 0: 
      return None 
     return min(self.lines, key=self.key) 

    def _writeLine(self, index): 
     """ Write line to output file and update self.lines 
     """ 
     self.output_file.write(self.lines[index]) 
     self.lines_written += 1 
     self._readNextLine(index) 

    def _readNextLine(self, index): 
     """ Read the next line from handles[index] into lines[index] 
     """ 
     self.lines[index] = self.handles[index].readline() 

    def _closeFile(self, index): 
     """ If there are no more lines to get in a file, it 
      needs to be closed and removed from 'self.handles'. 
      It's entry in 'self.lines' also need to be removed. 
     """ 
     handle = self.handles.pop(index) 
     if not handle.closed: 
      handle.close() 
     # remove entry from self.lines to preserve order 
     _ = self.lines.pop(index) 

    def _isLastLine(self, index): 
     """ Check that handles[index] is at the eof. 
     """ 
     handle = self.handles[index]    
     if handle.tell() == os.path.getsize(handle.name): 
      return True 
     return False 

Edit: Attuare i suggerimenti da Brian mi si avvicinò con la seguente soluzione:

Secondo Edit: Aggiornato il codice per John Machin 's suggerimento :

def decorated_file(f, key): 
    """ Yields an easily sortable tuple. 
    """ 
    for line in f: 
     yield (key(line), line) 

def standard_keyfunc(line): 
    """ The standard key function in my application. 
    """ 
    return line.split('\t', 2)[1].replace(' ', '').lower() 

def mergeSortedFiles(paths, output_path, dedup=True, keyfunc=standard_keyfunc): 
    """ Does the same thing SortedFileMerger class does. 
    """ 
    files = map(open, paths) #open defaults to mode='r' 
    output_file = open(output_path, 'w') 
    lines_written = 0 
    previous_comparable = '' 
    for line in heapq26.merge(*[decorated_file(f, keyfunc) for f in files]): 
     comparable = line[0] 
     if previous_comparable != comparable: 
      output_file.write(line[1]) 
      lines_written += 1 
     previous_comparable = comparable 
    return lines_written 

ruvida prova

Utilizzando gli stessi file di input (2.2 GB di dati):

  • classe SortedFileMerger voluti 51 minuti (3068.4 secondi)
  • soluzione
  • s' Brian sono voluti 40 minuti (2408.5 secondi)
  • Dopo aver aggiunto John Machin suggerimenti, il codice soluzione ha impiegato 36 minuti (2214.0 secondi)
+0

decorated_file è equivalente a ((chiave (linea), line) per la linea in f) –

+0

@gnibbler, Volontà che accelerare il processo o semplicemente sbarazzarsi della funzione? – tgray

risposta

16

Nota che in python2.6, heapq ha una nuova funzione merge che lo farà per te.

Per gestire la funzione del tasto personalizzato, si può semplicemente avvolgere l'iteratore file con qualcosa che decora in modo che mette a confronto basato sulla chiave, e la striscia fuori dopo:

def decorated_file(f, key): 
    for line in f: 
     yield (key(line), line) 

filenames = ['file1.txt','file2.txt','file3.txt'] 
files = map(open, filenames) 
outfile = open('merged.txt') 

for line in heapq.merge(*[decorated_file(f, keyfunc) for f in files]): 
    outfile.write(line[1]) 

[Edit] Anche nelle versioni precedenti di python, è probabilmente utile semplicemente prendere l'implementazione di unire dal successivo modulo heapq. È puro python e viene eseguito senza modifiche in python2.5 e poiché utilizza un heap per ottenere il minimo successivo dovrebbe essere molto efficiente quando si uniscono grandi numeri di file.

Si dovrebbe essere in grado di copiare semplicemente l'heapq.py da un'installazione di python2.6, copiarlo all'origine come "heapq26.py" e utilizzare "from heapq26 import merge" - non ci sono funzioni specifiche 2.6 in esso utilizzate. In alternativa, è sufficiente copiare la funzione di unione (riscrivere le chiamate heaopop, ecc. Per fare riferimento al modulo python2.5 heapq).

+0

In realtà, sto ancora usando python 2.5. – tgray

+0

Questa è un'ottima risposta, ho cercato Google per settimane e non sono riuscito a trovarlo. – tgray

2

< < Questa "risposta" è un commento sul codice risultante del interrogante originale >>

Suggerimento: usando eval() è ummmm e cosa si sta facendo limita al chiamante di utilizzare lambda - estrazione chiave può richiedere più di un one-liner, e in ogni caso non hai bisogno della stessa funzione per il passo di classificazione preliminare?

Quindi sostituire questo:

def mergeSortedFiles(paths, output_path, dedup=True, key="line.split('\t')[1].lower().replace(' ', '')"): 
    keyfunc = eval("lambda line: %s" % key) 

con questo:

def my_keyfunc(line): 
    return line.split('\t', 2)[1].replace(' ', '').lower() 
    # minor tweaks may speed it up a little 

def mergeSortedFiles(paths, output_path, keyfunc, dedup=True):  
+0

Grazie, anche la valutazione() mi è sembrata strana, ma non sapevo l'alternativa. Avevo ottenuto il metodo da questa ricetta: http://code.activestate.com/recipes/576755/ – tgray

+0

Quella ricetta fornisce lo strumento eval() solo come funzionalità opzionale per coloro che hanno il coraggio di digitare la sorgente della loro funzione di estrazione chiave nella riga di comando quando eseguono un ordinamento multi-GB :-) Noterai che questo è stato separato in modo pulito; entrambe le funzioni di unione e ordinamento prendono una funzione per l'argomento chiave, non una stringa. –