Celery uses spin-loops. Gah!


Here’s another cautionary performance tale.

If you use Celery subtasks to manage parallel work, know going in that it uses spin-loops to monitor subtask progress. Specifically, if you get a TaskSetResult from a TaskSet and then use iterate() or join(), the underlying code will eat your CPU alive. Here’s the code in celery.result.TaskSetResult:

class TaskSetResult(object):
    # [snip]
    def iterate(self):
        """Iterate over the return values of the tasks as they finish
        one by one.

        :raises: The exception if any of the tasks raised an exception.

        """
        pending = list(self.subtasks)
        results = dict((subtask.task_id, copy(subtask))
                            for subtask in self.subtasks)
        while pending:
            for task_id in pending:
                result = results[task_id]
                if result.status == states.SUCCESS:
                    try:
                        pending.remove(task_id)
                    except ValueError:
                        pass
                    yield result.result
                elif result.status in states.PROPAGATE_STATES:
                    raise result.result

    def join(self, timeout=None, propagate=True):
        """Gather the results of all tasks in the taskset,
        and returns a list ordered by the order of the set.

        :keyword timeout: The number of seconds to wait for results before
                          the operation times out.

        :keyword propagate: If any of the subtasks raises an exception, the
                            exception will be reraised.

        :raises celery.exceptions.TimeoutError: if `timeout` is not
            :const:`None` and the operation takes longer than `timeout`
            seconds.

        """

        time_start = time.time()
        results = PositionQueue(length=self.total)

        while True:
            for position, pending_result in enumerate(self.subtasks):
                state = pending_result.state
                if state in states.READY_STATES:
                    if propagate and state in states.PROPAGATE_STATES:
                        raise pending_result.result
                    results[position] = pending_result.result
            if results.full():
                # Make list copy, so the returned type is not a position
                # queue.
                return list(results)
            else:
                if (timeout is not None and
                        time.time() >= time_start + timeout):
                    raise TimeoutError("join operation timed out.")

The symptom

After helping us fix a couple of humongous performance bugs, the PostgreSQL Experts dudes analyzed a new system instance to isolate the next largest problem. We soon heard from them.

Celery is still polling the DB about 600 times/sec. In fact, out of the 683 queries in a second, 597 of them are:

SELECT “”celery_taskmeta””.””id””, “”celery_taskmeta””.””task_id””, “”celery_taskmeta””.””status””, “”celery_taskmeta””.””result””, “”celery_taskmeta””.””date_done””, “”celery_taskmeta””.””traceback””
FROM “”celery_taskmeta”” WHERE “”celery_taskmeta””.””task_id”” = E’b5517ae9-59ef-46e7-808b-1da17dab9721′

In fact, it’s only these 10 queries:

SELECT “”celery_taskmeta””.””id””, “”celery_taskmeta””.””task_id””, “”celery_taskmeta””.””status””, “”celery_taskmeta””.””result””, “”celery_taskmeta””.””date_done””, “”celery_taskmeta””.””traceback””
FROM “”celery_taskmeta”” WHERE “”celery_taskmeta””.””task_id”” = E’1f6a65f7-48da-44b8-8b4b-70fd0ee8d800′
SELECT “”celery_taskmeta””.””id””, “”celery_taskmeta””.””task_id””, “”celery_taskmeta””.””status””, “”celery_taskmeta””.””result””, “”celery_taskmeta””.””date_done””, “”celery_taskmeta””.””traceback””
FROM “”celery_taskmeta”” WHERE “”celery_taskmeta””.””task_id”” = E’3b6f1504-326a-4133-a08d-daf67170a1d1′
[…snip…]

I examined their sandbox, saw the 10 subtasks for an indexing operation, knew that a master task was waiting for them to complete, and thought: Oh, no. It couldn’t be.

Sure enough, the .iterator() code was spin-waiting for the subtasks to terminate. The spin-loop generated most of the Postgres operations during the time interval in question.

What we expected

The Celery documentation doesn’t warn you that iterating on, or waiting for, subtask results will spin-loop. I like Celery and have sung its praises, but I was surprised. I expected this about as much as I expected it to call a Fortran subroutine.

We assumed that waiting on a subtask result would, you know, wait. As in, blocking-wait. Performance and Strategies advises that “it is better to split the problem up into many small tasks, than have a few long running tasks.” But this is referring to general granularity benefits, and not to a spin-loop you should avoid.

The fix

This problem took some work to uncover: Find and fix larger performance problems, reanalyze the system, find the next largest problem, dig into the code, drink a shot of single-malt Scotch Whiskey. But once identified, the fix was easy.

Ryan coded a version of .iterator() with a time.sleep(5.0) within the loop. The subtasks run for multiple minutes, so this won’t materially affect the next subtask set’s queuing. The full results aren’t in yet, but the system behavior as of when I left the office today was very encouraging. We’ll try increasing the sleep interval to 60 seconds if nothing odd happens over the weekend.

