GVKun编程网logo

使用通道同步多个goroutine(多通道数据)

20

此处将为大家介绍关于使用通道同步多个goroutine的详细内容,并且为您解答有关多通道数据的相关问题,此外,我们还将为您介绍关于GolangGoroutine不在通道内部运行、Golang|同时结束

此处将为大家介绍关于使用通道同步多个goroutine的详细内容,并且为您解答有关多通道数据的相关问题,此外,我们还将为您介绍关于Golang Goroutine不在通道内部运行、Golang | 同时结束多个goroutines、golang-- 使用 channel 来同步 goroutine、golang--使用channel来同步goroutine的有用信息。

本文目录一览:

使用通道同步多个goroutine(多通道数据)

使用通道同步多个goroutine(多通道数据)

我需要使用单个任务队列和单个结果队列来启动许多工作程序。每个工人都应该以不同的goroutine开始。我需要等到所有工作人员都将完成并且任务队列将为空后再退出程序。我已经准备了goroutine同步的小例子。主要思想是我们将排队的任务计数,并等待所有工人完成工作。但是当前的实现有时会遗漏值。为什么会发生这种情况以及如何解决问题?示例代码:

import (    "fmt"    "os"    "os/signal"    "strconv")const num_workers = 5type workerChannel chan uint64// Make channel for tasksvar workCh workerChannel// Make channel for task countervar cntChannel chan int// Task countervar tskCnt int64// Worker functionfunc InitWorker(input workerChannel, result chan string, num int) {    for {        select {        case inp := <-input:            getTask()            result <- ("Worker " + strconv.Itoa(num) + ":" + strconv.FormatUint(inp, 10))        }    }}// Function to manage task counter// should be in uniq goroutinefunc taskCounter(inp chan int) {    for {        val := <-inp        tskCnt += int64(val)    }}// Put pask to the queuefunc putTask(val uint64) {    func() {        fmt.Println("Put ", val)        cntChannel <- int(1)        workCh <- val    }()}// Get task from queuefunc getTask() {    func() {        cntChannel <- int(-1)    }()}func main() {// Init service channels    abort := make(chan os.Signal)    done := make(chan bool)// init queue for results    result := make(chan string)// init task queue    workCh = make(workerChannel)// start some workers    for i := uint(0); i < num_workers; i++ {        go InitWorker(workCh, result, int(i))    }// init counter for synchro    cntChannel = make(chan int)    go taskCounter(cntChannel)// goroutine that put some tasks into queue    go func() {        for i := uint(0); i < 21; i++ {            putTask(uint64(i))        }        // wait for processing all tasks and close application        for len(cntChannel) != 0 {}        for tskCnt != 0 {}        for len(workCh) != 0 {}        for len(result) != 0 {}        // send signal for close        done <- true    }()    signal.Notify(abort, os.Interrupt)    for {        select {        case <-abort:            fmt.Println("Aborted.")            os.Exit(0)        // print results        case res := <-result:            fmt.Println(res)        case <-done:            fmt.Println("Done")            os.Exit(0)        }    }}

答案1

小编典典

使用sync.WaitGroup等待goroutine完成。关闭通道以使通道上的循环读取退出。

