最近正在將專案從 Ruby On Rails 當中逐漸搬移到 golang,除了一方面是想要練習寫寫看醜醜的語法感覺怎麼樣,一方面是想感受一下 golang 的威力還有在沒有一個既有的框架下,要怎麼組織自己的程式碼。
今天要來介紹的是 goworker。會看上他的原因是搜尋 go worker 第一個跳出來的就是它,而且他實作的方式完全支援 resque
(一個 Ruby 的 worker 套件)的形狀,所以我可以在 Ruby On Rails 的應用中將任務放到 queue 裡頭,再透過 golang 執行任務。
使用方法
在初始化的時候,你需要將所有會執行到 worker 先註冊到 goworker 當中,讓 worker 知道取得這個佇列的時候,要執行哪個對應的任務:
func init() {
settings := goworker.WorkerSettings{
URI: os.Getenv("REDIS_URL"),
Queues: []string{"worker", "queues"},
UseNumber: true,
ExitOnComplete: false, // don't complete even though queue is empty.
Concurrency: concurrency,
Connections: connections,
Namespace: "myapp" + ":",
Interval: 10.0,
}
goworker.SetSettings(settings)
}
你可以根據需要來定義 concurrency
跟 connections
,至於下面的 Namespace 則是讓 queue 的名稱有一個作用域。interval
是指如果在 queue 沒有任務的情況下,過多久會在檢查一次是否有新任務。
要如何實作一個 worker 呢?你需要實作一個函數,以及將函數註冊到 goworker 當中。
func workerFn(queue string, args ...interface{}) error {
// your job.
fmt.Println(queue, args)
}
goworker.Register("MyWorker", workerFn)
設定完之後就可以用 goworker.Start()
來監聽 Redis 了。這個操作是會 block 的,所以通常都是另外開一個 server 或是進程(你要在伺服器開啟的時候開一個 goroutine 給他跑也可以)。
func main() {
err := goworker.Work()
if err != nil {
// log your error
}
}
接下來我們在 redis 塞一個任務瞧瞧:
RPUSH myapp:queue:worker '{"class": "MyWorker", "args": [1,2,3]}
會發現在 stdout
上出現了 worker
跟 [1 2 3]
。
你也可以直接用 goworker.Enqueue(&goworker.Job{})
來將你的任務推入 redis 當中。
運作方式
當有新的任務加入的時候,goworker 會用 RPUSH
的方式將任務塞到 namespace:queue:job.Queue
當中。為了要讓參數可以序列化,使用 JSON
字串當作內容塞入佇列中。
// workers.go L43~47
buffer, err := json.Marshal(job.Payload)
if err != nil {
logger.Criticalf("Cant marshal payload on enqueue")
return err
}
err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer)
if err != nil {
logger.Criticalf("Cant push to queue")
return err
}
除此之外也用一個集合紀錄了目前使用中的 queue 有哪些。
// workers.go L49~53
err = conn.Send("SADD", fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue)
if err != nil {
logger.Criticalf("Cant register queue to list of use queues")
return err
}
在呼叫 goworker.Work()
的時候,內部會先呼叫一個 poller
,這個 poller
主要做了幾件事情:
- 透過
poller.getJob()
呼叫LPOP
將任務取出
reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue))
- 透過
poller.poll(duration, quit)
會定期從 jobs channel 取出要執行的jobs
,並透過quit
當作離開的 signal。
jobs := make(chan *Job)
//......
return jobs
- 最後執行
worker.work(jobs, &monitor)
,接收到 jobs 的時候會執行w.run(job, workerFunc)
就是我們當初定義的函數。如果找不到對應的 class,會印出錯誤No Worker for ... queue with args ...
問題
- 使用 goworker 並沒有辦法保證如果意外 shutdown,這些 pending 的 job 能夠順利執行。
- 就我的理解,redis 並沒有一個機制可以保證接收端一定會收到訊息,如果需要更可靠的機制,恐怕需要像是 AMQP 的協定。不過對一個小專案來說,redis 已經相當夠用了。
小結
對於一般的應用來說,Redis 已經相當夠用了,除非真的是大流量跟高並發的場景,不然 redis 的 queue 也沒有那麼不堪用。這次介紹了 goworker 這個函式庫搭配 redis 做開發,應該可以完成耗時任務(或延時任務)的處理,對於比較大型的應用來說,可以用像是 AMQP 等更可靠的協議來完成。 另外就是跑 goworker 的時候我的風扇都會一直轉,不知道是不是正常的情況,但 CPU 並沒有明顯的飆升......。