2015-03-03 15 views
6

Sto tentando di utilizzare Sailfish, che accetta più file fastq come argomenti, in una pipeline ruffus. Eseguo Sailfish utilizzando il modulo subprocess in python, ma <() nella chiamata di sottoprocesso non funziona anche quando ho impostato shell=True.Tubi multipli nel sottoprocesso

Questo è il comando che voglio da eseguire utilizzando python:

sailfish quant [options] -1 <(cat sample1a.fastq sample1b.fastq) -2 <(cat sample2a.fastq sample2b.fastq) -o [output_file] 

o (preferibilmente):

sailfish quant [options] -1 <(gunzip sample1a.fastq.gz sample1b.fastq.gz) -2 <(gunzip sample2a.fastq.gz sample2b.fastq.gz) -o [output_file] 

Una generalizzazione:

someprogram <(someprocess) <(someprocess) 

Come potrei fare per fare questo in python? Il sottoprocesso è il giusto approccio?

+0

correlato: [Bash processo di sostituzione con stile Popen di Python] (http://stackoverflow.com/q/15343447/4279) – jfs

+0

relativi al titolo: [Come si usa subprocess.Popen per collegare più processi da tubi?] (http://stackoverflow.com/q/295459/4279) – jfs

risposta

8

di emulare il bash process substitution:

tubi
#!/usr/bin/env python 
from subprocess import check_call 

check_call('someprogram <(someprocess) <(anotherprocess)', 
      shell=True, executable='/bin/bash') 

In Python, è possibile utilizzare nome:

#!/usr/bin/env python 
from subprocess import Popen 

with named_pipes(n=2) as paths: 
    someprogram = Popen(['someprogram'] + paths) 
    processes = [] 
    for path, command in zip(paths, ['someprocess', 'anotherprocess']): 
     with open(path, 'wb', 0) as pipe: 
      processes.append(Popen(command, stdout=pipe, close_fds=True)) 
    for p in [someprogram] + processes: 
     p.wait() 

dove named_pipes(n) è:

import os 
import shutil 
import tempfile 
from contextlib import contextmanager 

@contextmanager 
def named_pipes(n=1): 
    dirname = tempfile.mkdtemp() 
    try: 
     paths = [os.path.join(dirname, 'named_pipe' + str(i)) for i in range(n)] 
     for path in paths: 
      os.mkfifo(path) 
     yield paths 
    finally: 
     shutil.rmtree(dirname) 

Un altro e più pre modo agevole (non è necessario creare una voce con nome sul disco) per implementare la sostituzione del processo di bash è utilizzare i nomi di file /dev/fd/N (se disponibili) come suggested by @Dunes. Su FreeBSD, fdescfs(5) (/dev/fd/#) creates entries for all file descriptors opened by the process. Per verificare la disponibilità, eseguire:

$ test -r /dev/fd/3 3</dev/null && echo /dev/fd is available 

Se fallisce; cercare di link simbolico /dev/fd a proc(5) come si fa su alcuni sistemi Linux:

$ ln -s /proc/self/fd /dev/fd 

Ecco /dev/fd based realizzazione di comando someprogram <(someprocess) <(anotherprocess) bash:

#!/usr/bin/env python3 
from contextlib import ExitStack 
from subprocess import CalledProcessError, Popen, PIPE 

def kill(process): 
    if process.poll() is None: # still running 
     process.kill() 

with ExitStack() as stack: # for proper cleanup 
    processes = [] 
    for command in [['someprocess'], ['anotherprocess']]: # start child processes 
     processes.append(stack.enter_context(Popen(command, stdout=PIPE))) 
     stack.callback(kill, processes[-1]) # kill on someprogram exit 

    fds = [p.stdout.fileno() for p in processes] 
    someprogram = stack.enter_context(
     Popen(['someprogram'] + ['/dev/fd/%d' % fd for fd in fds], pass_fds=fds)) 
    for p in processes: # close pipes in the parent 
     p.stdout.close() 
# exit stack: wait for processes 
if someprogram.returncode != 0: # errors shouldn't go unnoticed 
    raise CalledProcessError(someprogram.returncode, someprogram.args) 

Nota: sulla mia macchina Ubuntu, il codice subprocess funziona solo in Python 3.4+, nonostante pass_fds sia disponibile da Python 3.2.

+0

Grazie JF Sebastian! In realtà ha funzionato con il semplice argomento di sottoprocesso 'executable = '/ bin/bash'' che prima mi mancava. Funziona ora con questa chiamata: 'check_call ('sailfish quant [opzioni] <(gunzip -c file1 file2) <(gunzip -c file3 file4)', shell = True, eseguibile = '/ bin/bash')'. Grazie mille per il vostro aiuto! Sei davvero andato oltre nella tua risposta - non solo mi hai aiutato a risolvere il mio problema, ma mi hai anche aiutato a capire meglio il piping in python. – Michelle

2

Mentre J.F. Sebastian ha fornito una risposta utilizzando pipe denominate, è possibile farlo con pipe anonime.

import shlex 
from subprocess import Popen, PIPE 

inputcmd0 = "zcat hello.gz" # gzipped file containing "hello" 
inputcmd1 = "zcat world.gz" # gzipped file containing "world" 

def get_filename(file_): 
    return "/dev/fd/{}".format(file_.fileno()) 

def get_stdout_fds(*processes): 
    return tuple(p.stdout.fileno() for p in processes) 

# setup producer processes 
inputproc0 = Popen(shlex.split(inputcmd0), stdout=PIPE) 
inputproc1 = Popen(shlex.split(inputcmd1), stdout=PIPE) 

# setup consumer process 
# pass input processes pipes by "filename" eg. /dev/fd/5 
cmd = "cat {file0} {file1}".format(file0=get_filename(inputproc0.stdout), 
    file1=get_filename(inputproc1.stdout)) 
print("command is:", cmd) 
# pass_fds argument tells Popen to let the child process inherit the pipe's fds 
someprogram = Popen(shlex.split(cmd), stdout=PIPE, 
    pass_fds=get_stdout_fds(inputproc0, inputproc1)) 

output, error = someprogram.communicate() 

for p in [inputproc0, inputproc1, someprogram]: 
    p.wait() 

assert output == b"hello\nworld\n" 
+0

il tuo codice: 'inputcmd | someproc' - è diverso da 'someproc <(inputcmd)'. btw, dovresti chiamare 'inputproc.communicate()' invece di 'inputproc.wait() ', per chiudere' inputproc.stdout.close() 'nel genitore in modo che' inputproc' non si blocchi se 'someproc' esce prematuramente. Non è chiaro cosa stai cercando di ottenere con 'StreamConnector' ma sembra gonfiato. – jfs

+0

Il mio errore. Ho pensato che '<(cmdlist)' ha collegato una serie di comandi stdout allo stdin del processo consumer. La classe era pensata per un'utilità a forma di gatto per i flussi anziché i file. La risposta è molto più semplice ora. – Dunes

+0

'/ dev/fd/#' o named pipe se il primo non è disponibile è esattamente come bash implementa la sostituzione del processo. Dovresti chiudere i pipe nel genitore in modo tale che 'inputproc1' o' inputproc2' muoiano prematuramente; 'someprogram' potrebbe uscire prima. Altrimenti * la soluzione dovrebbe funzionare su Python 3.4 + *. Ho aggiunto una versione del codice eccezionalmente sicura a [mia risposta] (http://stackoverflow.com/a/28840955/4279) (proprio come un esercizio). – jfs