Go Concurrency Patterns

Golang

  • Simplicity: 简单

    Simplicity is prerequisite for reliability. 简单是可靠的先觉条件

  • Readablity: 可读性

    Readability is essential for maintainablity. 可读性对于可维护性至关重要
    Programs must be written for people to read, and only incidentally for machine to execute. 程序必须是为人们阅读而编写的,而偶然为及其执行而编写

  • Productivity: 生产力

    Design is the art of arranging code to work today, and be changeable forever.

A First Look - Go Programming Language

  • Hello World

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    package main                    // 声明本文件的包名

    // 1. 在import中可以使用相对路径 ./ ../ 引入package
    // 2. 如果没有相对路径 Go会从$GOPATH/src/目录寻找
    import "fmt" // import语言的fmt库 - 用于输出

    func main() {
    fmt.Println("hello world")

    // fmt 输出格式
    fmt.Printf("%t\n", 1 == 2)
    fmt.Printf("二进制: %b\n", 255)
    fmt.Printf("八进制: %o\n", 255)
    fmt.Printf("十六进制: %x\n", 255)
    fmt.Printf("浮点数: %f\n", math.Pi)
    fmt.Printf("字符串: %s\n", "hello world")
    }
  • var & const

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // 声明初始化一个变量
    var x int = 100
    var str string = "hello world"
    // 声明初始化多个变量
    var i, j, k int = 1, 2, 3
    // 不指明类型,通过初始化值推导
    var b = true // bool 类型
    x := 100

    // 常量
    const p string = "hello world"
    const pi float32 = 3.1415926
  • Array & Slice

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40

    // Slices in the runtime are represented by three components:
    //
    // type slice struct {
    // ptr unsafe.Pointer // 指向存放数据的数组指针
    // len int // 长度有多大
    // cap int // 容量有多大
    // }

    # 使用反射reflect.DeepEqual() 进行深度比较
    package main

    import (
    "fmt"
    "reflect"
    )

    type data struct {
    len int
    p *int
    }

    // 比较两个结构体中数据是否相同
    func main() {
    v1 := data{}
    v2 := data{}
    fmt.Println("v1 == v2:", reflect.DeepEqual(v1, v2))
    // prints: v1 == v2: true

    m1 := map[string]string{"one": "a", "two": "b"}
    m2 := map[string]string{"two": "b", "one": "a"}
    fmt.Println("m1 == m2:", reflect.DeepEqual(m1, m2))
    // prints: m1 == m2: true

    s1 := []int{1, 2, 3}
    s2 := []int{1, 2, 3}
    fmt.Println("s1 == s2:", reflect.DeepEqual(s1, s2))
    // prints: s1 == s2: true
    }

Go Model: CSP

  • CSP:

    Communicating Sequential Processes

并发(Concurrency) && 并行(Parallelism)

  • 并发(Concurrency):

    Programming as the composition of independently executing processes.
    Concurrency is about dealing with lots of things at once.
    Concurreny is about structure

  • 并行(Parallelism)

    programming as the simultaneous execution of computations.
    Parallelism is about doing lots of things at once.
    Parallelism is about execution

A well-written concurrent program might run efficiently in parallel on a multiprocessor.

进程 & 线程

进程 Process: 一般是资源分配单元,一个进程拥有堆、栈、虚存空间(页表)、文件描述符
线程 Thread: CPU进行调度和执行的实体
主进程/主线程: 如果一个进程启动后,没有在创建额外的线程,那么这个进程一般称为主进程或主线程。

Goroutine

Concurrent execution
A goroutine is a function running independently in the same address space as other goroutines
has its own call stack, which grows and shrinks as required.

  • goroutine 生命周期管理
    • 尽量避免在请求中直接启动goroutine处理问题,通过启动worker进行消费,避免由于请求量大,创建大量的goroutine导致OOM
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      71
      72
      73
      74
      75
      76
      77
      78
      79
      80
      package main

      import (
      "context"
      "fmt"
      "log"
      "net/http"
      _ "net/http/pprof"
      "time"
      )

      func setup() {
      // 初始化操作
      }

      func main() {
      setup()

      // 监听服务退出
      done := make(chan error, 2)
      // 控制服务退出
      stop := make(chan struct{}, 0)

      // for debug
      go func() {
      done <- pprof(stop)
      }()

      // 主程序
      go func() {
      done <- app(stop)
      }()

      var stoped bool

      for i := 0; i < cap(done); i++ {
      if err := <-done; err != nil {
      log.Printf("server exit err: %+v", err)
      }

      if !stoped {
      stoped = true
      close(stop)
      }
      }
      }

      func app(stop <-chan struct{}) error {
      mux := http.NewServeMux()
      mux.HandleFunc("/ping", func(w http.ResponseWriter, r *http.Request) {
      w.Write([]byte("pong"))
      })

      return server(mux, ":8080", stop)
      }

      func pprof(stop <-chan struct{}) error {
      go func() {
      server(http.DefaultServeMux, ":8081", stop)
      }()

      time.Sleep(5 * time.Second)
      return fmt.Errorf("mock pprof exit")
      }

      // 启动服务
      func server(handler http.Handler, addr string, stop <-chan struct{}) error {
      s := http.Server{
      Handler: handler,
      Addr: addr,
      }

      go func() {
      <-stop
      log.Printf("server will exiting, addr: %s", addr)
      s.Shutdown(context.Background())
      }()

      return s.ListenAndServe()
      }

