最近、Ruby on Railsからプロジェクトを徐々にGolangに移行しています。一方では、構文を書いてみてどのような感じかを練習したいという理由があり、もう一方では、Golangのパワーを感じたいと思っています。また、既存のフレームワークがない状況で、自分のコードをどのように組織するかを体験したいと思っています。
今回紹介するのは、goworkerです。このライブラリを選んだ理由は、"go worker"と検索したときに最初に出てきたからです。また、このライブラリはresque
(Rubyのワーカーパッケージ)と完全に互換性があり、Ruby on Railsのアプリケーションでタスクをキューに入れて、Golangで実行することができます。
使用方法
初期化時に、workerがキューから取得するときに実行する対応するタスクを、すべてgoworkerに登録する必要があります。
func init() {
settings := goworker.WorkerSettings{
URI: os.Getenv("REDIS_URL"),
Queues: []string{"worker", "queues"},
UseNumber: true,
ExitOnComplete: false, // キューが空でも完了フラグを立てない
Concurrency: concurrency,
Connections: connections,
Namespace: "myapp" + ":",
Interval: 10.0,
}
goworker.SetSettings(settings)
}
必要に応じてconcurrency
とconnections
を定義できます。また、Namespaceはキューの名前にスコープを持たせるためのものです。interval
は、キューにタスクがない場合に新しいタスクをチェックするまでの待機時間を指定します。
ワーカーを実装するには、関数を実装し、関数をgoworkerに登録する必要があります。
func workerFn(queue string, args ...interface{}) error {
// ここに処理を記述
fmt.Println(queue, args)
}
goworker.Register("MyWorker", workerFn)
設定が完了したら、goworker.Start()
を使用してRedisを監視できます。この操作はブロックされるため、通常は別のサーバーまたはプロセスを起動します(サーバーの起動時にgoroutineを開始することもできます)。
func main() {
err := goworker.Work()
if err != nil {
// エラーログを出力
}
}
次に、Redisにタスクを追加してみましょう:
RPUSH myapp:queue:worker '{"class": "MyWorker", "args": [1,2,3]}
すると、stdout
にworker
と[1 2 3]
が表示されることに気づくでしょう。
また、goworker.Enqueue(&goworker.Job{})
を使用してタスクを直接Redisに追加することもできます。
動作原理
新しいタスクが追加されると、goworkerはRPUSH
を使用してタスクをnamespace:queue:job.Queue
に追加します。パラメータをシリアライズするために、JSON文字列をキューに格納します。
// workers.go L43~47
buffer, err := json.Marshal(job.Payload)
if err != nil {
logger.Criticalf("Cant marshal payload on enqueue")
return err
}
err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer)
if err != nil {
logger.Criticalf("Cant push to queue")
return err
}
また、現在使用中のキューを記録するためにセットも使用されます。
// workers.go L49~53
err = conn.Send("SADD", fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue)
if err != nil {
logger.Criticalf("Cant register queue to list of use queues")
return err
}
goworker.Work()
を呼び出すと、内部でpoller
が最初に呼び出されます。poller
は次のようなことを行います。
poller.getJob()
を使用して、LPOP
を呼び出してタスクを取得します。
reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue))
poller.poll(duration, quit)
を使用して、定期的にjobsチャネルから実行するジョブを取得し、quit
を終了のシグナルとして使用します。
jobs := make(chan *Job)
//......
return jobs
- 最後に
worker.work(jobs, &monitor)
を実行し、ジョブを受け取った場合にはw.run(job, workerFunc)
が実行されます。対応するクラスが見つからない場合は、エラーメッセージ「No Worker for ... queue with args ...」が表示されます。
問題点
- goworkerを使用しても、予期しないシャットダウンの場合、保留中のジョブが正常に実行されることは保証されません。
- 私の理解では、Redisにはメッセージを確実に受信するメカニズムがないため、より信頼性の高いメカニズムが必要な場合は、AMQPのようなプロトコルが必要になるかもしれません。ただし、小規模なプロジェクトの場合、Redisは十分に使用できます。
まとめ
一般的なアプリケーションにとっては、Redisは十分に使用できます。大規模なトラフィックと高並行性の場合を除いては、Redisのキューも十分に使えます。今回は、goworkerとRedisを組み合わせて開発する方法を紹介しました。これにより、時間のかかるタスク(または遅延タスク)を処理できるようになります。より大規模なアプリケーションの場合は、より信頼性の高いプロトコル(例:AMQP)を使用することができます。 また、goworkerを実行すると、ファンが常に回転することがありますが、CPUは明らかに高負荷になりません......。