Tag Archives: channels

Goroutines don’t panic if channel they write to is gone

Couple of months ago I wrote an article about error handling in concurrent Go programs. One concern I had about it was if other goroutines would panic if some goroutine produces an error value which causes return from function when the channel is iterated with range construct.

So I made another program that mimics scenario. It spawns couple of goroutines, each taking a second more to complete than previous (for the sake of test being easier to observe). When done, they write to channel. 5th goroutine returns an error (after 5 seconds) via channel which is iterated using range. In case of an error, it’s returned from function.

package main

import (
    "errors"
    "sync"
    "time"
)

type data struct {
    err  error
    data int
}

func someWork(dataChan chan<- *data, collection []int) {
    var wg sync.WaitGroup
    l := len(collection)
    wg.Add(l)
    for i, e := range collection {
        go func(i, e int) {
            time.Sleep(time.Duration(i) * time.Second)
            var err error
            if i == 5 {
                err = errors.New("error")
            }
            dataChan <- &data{err, e}
            defer wg.Done()
        }(i, e)
    }
    wg.Wait()
    close(dataChan)
}

func someFunc() {
    collection := []int{1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1}
    dataChan := make(chan *data)

    go someWork(dataChan, collection)

    for c := range dataChan {
        if c.err != nil {
            return
        }
    }

    time.Sleep(2 * time.Second)
}

func main() {
    someFunc()
    time.Sleep(20 * time.Second)
}

It surprised me a little other goroutines which still aren't finished don't cause panic because channel is gone even. For more realistic test I've also added some work after someFunc is finished.

Don't know why, it sure has to do with the way garbage collection works in Go (would be a great read if there's detailed explanation of it). I'm glad though I don't have to deal with goroutines panicking myself.

One thing I didn't do in example is cancelling the work running goroutines are still doing so they stop wasting additional memory. One idiomatic way to do that is with the use of context package from standard library, which provides cancellation and timeout mechanisms.

Error handling in concurrent programs in Golang

Error handling in concurrent programs in Go consists of little more work than

if err != nil {
   return err
}

because the return value doesn’t reach intended receiver (for example in parent function where function was fired as a goroutine using go keyword). Just as we use channel for sending resulting data, we must also use channel for sending error. And we can do this using single channel of type Data which is a struct that will hold data and error for the data. And than when we range over channel we first check if there’s an error (and when we are done, when all tasks have been completed, we close the channel).

Using same channel for data and error simplifies concurrency, we don’t have to handle scenarios of completed tasks end error handling separately.

So here’s a simple example of what I have in mind (with sending HTTP request as a concurrent task).

type Data struct {
    URL      string
    IsAvailable bool
    Err      error
}

func getAvailableSites(urls []string) ([]string, error) {
    var wg sync.WaitGroup
    urlChan := make(chan *Data)

    go func() {
        wg.Wait()
        close(urlChan)
    }()

    for _, url := range urls {
        go checkIfSiteAvailable(&wg, urlChan, url)
    }

    availableSites := []string{}
    for data := range urlChan {
        if data.Err != nil {
            return nil, fmt.Errorf("problem getting url: %v", data.Err)
        }
        if (data.IsAvailable) {
            availableSites = append(availableSites, data.URL)
        }
    }
    return availableSites, nil
}

func checkIfSiteAvailable(wg *sync.WaitGroup, urlChan chan<- *Data, url string) {
    defer wg.Done()

    resp, err := http.Head(url)
    if err != nil {
        urlChan <- &Data{Err: err}
        return
    }

    if resp.StatusCode != http.StatusOK {
        urlChan <- &Data{Err: fmt.Errorf("status code not OK for %s", url)}
        return
    }

    urlChan <- &Data{URL: url, Available: true}
}

I won’t be describing logic here just a steps related to concurrency and error handling in it. In getAvailableSites we

  • set up a channel we’ll be using for sending the data for specific URL (with error as part of data),
  • a WaitGroup to specify how many goroutines we’ll fire and after all finish, Go will know to close channel,
  • define and fire a goroutine which waits for all goroutines to finish and than close a channel,
  • for each URL we call a function checkIfSiteAvailable as a goroutine (alonside URL we pass channel so we can send resulting data to it and a WaitGroup to be able to indicate task is finished)
  • iterate over a channel, which will finish after we close the channel (from 3rd point). Iterating over a channel is a blocking operation so we don’t have to worry data from channel will get lost. In for loop we check for error and return it as if no concurrency was involved.

In checkIfSiteAvailable we defer call to wg.Done to indicate task is finished. And whenever there’s an error we send instance of Data with only Err field specified because in this simple example we don’t need other fields. Lastly, we send the actual data (wrapped in the Data struct).

Testing concurrent code in Golang

I wrote a package which tracks certain hashtags on twitter and part of the package is adding and removing hashtag from list of hashtags. Plan is that this can be done from different goroutines hence tracking/untracking is done by putting hashtag in channels. It looks like this:

var (
    trackChan   = make(chan string)
    untrackChan = make(chan string)
)

