One day I was given the task of doing a whiteboard word counter algorithm. The requirements were:

  • Print on console all the words and the quantity of times they appear on a 40.000 lines input.
  • Make it as fast as possible.

For a start one can do it as simple as possible instead, and then try to see whether it can be made faster:

func wordCounter() (words map[string]int) {
	b, err := ioutil.ReadFile("input.txt")
	if err != nil {
		log.Fatal(err)
	}
	removeLineBreaks := regexp.MustCompile(`\r?\n`)
	inputText := removeLineBreaks.ReplaceAllString(string(b)," ")
	words = make(map[string]int)
	removeSpecial := regexp.MustCompile(`(?m)[^a-z]`)

	for _, w := range strings.Split(inputText, " ") {
		w = strings.ToLower(w)
		w = removeSpecial.ReplaceAllString(w, "")
		if w != "" {
			words[w]++
		}
	}

	return
}

What the code does is the following:

  1. Read the file, remove the line breaks and put it on a string variable.
  2. Create the map structure that will hold a key (word) and a value (quantity)
  3. Create a regular expression for removing punctuation characters like , ! ?
  4. For each word in the text (splitting by spaces)
  5. Lowercase it + execute the regex + add 1 to the map quantity for that word.
  6. When we finish, we print to the console.

This works fine, but we are using Go and every post describing Go talks about goroutines and how easy is to achieve concurrency. So what the code is changed to the following?

func main() {
	fmt.Println(wordCounterConcurrent())
}

func wordCounterConcurrent() map[string]int {
	b, err := ioutil.ReadFile("input.txt")
	if err != nil {
		log.Fatal(err)
	}
	removeLineBreaks := regexp.MustCompile(`\r?\n`)
	inputText := removeLineBreaks.ReplaceAllString(string(b)," ")
	words = make(map[string]int)
	removeSpecial := regexp.MustCompile(`(?m)[^a-z]`)

	wg := sync.WaitGroup{}
	for _, w := range strings.Split(inputText, " ") {
		wg.Add(1)
		go func(w2 string) {
			defer wg.Done()
			w2 = strings.ToLower(w2)
			w2 = removeSpecial.ReplaceAllString(w2, "")
			words[w2]++
		}(w)
	}

	wg.Wait()
	return mostFrequent
}

The differences in this snippet and the first one are the following:

  1. We defined a WaitGroup that will allow us to:
    • Add a counter every time a goroutine is fired.
    • Block the main goroutine with the wg.Wait() until all the other goroutines are finished.
    • Decrease the counter executing wg.Done() every time a goroutine is finished.
  2. We put the functionality that is going to be executed for each word in an anonymous function. So now, we can use the word go to execute that function in a different goroutine.

Does this work? Unfortunately no. If we execute the following:

> go run main.go --race
fatal error: concurrent map read and map write

Basically the race detector is telling us that some of the goroutines can execute this line at the same moment:

words[w2]++

If the map does not want to be written at the same time by two different goroutines we should use some sort of queue and then consume the values in a single threaded process. Fortunately, Go have us covered with channels.

Doing the following modifications should work:

func main() {
	fmt.Println(wordCounterConcurrent())
}

func wordCounterConcurrent() (words map[string]int) {
	b, err := ioutil.ReadFile("input.txt")
	if err != nil {
		log.Fatal(err)
	}
	removeLineBreaks := regexp.MustCompile(`\r?\n`)
	inputText := removeLineBreaks.ReplaceAllString(string(b)," ")
	words = make(map[string]int)
	removeSpecial := regexp.MustCompile(`(?m)[^a-z]`)

	doneChan := make(chan bool)
	wordsChan := make(chan string)

	go func() {
		for {
			select {
			case w := <-wordsChan:
				words[w]++
			case <-doneChan:
				return
			}
		}
	}()

	wg := sync.WaitGroup{}

	for _, w := range strings.Split(inputText, " ") {
		wg.Add(1)
		go func(w1 string) {
			defer wg.Done()
			w1 = strings.ToLower(w1)
			w1 = removeSpecial.ReplaceAllString(w1, "")
			if w1 != "" {
				wordsChan <- w1
			}
		}(w)
	}

	wg.Wait()
	doneChan <- true
	return
}

But now the code growth a lot, lets look at it by parts:

In order to declare channels the following syntax is required:

