2015-12-21 2 views
7

Mi sono imbattuto in una situazione in cui voglio tracciare qualche goroutine da sincronizzare su un punto specifico, ad esempio quando vengono caricati tutti gli url. Quindi, possiamo metterli tutti e mostrarli in ordine specifico.Come implementare un timeout quando si utilizza sync.WaitGroup.wait?

Penso che questa sia la barriera entrata. È in go con sync.WaitGroup. Tuttavia, in una situazione reale, non possiamo assicurarci che tutte le operazioni di recupero riusciranno in breve tempo. Quindi, voglio introdurre un timeout quando wait per le operazioni di recupero.

Sono un novellino a Golang, quindi qualcuno può darmi qualche consiglio?


Quello che sto cercando è come questo:

wg := &sync.WaigGroup{} 
    select { 
    case <-wg.Wait(): 
    // All done! 
    case <-time.After(500 * time.Millisecond): 
    // Hit timeout. 
    } 

So Wait non supportano Channel.

+0

potete inserire come tu sei l'aggiunta di gruppi attesa e roba come stai facendo attraverso un loop o ?? – Datsik

+0

Possibile duplicato di [Timeout per WaitGroup.Wait()] (http://stackoverflow.com/questions/32840687/timeout-for-waitgroup-wait) – jab

risposta

8

Se tutto ciò che si desidera è una selezione accurata, è possibile convertire facilmente la funzione di blocco in un canale generando una routine che chiama un metodo e si chiude/invia sul canale una volta eseguito.

done := make(chan struct{}) 
go func() { 
    wg.Wait() 
    close(done) 
}() 

select { 
case <-done: 
// All done! 
case <-time.After(500 * time.Millisecond): 
// Hit timeout. 
} 
+0

Mi sembra buono. – andy

0

Un altro modo per farlo sarebbe monitorarlo internamente, la tua domanda è limitata ma assumerò che tu stia iniziando le tue goroutine attraverso un ciclo anche se non lo sei puoi refactoring per lavorare per te ma si potrebbe fare uno di questi 2 esempi, il primo sarà timeout ogni richiesta di timeout individualmente e il secondo sarà il timeout l'intero lotto di richieste e andare avanti se troppo tempo è passato

var wg sync.WaitGroup 
wg.Add(1) 
go func() { 
    success := make(chan struct{}, 1) 
    go func() { 
     // send your request and wait for a response 
     // pretend response was received 
     time.Sleep(5 * time.Second) 
     success <- struct{}{} 
     // goroutine will close gracefully after return  
     fmt.Println("Returned Gracefully") 
    }() 

    select { 
    case <-success: 
     break 
    case <-time.After(1 * time.Second): 
     break 
    } 

    wg.Done() 
    // everything should be garbage collected and no longer take up space 
}() 

wg.Wait() 

// do whatever with what you got  
fmt.Println("Done") 
time.Sleep(10 * time.Second) 
fmt.Println("Checking to make sure nothing throws errors after limbo goroutine is done") 

O se si Voglio solo un modo semplice e generale per il timeout di TUTTE le richieste che potresti fare qualcosa come

var wg sync.WaitGroup 
waiter := make(chan int) 
wg.Add(1) 
go func() { 
    success := make(chan struct{}, 1) 
    go func() { 
     // send your request and wait for a response 
     // pretend response was received 
     time.Sleep(5 * time.Second) 
     success <- struct{}{} 
     // goroutine will close gracefully after return  
     fmt.Println("Returned Gracefully") 
    }() 

    select { 
    case <-success: 
     break 
    case <-time.After(1 * time.Second): 
     // control the timeouts for each request individually to make sure that wg.Done gets called and will let the goroutine holding the .Wait close 
     break 
    } 
    wg.Done() 
    // everything should be garbage collected and no longer take up space 
}() 

completed := false 
go func(completed *bool) { 
    // Unblock with either wait 
    wg.Wait() 
    if !*completed { 
     waiter <- 1   
     *completed = true 
    }  
    fmt.Println("Returned Two") 
}(&completed) 

go func(completed *bool) { 
    // wait however long 
    time.Sleep(time.Second * 5) 
    if !*completed { 
     waiter <- 1   
     *completed = true 
    }  
    fmt.Println("Returned One") 
}(&completed) 


// block until it either times out or .Wait stops blocking 
<-waiter 

// do whatever with what you got  
fmt.Println("Done") 
time.Sleep(10 * time.Second) 
fmt.Println("Checking to make sure nothing throws errors after limbo goroutine is done") 

questo modo il vostro WaitGroup rimarrà in sincronia e non avrà alcun goroutines lasciati nel limbo

http://play.golang.org/p/g0J_qJ1BUT provare qui è possibile modificare le variabili in giro per vedere il lavoro in modo diverso

Edit: Sono su mobile Se qualcuno potesse risolvere la formattazione sarebbe fantastico grazie.

0

Invia i risultati ad un canale tamponata sufficiente per prendere tutti i risultati, senza bloccare, e leggerli in per-select ciclo nel thread principale:

func work(msg string, d time.Duration, ret chan<- string) { 
    time.Sleep(d) // Work emulation. 
    select { 
    case ret <- msg: 
    default: 
    } 
} 

// ... 

const N = 2 
ch := make(chan string, N) 

go work("printed", 100*time.Millisecond, ch) 
go work("not printed", 1000*time.Millisecond, ch) 

timeout := time.After(500 * time.Millisecond) 
loop: 
for received := 0; received < N; received++ { 
    select { 
    case msg := <-ch: 
     fmt.Println(msg) 
    case <-timeout: 
     fmt.Println("timeout!") 
     break loop 
    } 
} 

Playground: http://play.golang.org/p/PxeEEJo2dz.

Vedere anche: Go Concurrency Patterns: Timing out, moving on.

+0

Non sono sicuro del motivo per cui è necessaria una trasmissione non bloccante qui, a meno che non sia solo una misura di sicurezza nel caso in cui la dimensione del buffer non corrisponda al numero di lavoratori. Altrimenti il ​​conteggio (proposto per il ciclo) è probabilmente una soluzione migliore di "WaitGroup" quando i lavoratori producono qualcosa su un canale. – tomasz

0

Se volete evitare di mescolare la logica di concorrenza con la logica di business, ho scritto questa libreria https://github.com/shomali11/parallelizer per aiutarvi in ​​questo. Incapsula la logica della concorrenza in modo da non doversene preoccupare.

Quindi nel tuo esempio:

package main 

import (
    "github.com/shomali11/parallelizer" 
    "fmt" 
) 

func main() { 
    urls := []string{ ... } 
    results = make([]*HttpResponse, len(urls) 

    options := &Options{ Timeout: time.Second } 
    group := parallelizer.NewGroup(options) 
    for index, url := range urls { 
     group.Add(func(index int, url string, results *[]*HttpResponse) { 
      return func() { 
       ... 

       results[index] = &HttpResponse{url, response, err} 
      } 
     }(index, url, &results)) 
    } 

    err := group.Run() 

    fmt.Println("Done") 
    fmt.Println(fmt.Sprintf("Results: %v", results)) 
    fmt.Printf("Error: %v", err) // nil if it completed, err if timed out 
}