Kalan's Blog

Kalan 頭像照片,在淡水拍攝,淺藍背景

四零二曜日電子報上線啦!訂閱訂起來

本部落主要是關於前端、軟體開發以及我在日本的生活,也會寫一些對時事的觀察和雜感
本部落格支援 RSS feed(全文章內容),可點擊下方 RSS 連結或透過第三方服務設定。若技術文章裡有程式碼語法等特殊樣式,仍建議至原網站瀏覽以獲得最佳體驗。

目前主題 亮色

我會把一些不成文的筆記或是最近的生活雜感放在短筆記,如果有興趣的話可以來看看唷!

Goworker 簡介 — 搭配 Redis 實作 worker

最近正在將專案從 Ruby On Rails 當中逐漸搬移到 golang,除了一方面是想要練習寫寫看醜醜的語法感覺怎麼樣,一方面是想感受一下 golang 的威力還有在沒有一個既有的框架下,要怎麼組織自己的程式碼。

今天要來介紹的是 goworker。會看上他的原因是搜尋 go worker 第一個跳出來的就是它,而且他實作的方式完全支援 resque(一個 Ruby 的 worker 套件)的形狀,所以我可以在 Ruby On Rails 的應用中將任務放到 queue 裡頭,再透過 golang 執行任務。

Nodes@1.100000023841858x

使用方法

在初始化的時候,你需要將所有會執行到 worker 先註冊到 goworker 當中,讓 worker 知道取得這個佇列的時候,要執行哪個對應的任務:

func init() {
  settings := goworker.WorkerSettings{
    URI:            os.Getenv("REDIS_URL"),
    Queues:         []string{"worker", "queues"},
    UseNumber:      true,
    ExitOnComplete: false, // don't complete even though queue is empty.
    Concurrency:    concurrency,
    Connections:    connections,
    Namespace:      "myapp" + ":",
    Interval:       10.0,
  }
  goworker.SetSettings(settings)  
}

你可以根據需要來定義 concurrencyconnections,至於下面的 Namespace 則是讓 queue 的名稱有一個作用域。interval 是指如果在 queue 沒有任務的情況下,過多久會在檢查一次是否有新任務。

要如何實作一個 worker 呢?你需要實作一個函數,以及將函數註冊到 goworker 當中。

func workerFn(queue string, args ...interface{}) error {
  // your job.
  fmt.Println(queue, args)
}

goworker.Register("MyWorker", workerFn)

設定完之後就可以用 goworker.Start() 來監聽 Redis 了。這個操作是會 block 的,所以通常都是另外開一個 server 或是進程(你要在伺服器開啟的時候開一個 goroutine 給他跑也可以)。

func main() {
	err := goworker.Work()
  if err != nil {
    // log your error
  }  
}

接下來我們在 redis 塞一個任務瞧瞧:

RPUSH myapp:queue:worker '{"class": "MyWorker", "args": [1,2,3]}

會發現在 stdout 上出現了 worker[1 2 3]

你也可以直接用 goworker.Enqueue(&goworker.Job{}) 來將你的任務推入 redis 當中。

運作方式

當有新的任務加入的時候,goworker 會用 RPUSH 的方式將任務塞到 namespace:queue:job.Queue 當中。為了要讓參數可以序列化,使用 JSON 字串當作內容塞入佇列中。

// workers.go L43~47
buffer, err := json.Marshal(job.Payload)
	if err != nil {
		logger.Criticalf("Cant marshal payload on enqueue")
		return err
	}

	err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer)
	if err != nil {
		logger.Criticalf("Cant push to queue")
		return err
	}

除此之外也用一個集合紀錄了目前使用中的 queue 有哪些。

// workers.go L49~53
err = conn.Send("SADD", fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue)
if err != nil {
  logger.Criticalf("Cant register queue to list of use queues")
  return err
}

在呼叫 goworker.Work() 的時候,內部會先呼叫一個 poller,這個 poller 主要做了幾件事情:

  1. 透過 poller.getJob() 呼叫 LPOP 將任務取出
reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue))
  1. 透過 poller.poll(duration, quit) 會定期從 jobs channel 取出要執行的 jobs,並透過 quit 當作離開的 signal。
jobs := make(chan *Job)
//......
return jobs
  1. 最後執行 worker.work(jobs, &monitor),接收到 jobs 的時候會執行 w.run(job, workerFunc) 就是我們當初定義的函數。如果找不到對應的 class,會印出錯誤 No Worker for ... queue with args ...

問題

  • 使用 goworker 並沒有辦法保證如果意外 shutdown,這些 pending 的 job 能夠順利執行。
  • 就我的理解,redis 並沒有一個機制可以保證接收端一定會收到訊息,如果需要更可靠的機制,恐怕需要像是 AMQP 的協定。不過對一個小專案來說,redis 已經相當夠用了。

小結

對於一般的應用來說,Redis 已經相當夠用了,除非真的是大流量跟高並發的場景,不然 redis 的 queue 也沒有那麼不堪用。這次介紹了 goworker 這個函式庫搭配 redis 做開發,應該可以完成耗時任務(或延時任務)的處理,對於比較大型的應用來說,可以用像是 AMQP 等更可靠的協議來完成。 另外就是跑 goworker 的時候我的風扇都會一直轉,不知道是不是正常的情況,但 CPU 並沒有明顯的飆升......。

上一篇

requestIdleCallback - 善用空閑時間

下一篇

Golang 筆記 — Type Assertion

如果覺得這篇文章對你有幫助的話,可以考慮到下面的連結請我喝一杯 ☕️ 可以讓我平凡的一天變得閃閃發光 ✨

Buy me a coffee