Concurrency 102

14.11.2018

Но преди това...

Въпроси за мъфин #1

Тип в езика
Служи за комуникация между go рутини
В него може да се четат и пишат стойности от конкретен тип

Въпрос за мъфин #2

func worker(val string) {
  fmt.Prinln(val)
}

func main() {
    go worker("one")
    go worker("two")
    worker("three")
    time.Sleep(10 * time.Second)
}

Не е ясно. Нещата се изпълняват конкурентно.

Въпрос за мъфин #3

Когато главната go рутина завърши.
Независимо колко други живи има.

Въпрос за мъфин #4

Четене: блокира за винаги
Писане: блокира за винаги

Въпрос за мъфин #5

Четене: връща нулевата стойност на типа на канала
Писане: паника
Затваряне: паника

Concurrency 102

Ще си говорим за

Да си спомним скуката

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func main() {
    boring := func(msg string) {
        for i := 0; ; i++ {
            fmt.Println(msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }

    go boring("boring!")
    fmt.Println("Listening.")
    time.Sleep(2 * time.Second)
    fmt.Println("You are way too boring. I am leaving.")
}

sync

sync е пакет, който ни дава синхронизационни примитиви от сравнително ниско ниво:

един полезен интерфейс:

type Locker interface {
    Lock()
    Unlock()
}

и подпакет atomic с low-level memory примитиви

WaitGroup

Изчаква колекция от горутини да приключат и чак тогава продължава с изпълнението.
Така не правим простотии със time.Sleep, както преди.

type WaitGroup struct {}

func (*WaitGroup) Add(delta int)
func (*WaitGroup) Done()
func (*WaitGroup) Wait()

Пример

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

func main() {
    var wg sync.WaitGroup

    boring := func(msg string) {
        for i := 0; i < 8; i++ {
            fmt.Println(msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
        wg.Done()
    }

    wg.Add(1)
    go boring("boring!")
    fmt.Println("Waiting for the boring function to do its work")
    wg.Wait()
}

Стига вече с тая скука!

По - интересен пример

package main

import (
	"fmt"
	"net/http"
	"sync"
)

func main() {
    var wg sync.WaitGroup
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://does-not-exists-for-real.com/",
    }
    for _, url := range urls {
        wg.Add(1) // Increment the WaitGroup counter.
        // Launch a goroutine to fetch the URL.
        go func(url string) {
            content, err := http.Get(url) // Fetch the URL.
            if err == nil {
                fmt.Println(url, content.Status)
            } else {
                fmt.Println(url, "has failed")
            }
            wg.Done() // Decrement the counter when the goroutine completes.
        }(url)
    }
    // Wait for all HTTP fetches to complete.
    wg.Wait()
}

Mutex

type Mutex struct {}

func (*Mutex) Lock()
func (*Mutex) Unlock()

В код

package main

import (
	"fmt"
	"sync"
)

func main() {
    var (
        count    int
        countMtx sync.Mutex
        countWg  sync.WaitGroup
    )

    worker := func() {
        countMtx.Lock()
        count += 1
        countMtx.Unlock()
        countWg.Done()
    }

    for i := 0; i < 8; i++ {
        countWg.Add(1)
        go worker()
    }

    countWg.Wait()
    fmt.Println("counter:", count)
}

На каналски

package main

import (
	"fmt"
	"sync"
)

func main() {
    var (
        count   int
        countWg sync.WaitGroup
    )
    ch := make(chan struct{}, 1)

    worker := func() {
        ch <- struct{}{}
        count += 1
        <-ch
        countWg.Done()
    }

    for i := 0; i < 8; i++ {
        countWg.Add(1)
        go worker()
    }

    countWg.Wait()
    fmt.Println("counter:", count)
}

RWMutex

type RWMutex struct {}
func (*RWMutex) Lock()
func (*RWMutex) Unlock()
func (*RWMutex) RLock()
func (*RWMutex) RUnlock()
func (*RWMutex) RLocker() sync.Locker

Once

Обект от този тип ще изпълни точно една функция.

package main

import (
	"fmt"
	"sync"
)

func main() {
    var once sync.Once
    var wg sync.WaitGroup

    onceBody := func() {
        fmt.Println("Only once")
    }
    anotherBody := func() {
        fmt.Println("Another")
    }

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            once.Do(onceBody)
            once.Do(anotherBody)
            wg.Done()
        }()
    }
    wg.Wait()
}

Cond

type Cond struct {
    L Locker // this is held while observing or changing the condition
    // other unexported fields ...
}

func NewCond(l Locker) *Cond

func (c *Cond) Wait()
func (c *Cond) Broadcast()
func (c *Cond) Signal()

Cond Demo

Demo ./code/concurrency102/cond.go

select

select

select {
case v1 := <-c1:
    fmt.Printf("received %v from c1\n", v1)
case v2 := <-c2:
    fmt.Printf("received %v from c2\n", v2)
case c3 <- 5:
    fmt.Println("send 5 to c3")
default:
    fmt.Printf("no one was ready to communicate\n")
}

Накратко: switch за канали.
Надълго: изчаква първия case, по който е изпратена или получена стойност

Въпрос

Какво би направил следния код?

package main

import "fmt"
import "time"

func main() {
    c := make(chan int)

    go func() {
        c <- 42
    }()

    select {
    case v1 := <-c:
        fmt.Printf("received %d in v1\n", v1)
    case v2 := <-c:
        fmt.Printf("received %d in v2\n", v2)
    case <-time.After(1 * time.Nanosecond):
        fmt.Printf("timeout\n")
    default:
        fmt.Printf("nothing!\n")
    }
}

Не се знае.

Concurrency patterns

Timeout

package main

import (
	"fmt"
	"net/http"
	"time"
)

func fetch(url string) <-chan *http.Response {
    ch := make(chan *http.Response)

    go func() {
        if response, err := http.Get(url); err == nil {
            ch <- response
        }
    }()

    return ch
}

func main() {
    select {
    case resp := <-fetch("https://www.google.com/search?q=golang"):
        fmt.Printf("received %v from google\n", resp.Status)
    case resp := <-fetch("https://www.bing.com/search?q=golang"):
        fmt.Printf("received %v from bing\n", resp.Status)
    case <-time.After(500 * time.Millisecond):
        fmt.Printf("timed out\n")
    }
}

Игра на развален телефон

Игра на развален телефон

package main

import (
	"fmt"
)

func f(left, right chan int) {
    left <- 1 + <-right
}

func main() {
    const n = 100000
    leftmost := make(chan int)
    right := leftmost
    left := leftmost

    for i := 0; i < n; i++ {
        right = make(chan int)
        go f(left, right)
        left = right
    }

    go func(c chan int) { c <- 1 }(right)
    fmt.Println(<-leftmost)
}

Generators

package main

func fib() <-chan int {
    c := make(chan int)
    go func() {
        for a, b := 0, 1; ; a, b = b, a+b {
            c <- a
        }
    }()
    return c
}

func main() {
    fibonacci := fib()
    for i := 0; i < 10; i++ {
        println(<-fibonacci)
    }
}

Fan In

package main

import (
	"fmt"
	"math/rand"
	"time"
)

func talk(msg string) <-chan string {
    c := make(chan string)
    go func() {
        for i := 0; ; i++ {
            c <- fmt.Sprintf("%s: %d", msg, i)
            time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond)
        }
    }()
    return c
}

