Go concurrency cancellation patterns #
Channel based cancellation #
This pattern is commonly used in the stdlib, e.g. the HTTP server. It relies
on closing a channel to signal the workers to stop. A sync.WaitGroup
is used
to wait for the workers to complete before exiting.
package main
import (
"fmt"
"sync"
"time"
)
type Worker struct {
mu sync.Mutex
wg sync.WaitGroup
done chan struct{}
}
func (w *Worker) Work(id int) {
w.mu.Lock()
// wg.Add is not safe for concurrent use so the lock needs to be held
// when it's called.
w.wg.Add(1)
w.mu.Unlock()
defer w.wg.Done()
done := w.getDoneChan()
for {
select {
case <-done:
fmt.Printf("%d is done\n", id)
return
case <-time.After(1 * time.Second):
fmt.Printf("%d is working\n", id)
}
}
}
func (w *Worker) getDoneChan() chan struct{} {
w.mu.Lock()
defer w.mu.Unlock()
return w.getDoneChanLocked()
}
// getDoneChanLocked requires the lock to be held when called.
func (w *Worker) getDoneChanLocked() chan struct{} {
if w.done == nil {
w.done = make(chan struct{})
}
return w.done
}
func (w *Worker) Shutdown() {
w.mu.Lock()
defer w.mu.Unlock()
done := w.getDoneChanLocked()
select {
case <-done:
// already closed
default:
// we're the only closers since we have the lock
close(done)
}
w.wg.Wait()
}
func main() {
w := &Worker{}
go w.Work(1)
go w.Work(2)
go w.Work(3)
<-time.After(time.Second * 5)
w.Shutdown()
// we may call shutdown as many times as we'd like
w.Shutdown()
}