Decorated Concurrency - Python multiprocessing made really really easy

13 May 2016   15 comments   Python

https://github.com/alex-sherman/deco

tl;dr There's a new interesting wrapper on Python multiprocessing called deco, written by Alex Sherman and Peter Den Hartog, both at University of Wisconsin - Madison. It makes Python multiprocessing really really easy.

The paper is here (PDF) and the code is here: https://github.com/alex-sherman/deco.

This library is based on something called Pydron which, if I understand it correctly, is still a piece of research with no code released. ("We currently estimate that we will be ready for the release in the first quarter of 2015.")

Apart from using simple decorators on functions, the big difference that deco takes, is that it makes it really easy to get started and that there's a hard restriction on how to gather the results of sub-process calls'. In deco, you pass in a mutable object that has a keyed index (e.g. a python dict). A python list is also mutable but it doesn't have an index. Meaning, you could get race conditions on mylist.append().

"However, DECO does impose one important restriction on the program: all mutations may only by index based."

Some basic example

Just look at this example:

# before.py

def slow(index):
    time.sleep(5)

def run():
    for index in list('123'):
        slow(index)
run()

And when run, you clearly expect it to take 15 seconds:

$ time python before.py

real    0m15.090s
user    0m0.057s
sys 0m0.022s

Ok, let's parallelize this with deco. First pip install deco, then:

# after.py

from deco import concurrent, synchronized

@concurrent
def slow(index):
    time.sleep(5)

@synchronized
def run():
    for index in list('123'):
        slow(index)

run()

And when run, it should be less than 15 seconds:

$ time python after.py

real    0m5.145s
user    0m0.082s
sys 0m0.038s

About the order of execution

Let's put some logging into that slow() function above.

def slow(index):
    time.sleep(5)
    print 'done with {}'.format(index)

Run the example a couple of times and note that the order is not predictable:

$ python after.py
done with 1
done with 3
done with 2
$ python after.py
done with 1
done with 2
done with 3
$ python after.py
done with 3
done with 2
done with 1

That probably don't come as a surprise for those familiar with async stuff, but it's worth reminding so you don't accidentally depend on order.

@synchronized or .wait()

Remember the run() function in the example above? The @synchronized decorator is magic. It basically figures out that within the function call there are calls out to sub-process work. What it does it that it "pauses" until all those have finished. An alternative approach is to call the .wait() method on the decorated concurrency function:

def run():
    for index in list('123'):
        slow(index)
    slow.wait()

That works the same way. This could potentially be useful if you, on the next line, need to depend on the results. But if that's the case you could just split up the function and slap a @synchronized decorator on the split-out function.

No Fire-and-forget

It might be tempting to not set the @synchronized decorator and not call .wait() hoping the work will be finished anyway somewhere in the background. The functions that are concurrent could be, for example, functions that generate thumbnails from a larger image or something time consuming where you don't care when it finishes, as long as it finishes.

# fireandforget.py
# THIS DOES NOT WORK
# And it's not expected to either.

@concurrent
def slow(index):
    time.sleep(5)

def run():
    for index in list('123'):
        slow(index)

run()

When you run it, you don't get an error:

$ time python fireandforget.py

real    0m0.231s
user    0m0.079s
sys 0m0.047s

But if you dig deeper, you'll find that it never actually executes those concurrent functions.

If you want to do fire-and-forget you need to have another service/process that actually keeps running and waiting for all work to be finished. That's how the likes of a message queue works.

Number of concurrent workers

multiprocessing.Pool automatically, as far as I can understand, figures out how many concurrent jobs it can run. On my Mac, where I have 8 CPUS, the number is 8.

This is easy to demonstrate. In the example above it does exactly 3 concurrent jobs, because len(list('123')) == 3. If I make it 8 items, the whole demo run takes, still, 5 seconds (plus a tiny amount of overhead). If I make it 9 items, it now takes 10 seconds.

How multiprocessing figures this out I don't know but I can't imagine it being anything but a standard lib OS call to ask the operating system how many CPUs it has.

