Golang高并发学习【通过Go来处理每分钟达百万的数据请求】

·

2 min read

golang 的主要场景之一就是用来做高并发的处理,之前的公司有个 golang 写的服务用来生成二维码,并上传到 upyun 上。

并发很好,设计良好,当时还看了下源码,然后今天继续看了一下相关的文章。

然后找到这篇译文:

【译文】通过 Go 来处理每分钟达百万的数据请求 blog.csdn.net/tybaoerge/article/details/503..

这篇文章非常好的描述了什么是高并发,怎么实现高并发:在有限的资源下,通过合理的使用 goroutine,channel,来完成业务操作。

只要是 2 部分,调度器(dispatcher),执行者(worker)

源码分析

调度器 dispatcher

调度器的作用是从 JobQueue 中获取新的 job,并将 job 交给闲置的 worker。

package worker

import ( "fmt" )

type Dispatcher struct { // A pool of workers channels that are registered with the dispatcher // 执行池, WorkerPool chan chan Job maxWorkers int }

func NewDispatcher(maxWorkers int) Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool, maxWorkers: maxWorkers} } // 初始化worker池,并启动woker池,并开始接受新的job // func (d Dispatcher) Run() { // starting n number of workers for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(d.WorkerPool) worker.Start() }

go d.dispatch() }

// 开始调度,接收新的job func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: // a job request has been received go func(job Job) { fmt.Println("[UPYUN] Dispatcher get JOB") // try to obtain a worker job channel that is available. // this will block until a worker is idle // 从pool中获取空闲的job channel jobChannel := <-d.WorkerPool

// dispatch the job to the worker job channel // 将job塞入 job channel中 jobChannel <- job }(job) } } }

工人执行者 Worker

package worker

import ( "fmt" "os" )

var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") )

type Payload struct { FilePath string }

// Job represents the job to be run type Job struct { Payload Payload }

// A buffered channel that we can send work requests on. var JobQueue chan Job

// Worker represents the worker that executes the job type Worker struct { WorkerPool chan (chan Job) JobChannel chan Job quit chan bool }

func NewWorker(workerPool chan chan Job) Worker { return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), // 分配器中,会将任务交给jobChannel,下面会从这里读取到job quit: make(chan bool)} }

// Start method starts the run loop for the worker, listening for a quit channel in // case we need to stop it func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. // 将限制的JobChannel(chan) 丢入WorkerPool w.WorkerPool <- w.JobChannel

select { case job := <-w.JobChannel: // 当闲置的jobChannel中有job时, job.Payload // job开始工作 case <-w.quit: return } } }() }

// Stop signals the worker to stop listening for work requests. func (w Worker) Stop() { go func() { w.quit <- true }() }

感觉整个代码最抽象的其实是

chan chan Job

大家知道它是“管道中的管道” 的意思。

用图来模拟就是这样一种感觉

WorkerPool chan


| chan Job |


| chan Job |

- + - + - + -

但是实际用起来,还是蛮抽象的。

我想着,这样写似乎跟下面的代码没有太大区别啊:

伪代码:

maxPool = 100 workerPool = make(chan Job, maxPool)

// 塞入队列 for { workerPool <- Job }

// 取出队列 for i=0; i<maxPool; i++ { go func() { for { select { case job := <- workerPool: job.Do() } } } }

Did you find this article valuable?

Support 粒 莫 by becoming a sponsor. Any amount is appreciated!