Kalan's Blog

Software Engineer / Taiwanese / Life in Fukuoka

Current Theme light

Recently, I have been gradually moving my project from Ruby On Rails to golang. One reason is to practice and get a feel for the syntax of golang, and the other reason is to experience the power of golang and how to organize my code without an existing framework.

Today, I want to introduce goworker. The reason I chose it is because it was the first result when searching for "go worker," and it has full support for the shape of resque (a Ruby worker package). This means I can enqueue tasks in the Ruby On Rails application and execute them using golang.

Nodes@1.100000023841858x

Usage

During initialization, you need to register all the workers that will be executed with goworker. This lets the worker know which corresponding task to execute when it fetches a queue:

func init() {
  settings := goworker.WorkerSettings{
    URI:            os.Getenv("REDIS_URL"),
    Queues:         []string{"worker", "queues"},
    UseNumber:      true,
    ExitOnComplete: false, // don't complete even though the queue is empty.
    Concurrency:    concurrency,
    Connections:    connections,
    Namespace:      "myapp" + ":",
    Interval:       10.0,
  }
  goworker.SetSettings(settings)  
}

You can define concurrency and connections as needed. The Namespace below is used to provide a scope for the queue names. The interval indicates how often to check for new tasks if there are no tasks in the queue.

To implement a worker, you need to define a function and register it with goworker:

func workerFn(queue string, args ...interface{}) error {
  // your job.
  fmt.Println(queue, args)
}

goworker.Register("MyWorker", workerFn)

After setting everything up, you can use goworker.Start() to start listening to Redis. This operation will block, so it's usually done in a separate server or process (you can also start a goroutine for it when your server starts).

func main() {
	err := goworker.Work()
  if err != nil {
    // log your error
  }  
}

Next, let's enqueue a task in Redis:

RPUSH myapp:queue:worker '{"class": "MyWorker", "args": [1,2,3]}

You will see worker and [1 2 3] printed on the stdout.

You can also directly enqueue your task into Redis using goworker.Enqueue(&goworker.Job{}).

How It Works

When a new task is added, goworker uses RPUSH to push the task into namespace:queue:job.Queue. To allow parameter serialization, the content is stored as a JSON string in the queue.

// workers.go L43~47
buffer, err := json.Marshal(job.Payload)
	if err != nil {
		logger.Criticalf("Cant marshal payload on enqueue")
		return err
	}

	err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer)
	if err != nil {
		logger.Criticalf("Cant push to queue")
		return err
	}

In addition, a set is used to keep track of the currently used queues.

// workers.go L49~53
err = conn.Send("SADD", fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue)
if err != nil {
  logger.Criticalf("Cant register queue to the list of used queues")
  return err
}

When calling goworker.Work(), it internally calls a poller which does the following:

  1. Uses poller.getJob() to fetch the task by calling LPOP.
reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue))
  1. Uses poller.poll(duration, quit) to periodically retrieve jobs from the jobs channel and uses quit as the signal to exit.
jobs := make(chan *Job)
//......
return jobs
  1. Finally, it executes worker.work(jobs, &monitor). When a job is received, it executes w.run(job, workerFunc), which is the function we defined earlier. If no corresponding class is found, an error "No Worker for ... queue with args ..." is printed.

Issues

  • Using goworker does not guarantee that pending jobs will be executed if an unexpected shutdown occurs.
  • As far as I understand, Redis does not have a mechanism to ensure that the recipient will definitely receive a message. If a more reliable mechanism is needed, protocols like AMQP may be required. However, for a small project, Redis is sufficient.

Conclusion

For general applications, Redis is already quite sufficient, unless there are high traffic and high concurrency scenarios where a more advanced queueing system may be needed. In this article, I introduced the goworker library for development with Redis, which should be able to handle time-consuming or delayed tasks. For larger applications, more reliable protocols like AMQP can be used. Additionally, when running goworker, my fan keeps spinning constantly. I'm not sure if this is normal, but the CPU doesn't show any significant increase...

Prev

RequestidLecallBack - Make good use of free time

Next

Golang Notes — Type Assertion

If you found this article helpful, please consider buy me a drink ☕️ It'll make my ordinary day shine✨

Buy me a coffee

作者

Kalan 頭像照片,在淡水拍攝,淺藍背景

愷開 | Kalan

Hi, I'm Kai. I'm Taiwanese and moved to Japan in 2019 for work. Currently settled in Fukuoka. In addition to being familiar with frontend development, I also have experience in IoT, app development, backend, and electronics. Recently, I started playing electric guitar! Feel free to contact me via email for consultations or collaborations or music! I hope to connect with more people through this blog.