You can actually override this with your own number. It looks like this:

from deco import concurrent

@concurrent(processes=5)
def really_slow_and_intensive_thing():
    ...

So that way, the operating system doesn't get too busy. It's like a throttle.

A more realistic example

Let's actually use the mutable for something and let's do something that isn't just a time.sleep(). Also, let's do something that is CPU bound. A lot of times where concurrency is useful is when you're network bound because running many network waiting things at the same time doesn't hose the system from being able to do other things.

Here's the code:

from PIL import Image
from deco import concurrent, synchronized


@concurrent
def slow(times, offset):
    t0 = time.time()
    path = '9745e8.jpg'
    img = Image.open(path)
    size = (100 + offset * 20, 100 + offset * 20)
    img.thumbnail(size, Image.ANTIALIAS)
    img.save('thumbnails/{}.jpg'.format(offset), 'JPEG')
    t1 = time.time()
    times[offset] = t1 - t0

@synchronized
def run(times):
    for index in range(100):
        slow(times, index)

t0 = time.time()
times = {}
run(times)
t1 = time.time()
print "TOOK", t1-t0
print "WOULD HAVE TAKEN", sum(times.values())

It generates 100 different thumbnails from a very large original JPG. Running this on my macbook pro takes 8.4 seconds but the individual times was a total of 65.1 seconds. The numbers makes sense, because 65 seconds / 8 cores ~= 8 seconds.

But, where it gets really interesting is that if you remove the deco decorators and run 100 thumbnail creations in serial, on my laptop, it takes 28.9 seconds. Now, 28.9 seconds is much more than 8.4 seconds so it's still a win to multiprocessing for this kind of CPU bound work. However, stampeding herd of doing 8 CPU intensive tasks at the same time can put some serious strains on your system. Also, it could cause high spikes in terms of memory allocation that wouldn't have happened if freed space can be re-used in the serial pattern.

Here's by the way the difference in what this looks like in the Activity Monitor:

Fully concurrent PIL work

Running PIL in all CPUs

Same work but in serial

In serial

One more "realistic" pattern

Let's do this again with a network bound task. Let's download 100 webpages from my blog. We'll do this by keeping an index where the URL is the key and the value is the time it took to download that one individual URL. This time, let's start with the serial pattern:

(Note! I ran these two experiments a couple of times so that the server-side cache would get a chance to clear out outliers)

import time, requests

urls = """
https://www.peterbe.com/plog/blogitem-040212-1
https://www.peterbe.com/plog/geopy-distance-calculation-pitfall
https://www.peterbe.com/plog/app-for-figuring-out-the-best-car-for-you
https://www.peterbe.com/plog/Mvbackupfiles
...a bunch more...
https://www.peterbe.com/plog/swedish-holidays-explaine
https://www.peterbe.com/plog/wing-ide-versus-jed
https://www.peterbe.com/plog/worst-flash-site-of-the-year-2010
""".strip().splitlines()
assert len(urls) == 100

def download(url, data):
    t0 = time.time()
    assert requests.get(url).status_code == 200
    t1 = time.time()
    data[url] = t1-t0

def run(data):
    for url in urls:
        download(url, data)

somemute = {}
t0 = time.time()
run(somemute)
t1 = time.time()
print "TOOK", t1-t0
print "WOULD HAVE TAKEN", sum(somemute.values()), "seconds"

When run, the output is:

TOOK 35.3457410336
WOULD HAVE TAKEN 35.3454759121 seconds

Now, let's add the deco decorators, so basically these changes:

from deco import concurrent, synchronized

@concurrent
def download(url, data):
    t0 = time.time()
    assert requests.get(url).status_code == 200
    t1 = time.time()
    data[url] = t1-t0

@synchronized
def run(data):
    for url in urls:
        download(url, data)

And the output this time:

TOOK 5.13103795052
WOULD HAVE TAKEN 39.7795288563 seconds

So, instead of it having to take 39.8 seconds it only needed to take 5 seconds with extremely little modification. I call that a win!

What's next