doneChan := make(chan bool)
wordsChan := make(chan string)

Now, the words goroutines instead of performing operations using the map is just pushing a value into a channel.

wordsChan <- w1

The operations using the map, then will be performed when a value is read from the channel (blocking action since it is not a buffered channel)

go func() {
	for {
		select {
		case w := <-wordsChan:
			words[w]++
		case <-doneChan:
			return
		}
	}
}()

This piece of code will read from the wordsChannel and perform the map operation; and also read from the doneChannel to stop the goroutine.

The message to the doneChannel will only be submitted after all words goroutines are finished.

wg.Wait()
doneChan <- true

Now if we run this algorithm we will see that is not failing anymore. Let’s then move to benchmarking.

Benchmark

We are going to do this in two different ways: executing the idiomatic Go benchmark and using the command-line utility time.

> go test -run=XXX -bench=.
BenchmarkWordCounter-12               45   25.88 ms/op   4354027 B/op   124918 allocs/op
BenchmarkWordCounterConcurrent-12     19   59.70 ms/op   5711640 B/op   138290 allocs/op

This is telling us that the sync version executed 45 times and each execution took approx 25.88 milliseconds, meanwhile in the same time the concurrent version only got executed 19 times because each execution took approx 59.70 milliseconds.

Wait, What?

Let’s try the following:

> go build
> time ./go-word-counter
(SYNC)
./go-word-counter  0.04s user 0.00s system 89% cpu 0.050 total
(CONCURRENT)
./go-word-counter  0.46s user 0.13s system 486% cpu 0.120 total

So the sync version takes approx 0.050 and 89% cpu and the concurrent one 0.120 and 486% cpu.

We are sure that the function is running concurrently (hence the CPU usage), but shouldn’t that mean it has to run faster? Well, no.

Looking at the code one can see that the parts that get executed inside the word goroutines takes less to execute than all the orchestration needed for pulling a word from a channel.

Things would be different if an sleep was added:

w1 = strings.ToLower(w1)
w1 = removeSpecial.ReplaceAllString(w1, "")
time.Sleep(200)

And then the benchmark:

BenchmarkWordCounter-12                3   425.23 ms/op   4436128 B/op   124934 allocs/op
BenchmarkWordCounterConcurrent-12     19    64.75 ms/op   8812324 B/op   179560 allocs/op

In this scenario is visible how the concurrent way outperforms the sync one.

So we started with a single goroutine algorithm, we evolved it to run using more than one goroutine and having an orchestration for the algorithm to work. At the end we did a benchmark and compared the results.

The value we can get out of this is to avoid premature optimization, how to KISS (keep it simple and stupid) and the most important one: We learned that concurrent flows are beneficial when the code inside the goroutine is CPU/time intensive.

2020 Edit

One alternative suggested by David Pennington was to use streams instead of loading the whole file to the disk. In this case the code will look like:

func wordCounterStream() (words map[string]int) {
	file, err := os.Open("input.txt")
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	words = make(map[string]int)
	scanner := bufio.NewScanner(file)

	// bufio.ScanWords includes punctuation that we want to remove
	// reimplemented that method checking with unicode.IsPunct
	scanner.Split(ScanWords)

	for scanner.Scan() {
		w := scanner.Text()
		if w != "" {
			words[strings.ToLower(w)]++
		}
	}
	if err := scanner.Err(); err != nil {
		log.Fatal(err)
	}
	return
}
  • Open the file which contains the words.
  • Create a new scanner for the stream.
  • Make the scanner split the stream on the criteria selected.
  • Each split obtained represents a word.

The scanner.Split(...) instead of using bufio.ScanWords is using an implementation that takes care of splitting the words when punctuation happens since the default implementation do not consider these cases. This can be found on my repository linked below.

Now the benchmarks demonstrate what a better solution for this problem would be:

benchmark                           iter     time/iter    bytes alloc             allocs
---------                           ----     ---------    -----------             ------
BenchmarkWordCounter-12               45   25.88 ms/op   4354027 B/op   124918 allocs/op
BenchmarkWordCounterConcurrent-12     19   59.70 ms/op   5711640 B/op   138290 allocs/op
BenchmarkWordCounterStreams-12       198    5.93 ms/op    330114 B/op    44844 allocs/op

πŸ‘‰ Repository with source files used πŸ‘ˆ