func main() {
    doycho := talk("Doycho")
    misho := talk("Misho")
    for i := 0; i < 5; i++ {
        fmt.Println(<-doycho)
        fmt.Println(<-misho)
    }
}

Fan In

Fan In <- Concurrency

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// TALK START OMIT
func talk(msg string) <-chan string { // HL
	c := make(chan string)
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	go func() {
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s: %d", msg, i)
			time.Sleep(time.Duration(r.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c
}

// TALK END OMIT

// FANIN START OMIT
func fanIn(input1, input2 <-chan string) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case s := <-input1:
                c <- s
            case s := <-input2:
                c <- s
            }
        }
    }()
    return c
}

func main() {
    c := fanIn(talk("Misho"), talk("Doycho"))
    for i := 0; i < 10; i++ {
        fmt.Println(<-c)
    }
}

Finish channel

func fanIn(input1, input2 <-chan string, finish chan struct{}) <-chan string {
    c := make(chan string)
    go func() {
        for {
            select {
            case s := <-input1:
                c <- s
            case s := <-input2:
                c <- s
            case <-finish:
                close(c)
                wg.Done()
                return
            }
        }
    }()
    return c
}

Finish channel

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

var wg sync.WaitGroup

// TALK START OMIT
func talk(msg string) <-chan string { // HL
	c := make(chan string)
	r := rand.New(rand.NewSource(time.Now().UnixNano()))
	go func() {
		for i := 0; ; i++ {
			c <- fmt.Sprintf("%s: %d", msg, i)
			time.Sleep(time.Duration(r.Intn(1e3)) * time.Millisecond)
		}
	}()
	return c
}

// TALK END OMIT

// FANIN START OMIT
func fanIn(input1, input2 <-chan string, finish chan struct{}) <-chan string { // HL
	c := make(chan string)
	go func() {
		for {
			select {
			case s := <-input1:
				c <- s
			case s := <-input2:
				c <- s
			case <-finish: // HL
				close(c) // HL
				wg.Done()
				return
			}
		}
	}()
	return c
}

// FANIN END OMIT

func main() {
    wg.Add(1)
    finish := make(chan struct{})
    c := fanIn(talk("Doycho"), talk("Misho"), finish)
    for value := range c {
        fmt.Println(value)
        if len(value) > 9 {
            close(finish)
        }
    }
    wg.Wait()
}

Words of wisdom

И отново:

Следващия път

Тестове и обработване на грешки

Въпроси?