Here’s another cautionary performance tale.
If you use Celery subtasks to manage parallel work, know going in that it uses spin-loops to monitor subtask progress. Specifically, if you get a TaskSetResult from a TaskSet and then use iterate()
or join()
, the underlying code will eat your CPU alive. Here’s the code in celery.result.TaskSetResult
:
class TaskSetResult(object): # [snip] def iterate(self): """Iterate over the return values of the tasks as they finish one by one. :raises: The exception if any of the tasks raised an exception. """ pending = list(self.subtasks) results = dict((subtask.task_id, copy(subtask)) for subtask in self.subtasks) while pending: for task_id in pending: result = results[task_id] if result.status == states.SUCCESS: try: pending.remove(task_id) except ValueError: pass yield result.result elif result.status in states.PROPAGATE_STATES: raise result.result def join(self, timeout=None, propagate=True): """Gather the results of all tasks in the taskset, and returns a list ordered by the order of the set. :keyword timeout: The number of seconds to wait for results before the operation times out. :keyword propagate: If any of the subtasks raises an exception, the exception will be reraised. :raises celery.exceptions.TimeoutError: if `timeout` is not :const:`None` and the operation takes longer than `timeout` seconds. """ time_start = time.time() results = PositionQueue(length=self.total) while True: for position, pending_result in enumerate(self.subtasks): state = pending_result.state if state in states.READY_STATES: if propagate and state in states.PROPAGATE_STATES: raise pending_result.result results[position] = pending_result.result if results.full(): # Make list copy, so the returned type is not a position # queue. return list(results) else: if (timeout is not None and time.time() >= time_start + timeout): raise TimeoutError("join operation timed out.")
The symptom
After helping us fix a couple of humongous performance bugs, the PostgreSQL Experts dudes analyzed a new system instance to isolate the next largest problem. We soon heard from them.
Celery is still polling the DB about 600 times/sec. In fact, out of the 683 queries in a second, 597 of them are:
SELECT “”celery_taskmeta””.””id””, “”celery_taskmeta””.””task_id””, “”celery_taskmeta””.””status””, “”celery_taskmeta””.””result””, “”celery_taskmeta””.””date_done””, “”celery_taskmeta””.””traceback””
FROM “”celery_taskmeta”” WHERE “”celery_taskmeta””.””task_id”” = E’b5517ae9-59ef-46e7-808b-1da17dab9721′In fact, it’s only these 10 queries:
SELECT “”celery_taskmeta””.””id””, “”celery_taskmeta””.””task_id””, “”celery_taskmeta””.””status””, “”celery_taskmeta””.””result””, “”celery_taskmeta””.””date_done””, “”celery_taskmeta””.””traceback””
FROM “”celery_taskmeta”” WHERE “”celery_taskmeta””.””task_id”” = E’1f6a65f7-48da-44b8-8b4b-70fd0ee8d800′
SELECT “”celery_taskmeta””.””id””, “”celery_taskmeta””.””task_id””, “”celery_taskmeta””.””status””, “”celery_taskmeta””.””result””, “”celery_taskmeta””.””date_done””, “”celery_taskmeta””.””traceback””
FROM “”celery_taskmeta”” WHERE “”celery_taskmeta””.””task_id”” = E’3b6f1504-326a-4133-a08d-daf67170a1d1′
[…snip…]
I examined their sandbox, saw the 10 subtasks for an indexing operation, knew that a master task was waiting for them to complete, and thought: Oh, no. It couldn’t be.
Sure enough, the .iterator()
code was spin-waiting for the subtasks to terminate. The spin-loop generated most of the Postgres operations during the time interval in question.
What we expected
The Celery documentation doesn’t warn you that iterating on, or waiting for, subtask results will spin-loop. I like Celery and have sung its praises, but I was surprised. I expected this about as much as I expected it to call a Fortran subroutine.
We assumed that waiting on a subtask result would, you know, wait. As in, blocking-wait. Performance and Strategies advises that “it is better to split the problem up into many small tasks, than have a few long running tasks.” But this is referring to general granularity benefits, and not to a spin-loop you should avoid.
The fix
This problem took some work to uncover: Find and fix larger performance problems, reanalyze the system, find the next largest problem, dig into the code, drink a shot of single-malt Scotch Whiskey. But once identified, the fix was easy.
Ryan coded a version of .iterator()
with a time.sleep(5.0)
within the loop. The subtasks run for multiple minutes, so this won’t materially affect the next subtask set’s queuing. The full results aren’t in yet, but the system behavior as of when I left the office today was very encouraging. We’ll try increasing the sleep interval to 60 seconds if nothing odd happens over the weekend.
You’re right. .join() is a very naive implementation, but the reason it hasn’t been the focus of optimization is because it is not often used.
You shouldn’t wait for the results of subtasks, as that can lead to a deadlock; Imagine you have 10 worker processes, and 10 tasks depending on other tasks, then there won’t be any workers to fulfill the dependencies. You should be doing that asynchronously, by using callbacks: The last task in the set applies the callback. Sadly this isn’t built-in yet. what people have been doing is implementing this using atomic increments in memcached/Redis (database may also be possible, but probably tricky). I will make this clear in the documentation.
I’m working on a way to prevent deadlocks for such tasks, and the .join operation is also being optimized for the different result backends in 2.2, we will also hopefully have built-in support for “taskset callbacks” using redis/memcached at some point.
But the best solution will always be to not use join() or wait for results at all (mantra: avoid synchronization).
By the way, If you need help removing the synchronization step from your tasks, you can always send an e-mail to the celery-users mailing-list!
Hi Ask,
Great comments and examples (here and below). And, we do actually use Redis in conjunction for some our tasks (mostly as a distributed lock). Our original problem was something like a parent task A that spawns task B, and B spawns C, D, and E. We found that A spun an enormous backlog of tasks B (several thousand), without any non-B task making progress. Unfortunately, looking at the performance stats, tasks B, C, D, and E each have different stress points (e.g., disk I/O, network, CPU), and the overall task group ran best when all the tasks roughly made progress at the same time.
We originally tried to use Celery queues to separate out what could run on a worker to give non-B tasks the ability to make progress, but couldn’t get this up and running satisfactorily (probably user error on my side). So, we went with a manual “batch” approach of A launches “n” B tasks, waits for them to complete, then launches another “n” B tasks. You’re absolutely right — we have the potential for deadlock if parent tasks with this “wait and iterate” approach take up all the active queues. For us, we enforce one parent task running at any given time and have it set up so there are always more workers than parent tasks that would wait in a loop. So, we’re safe, but probably not the best design.
The retryable slow-polling task example you provided is interesting. We’ll look into it in more detail once we get back to our task wrangling. Thanks!
I appreciate your thoughtful replies!
I would like to (politely 🙂 ) suggest that your answers conflate separate issues. There’s the number of task execution slots, the way that tasks and sub-tasks message each other, and the communications’ synchronicity. Unless we’re discussing an unusual system, those can all be considered orthogonal.
If capital-T Task Z is divided into Celery (lowercase t) tasks a, b, and c, and a, b, and c are required to complete Z, then it doesn’t matter what the communication mode is. Z requires a, b, and c, and the communications synchronicity is irrelevant.
Similarly, the worker slot count is a canard. If I’m developing a system that requires a maximum of 25 simultaneous tasks and I don’t provide for at least 25 execution slots, that’s no different than a system requiring 10 simultaneous tasks without at least 10 execution slots. Either way I’ve screwed up. If a task is block-waiting or sleeping in a spin loop :-), it uses a trivial amount of system resources hanging around waiting. And if a subtask throws an exception, my system better have a way of recovering regardless of the inter-task messaging.
In our case, we want N subtasks to update a large region of a table, where each one works on region/N. They don’t feed results to each other.
For me, more important considerations are using the simplest programmatic interface, minimizing the number of lines of code, what’s simplest and easiest to maintain and extend in the future, etc.
Apparently .join haven’t been changed since v0.3.0, and it can actually be simplified now, so I improved it and added an argument for the polling interval used.
Btw, if you are really into polling the results from the database maybe you would like to contribute an optimized version that uses a single SELECT for all the ids?
Suddenly realized something else about your use case. If you really need to poll at slow intervals, you could use task retries. That way you don’t occupy a worker process:
Hi there,
I think that the last solution with retries is great! It perfectly suits my need. I’m new to python and celery, and I hope I won’t ask dumb questions but questions that are easy to answer, hopefully.
I think I can see a problem with the “forwarding” of result.join() to the callback (line 7). In line 7, the argument “args” you give to the apply_async method is a list [0, 2, 4, etc] and the add subtask will not understand what to do with all those arguments since it gets called with arguments (0, 2, 4, etc, 100) on line 15 (the final 100 is the one given in line 15). It will simply fail saying it expects 2 arguments.
There must be a sum(list) missing at some point, I guess (but where to put it?)
On the other hand, being able to work with the list [0, 2, 4, etc] (as a single argument) might be good in the callback, for “error detection”, assuming the individual subtasks have return values (e.g., 0=ok, 1=nok): e.g., sum(result.join())==0 everything is alright. But maybe I’m totally wrong and I should think more “celery” and play with the “successful” attribute of each task in the TaskSet to see which one failed?
Anyway, coming back to the sum subtask, if it received as args ([0, 2, 4, etc], 100), it could be smart and sum the list before summing it with 100, I’m sure.
Finally, how would join_native help in that situation? Would it make any sense to use join_native in the join_taskset() method?
Thanks for your help, comments and solutions!
kmon