7 comments
  1. You’re right. .join() is a very naive implementation, but the reason it hasn’t been the focus of optimization is because it is not often used.

    You shouldn’t wait for the results of subtasks, as that can lead to a deadlock; Imagine you have 10 worker processes, and 10 tasks depending on other tasks, then there won’t be any workers to fulfill the dependencies. You should be doing that asynchronously, by using callbacks: The last task in the set applies the callback. Sadly this isn’t built-in yet. what people have been doing is implementing this using atomic increments in memcached/Redis (database may also be possible, but probably tricky). I will make this clear in the documentation.

    I’m working on a way to prevent deadlocks for such tasks, and the .join operation is also being optimized for the different result backends in 2.2, we will also hopefully have built-in support for “taskset callbacks” using redis/memcached at some point.

    But the best solution will always be to not use join() or wait for results at all (mantra: avoid synchronization).

    By the way, If you need help removing the synchronization step from your tasks, you can always send an e-mail to the celery-users mailing-list!

    • Hi Ask,

      Great comments and examples (here and below). And, we do actually use Redis in conjunction for some our tasks (mostly as a distributed lock). Our original problem was something like a parent task A that spawns task B, and B spawns C, D, and E. We found that A spun an enormous backlog of tasks B (several thousand), without any non-B task making progress. Unfortunately, looking at the performance stats, tasks B, C, D, and E each have different stress points (e.g., disk I/O, network, CPU), and the overall task group ran best when all the tasks roughly made progress at the same time.

      We originally tried to use Celery queues to separate out what could run on a worker to give non-B tasks the ability to make progress, but couldn’t get this up and running satisfactorily (probably user error on my side). So, we went with a manual “batch” approach of A launches “n” B tasks, waits for them to complete, then launches another “n” B tasks. You’re absolutely right — we have the potential for deadlock if parent tasks with this “wait and iterate” approach take up all the active queues. For us, we enforce one parent task running at any given time and have it set up so there are always more workers than parent tasks that would wait in a loop. So, we’re safe, but probably not the best design.

      The retryable slow-polling task example you provided is interesting. We’ll look into it in more detail once we get back to our task wrangling. Thanks!

    • John said:

      I appreciate your thoughtful replies!

      I would like to (politely 🙂 ) suggest that your answers conflate separate issues. There’s the number of task execution slots, the way that tasks and sub-tasks message each other, and the communications’ synchronicity. Unless we’re discussing an unusual system, those can all be considered orthogonal.

      If capital-T Task Z is divided into Celery (lowercase t) tasks a, b, and c, and a, b, and c are required to complete Z, then it doesn’t matter what the communication mode is. Z requires a, b, and c, and the communications synchronicity is irrelevant.

      Similarly, the worker slot count is a canard. If I’m developing a system that requires a maximum of 25 simultaneous tasks and I don’t provide for at least 25 execution slots, that’s no different than a system requiring 10 simultaneous tasks without at least 10 execution slots. Either way I’ve screwed up. If a task is block-waiting or sleeping in a spin loop :-), it uses a trivial amount of system resources hanging around waiting. And if a subtask throws an exception, my system better have a way of recovering regardless of the inter-task messaging.

      In our case, we want N subtasks to update a large region of a table, where each one works on region/N. They don’t feed results to each other.

      For me, more important considerations are using the simplest programmatic interface, minimizing the number of lines of code, what’s simplest and easiest to maintain and extend in the future, etc.

  2. Apparently .join haven’t been changed since v0.3.0, and it can actually be simplified now, so I improved it and added an argument for the polling interval used.

    Btw, if you are really into polling the results from the database maybe you would like to contribute an optimized version that uses a single SELECT for all the ids?

  3. Suddenly realized something else about your use case. If you really need to poll at slow intervals, you could use task retries. That way you don’t occupy a worker process:

    from celery.decorators import task
    from celery.task.sets import TaskSet, subtask
    
    @task
    def join_taskset(result, callback=None, interval=60):
        if result.ready():
            return subtask(callback).apply_async(result.join())
        join_taskset.retry((result, callback), countdown=interval)
    
    
    @task
    def foo():
        result = TaskSet(add.subtask((i, i)) for i in xrange(100)).apply_async()
    
        # Becomes result_of_taskset + 100  
        return join_taskset.apply_async(result, add.subtask((100, )))
    
    @task
    def add(x, y):
        return x + y
    
  4. kmon said:

    Hi there,

    I think that the last solution with retries is great! It perfectly suits my need. I’m new to python and celery, and I hope I won’t ask dumb questions but questions that are easy to answer, hopefully.

    I think I can see a problem with the “forwarding” of result.join() to the callback (line 7). In line 7, the argument “args” you give to the apply_async method is a list [0, 2, 4, etc] and the add subtask will not understand what to do with all those arguments since it gets called with arguments (0, 2, 4, etc, 100) on line 15 (the final 100 is the one given in line 15). It will simply fail saying it expects 2 arguments.

    There must be a sum(list) missing at some point, I guess (but where to put it?)

    On the other hand, being able to work with the list [0, 2, 4, etc] (as a single argument) might be good in the callback, for “error detection”, assuming the individual subtasks have return values (e.g., 0=ok, 1=nok): e.g., sum(result.join())==0 everything is alright. But maybe I’m totally wrong and I should think more “celery” and play with the “successful” attribute of each task in the TaskSet to see which one failed?

    Anyway, coming back to the sum subtask, if it received as args ([0, 2, 4, etc], 100), it could be smart and sum the list before summing it with 100, I’m sure.

    Finally, how would join_native help in that situation? Would it make any sense to use join_native in the join_taskset() method?

    Thanks for your help, comments and solutions!

    kmon

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: