Now we know what is a goroutine. Here are some fairly advanced use cases. Majority of the examples come from Rob Pike's talk. I made couple enhancements here and there. Try your best to understand them, but know that concurrency is difficult and it takes time to have a firm grasp.
Channels
Two goroutines can communicate and synchronize through channel.
func ping(msg string, c chan string) {
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}
func main() {
ch := make(chan string)
go ping("ping", ch)
for i := 0; i < 5; i++ {
fmt.Println(<-ch)
// The receiver end is blocking because it's waiting for a message to come through the channel.
}
fmt.Println("Enough pings, done!")
}
Generator
What if we want to listen to multiple pings? We can use a ping generator.
func pingGen(msg string) <-chan string {
ch := make(chan string)
go func() {
for i := 0; ; i++ {
ch <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return ch
}
func main() {
ch1 := pingGen("ping")
ch2 := pingGen("bing")
for i := 0; i < 5; i++ {
fmt.Println(<-ch1)
fmt.Println(<-ch2)
}
fmt.Println("Enough pings, done!")
}
Here's a problem though, ch1 and ch2 are blocking each other. We are not really getting the live updates of ping.
Multiplexing
We can address the problem by multiplexing multiple channels into one.
func fanIn(inputs ...<-chan string) <-chan string {
out := make(chan string)
for _, in := range inputs {
go func(ch <-chan string) {
for {
out <- <-ch
}
}(in)
}
return out
}
func pingGen(msg string) <-chan string {
ch := make(chan string)
go func() {
for i := 0; ; i++ {
ch <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
}
}()
return ch
}
func main() {
out := fanIn(pingGen("ping"), pingGen("bing"), pingGen("sing"))
for i := 0; i < 10; i++ {
fmt.Println(<-out)
}
fmt.Println("Enough pings, done!")
}
However, we can easily spot that the messages are coming in out of order. What if I want the batch of messages to come in order, e.g.
sing 0
bing 0
ping 0
sing 1
ping 1
bing 1
sing 2
ping 2
bing 2
sing 3
ping 3
bing 3
Fan-in Sequencing
We will use a signal channel to achieve the wait. We wait for three pings and then tell them ready for the next batch of 3 pings. First, let's define a message struct.
type message struct {
content string
ready chan bool
}
Each message will carry a reference to a signal channel, called ready. When the channel receives a signal, it indicates that it's ready to process next message.
func fanIn(inputs ...<-chan message) <-chan message {
out := make(chan message)
for _, in := range inputs {
go func(ch <-chan message) {
for {
out <- <-ch
}
}(in)
}
return out
}
func pingGen(msg string) <-chan message {
ch := make(chan message)
rdy := make(chan bool)
go func(ready chan bool) {
for i := 0; ; i++ {
ch <- message{fmt.Sprintf("%s %d", msg, i), ready}
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
<-ready // Wait for ready
}
}(rdy)
return ch
}
func main() {
out := fanIn(pingGen("ping"), pingGen("bing"), pingGen("sing"))
for i := 0; i < 10; i++ {
msgs := []message{}
for j := 0; j < 3; j++ {
msgs = append(msgs, <-out)
}
// Grab three messages and then tell each generator that it's ready for next message.
for _, msg := range msgs {
fmt.Println(msg.content)
msg.ready <- true
}
}
fmt.Println("Enough pings, done!")
}
Select
If you already know how many channels you want to listen to, it's better to use select instead of fan-in. However, it's also reasonable to combine two techniques together. Let's say I want a timeout on my fan-in.
func main() {
out := fanIn(pingGen("ping"), pingGen("ding"), pingGen("sing"))
timeout := time.After(5 * time.Second)
for {
select {
case msg := <-out:
fmt.Println(msg.content)
msg.ready <- true
case <-timeout:
fmt.Println("Too late")
return
}
}
}
Quit Channel
We know how to terminate the reader, but how about the writer? Notice that despite the for-loop has terminated, the goroutine in each generator is not terminated. Let's do something clever, i.e. use a channel to signal quit. We need to stop using fanIn for a moment because that will cause a deadlock if we terminate the writer without terminating the goroutines from fanIn.
The problem is that how do we know the quit signal has been received and processed? We can wait for the quit channel to tell us that it is done! We make a channel that passes channel which passes string.
Let's try to pass data from one channel to another, and repeat this process many times. We are daisy chaining go routines.
For each pass, we increment the value of an integer by one.
func pass(left, right chan int) {
value := <-left
right <- value + 1
fmt.Printf("Left[%d] -> Right[%d]\n", value, value + 1)
}
Now create the goroutines and chain them up.
func main() {
end := make(chan int)
var left chan int
var right chan int
right = end
for i := 0; i < 6; i++ {
left = make(chan int)
go pass(left, right)
right = left
}
fmt.Println("All goroutines are waiting.")
// Send the initial value to first channel.
initVal := 1
go func(ch chan int, val int) {
fmt.Println("Give left most channel the initial value")
ch <- val
}(left, initVal)
fmt.Printf("Final value is %d\n", <-end)
}