Go concurrency patterns

June 22, 2017
Go Channels Concurrency Github API

Concurrency in Go

This post is aimed at beginner/intermediate level Go developers who want to see some cool usage of Go channels and concurrency patterns. We will start with a rather simple task:

I like to spend my time on Github browing and starring repos I like and following some really brilliant engineers. And one day I figured that most of the people I follow are gophers, which is really cool. But I would also like to know what kind of projects they like ? Maybe all of them already follow some repository, which I am totally unaware of ? Let’s find out :P

So you might have even started building some patterns in your head about how you would solve this problem, or maybe you had something like that in your mind:

func main(){
  followedUsers, err := githubCli.getFollowedUsers(myGithubHandle, myToken string)
  ...
  allRepos := make([]Repo, 0)
  for _, user := range followedUsers { 
    starredRepos, err := githubCli.getStarredRepos(user.Handle, myToken string)
    ...
    allRepos = append(allRepos, starredRepos...)
  }
  ...
  // some boring job goes on here to merge the answer 
}

No, not this time. Instead we are going to use channels and calculate the answer in a concurrent fasion. So assume we came up with some API which gives a channel from where we can consume repositories and simply write them to the map:

type Repo struct { 
  Name string 
  Link string
}
...
answer := map[string]uint16{} // repository name -> how many stars among the people I follow

repos := reposStarredBy(usersFollowedBy(githubHandle string)) // reposStarredBy returns a <-chan *Repo

for repo := range repos { 
  answer[repo.Name]++
}

Notice how this style looks a lot cleaner and more expressive. So let’s take a deep look into the usersFollowedBy and reposStarredBy functions.

This is the endpoint we are going to use to retrieve the list of people I follow:

const(
  fmtFollowingURL = "https://api.github.com/users/%s/following?per_page=5"
  fmtStargazerURL = "https://api.github.com/users/%s/starred"
)

(Note that I use per_page=5 to create multiple requests to the endpoint, as there are not many users that I follow)

Pipeline pattern

Function usersFollowedBy has the following interface:

func usersFollowedBy(me string) <-chan *User 

It immediately (almost) returns us a channel from where we can read, and proceeds to fetch the users in a separate goroutine and feeds us with users via the channel it returned. Consequently:

reposStarredBy(users <-chan *User) <-chan *Repo

Another function consumes the user channel and returns a channel providing Repo objects.

This is a typical pipeline pattern with multiple stages. Each stage (except first) consumes a channel, operates on it and returns an output channel. The following guidelines are applied:

  1. stages close their outbound channels when all the send operations are done.
  2. stages keep receiving values from inbound channels until those channels are closed or the senders are unblocked.

Pipelines unblock senders either by ensuring there’s enough buffer for all the values that are sent or by explicitly signalling senders when the receiver may abandon the channel.

For now we will ignore channel abandoning, and let’s see how we can enable channel pipelining in our program.

Getting users

func usersFollowedBy(me string) <-chan *User {
	users := make(chan *User)
	nextPage := fmt.Sprintf(fmtFollowingURL, me)
	go func() {
		wg := sync.WaitGroup{}
		var (
			result []*User
			err    error
		)
		for nextPage != "" {
			result, nextPage, err = getUsers(nextPage)
			if err != nil {
				panic(err)
			}

			wg.Add(1)
			go func() {
				for _, user := range result {
					users <- user
				}
				wg.Done()
			}()
		}
		wg.Wait()
		close(users)
	}()

	return users
}

(getUsers details will be skipped for this post brevity, but can be seen in the full source link below)

What function does is the following:

  1. Construct first URL from where we can fetch the User list I follow
  2. Launches a new goroutine which will stop executing when all users are fetched
  3. Returns a channel which will be closed when the goroutine finishes fetching users

Note how this follows the guidelines set for the pipeline pattern.

Another interesting part is the following code snippet from above:

wg.Add(1)
go func() {
  for _, user := range result {
    users <- user
  }
  wg.Done()
}()