func Track(hs ...string) {
    for _, h := range hs {
        trackChan <- h
    }
}

func Untrack(hs ...string) {
    for _, h := range hs {
        untrackChan <- h
    }
}

var hashtags []string

func init() {
    go func() {
        for {
            select {
            case h := <-trackChan:
                if !sliceContains(h, hashtags) {
                    hashtags = append(hashtags, h)
                }
            case h := <-untrackChan:
                if sliceContains(h, hashtags) {
                    hashtags = sliceRemove(h, hashtags)
                }
            }
        }
    }()
}

sliceContains and sliceRemove are (probably) self-explanatory helper functions.

Since I write tests for my code I wrote three tests for this part: for track, untrack and combined scenario. At first you may write tests like this:

func TestTrack(t *testing.T) {
    h := "name"
    Track(h)
    assert.Contains(t, hashtags, h)
}

func TestUntrack(t *testing.T) {
    name := "name"
    hashtags = []string{name, "surname"}
    Untrack(name)
    assert.NotContains(t, hashtags, name)
}

func TestTrackUntrack(t *testing.T) {
    h := "name"

    Track(h)
    Untrack(h)

    assert.NotContains(t, hashtags, h)
}

But tests are not working! The main problem here is that we are dealing with concurrent code where we don't know when commands will finish with execution (concretely when slice hashtags will be updated).

Testing concurrent code in Go turns out to be easier after watching Testing Techniques by Andrew Gerrand.

Main idea here is inserting empty function (no-op) after interesting code which acts as a hook for testing purposes. In testing this function will push a value into the channel which was created in test. We receive this value in test code where we want to ensure that some logic is executed (basically blocking the code till something is executed).

So the solution to the above problem was defining empty function (e.g. doneFunc) and calling it after update to hashtags slice. And than in test creating a channel (e.g. doneChan) and redefining doneFunc so that a value is pushed to doneChan. ((It doesn't matter what value. Boolean is perfectly OK, but even better is an empty struct since it takes 0 bytes - boolean takes 1 byte.)). As a good practice at the end of the test we restore doneFunc to empty function.

Updated code:

var (
    trackChan   = make(chan string)
    untrackChan = make(chan string)
    doneFunc    = func() {}
)

func Track(hs ...string) {
    for _, h := range hs {
        trackChan <- h
    }
}

func Untrack(hs ...string) {
    for _, h := range hs {
        untrackChan <- h
    }
}

var hashtags []string

func init() {
    go func() {
        for {
            select {
            case h := <-trackChan:
                if !sliceContains(h, hashtags) {
                    hashtags = append(hashtags, h)
                    doneFunc()
                }
            case h := <-untrackChan:
                if sliceContains(h, hashtags) {
                    hashtags = sliceRemove(h, hashtags)
                    doneFunc()
                }
            }
        }
    }()
}

Updated tests:

func TestTrack(t *testing.T) {
    doneChan := make(chan struct{}, 1)
    doneFunc = func() { doneChan <- struct{}{} }
    defer func() {
        doneFunc = func() {}
    }()

    h := "name"

    Track(h)
    <-doneChan

    assert.Contains(t, hashtags, h)
}

func TestUntrack(t *testing.T) {
    doneChan := make(chan struct{}, 1)
    doneFunc = func() { doneChan <- struct{}{} }
    defer func() {
        doneFunc = func() {}
    }()

    name := "name"
    hashtags = []string{name, "surname"}

    Untrack(name)
    <-doneChan

    assert.NotContains(t, hashtags, name)
}

func TestTrackUntrack(t *testing.T) {
    doneChan := make(chan struct{}, 1)
    doneFunc = func() {
        doneChan <- struct{}{}
    }
    defer func() {
        doneFunc = func() {}
    }()
    h := "name"

    Track(h)
    <-doneChan
    Untrack(h)
    <-doneChan

    assert.NotContains(t, hashtags, h)
}

Channels in Go

Do not communicate by sharing memory; instead, share memory by communicating.

Channels are for orchestration, mutexes are for serialization.

Go Proverbs – Rob Pike

Yesterday I wrote my first project ((It’s URL shorterer, a project which I write in every new programming language I learn.)) in Go which does not use traditional locks when dealing with shared data in concurrent programs but instead uses channels. Although I admit it’s little more complex than mutexes the end result is more readable and decoupled code. It also made me agreeing with that second quote above.

Examples of channel use cases which I used:

  • blocking the execution till some operation is completed. With <- done goroutine waits till some value is put into the channel done from some other goroutine. One cool way to utilize this is faking time.Sleep in tests.
  • time.After(5*time.Second) returns a channel which receives a value after every 5 seconds. Which means we can execute some logic every 5 seconds.
  • catching the signals and handling them (this needs to be done in separate goroutine otherwise whole program will be blocked).
    go func() {
      signalChan := make(chan os.Signal)
      signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
      <-signalChan // this blocks current goroutine until sigint or sigterm signals 
      fmt.Println("Exiting the program and doing some cleanup...")
      // ...
      os.Exit(0)
    }()