Building a Work Queue With Redis and Lua

I recently encountered a situation that called for a work queue. The system seemed simple enough at the outset – only producers and consumers, with jobs prioritized by when they are enqueued. There is, however, a curious twist – the consumers are people doing work in a single page app. There are enough concurrent users to warrant a datastore with atomic operations.

Jobs cannot simply be dequeued when people start them because many jobs go unfinished for one reason or another. The behavior we want is a timeout – the person has some amount of time to complete the job after it was dequeued, else the system considers the job abandoned and must it back in the inbound queue.

Redis does not have primitives for this specific use case, but it turns out we don’t need them – we can run Lua scripts on our Redis server to get the exact behavior we need.

First, a script to reserve jobs:

1
2
3
4
5
6
7
8
-- arguments are passed in as strings; we need our score to be a number
local time = tonumber(ARGV[1])
local val  = redis.call('zrange', KEYS[1], 0, 0)[1]
if val then
  redis.call('zadd', KEYS[2], time, val)
  redis.call('zremrangebyrank', KEYS[1], 0, 0)
end
return val

This takes jobs from our default queue and moves them to a reserved queue. Both queues are sorted sets – the time is passed in as a string (ARGV[1]), converted to a number and used as the score. Using a sorted set keeps the inbound queue ordered by the time when jobs were queued up and prevents duplicate jobs. We can also use the timestamp/score to determine which jobs in the reserved queue have timed out and move them back to the inbound queue:

1
2
3
4
5
6
7
local time = tonumber(ARGV[1])
local vals = redis.call('zrangebyscore', KEYS[1], 0, time)
redis.call('zremrangebyscore', KEYS[1], 0, time)
for i, val in ipairs(vals) do
  redis.call('zadd', KEYS[2], time+i, (val))
end
return #vals

Redis scripts run as atomic operations and, as Antirez notes, much more performant than using WATCH.

The server used as a backend for our single page app would interact with Redis like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
class MyQueue

  INBOUND_QUEUE =  "#{self}"
  RESERVED_QUEUE = "#{self}:reserved"

  def enqueue(data)
    $redis.zadd(INBOUND_QUEUE, timestamp, data)
  end

  def reserve
    $redis.eval zreserve, keys: [INBOUND_QUEUE, RESERVED_QUEUE], argv: [timestamp]
  end

  JOB_EXPIRES_IN = 300 # seconds

  def clear
    $redis.eval zclear_queue, keys: [RESERVED_QUEUE, INBOUND_QUEUE], argv: [timestamp - JOB_EXPIRES_IN]
  end

  def delete(key, queue=RESERVED_QUEUE)
    $redis.zrem(queue, key)
  end

  private

  # note - reading in the Lua script as a concatenated string!
  def zreserve
    File.readlines("lib/zreserve.lua").map(&:chomp).join(' ')
  end

  def zclear_queue
    File.readlines("lib/zclear_queue.lua").map(&:chomp).join(' ')
  end

  def timestamp
    Time.now.to_i
  end
end

This server needs a cron job or some other way to periodically call #clear to clear the reserved queue of timed-out jobs. A more fully baked implementation might also use SCRIPT LOAD to upload the script to our Redis server and then call it via EVALSHA instead of sending the entire script every time.

The tour of Lua on Learn X in Y Minutes and the examples on the Redis homepage are all you need to get started and extend the usefulness of Redis in meaningful way within a couple of hours. I recommend giving this a shot the next time you encounter a unique problem where Redis seems close but not quite right for your specific use case.