カランのブログ

Kalan 頭像照片,在淡水拍攝,淺藍背景

四零二曜日電子報上線啦!訂閱訂起來

ソフトウェアエンジニア / 台湾人 / 福岡生活
このブログはRSS Feed をサポートしています。RSSリンクをクリックして設定してください。技術に関する記事はコードがあるのでブログで閲覧することをお勧めします。

今のモード ライト

我會把一些不成文的筆記或是最近的生活雜感放在短筆記,如果有興趣的話可以來看看唷!

記事のタイトルや概要は自動翻訳であるため(中身は翻訳されてない場合が多い)、変な言葉が出たり、意味伝わらない場合がございます。空いてる時間で翻訳します。

Goworker 入門 — Redis によるワーカーの実装

最近、Ruby on Railsからプロジェクトを徐々にGolangに移行しています。一方では、構文を書いてみてどのような感じかを練習したいという理由があり、もう一方では、Golangのパワーを感じたいと思っています。また、既存のフレームワークがない状況で、自分のコードをどのように組織するかを体験したいと思っています。

今回紹介するのは、goworkerです。このライブラリを選んだ理由は、"go worker"と検索したときに最初に出てきたからです。また、このライブラリはresque(Rubyのワーカーパッケージ)と完全に互換性があり、Ruby on Railsのアプリケーションでタスクをキューに入れて、Golangで実行することができます。

Nodes@1.100000023841858x

使用方法

初期化時に、workerがキューから取得するときに実行する対応するタスクを、すべてgoworkerに登録する必要があります。

func init() {
  settings := goworker.WorkerSettings{
    URI:            os.Getenv("REDIS_URL"),
    Queues:         []string{"worker", "queues"},
    UseNumber:      true,
    ExitOnComplete: false, // キューが空でも完了フラグを立てない
    Concurrency:    concurrency,
    Connections:    connections,
    Namespace:      "myapp" + ":",
    Interval:       10.0,
  }
  goworker.SetSettings(settings)  
}

必要に応じてconcurrencyconnectionsを定義できます。また、Namespaceはキューの名前にスコープを持たせるためのものです。intervalは、キューにタスクがない場合に新しいタスクをチェックするまでの待機時間を指定します。

ワーカーを実装するには、関数を実装し、関数をgoworkerに登録する必要があります。

func workerFn(queue string, args ...interface{}) error {
  // ここに処理を記述
  fmt.Println(queue, args)
}

goworker.Register("MyWorker", workerFn)

設定が完了したら、goworker.Start()を使用してRedisを監視できます。この操作はブロックされるため、通常は別のサーバーまたはプロセスを起動します(サーバーの起動時にgoroutineを開始することもできます)。

func main() {
	err := goworker.Work()
  if err != nil {
    // エラーログを出力
  }  
}

次に、Redisにタスクを追加してみましょう:

RPUSH myapp:queue:worker '{"class": "MyWorker", "args": [1,2,3]}

すると、stdoutworker[1 2 3]が表示されることに気づくでしょう。

また、goworker.Enqueue(&goworker.Job{})を使用してタスクを直接Redisに追加することもできます。

動作原理

新しいタスクが追加されると、goworkerはRPUSHを使用してタスクをnamespace:queue:job.Queueに追加します。パラメータをシリアライズするために、JSON文字列をキューに格納します。

// workers.go L43~47
buffer, err := json.Marshal(job.Payload)
	if err != nil {
		logger.Criticalf("Cant marshal payload on enqueue")
		return err
	}

	err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer)
	if err != nil {
		logger.Criticalf("Cant push to queue")
		return err
	}

また、現在使用中のキューを記録するためにセットも使用されます。

// workers.go L49~53
err = conn.Send("SADD", fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue)
if err != nil {
  logger.Criticalf("Cant register queue to list of use queues")
  return err
}

goworker.Work()を呼び出すと、内部でpollerが最初に呼び出されます。pollerは次のようなことを行います。

  1. poller.getJob()を使用して、LPOPを呼び出してタスクを取得します。
reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue))
  1. poller.poll(duration, quit)を使用して、定期的にjobsチャネルから実行するジョブを取得し、quitを終了のシグナルとして使用します。
jobs := make(chan *Job)
//......
return jobs
  1. 最後にworker.work(jobs, &monitor)を実行し、ジョブを受け取った場合にはw.run(job, workerFunc)が実行されます。対応するクラスが見つからない場合は、エラーメッセージ「No Worker for ... queue with args ...」が表示されます。

問題点

  • goworkerを使用しても、予期しないシャットダウンの場合、保留中のジョブが正常に実行されることは保証されません。
  • 私の理解では、Redisにはメッセージを確実に受信するメカニズムがないため、より信頼性の高いメカニズムが必要な場合は、AMQPのようなプロトコルが必要になるかもしれません。ただし、小規模なプロジェクトの場合、Redisは十分に使用できます。

まとめ

一般的なアプリケーションにとっては、Redisは十分に使用できます。大規模なトラフィックと高並行性の場合を除いては、Redisのキューも十分に使えます。今回は、goworkerとRedisを組み合わせて開発する方法を紹介しました。これにより、時間のかかるタスク(または遅延タスク)を処理できるようになります。より大規模なアプリケーションの場合は、より信頼性の高いプロトコル(例:AMQP)を使用することができます。 また、goworkerを実行すると、ファンが常に回転することがありますが、CPUは明らかに高負荷になりません......。

次の記事

IdleCallbackをリクエスト-空き時間を有効に活用してください

前の記事

Golang ノート — タイプアサーション

この文章が役に立つと思うなら、下のリンクで応援してくれると大変嬉しいです✨

Buy me a coffee