Cancellation

Go concurrency cancellation patterns #

Channel based cancellation #

This pattern is commonly used in the stdlib, e.g. the HTTP server. It relies on closing a channel to signal the workers to stop. A sync.WaitGroup is used to wait for the workers to complete before exiting.

package main

import (
	"fmt"
	"sync"
	"time"
)

type Worker struct {
	mu   sync.Mutex
	wg   sync.WaitGroup
	done chan struct{}
}

func (w *Worker) Work(id int) {
	w.mu.Lock()
	// wg.Add is not safe for concurrent use so the lock needs to be held
	// when it's called.
	w.wg.Add(1)
	w.mu.Unlock()
	defer w.wg.Done()
	done := w.getDoneChan()
	for {
		select {
		case <-done:
			fmt.Printf("%d is done\n", id)
			return
		case <-time.After(1 * time.Second):
			fmt.Printf("%d is working\n", id)
		}
	}
}

func (w *Worker) getDoneChan() chan struct{} {
	w.mu.Lock()
	defer w.mu.Unlock()
	return w.getDoneChanLocked()
}

// getDoneChanLocked requires the lock to be held when called.
func (w *Worker) getDoneChanLocked() chan struct{} {
	if w.done == nil {
		w.done = make(chan struct{})
	}
	return w.done
}

func (w *Worker) Shutdown() {
	w.mu.Lock()
	defer w.mu.Unlock()
	done := w.getDoneChanLocked()
	select {
	case <-done:
		// already closed
	default:
		// we're the only closers since we have the lock
		close(done)
	}
	w.wg.Wait()
}

func main() {
	w := &Worker{}
	go w.Work(1)
	go w.Work(2)
	go w.Work(3)
	<-time.After(time.Second * 5)
	w.Shutdown()
	// we may call shutdown as many times as we'd like
	w.Shutdown()
}