Goworker 入門 — Redis によるワーカーの実装

作成者:カランカラン
💡

質問やフィードバックがありましたら、フォームからお願いします

本文は台湾華語で、ChatGPT で翻訳している記事なので、不確かな部分や間違いがあるかもしれません。ご了承ください

最近、プロジェクトを Ruby on Rails から Golang に徐々に移行しています。一方では、醜い構文を書いてみたいという練習のため、もう一方では Golang の力を感じたり、既存のフレームワークなしで自分のコードをどのように整理するかを体験したいと思っています。

今日は goworker を紹介します。これを選んだ理由は、検索で最初に出てきたのがこれだったからです。また、resque(Ruby のワーカーライブラリ)に完全に対応した実装方法を持っているので、Ruby on Rails のアプリケーションでタスクをキューに追加し、Golang でそのタスクを実行できるからです。

Nodes@1.100000023841858x

使用方法

初期化の際には、実行されるすべてのワーカーを goworker に登録する必要があります。これにより、ワーカーはキューを取得する際にどのタスクを実行すべきかを知ることができます。

func init() {
  settings := goworker.WorkerSettings{
    URI:            os.Getenv("REDIS_URL"),
    Queues:         []string{"worker", "queues"},
    UseNumber:      true,
    ExitOnComplete: false, // キューが空でも完了しない。
    Concurrency:    concurrency,
    Connections:    connections,
    Namespace:      "myapp" + ":",
    Interval:       10.0,
  }
  goworker.SetSettings(settings)  
}

必要に応じて concurrencyconnections を定義できます。下の Namespace はキューの名前にスコープを持たせるためのものです。interval は、キューにタスクがない場合に新しいタスクがあるかどうかを再チェックする間隔を示します。

ワーカーを実装するには、関数を実装し、その関数を goworker に登録する必要があります。

func workerFn(queue string, args ...interface{}) error {
  // あなたのタスク。
  fmt.Println(queue, args)
}

goworker.Register("MyWorker", workerFn)

設定が完了したら、goworker.Start() を使用して Redis を監視できます。この操作はブロックされるため、通常は別のサーバーやプロセスを開くか、サーバーを起動する際に goroutine を走らせることができます。

func main() {
	err := goworker.Work()
  if err != nil {
    // エラーをログに記録
  }  
}

次に、Redis にタスクを追加してみましょう:

RPUSH myapp:queue:worker '{"class": "MyWorker", "args": [1,2,3]}

stdout には worker[1 2 3] が表示されることが分かります。

また、goworker.Enqueue(&goworker.Job{}) を使って、直接 Redis にタスクを追加することもできます。

運作方式

新しいタスクが追加されると、goworker は RPUSH を使用してタスクを namespace:queue:job.Queue に追加します。パラメータをシリアライズできるように、JSON 文字列を内容としてキューに入れます。

// workers.go L43~47
buffer, err := json.Marshal(job.Payload)
	if err != nil {
		logger.Criticalf("ペイロードのマーシャリングに失敗しました")
		return err
	}

	err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer)
	if err != nil {
		logger.Criticalf("キューに追加できませんでした")
		return err
	}

さらに、現在使用中のキューを記録するための集合も用意されています。

// workers.go L49~53
err = conn.Send("SADD", fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue)
if err != nil {
  logger.Criticalf("使用中のキューのリストに登録できませんでした")
  return err
}

goworker.Work() を呼び出す際、内部で poller が呼ばれます。この poller は主に以下のことを行います:

  1. poller.getJob() を通じて LPOP を使用してタスクを取り出します。
reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue))
  1. poller.poll(duration, quit) を通じて、定期的に jobs チャンネルから実行する jobs を取り出し、quit を終了のシグナルとして使用します。
jobs := make(chan *Job)
//......
return jobs
  1. 最後に worker.work(jobs, &monitor) を実行し、jobs を受け取った際には w.run(job, workerFunc) を実行します。これは当初定義した関数です。対応するクラスが見つからない場合は、エラー No Worker for ... queue with args ... が表示されます。

問題

  • goworker を使用している場合、予期しないシャットダウンが発生した際に、保留中のジョブが正常に実行される保証はありません。
  • 私の理解では、Redis には受信側が必ずメッセージを受け取ることを保証するメカニズムはありません。より信頼性の高いメカニズムが必要な場合は、AMQP のようなプロトコルが必要になるかもしれません。しかし、小規模なプロジェクトにおいては、Redis は十分に機能します。

小結

一般的なアプリケーションにとって、Redis は十分に使えるものであり、本当に大流量や高い同時接続が求められる場面でない限り、Redis のキューもそれほど使い物にならないというわけではありません。今回は、goworker というライブラリを Redis と組み合わせて開発し、時間のかかるタスクや遅延タスクの処理を行う方法を紹介しました。より大規模なアプリケーションに対しては、AMQP などのより信頼性の高いプロトコルを使用することができます。また、goworker を実行しているときは、私のファンが常に回っているのですが、これが正常な状況なのかはわかりませんが、CPU の使用率は特に上昇していません……。

この記事が役に立ったと思ったら、下のリンクからコーヒーを奢ってくれると嬉しいです ☕ 私の普通の一日が輝かしいものになります ✨

Buy me a coffee