package mainimport (    "fmt"    "sync")type workerChannel chan uint64const num_workers = 5func main() {    results := make(chan string)    workCh := make(workerChannel)    // Start workers    var wg sync.WaitGroup    wg.Add(num_workers)    for i := 0; i < num_workers; i++ {        go func(num int) {            defer wg.Done()            // Loop processing work until workCh is closed            for w := range workCh {                results <- fmt.Sprintf("worker %d, task %d", num, w)            }        }(i)    }    // Close result channel when workers are done    go func() {        wg.Wait()        close(results)    }()    // Send work to be done    go func() {        for i := 0; i < 21; i++ {            workCh <- uint64(i)        }        // Closing the channel causes workers to break out of loop        close(workCh)    }()    // Process results. Loop exits when result channel is closed.    for r := range results {        fmt.Println(r)    }}

https://play.golang.org/p/ZifpzsP6fNv

Golang Goroutine不在通道内部运行

Golang Goroutine不在通道内部运行

我正在尝试实现字数统计程序,但是第一步我遇到了一些问题。

这是我的代码:

package mainimport (    "fmt"    "os"    "bufio"    "sync")// Load data into channelfunc laodData(arr []string,channel chan string,wg sync.WaitGroup) {    for _,path := range arr {        file,err := os.Open(path)        fmt.Println("begin to laodData ", path)        if err != nil {            fmt.Println(err)            os.Exit(-1)        }        defer file.Close()        reader := bufio.NewReaderSize(file, 32*10*1024)        i := 0        for {            line,err := reader.ReadString(''\n'')            channel <- line            if err != nil {                break            }            i++            if i%200 == 0 {                fmt.Println(i," lines parsed")            }        }        fmt.Println("finish laodData ", path)    }    wg.Done()}// dispatch data lines into different mappersfunc dispatcher(channel chan string,wg sync.WaitGroup){    fmt.Println("pull data 11")    line,ok := <- channel    fmt.Println(ok)    for ok {        fmt.Println(line)        line,ok = <- channel    }    fmt.Println("pull data 22")    wg.Done()}func main() {    path := os.Args    if len(path) < 2 {        fmt.Println("Need Input Files")        os.Exit(0)    }    var wg sync.WaitGroup    wg.Add(2)    channel := make(chan string)    defer close(channel)    fmt.Println("before dispatcher")    go laodData(path[1:],channel,wg)    go dispatcher(channel,wg)    wg.Wait()    fmt.Println("after dispatcher")}

这是我的输出:

...finish laodData  result.txtthrow: all goroutines are asleep - deadlock!goroutine 1 [semacquire]:sync.runtime_Semacquire(0x42154100, 0x42154100)    /usr/local/go/src/pkg/runtime/zsema_amd64.c:146 +0x25sync.(*WaitGroup).Wait(0x4213b440, 0x0)    /usr/local/go/src/pkg/sync/waitgroup.go:79 +0xf2main.main()    /Users/kuankuan/go/src/mreasy/main.go:66 +0x238goroutine 2 [syscall]:created by runtime.main    /usr/local/go/src/pkg/runtime/proc.c:221goroutine 4 [chan receive]:main.dispatcher(0x42115a50, 0x0, 0x2, 0x0)    /Users/kuankuan/go/src/mreasy/main.go:45 +0x223created by main.main    /Users/kuankuan/go/src/mreasy/main.go:65 +0x228exit status 2

谢谢 !

答案1

小编典典

当主goroutine退出时,程序终止,因此dispatcher()没有时间做任何事情。您需要封锁main()直到dispatcher()完成。通道可用于此目的:

package mainimport (    "fmt"    "os"    "bufio")var done = make(chan bool)             // create channel// Load files and send them into a channel for mappers reading.func dispatcher(arr []string,channel chan string) {    for _,path := range arr {        file,err := os.Open(path)        fmt.Println("begin to dispatch ", path)        if err != nil {            fmt.Println(err)            os.Exit(-1)        }        defer file.Close()        reader := bufio.NewReaderSize(file, 32*10*1024)        i := 0        for {            line,_ := reader.ReadString(''\n'')            channel <- line            i++            if i%200 == 0 {                fmt.Println(i," lines parsed")            }        }        fmt.Println("finish dispatch ", path)    }    done <- true                 // notify main() of completion}func main() {    path := os.Args    if len(path) < 2 {        fmt.Println("Need Input Files")        os.Exit(0)    }    channel := make(chan string)    fmt.Println("before dispatcher")    go dispatcher(path[1:],channel)    <-done                 // wait for dispatcher()    fmt.Println("after dispatcher")}

Golang | 同时结束多个goroutines

Golang | 同时结束多个goroutines

https://chilts.org/2017/06/12/cancelling-multiple-goroutines

golang-- 使用 channel 来同步 goroutine

golang-- 使用 channel 来同步 goroutine

在 golang 中同步 goroutine 有 2 种方法,要么使用 channel,要么使用 sync.WaitGroup,本文就是介绍如何通过 channel 来同步 goroutine。先看代码。

 1 package main
 2 
 3 import (
 4     "os"
 5     "os/signal"
 6     "runtime"
 7     "log"
 8     "syscall"
 9 )
10 
11 const NUM_OF_QUIT int = 100
12 
13 func main() {
14     runtime.GOMAXPROCS(runtime.NumCPU())
15     done := make(chan bool)
16     receive_channel := make(chan chan bool)
17     finish := make(chan bool)
18 
19 
20     for i := 0; i < NUM_OF_QUIT; i++ {
21         go do_while_select(i, receive_channel, finish)
22     }
23 
24     go handle_exit(done, receive_channel, finish)
25 
26     <-done
27     os.Exit(0)
28 
29 }
30 func handle_exit(done chan bool, receive_channel chan chan bool, finish chan bool) {
31     sigs := make(chan os.Signal, 1)
32     signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
33     chan_slice := make([]chan bool, 0)
34     for {
35         select {
36         case  <-sigs:
37             for _, v := range chan_slice {
38                 v <- true
39             }
40             for i := 0; i < len(chan_slice); i++ {
41                 <-finish
42             }
43             done <- true
44             runtime.Goexit()
45         case single_chan := <-receive_channel:
46             log.Println("the single_chan is ", single_chan)
47             chan_slice = append(chan_slice, single_chan)
48         }
49     }
50 }
51 func do_while_select(num int, rece chan chan bool, done chan bool) {
52     quit := make(chan bool)
53     rece <- quit
54     for {
55         select {
56         case <-quit:
57             done <- true
58             runtime.Goexit()
59         default:
60             //简单输出
61             log.Println("the ", num, "is running")
62         }
63     }
64 }

上面的代码保存为 example.go,通过 gotool 编译代码:

go build example.go

在当前目录下有 example 文件,在终端运行这个文件

2013/03/19 21:17:14 the  0 is running
2013/03/19 21:17:14 the  0 is running
2013/03/19 21:17:14 the  0 is running
2013/03/19 21:17:14 the  0 is running
2013/03/19 21:17:14 the  0 is running
2013/03/19 21:17:14 the  0 is running
2013/03/19 21:17:14 the  0 is running
2013/03/19 21:17:14 the  0 is running
2013/03/19 21:17:14 the  0 is running
2013/03/19 21:17:14 the  0 is running
2013/03/19 21:17:14 the  0 is running
2013/03/19 21:17:14 the  0 is running
2013/03/19 21:17:14 the  0 is running
2013/03/19 21:17:14 the  0 is running
......

上面不断输出 goroutine 中的数字,等待退出信号。

新打开一个终端,通过 ps 找到这个进程名,通过 kill 工具干掉这个进程:

$ps aux | grep example
user  4026 77.9  0.0  39436  1716 pts/1    Sl+  21:19   0:17 ./example
$kill 4026

不久就可以看到在第一个终端里面不再打印,至此演示完毕。

代码思想:

新建 NUM_OF_QUIT 个 goroutine,这些个 goroutine 里面新建 1 个 chan bool,通过这个 channel 来接受退出的信号,这些 channel 在新建的时候,已经发给了 handle_exit。在 handle_exit 这个 goroutine 里面,1 方面监控由系统发过来的退出信号,然后再通知其他的 goroutin 优雅地退出;另一方面通过 slice 收集其他 goroutine 发过来的 channel。handle_exit 通知其他的 goroutine 优雅退出后,再发信号给 main 进程主动退出。

可以修改 NUM_OF_QUIT 值,例如改为 10000,这个时候,kill 命令发出去后,要等待相当长的一段时间才能看到第一个终端停止打印。

参考:

Go by Example: Signals

https://gobyexample.com/signals

 转贴请注明来自:格通

 

golang--使用channel来同步goroutine

golang--使用channel来同步goroutine

在golang中同步goroutine有2种方法,要么使用channel,要么使用sync.WaitGroup,本文就是介绍如何通过channel来同步goroutine。先看代码。

 1 package main  2 
 3 import (  4     "os"
 5     "os/signal"
 6     "runtime"
 7     "log"
 8     "syscall"
 9 ) 10 
11 const NUM_OF_QUIT int = 100
12 
13 func main() { 14  runtime.GOMAXPROCS(runtime.Numcpu()) 15     done := make(chan bool) 16     receive_channel := make(chan chan bool) 17     finish := make(chan bool) 18 
19 
20     for i := 0; i < NUM_OF_QUIT; i++ { 21  go do_while_select(i,receive_channel,finish) 22  } 23 
24  go handle_exit(done,finish) 25 
26     <-done 27     os.Exit(0) 28 
29 } 30 func handle_exit(done chan bool,receive_channel chan chan bool,finish chan bool) { 31     sigs := make(chan os.Signal,1) 32  signal.Notify(sigs,syscall.SIGINT,syscall.SIGTERM) 33     chan_slice := make([]chan bool,0) 34     for { 35         select { 36         case  <-sigs: 37             for _,v := range chan_slice { 38                 v <- true
39  } 40             for i := 0; i < len(chan_slice); i++ { 41                 <-finish 42  } 43             done <- true
44  runtime.Goexit() 45         case single_chan := <-receive_channel: 46             log.Println("the single_chan is ",single_chan) 47             chan_slice = append(chan_slice,single_chan) 48  } 49  } 50 } 51 func do_while_select(num int,rece chan chan bool,done chan bool) { 52     quit := make(chan bool) 53     rece <- quit 54     for { 55         select { 56         case <-quit: 57             done <- true
58  runtime.Goexit() 59         default: 60             //简单输出
61             log.Println("the ",num,"is running") 62  } 63  } 64 }

上面的代码保存为example.go,通过gotool编译代码:

go build example.go

在当前目录下有example文件,在终端运行这个文件

2013/03/19 21:17:14 the  0 is running 2013/03/19 21:17:14 the  0 is running 2013/03/19 21:17:14 the  0 is running 2013/03/19 21:17:14 the  0 is running 2013/03/19 21:17:14 the  0 is running 2013/03/19 21:17:14 the  0 is running 2013/03/19 21:17:14 the  0 is running 2013/03/19 21:17:14 the  0 is running 2013/03/19 21:17:14 the  0 is running 2013/03/19 21:17:14 the  0 is running 2013/03/19 21:17:14 the  0 is running 2013/03/19 21:17:14 the  0 is running 2013/03/19 21:17:14 the  0 is running 2013/03/19 21:17:14 the  0 is running
......

上面不断输出goroutine中的数字,等待退出信号。

新打开一个终端,通过ps找到这个进程名,通过kill工具干掉这个进程:

$ps aux | grep example user 4026 77.9  0.0  39436  1716 pts/1    Sl+  21:19   0:17 ./example $kill 4026

不久就可以看到在第一个终端里面不再打印,至此演示完毕。

代码思想:

新建NUM_OF_QUIT个goroutine,这些个goroutine里面新建1个chan bool,通过这个channel来接受退出的信号,这些channel在新建的时候,已经发给了handle_exit。在handle_exit这个goroutine里面,1方面监控由系统发过来的退出信号,然后再通知其他的goroutin优雅地退出;另一方面通过slice收集其他goroutine发过来的channel。handle_exit通知其他的goroutine优雅退出后,再发信号给main进程主动退出。

可以修改NUM_OF_QUIT值,例如改为10000,这个时候,kill命令发出去后,要等待相当长的一段时间才能看到第一个终端停止打印。

参考:

Go by Example: Signals

https://gobyexample.com/signals

转贴请注明来自:格通

我们今天的关于使用通道同步多个goroutine多通道数据的分享已经告一段落,感谢您的关注,如果您想了解更多关于Golang Goroutine不在通道内部运行、Golang | 同时结束多个goroutines、golang-- 使用 channel 来同步 goroutine、golang--使用channel来同步goroutine的相关信息,请在本站查询。

本文标签: