基于Golang学习并发编程
Golang 实现并发编程
简介
我对于并发的认识
在我们学习并发这个概念之前,我们需要了解下什么是并发。我第一次接触这个概念的时候是在逻辑漏洞挖掘上。当时第一次接触到这个概念,我还在想这个和无限重放有什么区别呢。我觉得比较好的一个解释是 并发是多线程同时进行的,而无限重放,不论你的手速再快,他也是单线程进行的。所以 网上那些通过无限重放数据包构造并发的都是扯淡。。。。
go语言中的并发
很多语言都可以进行编程,但是民间据说go自带并发属性,Go 使用 Go 协程(Goroutine) 和信道(Channel)来处理并发。具体内容我也也不太好说,看下面的代码和注释吧。
Goroutine
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
hello()
fmt.Println("main goroutine done!")
}
这段代码就是正常的并行输出。
package main
import "fmt"
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
go hello() //使用了 Goroutine
fmt.Println("main goroutine done!")
}
//输出结果
/*main goroutine done!*/
没有输出 Hello 部分
网上师傅的解释
go为 main函数 自动创建了一个goroutine 当main()函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束,main函数所在的goroutine就像是权利的游戏中的夜王,其他的goroutine都是异鬼,夜王一死它转化的那些异鬼也就全部GG了。
//网上的解决方法 加入 time.sleep
package main
import (
"fmt"
"time"
)
func hello() {
fmt.Println("Hello Goroutine!")
}
func main() {
go hello()
fmt.Println("main goroutine done!")
time.Sleep(time.Second)
}
//输出结果
/*main goroutine done!
Hello Goroutine!
*/
//启动多个 goroutine
package main
import (
"fmt"
"sync"
)
var wg sync.WaitGroup
func hello(i int) {
defer wg.Done() // goroutine结束就登记-1
fmt.Println("Hello Goroutine!", i)
}
func main() {
for i := 0; i < 10; i++ {
wg.Add(1) // 启动一个goroutine就登记+1
go hello(i)
}
wg.Wait() // 等待所有登记的goroutine都结束
}
//执行并发输出,多次运行输出发现输出结果不一样。这就是并发的特性,如果是并行输出,输出结果就是唯一的。
runtime
package main
import (
"fmt"
"runtime"
)
func main() {
go func(s string) {
for i := 0; i < 2; i++ {
fmt.Println(s)
}
}("world")
// 主协程
for i := 0; i < 2; i++ {
// 切一下,再次分配任务
runtime.Gosched()
fmt.Println("hello")
}
}
package main
import (
"fmt"
"runtime"
"time"
)
func a() {
for i := 1; i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i := 1; i < 10; i++ {
fmt.Println("B:", i)
}
}
func main() {
runtime.GOMAXPROCS(2)
go a()
go b()
time.Sleep(time.Second)
}
package main
import (
"fmt"
"time"
)
func a() {
for i := 1; i < 10; i++ {
fmt.Println("A:", i)
}
}
func b() {
for i := 1; i < 10; i++ {
fmt.Println("B:", i)
}
}
func main() {
go a()
go b()
time.Sleep(time.Second)
}
//go 1.5版本前默认单核心,go 1.5版本后默认调用全部核心 可通过 runtime.GOMAXPROCS 实现使用多少核心
Go语言中的操作系统线程和goroutine的关系:
- 1.一个操作系统线程对应用户态多个goroutine。
- 2.go程序可以同时使用多个操作系统线程。
- 3.goroutine和OS线程是多对多的关系,即m:n。
channel
如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。
// channel
package main
import "fmt"
func main() {
var demo1 chan int //声明一个传递整型的通道
fmt.Println(demo1)
}
//输出结果 <nil>
// channel
//channel的缓冲大小是可选的
package main
import "fmt"
func main() {
ch4 := make(chan bool)
ch5 := make(chan bool)
ch6 := make(chan []int)
fmt.Println(ch4)
fmt.Println(ch5)
fmt.Println(ch6)
}
//这段代码我只是知道如何用了,但是对于一些底层的知识还不太清楚
// channel操作
// channel 的操作只有三种 发送 接收 关闭
//发送和接收都使用<-符号
ch <- 10 // 把10发送到ch中
x := <- ch // 从ch中接收值并赋值给变量x
<-ch // 从ch中接收值,忽略结果
close(ch) //关闭通道
网上关于关闭通道的一些说法
关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。
关闭后的通道有以下特点:
1.对一个关闭的通道再发送值就会导致panic。
2.对一个关闭的通道进行接收会一直获取值直到通道为空。
3.对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
4.关闭一个已经关闭的通道会导致panic。
package main
import "fmt"
func main() {
demo := make(chan int)
demo <- 10
fmt.Println("发送成功")
}
// 上述代码会报错 因为我们使用demo := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有人接收值的时候才能发送值
//解决方案1 启用一个goroutine去接收值
package main
import (
"fmt"
)
func recv(c chan int) {
ret := <-c
fmt.Println("接收成功", ret)
}
func main() {
demo := make(chan int)
go recv(demo) // 启用goroutine从通道接收值
demo <- 10
fmt.Println("发送成功")
}
//使用无缓冲通道进行通信将导致发送和接收的goroutine同步化。因此,无缓冲通道也被称为同步通道。
//解决方案 2 创建一个有缓冲的通道
func main() {
demo := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
demo <- 10
fmt.Println("发送成功")
}
//关闭操作
package main
import "fmt"
func main() {
c := make(chan int)
go func() {
for i := 0; i < 5; i++ {
c <- i
}
close(c)
}()
for {
if data, ok := <-c; ok {
fmt.Println(data)
} else {
break
}
}
fmt.Println("main结束")
}
//输出0到4
//使用上面的操作打印 0 到 100 的平方
package main
import "fmt"
func main() {
ch1 := make(chan int) //声明两段通道
ch2 := make(chan int)
go func() { //调用 goroutine
for i := 0; i < 101; i++ {
ch1 <- i
}
close(ch1) //关闭通道
}()
go func() { //调用 goroutine
for {
i, ok := <-ch1 //通道关闭后再取值ok=false
if !ok { //判断
break
}
ch2 <- i * i
}
close(ch2) //关闭通道
}()
for i := range ch2 { //打印
fmt.Println(i)
}
}
//单向通道
func counter(out chan<- int) {
for i := 0; i < 100; i++ {
out <- i
}
close(out)
}
func squarer(out chan<- int, in <-chan int) {
for i := range in {
out <- i * i
}
close(out)
}
func printer(in <-chan int) {
for i := range in {
fmt.Println(i)
}
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
go counter(ch1)
go squarer(ch2, ch1)
printer(ch2)
}
/* 1.chan<- int是一个只能发送的通道,可以发送但是不能接收;
2.<-chan int是一个只能接收的通道,可以接收但是不能发送*/
在函数传参及任何赋值操作中将双向通道转换为单向通道是可以的,但反过来是不可以的。

