カランのブログ

ソフトウェアエンジニア / 台湾人 / 福岡生活

今のモード ライト

最近、Ruby on Railsからプロジェクトを徐々にGolangに移行しています。一方では、構文を書いてみてどのような感じかを練習したいという理由があり、もう一方では、Golangのパワーを感じたいと思っています。また、既存のフレームワークがない状況で、自分のコードをどのように組織するかを体験したいと思っています。

今回紹介するのは、goworkerです。このライブラリを選んだ理由は、"go worker"と検索したときに最初に出てきたからです。また、このライブラリはresque(Rubyのワーカーパッケージ)と完全に互換性があり、Ruby on Railsのアプリケーションでタスクをキューに入れて、Golangで実行することができます。

Nodes@1.100000023841858x

使用方法

初期化時に、workerがキューから取得するときに実行する対応するタスクを、すべてgoworkerに登録する必要があります。

func init() {
  settings := goworker.WorkerSettings{
    URI:            os.Getenv("REDIS_URL"),
    Queues:         []string{"worker", "queues"},
    UseNumber:      true,
    ExitOnComplete: false, // キューが空でも完了フラグを立てない
    Concurrency:    concurrency,
    Connections:    connections,
    Namespace:      "myapp" + ":",
    Interval:       10.0,
  }
  goworker.SetSettings(settings)  
}

必要に応じてconcurrencyconnectionsを定義できます。また、Namespaceはキューの名前にスコープを持たせるためのものです。intervalは、キューにタスクがない場合に新しいタスクをチェックするまでの待機時間を指定します。

ワーカーを実装するには、関数を実装し、関数をgoworkerに登録する必要があります。

func workerFn(queue string, args ...interface{}) error {
  // ここに処理を記述
  fmt.Println(queue, args)
}

goworker.Register("MyWorker", workerFn)

設定が完了したら、goworker.Start()を使用してRedisを監視できます。この操作はブロックされるため、通常は別のサーバーまたはプロセスを起動します(サーバーの起動時にgoroutineを開始することもできます)。

func main() {
	err := goworker.Work()
  if err != nil {
    // エラーログを出力
  }  
}

次に、Redisにタスクを追加してみましょう:

RPUSH myapp:queue:worker '{"class": "MyWorker", "args": [1,2,3]}

すると、stdoutworker[1 2 3]が表示されることに気づくでしょう。

また、goworker.Enqueue(&goworker.Job{})を使用してタスクを直接Redisに追加することもできます。

動作原理

新しいタスクが追加されると、goworkerはRPUSHを使用してタスクをnamespace:queue:job.Queueに追加します。パラメータをシリアライズするために、JSON文字列をキューに格納します。

// workers.go L43~47
buffer, err := json.Marshal(job.Payload)
	if err != nil {
		logger.Criticalf("Cant marshal payload on enqueue")
		return err
	}

	err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer)
	if err != nil {
		logger.Criticalf("Cant push to queue")
		return err
	}

また、現在使用中のキューを記録するためにセットも使用されます。

// workers.go L49~53
err = conn.Send("SADD", fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue)
if err != nil {
  logger.Criticalf("Cant register queue to list of use queues")
  return err
}

goworker.Work()を呼び出すと、内部でpollerが最初に呼び出されます。pollerは次のようなことを行います。

  1. poller.getJob()を使用して、LPOPを呼び出してタスクを取得します。
reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue))
  1. poller.poll(duration, quit)を使用して、定期的にjobsチャネルから実行するジョブを取得し、quitを終了のシグナルとして使用します。
jobs := make(chan *Job)
//......
return jobs
  1. 最後にworker.work(jobs, &monitor)を実行し、ジョブを受け取った場合にはw.run(job, workerFunc)が実行されます。対応するクラスが見つからない場合は、エラーメッセージ「No Worker for ... queue with args ...」が表示されます。

問題点

  • goworkerを使用しても、予期しないシャットダウンの場合、保留中のジョブが正常に実行されることは保証されません。
  • 私の理解では、Redisにはメッセージを確実に受信するメカニズムがないため、より信頼性の高いメカニズムが必要な場合は、AMQPのようなプロトコルが必要になるかもしれません。ただし、小規模なプロジェクトの場合、Redisは十分に使用できます。

まとめ

一般的なアプリケーションにとっては、Redisは十分に使用できます。大規模なトラフィックと高並行性の場合を除いては、Redisのキューも十分に使えます。今回は、goworkerとRedisを組み合わせて開発する方法を紹介しました。これにより、時間のかかるタスク(または遅延タスク)を処理できるようになります。より大規模なアプリケーションの場合は、より信頼性の高いプロトコル(例:AMQP)を使用することができます。 また、goworkerを実行すると、ファンが常に回転することがありますが、CPUは明らかに高負荷になりません......。

作者

Kalan 頭像照片,在淡水拍攝,淺藍背景

愷開 | Kalan

Kalan です。台湾出身で、2019年に日本へ転職し、福岡に住んでいます。フロントエンド開発に精通しているだけでなく、IoT、アプリ開発、バックエンド、電子工作などの分野にも挑戦しています。 最近、エレキギターを始めました。ブログを通じて、より多くの人と交流できればと思っています。気軽に絡んでください