Skip to main content

Command Palette

Search for a command to run...

Golang高并发学习【通过Go来处理每分钟达百万的数据请求】

Published
2 min read

golang 的主要场景之一就是用来做高并发的处理,之前的公司有个 golang 写的服务用来生成二维码,并上传到 upyun 上。

并发很好,设计良好,当时还看了下源码,然后今天继续看了一下相关的文章。

然后找到这篇译文:

【译文】通过 Go 来处理每分钟达百万的数据请求 https://blog.csdn.net/tybaoerge/article/details/50392386

这篇文章非常好的描述了什么是高并发,怎么实现高并发:在有限的资源下,通过合理的使用 goroutine,channel,来完成业务操作。

只要是 2 部分,调度器(dispatcher),执行者(worker)

源码分析

调度器 dispatcher

调度器的作用是从 JobQueue 中获取新的 job,并将 job 交给闲置的 worker。

package worker

import ( "fmt" )

type Dispatcher struct { // A pool of workers channels that are registered with the dispatcher // 执行池, WorkerPool chan chan Job maxWorkers int }

func NewDispatcher(maxWorkers int) Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool, maxWorkers: maxWorkers} } // 初始化worker池,并启动woker池,并开始接受新的job // func (d Dispatcher) Run() { // starting n number of workers for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(d.WorkerPool) worker.Start() }

go d.dispatch() }

// 开始调度,接收新的job func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: // a job request has been received go func(job Job) { fmt.Println("[UPYUN] Dispatcher get JOB") // try to obtain a worker job channel that is available. // this will block until a worker is idle // 从pool中获取空闲的job channel jobChannel := <-d.WorkerPool

// dispatch the job to the worker job channel // 将job塞入 job channel中 jobChannel <- job }(job) } } }

工人执行者 Worker

package worker

import ( "fmt" "os" )

var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") )

type Payload struct { FilePath string }

// Job represents the job to be run type Job struct { Payload Payload }

// A buffered channel that we can send work requests on. var JobQueue chan Job

// Worker represents the worker that executes the job type Worker struct { WorkerPool chan (chan Job) JobChannel chan Job quit chan bool }

func NewWorker(workerPool chan chan Job) Worker { return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), // 分配器中,会将任务交给jobChannel,下面会从这里读取到job quit: make(chan bool)} }

// Start method starts the run loop for the worker, listening for a quit channel in // case we need to stop it func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. // 将限制的JobChannel(chan) 丢入WorkerPool w.WorkerPool <- w.JobChannel

select { case job := <-w.JobChannel: // 当闲置的jobChannel中有job时, job.Payload // job开始工作 case <-w.quit: return } } }() }

// Stop signals the worker to stop listening for work requests. func (w Worker) Stop() { go func() { w.quit <- true }() }

感觉整个代码最抽象的其实是

chan chan Job

大家知道它是“管道中的管道” 的意思。

用图来模拟就是这样一种感觉

WorkerPool chan


| chan Job |


| chan Job |

- + - + - + -

但是实际用起来,还是蛮抽象的。

我想着,这样写似乎跟下面的代码没有太大区别啊:

伪代码:

maxPool = 100 workerPool = make(chan Job, maxPool)

// 塞入队列 for { workerPool <- Job }

// 取出队列 for i=0; i<maxPool; i++ { go func() { for { select { case job := <- workerPool: job.Do() } } } }

More from this blog

会有越来的多的side projects出现

什么是side project 可以理解为工作之余开发的产品,通常是收费的服务,可作为工作之外额外收入的产品。 在目前经济下行、公司开源节流(裁员)、失业率上升的大环境下,每一个程序员都应该拥有自己的side projects来对冲未来的不可靠风险。 所以side project 不仅仅是多一种「被动收入」,他也是你未来的「筹码」——工作累了,不想干了、有小孩了、买房了、家人生病了等等这些事情发生的时候,你可以「任性」一下。 而不是一些不可靠的风险出现的时候,再来提高自己的「抗风险」能力。 上面...

Jul 28, 20231 min read61

Xbox Cloud Gaming 游戏加速尝试

Xbox Cloud Gaming 游戏加速 之前有个很老的xbox游戏机,因为性能有点差劲了,所以卖了。 偶尔还是想玩玩游戏,但是老婆不让给买xbox的物理机(怀恋单身),所以含泪玩xbox cloud gaming(以下简称 xcg),xpg会员游戏还是很丰富的。 于是买了uu加速器,坦白说uu加速器不算便宜的,但是xcg在晚上高峰期,一样卡得怀疑人生,那种被马赛克糊满脸的感受,上一次这种体验还是看小姐姐的电影。 其实用uu加速器玩是ok的,就是国内的网络情况大家都知道,dddd(懂得都懂)...

Sep 5, 20221 min read258

github codespaces 在ipad上的最佳浏览器

Github Codespaces github codespaces 是github在被微软收购后,提供的一款在线web IDE,基本与vscode一致,只是运行在浏览器上而已。 而且非常土豪的提供了4核8G内存,微软就是壕。 so,通过ipad来使用codespaces就是一件比较顺其自然的事。 可是其实也没那么简单。。 IPad使用codespaces的快捷键问题 其实最大的问题就是快捷键的问题,不管你是用saferi,还是chrome,他们提供的快捷键或多或少会与你的vscode的快捷键...

Oct 8, 20211 min read59

基于binlog检查数据错误

起因 某个表的 status 「莫名其妙」变成 0 了 其实可以判断出是 status 没有被赋值,通常是结构体的 status 默认是 0 才会被插入数据库。 于是问题看起来就很简单了:只要检查相关的更新操作中的 status 字段有没有被赋值即可。 但是 这个表是用户表。 因为历史原因,源码中的更新函数很多 调用更新函数的地方也很多 无法复现该问题,测试人员也不知道做了什么操作状态变成 0 的 所以同事关注这个问题挺久了,也没看到问题原因(当然我也没看到……) 但是恰好我在做导出 bin...

Aug 5, 20212 min read61
M

Moli'blog

64 posts

曾经的少年还在吗?