//并发爬取网页图片代码 很有学习意义
package main
import (
"fmt"
"io/ioutil"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"time"
)
func HandleError(err error, why string) {
if err != nil {
fmt.Println(why, err)
}
}
// 下载图片,传入的是图片叫什么
func DownloadFile(url string, filename string) (ok bool) {
resp, err := http.Get(url)
HandleError(err, "http.get.url")
defer resp.Body.Close()
bytes, err := ioutil.ReadAll(resp.Body)
HandleError(err, "resp.body")
filename = "E:/image" + filename
// 写出数据
err = ioutil.WriteFile(filename, bytes, 0666)
if err != nil {
return false
} else {
return true
}
}
// 并发爬思路:
// 1.初始化数据管道
// 2.爬虫写出:26个协程向管道中添加图片链接
// 3.任务统计协程:检查26个任务是否都完成,完成则关闭数据管道
// 4.下载协程:从管道里读取链接并下载
var (
// 存放图片链接的数据管道
chanImageUrls chan string
waitGroup sync.WaitGroup
// 用于监控协程
chanTask chan string
reImg = `https?://[^"]+?(\.((jpg)|(png)|(jpeg)|(gif)|(bmp)))`
)
func main() {
// 1.初始化管道
chanImageUrls = make(chan string, 1000000)
chanTask = make(chan string, 26)
// 2.爬虫协程
for i := 1; i < 27; i++ {
waitGroup.Add(1)
go getImgUrls("https://bz.zzzmh.cn/index" + strconv.Itoa(i) + ".html")
}
waitGroup.Add(1)
go CheckOK()
for i := 0; i < 5; i++ {
waitGroup.Add(1)
go DownloadImg()
}
waitGroup.Wait()
}
// 下载图片
func DownloadImg() {
for url := range chanImageUrls {
filename := GetFilenameFromUrl(url)
ok := DownloadFile(url, filename)
if ok {
fmt.Printf("%s 下载成功\n", filename)
} else {
fmt.Printf("%s 下载失败\n", filename)
}
}
waitGroup.Done()
}
// 截取url名字
func GetFilenameFromUrl(url string) (filename string) {
// 返回最后一个/的位置
lastIndex := strings.LastIndex(url, "/")
// 切出来
filename = url[lastIndex+1:]
// 时间戳解决重名
timePrefix := strconv.Itoa(int(time.Now().UnixNano()))
filename = timePrefix + "_" + filename
return
}
// 任务统计协程
func CheckOK() {
var count int
for {
url := <-chanTask
fmt.Printf("%s 完成了爬取任务\n", url)
count++
if count == 26 {
close(chanImageUrls)
break
}
}
waitGroup.Done()
}
// 爬图片链接到管道
// url是传的整页链接
func getImgUrls(url string) {
urls := getImgs(url)
// 遍历切片里所有链接,存入数据管道
for _, url := range urls {
chanImageUrls <- url
}
// 标识当前协程完成
// 每完成一个任务,写一条数据
// 用于监控协程知道已经完成了几个任务
chanTask <- url
waitGroup.Done()
}
// 获取当前页图片链接
func getImgs(url string) (urls []string) {
pageStr := GetPageStr(url)
re := regexp.MustCompile(reImg)
results := re.FindAllStringSubmatch(pageStr, -1)
fmt.Printf("共找到%d条结果\n", len(results))
for _, result := range results {
url := result[0]
urls = append(urls, url)
}
return
}
// 抽取根据url获取内容
func GetPageStr(url string) (pageStr string) {
resp, err := http.Get(url)
HandleError(err, "http.Get url")
defer resp.Body.Close()
// 2.读取页面内容
pageBytes, err := ioutil.ReadAll(resp.Body)
HandleError(err, "ioutil.ReadAll")
// 字节转字符串
pageStr = string(pageBytes)
return pageStr
}