Easy; actually build something that uses this.

Comments

Shantanu Oak
Initially I wondered why does the title say "really really easy". But when I tried - I found that this is "really really really extremely easy". Thanks for this.
Larry Bugbee
I've been waiting for something like this for years! ...and this is much better than I was hoping for.
Larry Bugbee
I've been waiting for something like this for years! ...and this is much better than I was hoping for. Thank you Alex.
V P
On Windows, one should add to imports:

from multiprocessing import freeze_support

and before calling "run", add:

if __name__ == "__main__":
    freeze_support()
    run()
Ted Mielczarek
This looks interesting but I've really appreciated the flexibility of `concurrent.futures` for doing concurrent work in Python. Specifically, I find the `as_completed ` method makes it easy to write code that handles lots of work in parallel without having to handle the results in-order or block waiting for everything to finish: http://pythonhosted.org/futures/#concurrent.futures.as_completed .
Peter Bengtsson
Uh?! Can you do fire-and-forget with concurrent.futures?!
Ted Mielczarek
Sure, you just call `submit` on the `Executor` you're using:
http://pythonhosted.org/futures/#concurrent.futures.Executor.submit

...and then you can either call `wait` on it to wait for everything to finish, or use it in a with statement, which does that for you.
Eric Carmichael
Wowza! Hot stuff, thanks for sharing :)
Martin
Interesting concept.

The github https://github.com/alex-sherman/deco and your article do not mention which versions on Python are required.
Do you have info on that?
Peter Bengtsson
Pretty sure it's 2 and 3. At least 3.5.
AGMMGA
----
How multiprocessing figures this out I don't know but I can't imagine it being anything but a standard lib OS call to ask the operating system how many CPUs it has.
----

There exists a cpu_count() function. Useful if you want to limit yourself to say, 25% of available cores. I guess it simply calls nprocs on unix.

>>>from multiprocessing import cpu_count
>>>cpu_count() #same as nproc
32

From the docs: (https://docs.python.org/2/library/multiprocessing.html)
multiprocessing.cpu_count()
    Return the number of CPUs in the system. May raise NotImplementedError.

so I guess a try..except is in order if you want to use this yourself. Not sure when the NotImplemented triggers, maybe it's a OS-dependent thing.
Henrique Bastos
Clap, Clap, Clap!
Tom Varga
I'm so looking forward to using this module. However, I have to use python3 and I'm pretty sure deco is not ready for python3 yet:

Traceback (most recent call last):
  File "./decotest", line 17, in <module>
    run()
  File "/projects/build/build/virtualenv/prod/lib/python3.4/site-packages/deco/conc.py", line 54, in __call__
    return self.f(*args, **kwargs)
  File "<string>", line 4, in run
  File "/projects/build/build/virtualenv/prod/lib/python3.4/site-packages/deco/conc.py", line 108, in __call__
    self.replaceWithProxies(kwargs)
  File "/projects/build/build/virtualenv/prod/lib/python3.4/site-packages/deco/conc.py", line 86, in replaceWithProxies
    args_iter = args.iteritems() if type(args) is dict else enumerate(args)
AttributeError: 'dict' object has no attribute 'iteritems'

It should be easy enough to not use iteritems.
Is there a plan to make deco compatible with both python2 and python3?
Tom Varga
In fact, changing the one use of iteritems to items seems to be all it takes to make deco run in python3

Your email will never ever be published


Related posts

Previous:
gg - A prototype to rule Git, GitHub and Bugzilla 06 May 2016
Next:
Time to do concurrent CPU bound work 13 May 2016
Related by Keyword:
Concurrent Gzip in Python 13 October 2017
Why didn't I know about machma?! 07 June 2017
Time to do concurrent CPU bound work 13 May 2016
Related by Text:
jQuery and Highslide JS 08 January 2008
I'm back! Peterbe.com has been renewed 05 June 2005
Anti-McCain propaganda videos 12 August 2008
Ever wondered how much $87 Billion is? 04 November 2003
Guake, not Yakuake or Yeahconsole 23 January 2010