-
Notifications
You must be signed in to change notification settings - Fork 9
FileQueue Design
University clusters and supercomputers often cannot access SQS easily but have access to a common file system. It would be a pain to have to set up a RabbitMQ instance or similar process on each cluster we get access to, so it would be ideal to have a queue system that just runs off the filesystem (Sergiy's idea).
We need the following properties from our queue:
- Serverless
- Durable - No losses from power outages and process crashes.
- Supports Verbs - queue create, queue delete, task create, time limited task lease, task delete, task lease extend, and reset tasks leases.
- Parallel Safe
- Recirculating Tasks - If a process fails, eventually the task will be picked up by another one.
- Supports millions of tasks.
- Can be operated by a pipeline technician without help (or need onerous approvals) from a cluster administrator.
File Queues in principle fulfill the first two properties as the server is the filesystem and files do not disappear on power loss or process crash. On journaling filesystems, the files do not even become corrupted on power loss in the middle of writing. Filesystems support millions of files in a single directory, but certain operations like listing become unusable. Properties 3 through 6 will require careful design. We anticipate that these queues can be run from userland and require no special approvals to be used unless the queues are very large, in which case the entire job will likely need special approval anyway.
With respect to the verbs specified, all should be familiar from SQS with one exception: reset task leases is new and is extremely useful for resetting a job that has partially run but crashed when the lease time is very long.
The queue will consist of the following directory structure where the leaves are files.
queue-directory
| - inserted
| - completions
| - queue
|- $unix_timestamp-$uuid1.json
|- $unix_timestamp-$uuid2.json
| - movements
|- $uuid1
|- $uuid2
Creates the above directory structure in an arbitrary location on the filesystem.
rm -r queue-directory
- Assign $uuid and set timestamp to 0 to indicate the task was never leased and is available.
- Create a file
queue-directory/queue/0-$uuid.json
which contains a JSON task description. - Create an empty file
queue-directory/movements/$uuid
This process is parallel safe because UUIDs are guaranteed to never collide.
- Sort the
queue
directory and pick the earliest timestamp that is greater than or equal to the current UNIX time that does not have an active lock. Let that file be named$timestamp-$uuid.json
Sorting is very slow when there are many files so additional tricks will be necessary, such as limiting the number of enqueued tasks or using a tree. - Open
queue-directory/movements/$uuid
in append mode with an fcntl process based file write lock and append the new filename on a new line then release the lock. Note that if another process were to immediately pick up the task and write the new uuid first this would result in a non-linear history but for our purposes it shouldn't matter. This should only happen if the lease time is extremely short (0-1 sec) or the filesystem is severely overloaded. mv $timestamp-$uuid.json ($now+$leasesec)-$uuid.json
- Release the lock on the movements file.
- Begin processing the task.
If any of these steps fail due to a missing file, that's okay and as many files should be deleted as possible.
- Acquire a write lock on
queue-directory/movements/$uuid
- Read
queue-directory/movements/$uuid
- Delete all of the files listed in that directory including the initial timestamp 0 file if it is not listed. This step ensures the queue remains clean even if weird things happen.
- Delete
queue-directory/movements/$uuid
- Acquire a read-write lock on
queue-directory/movements/$uuid
The write lock blocks the task from being accessed by other verbs, the read part is so that we can read it. - Starting from the last item in the file, see if that file exists, if not, work your way down the list in reverse order until the file is located. It will be the last item nearly 100% of the time, it will only be disordered in the rare case we mentioned in task leasing.
- Move the file to its new location.
- Append the new location to the movement file.
- Release the lock.
Iterating over the queue in order to count the number of enqueued tasks is very slow, so we provide a completions counter. While for the rest of the queue, the workers are mostly content to access different files from each other, a counter file would be highly contentious as all of them would need to touch it. Therefore, we designed the counter system to be lock-free and append-only. However, this means we are not able to simply increment numbers (counter = counter + 1
) as this would create serious race conditions.
Therefore, we decided to counter in unary rather than binary or decimal. Each task submits one byte to be appended to the file. Since it doesn't matter what order the bytes are appended, only their quantity, the counter will be accurate at the cost of one byte per task. The size of the file in bytes then reveals the number of tasks completed. This means one billion tasks will require an additional gigabyte of space for the counter, but the size of the tasks themselves should be far larger. More likely, the counter will run into the millions of tasks, which is a very manageable few megabytes.
This is very easy to do in an offline system and trickier in an online system. For an offline system, simply delete all the movement files and rename all the queue files to have 0 timestamps. For an online system, we need to be cognizant of other running processes so:
- Iterate over all files and run lease update so that the files are all set to UNIX timestamp 0.
All tasks have a UNIX timestamp that will eventually become valid because time always moves forward. After leasing, the timestamp is set to now + lease seconds, which will eventually become valid. If there is a small integer overflow, the task will become valid immediately.
However, process locks can prevent a task from releasing. How will we deal with zombie processes that never release their locks?
Synchronization is provided by the write lock of the movements/$uuid
file. Lock contention can build up on this file if worker requests are voluminous (which is probable). If we abandon strict FIFO semantics, workers can search for the first lock-free movement file and proceed.
The current version of this supports at least hundreds of thousands of tasks on a single machine. If extending to millions of tasks becomes a problem, there are a few potential solutions:
- Structure the queue as a timestamp tree so that searching and sorting take considerably less time. Queue creation would have to generate random valid times in order for the tree to not degenerate into a list.
- Retain a flat queue and only list a few thousand tasks at a time. Within each queue include a special task that will generate the next set of tasks when the size of the queue shrinks below a threshold.