Kalan's Blog

Kalan 頭像照片,在淡水拍攝,淺藍背景

四零二曜日電子報上線啦!訂閱訂起來

Software Engineer / Taiwanese / Life in Fukuoka
This blog supports RSS feed (all content), you can click RSS icon or setup through third-party service. If there are special styles such as code syntax in the technical article, it is still recommended to browse to the original website for the best experience.

Current Theme light

我會把一些不成文的筆記或是最近的生活雜感放在短筆記,如果有興趣的話可以來看看唷!

Please notice that currenly most of posts are translated by AI automatically and might contain lots of confusion. I'll gradually translate the post ASAP

Introduction to Goworker — Implementing Workers with Redis

Recently, I have been gradually moving my project from Ruby On Rails to golang. One reason is to practice and get a feel for the syntax of golang, and the other reason is to experience the power of golang and how to organize my code without an existing framework.

Today, I want to introduce goworker. The reason I chose it is because it was the first result when searching for "go worker," and it has full support for the shape of resque (a Ruby worker package). This means I can enqueue tasks in the Ruby On Rails application and execute them using golang.

Nodes@1.100000023841858x

Usage

During initialization, you need to register all the workers that will be executed with goworker. This lets the worker know which corresponding task to execute when it fetches a queue:

func init() {
  settings := goworker.WorkerSettings{
    URI:            os.Getenv("REDIS_URL"),
    Queues:         []string{"worker", "queues"},
    UseNumber:      true,
    ExitOnComplete: false, // don't complete even though the queue is empty.
    Concurrency:    concurrency,
    Connections:    connections,
    Namespace:      "myapp" + ":",
    Interval:       10.0,
  }
  goworker.SetSettings(settings)  
}

You can define concurrency and connections as needed. The Namespace below is used to provide a scope for the queue names. The interval indicates how often to check for new tasks if there are no tasks in the queue.

To implement a worker, you need to define a function and register it with goworker:

func workerFn(queue string, args ...interface{}) error {
  // your job.
  fmt.Println(queue, args)
}

goworker.Register("MyWorker", workerFn)

After setting everything up, you can use goworker.Start() to start listening to Redis. This operation will block, so it's usually done in a separate server or process (you can also start a goroutine for it when your server starts).

func main() {
	err := goworker.Work()
  if err != nil {
    // log your error
  }  
}

Next, let's enqueue a task in Redis:

RPUSH myapp:queue:worker '{"class": "MyWorker", "args": [1,2,3]}

You will see worker and [1 2 3] printed on the stdout.

You can also directly enqueue your task into Redis using goworker.Enqueue(&goworker.Job{}).

How It Works

When a new task is added, goworker uses RPUSH to push the task into namespace:queue:job.Queue. To allow parameter serialization, the content is stored as a JSON string in the queue.

// workers.go L43~47
buffer, err := json.Marshal(job.Payload)
	if err != nil {
		logger.Criticalf("Cant marshal payload on enqueue")
		return err
	}

	err = conn.Send("RPUSH", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, job.Queue), buffer)
	if err != nil {
		logger.Criticalf("Cant push to queue")
		return err
	}

In addition, a set is used to keep track of the currently used queues.

// workers.go L49~53
err = conn.Send("SADD", fmt.Sprintf("%squeues", workerSettings.Namespace), job.Queue)
if err != nil {
  logger.Criticalf("Cant register queue to the list of used queues")
  return err
}

When calling goworker.Work(), it internally calls a poller which does the following:

  1. Uses poller.getJob() to fetch the task by calling LPOP.
reply, err := conn.Do("LPOP", fmt.Sprintf("%squeue:%s", workerSettings.Namespace, queue))
  1. Uses poller.poll(duration, quit) to periodically retrieve jobs from the jobs channel and uses quit as the signal to exit.
jobs := make(chan *Job)
//......
return jobs
  1. Finally, it executes worker.work(jobs, &monitor). When a job is received, it executes w.run(job, workerFunc), which is the function we defined earlier. If no corresponding class is found, an error "No Worker for ... queue with args ..." is printed.

Issues

  • Using goworker does not guarantee that pending jobs will be executed if an unexpected shutdown occurs.
  • As far as I understand, Redis does not have a mechanism to ensure that the recipient will definitely receive a message. If a more reliable mechanism is needed, protocols like AMQP may be required. However, for a small project, Redis is sufficient.

Conclusion

For general applications, Redis is already quite sufficient, unless there are high traffic and high concurrency scenarios where a more advanced queueing system may be needed. In this article, I introduced the goworker library for development with Redis, which should be able to handle time-consuming or delayed tasks. For larger applications, more reliable protocols like AMQP can be used. Additionally, when running goworker, my fan keeps spinning constantly. I'm not sure if this is normal, but the CPU doesn't show any significant increase...

Prev

RequestidLecallBack - Make good use of free time

Next

Golang Notes — Type Assertion

If you found this article helpful, please consider buy me a drink ☕️ It'll make my ordinary day shine✨

Buy me a coffee