Channels

Synchronization and messaging
Channels are typed values that allow goroutines to synchronize and exchange information
Channels are first-class values
A channel in Go provides a connection between two goroutines, allowing them to communicate

  • 无缓冲通道 no-buffer channel

    发送者sender 和 接收者 receiver 必须同时准备好才能进行下去

  • 缓冲通道 buffer channel

    缓冲通道解决同步等待问题

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // Declaring and initializing
    c := make(chan int)

    // Sending on a channel.
    c <- 1

    // Receiving from a channel
    // The "arrow" indicates the direction of data flow.
    value = <-c

Select

Multi-way concurrent control
The select statement is like a switch, but the decision is based on ability to communicate rather than equal values

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Fan-in using select
func fanIn(input1, input2 <- chan string) <- chan string {
c := make(chan string)
go func() {
for {
select {
case s := <-input1: c <- s
case s := <-input2: c <- s
}
}
}()
return c
}

Closures

Demo Project

Always use the right tool for the job

  • Google Search: A fake framework
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    package google_search_test

    import (
    "fmt"
    "math/rand"
    "testing"
    "time"
    )

    type Result string

    var (
    Web = fakeSearch("web")
    Web1 = fakeSearch("web1")
    Web2 = fakeSearch("web2")
    Image = fakeSearch("image")
    Image1 = fakeSearch("Image1")
    Image2 = fakeSearch("Image2")
    Video = fakeSearch("video")
    Video1 = fakeSearch("Video1")
    Video2 = fakeSearch("Video2")
    )

    type Search func(query string) Result

    func fakeSearch(kind string) Search {
    return func(query string) Result {
    time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
    return Result(fmt.Sprintf("%s result for %q\n", kind, query))
    }
    }

    func Google(query string) (results []Result) {
    results = append(results, Web(query))
    results = append(results, Image(query))
    results = append(results, Video(query))
    return
    }

    func GoogleV2(query string) (results []Result) {
    // Run the Web, Image, and Video searches concurrently
    c := make(chan Result)
    go func() { c <- Web(query) }()
    go func() { c <- Image(query) }()
    go func() { c <- Video(query) }()

    // wait for all results
    for i := 0; i < 3; i++ {
    result := <-c
    results = append(results, result)
    }
    return
    }

    func GoogleV21(query string) (results []Result) {
    // Don;t wait for slow servers. No locks. No condition variables.
    c := make(chan Result)
    go func() { c <- Web(query) }()
    go func() { c <- Image(query) }()
    go func() { c <- Video(query) }()

    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
    select {
    case result := <-c:
    results = append(results, result)
    case <-timeout:
    fmt.Println("timed out")
    return
    }
    }
    return
    }

    func GoogleV3(query string) (results []Result) {
    c := make(chan Result)
    go func() { c <- First(query, Web1, Web2) }()
    go func() { c <- First(query, Image1, Image2) }()
    go func() { c <- First(query, Video1, Video2) }()
    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
    select {
    case result := <-c:
    results = append(results, result)
    case <-timeout:
    fmt.Println("timed out")
    return
    }
    }
    return
    }

    func First(query string, replicas ...Search) Result {
    c := make(chan Result)
    searchReplica := func(i int) { c <- replicas[i](query) }
    for i := range replicas {
    go searchReplica(i)
    }
    return <-c
    }

    func TestGoogle(t *testing.T) {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    results := Google("golang")
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
    }

    func TestGoogleV2(t *testing.T) {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    results := GoogleV2("golang")
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
    }

    func TestGoogleV21(t *testing.T) {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    results := GoogleV21("golang")
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
    }

    func TestGoogleV3(t *testing.T) {
    rand.Seed(time.Now().UnixNano())
    start := time.Now()
    results := GoogleV3("golang")
    elapsed := time.Since(start)
    fmt.Println(results)
    fmt.Println(elapsed)
    }


    // ➜ go test -v google_search_test.go
    // === RUN TestGoogle
    // [web result for "golang"
    // image result for "golang"
    // video result for "golang"
    // ]
    // 31.882269ms
    // --- PASS: TestGoogle (0.03s)
    // === RUN TestGoogleV2
    // [web result for "golang"
    // image result for "golang"
    // video result for "golang"
    // ]
    // 49.036197ms
    // --- PASS: TestGoogleV2 (0.05s)
    // === RUN TestGoogleV21
    // [web result for "golang"
    // video result for "golang"
    // image result for "golang"
    // ]
    // 53.507162ms
    // --- PASS: TestGoogleV21 (0.05s)
    // === RUN TestGoogleV3
    // [Video2 result for "golang"
    // web1 result for "golang"
    // Image1 result for "golang"
    // ]
    // 41.612001ms
    // --- PASS: TestGoogleV3 (0.04s)
    // PASS
    // ok command-line-arguments 0.301s

Best Practice Writing Go Code: 最佳实践

  • software programming 软件编程
  • software engineering 软件工程

参考文献