This allows to fetch values from the second page even before channel consumer started receiving values from the first page. While WaitGroup allows us to keep the channel open until all the users are consumed. This pattern is used because the channel consumer does not care about order and may receive users from different pages concurrently.

starredBy(*User) function collects all repos starred by a user and returns a channel of type *Repo. It has extremely similar syntax and implementation to usersFollowedBy function, and we are not going to explain it in details (full source code can be found in the link below). What is more interesting is the merge of the channels output by starredBy(*User). Here is the code:

Fan in

func reposStarredBy(in <-chan *User) <-chan *Repo { // returns a list of repositories being starred by a list of users
	repos := make(chan *Repo)

	go func() {
		wg := sync.WaitGroup{}
		for u := range in {
			wg.Add(1)
			go func(u *User) {
				defer wg.Done()
				for repo := range starredBy(u) {
					repos <- repo
				}
			}(u)
		}
		wg.Wait()
		close(repos)
	}()

	return repos
}

This is a standard fan-in approach, where mutliple channels are being merged to one (all channels produced by starredBy call is merged into repos channel). Sending to a closed channel leads to panic, therefore we can use WaitGroup to ensure that the channel is closed only when all writes to it are finished.

Channel Abandoning

All code works as expected, but consider the following scenario. What if we are only interested in first 100 repostiories returned from our reposStarredBy and the rest of them don’t matter ? What if our little script is turned into a daemon running hourly, something along the lines of :

for { 
    repos := reposStarredBy(usersFollowedBy(*me)) //
    for repo := range repos {
      answer[repo.Link]++
      if len(answer) > 10 { 
        break
      }
    }
    time.Sleep(1 * time.Hour)
}

If you try to run this script after a while you will find out that our little program is consuming gigabytes of memory. Why?

The break statement has led to a case where no one is reading from a channel, therefore all attempts to write to a channel are blocked. It is important to remember that:

  1. All goroutines once launched are not terminated together with the caller goroutine
  2. Attempts to write to a full channel leads to goroutine being blocked, hence resources consumed by the goroutine are not released.

There are two most common ways to address this problem:

  1. Make channels with buffers
  2. Signal sender goroutines to stop producing data once receiver stops accepting data

While (1) is generally a simpler solution, it is by far not always possible. For example to create a buffer for repos channel produced by reposStarredBy function, we would have to know the total number of repositories starred by all people we follow. This is extremely tricky to calculate and probably not worth it. Therefore we will go with the second solution:

  done := make(chan struct{}) 
  for repo := range repos {
    answer[repo.Link]++
    if len(answer) > 10 {
      close(done) // signal to "repos" producer to stop trying writes 
      break
    }
  }
...
  func reposStarredBy(done <-chan struct{}, in <-chan *User) <-chan *Repo { // returns a list of repositories being starred by a list of users
  ...
    go func(u *User) {
      defer wg.Done()
      for repo := range starredBy(u) {
        select {
        case repos <- repo:
        case <-done:
        }
      }
    }(u)

Using select statement we can allow to drain producer of the User channel preventing resource leak in both starredBy and usersFollowedBy functions and inner goroutines. However, they will be running dummy HTTP requests, and to prevent that they need to be done aware as well - but this is left as an exercise to the reader.

Conclusion

Pipeline pattern:

Producers creates and returns a channel which is closed only once all values are sent or premature abortion is triggered by receiver.

Consumer consumes all values from input channel until it is closed. It signals to senders via done channel when it stops receiving values.

Fan in:

Merge multiple channels into one output channel

Fan out:

Run multiple goroutines consuming from a single channel

Abandon channel:

Prevent resoure leak by creating a channel buffer or passing a channel to prevent goroutines being blocked by the full channel. Use select pattern.

  1. Full source code available at: https://github.com/ideahitme/go-and-learn/blob/master/golang/concurrency/main.go

  2. Article based on: https://blog.golang.org/pipelines

More to come

In the next article on this series we will perform goroutine leak analysis and analyze some other common concurrency patterns. We will also see how to gracefully handle errors.

We will also compare channel vs expressive approaches and see which performs better.