Introduction to Goworker — Implementing Workers with Redis

Written byKalanKalan
💡

If you have any questions or feedback, pleasefill out this form

This post is translated by ChatGPT and originally written in Mandarin, so there may be some inaccuracies or mistakes.

Recently, I have been gradually migrating a project from Ruby on Rails to Golang. One reason for this shift is to practice and experience the unique syntax of Golang, while the other is to explore the power of Golang and learn how to organize my code without relying on an existing framework.

Today, I want to introduce goworker. The reason I chose it is that it was the first result that appeared when searching for "go worker," and its implementation fully supports the shape of resque (a Ruby worker library). This allows me to enqueue tasks in my Ruby on Rails application and then execute them using Golang.

Nodes@1.100000023841858x

How to Use

When initializing, you need to register all the workers that will be executed, so that the worker knows which corresponding task to execute when it retrieves a job from the queue:

func init() {
  settings := goworker.WorkerSettings{
    URI:            os.Getenv("REDIS_URL"),
    Queues:         []string{"worker", "queues"},
    UseNumber:      true,
    ExitOnComplete: false, // don't complete even though queue is empty.
    Concurrency:    concurrency,
    Connections:    connections,
    Namespace:      "myapp" + ":",
    Interval:       10.0,
  }
  goworker.SetSettings(settings)  
}

You can define concurrency and connections as needed, while the Namespace allows for a scoped name for the queue. The interval specifies how often to check for new tasks when there are none in the queue.

How do you implement a worker? You need to create a function and register it with goworker.

func workerFn(queue string, args ...interface{}) error {
  // your job.
  fmt.Println(queue, args)
}

goworker.Register("MyWorker", workerFn)

Once configured, you can use goworker.Start() to listen for jobs in Redis. This operation will block, so it's common to run it in a separate server or process (you can also start a goroutine to run it when your server is up).

func main() {
	err := goworker.Work()
  if err != nil {
    // log your error
  }  
}

Next, let's enqueue a task in Redis and see what happens:

RPUSH myapp:queue:worker '{"class": "MyWorker", "args": [1,2,3]}

You will notice that worker and [1 2 3] appear in stdout.

You can also directly use goworker.Enqueue(&goworker.Job{}) to push your job into Redis.

How It Works

When a new job is added, goworker uses RPUSH to insert the task into namespace:queue:job.Queue. To ensure that parameters can be serialized, it uses a JSON string as the content of the queue.

// workers.go L43~47
buffer, err := json.Marshal(job.Payload)
	if err != nil {
		logger.Criticalf("Cant marshal payload on enqueue")
		return err
	}

	err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer)
	if err != nil {
		logger.Criticalf("Cant push to queue")
		return err
	}

Additionally, it maintains a set to keep track of which queues are currently in use.

// workers.go L49~53
err = conn.Send("SADD", fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue)
if err != nil {
  logger.Criticalf("Cant register queue to list of use queues")
  return err
}

When goworker.Work() is called, an internal poller is first invoked. This poller performs several tasks:

  1. It retrieves a job using poller.getJob() which calls LPOP.
reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue))
  1. The poller.poll(duration, quit) method regularly fetches jobs from the jobs channel and uses quit as an exit signal.
jobs := make(chan *Job)
//......
return jobs
  1. Finally, it executes worker.work(jobs, &monitor), which runs w.run(job, workerFunc) for the jobs received. If no corresponding class is found, it prints an error: No Worker for ... queue with args ...

Issues

  • Using goworker does not guarantee that pending jobs will execute smoothly if an unexpected shutdown occurs.
  • As I understand it, Redis does not have a mechanism to ensure that the receiving end will always receive messages. If a more reliable mechanism is needed, a protocol like AMQP may be necessary. However, for a small project, Redis is already quite sufficient.

Conclusion

For general applications, Redis is more than adequate. Unless you’re dealing with high traffic and concurrency scenarios, Redis queues are quite usable. This article introduced the goworker library for development alongside Redis, which should effectively handle time-consuming (or delayed) tasks. For larger applications, more reliable protocols like AMQP can be used. Additionally, I noticed that my fan runs continuously while goworker is running; I’m not sure if this is normal, but my CPU usage hasn’t spiked noticeably...

If you found this article helpful, please consider buying me a coffee ☕ It'll make my ordinary day shine ✨

Buy me a coffee