Peterbe.com

A blog and website by Peter Bengtsson

Filtered home page!
Currently only showing blog entries under the category: Python. Clear filter

hashin 0.14.0 with --update-all and a bunch of other features

13 November 2018 0 comments   Linux, Python


If you don't know it is, hashin is a Python command line tool for updating your requirements file's packages and their hashes for use with pip install . It takes the pain out of figuring out what hashes each package on PyPI has. It also takes the pain out of figuring out what version you can upgrade to.

In the 0.14.0 release (changelog) there are a bunch of new features. The most exciting one is --update-all. Let's go through some of the new features:

Update all (--update-all)

Suppose you want to bravely upgrade all the pinned packages to the latest and greatest. Before version 0.14.0 you'd have to manually open the requirements file and list every single package on the command line:

$ less requirements.txt
$ hashin Django requests Flask cryptography black nltk numpy

With --update-all it's the same thing except it does that reading and copy-n-paste for you:

$ hashin --update-all

Particularly nifty is to combine this with --dry-run if you get nervous about that many changes.

Interactive update all (--interactive)

This new flag only makes sense when used together with --update-all. Used together, it basically reads all packages in the requirements file, and for each one that there is a new version it asks you if you want to update it or skip it:

It looks like this:

$ hashin --update-all --interactive
PACKAGE                        YOUR VERSION    NEW VERSION
Django                         2.1.2           2.1.3           ✓
requests                       2.20.0          2.20.1          ✘
numpy                          1.15.2          1.15.4          ?
Update? [Y/n/a/q/?]:

You can also use the aliases hashin -u -i to do the same thing.

Support for "extras"

If you want to have requests[security] or markus[datadog] in your requirements file, hashin used to not support that. This now works:

$ hashin "requests[security]"

Before, it would look for a package called verbatim requests[security] on PyPI which obviously doesn't exist. Now, it parses that syntax, makes a lookup for requests and when it's done it puts the extra syntax back into the requirements file.

Thanks Dustin Ingram for pushing for this one!

Atomic writes

Prior to this version, if you typed hashin requests flask numpy nltkay it would go ahead and do one of those packages at a time and effectively open and edit the requirements file as many times as there are packages mentioned. The crux of that is that if you, for example, have a typo (e.g. nltkay instead of nltk) it would crash there and not roll back any of the other writes. It's not a huge harm but it certainly is counter intuitive.

Another place where this matters is with --dry-run. If you specified something like hashin --dry-run requests flask numpy you would get one diff per package and thus repeat the diff header 3 (excessive) times.

The other reason why atomic writes is important is if you use hashin --update-all --interactive and it asks you if you want to update package1, package2, package3, and then you decide "Nah. I don't want any of this. I quit!" it would just do that without updating the requirements file.

Better not-found errors

This was never a problem if you used Python 2.7 but for Python 3.x, if you typoed a package name you'd get a Python exception about the HTTP call and it wasn't obvious that the mistake lies with your input and not the network. Basically, it traps any HTTP errors and if it's 404 it's handled gracefully.

(Internal) Black everything and pytest everything

All source code is now formatted with Black which, albeit imperfect, kills any boring manual review of code style nits. And, it uses therapist to wrap the black checks and fixes.

And all unit tests are now written for pytest. pytest was already the tool used in TravisCI but now all of those self.assertEqual(foo, bar)s have been replaced with assert foo == bar.

How to JSON schema validate 10x (or 100x) faster in Python

04 November 2018 2 comments   Python


This is perhaps insanely obvious but it was a measurement I had to do and it might help you too if you use python-jsonschema a lot too.

I have this project which has a migration script that needs to transfer about 1M records from one PostgreSQL database, transform it a bit, validate it, and store it in another PostgreSQL database. The validation step was done like this:

from jsonschema import validate

...

with open(os.path.join(settings.BASE_DIR, "schema.yaml")) as f:
    SCHEMA = yaml.load(f)["schema"]

...


class Build(models.Model):

    ...

    @classmethod
    def validate_build(cls, build):
        validate(build, SCHEMA)

That works fine when you have a slow trickle of these coming in with many seconds or minutes apart. But when you have to do about 1M of them, the speed overhead starts to really matter. Granted, in this context, it's just a migration which is hopefully only done once but it helps that it doesn't take too long since it makes it easier to not have any downtime.

What about python-fastjsonschema?

The name python-fastjsonschema just sounds very appealing but I'm just not sure how mature it is or what the subtle differences are between that and the more established python-jsonschema which I was already using.

It has two ways of using it either...

fastjsonschema.validate(schema, data
)

...or...

validator = fastjsonschema.compile(schema)
validator(data)

That got me thinking, why don't I just do that with regular python-jsonschema!
All you need to do is crack open the validate function and you can now re-used one instance for multiple pieces of data:

from jsonschema.validators import validator_for


klass = validator_for(schema)
klass.check_schema(schema)  # optional
instance = klass(SCHEMA)
instance.validate(data)

I rewrote my projects code to this:

from jsonschema import validate

...

with open(os.path.join(settings.BASE_DIR, "schema.yaml")) as f:
    SCHEMA = yaml.load(f)["schema"]
_validator_class = validator_for(SCHEMA)
_validator_class.check_schema(SCHEMA)
validator = _validator_class(SCHEMA)

...


class Build(models.Model):

    ...

    @classmethod
    def validate_build(cls, build):
        validator.validate(build)

How do they compare, performance-wise?

Let this simple benchmark code speak for itself:

from buildhub.main.models import Build, SCHEMA

import fastjsonschema
from jsonschema import validate, ValidationError
from jsonschema.validators import validator_for


def f1(qs):
    for build in qs:
        validate(build.build, SCHEMA)


def f2(qs):
    validator = validator_for(SCHEMA)
    for build in qs:
        validate(build.build, SCHEMA, cls=validator)


def f3(qs):
    cls = validator_for(SCHEMA)
    cls.check_schema(SCHEMA)
    instance = cls(SCHEMA)
    for build in qs:
        instance.validate(build.build)


def f4(qs):
    for build in qs:
        fastjsonschema.validate(SCHEMA, build.build)


def f5(qs):
    validator = fastjsonschema.compile(SCHEMA)
    for build in qs:
        validator(build.build)


# Reporting
import time
import statistics
import random

functions = f1, f2, f3, f4, f5
times = {f.__name__: [] for f in functions}


for _ in range(3):
    qs = list(Build.objects.all().order_by("?")[:1000])
    for func in functions:
        t0 = time.time()
        func(qs)
        t1 = time.time()
        times[func.__name__].append((t1 - t0) * 1000)


def f(ms):
    return f"{ms:.1f}ms"


for name, numbers in times.items():
    print("FUNCTION:", name, "Used", len(numbers), "times")
    print("\tBEST  ", f(min(numbers)))
    print("\tMEDIAN", f(statistics.median(numbers)))
    print("\tMEAN  ", f(statistics.mean(numbers)))
    print("\tSTDEV ", f(statistics.stdev(numbers)))

Basically, 3 times for each of the alternative implementations, do a validation on a 1,000 JSON blobs (technically Python dicts) that is around 1KB, each, in size.

The results:

FUNCTION: f1 Used 3 times
    BEST   1247.9ms
    MEDIAN 1309.0ms
    MEAN   1330.0ms
    STDEV  94.5ms
FUNCTION: f2 Used 3 times
    BEST   1266.3ms
    MEDIAN 1267.5ms
    MEAN   1301.1ms
    STDEV  59.2ms
FUNCTION: f3 Used 3 times
    BEST   125.5ms
    MEDIAN 131.1ms
    MEAN   133.9ms
    STDEV  10.1ms
FUNCTION: f4 Used 3 times
    BEST   2032.3ms
    MEDIAN 2033.4ms
    MEAN   2143.9ms
    STDEV  192.3ms
FUNCTION: f5 Used 3 times
    BEST   16.7ms
    MEDIAN 17.1ms
    MEAN   21.0ms
    STDEV  7.1ms

Basically, if you use python-jsonschema and create a reusable instance it's 10 times faster than the "default way". And if you do the same but with python-fastjsonscham it's 100 times faster.

By the way, in version f5 it validated 1,000 1KB records in 16.7ms. That's insanely fast!

Switching from AWS S3 (boto3) to Google Cloud Storage (google-cloud-storage) in Python

12 October 2018 0 comments   Python


I'm in the midst of rewriting a big app that currently uses AWS S3 and will soon be switched over to Google Cloud Storage. This blog post is a rough attempt to log various activities in both Python libraries:

Disclaimer: I'm manually copying these snippets from a real project and I have to manually scrub the code clean of unimportant quirks, hacks, and other unrelated things that would just add noise.

Install

boto3

$ pip install boto3
$ emacs ~/.aws/credentials

google-cloud-storage

$ pip install google-cloud-storage
$ cat ./google_service_account.json

Note: You need to create a service account and then that gives you a .json file which you download and make sure you pass its path when you create a client.

I suspect there are more/other ways to do this with environment variables alone but I haven't got there yet.

Making a "client"

boto3

Note, there are easier shortcuts for this but with this pattern you can have full control over things like like read_timeout, connect_timeout, etc. with that confi_params keyword.

import boto3
from botocore.config import Config


def get_s3_client(region_name=None, **config_params):
    options = {"config": Config(**config_params)}
    if region_name:
        options["region_name"] = region_name
    session = boto3.session.Session()
    return session.client("s3", **options)

google-cloud-storage

from google.cloud import storage


def get_gcs_client():
    return storage.Client.from_service_account_json(
        settings.GOOGLE_APPLICATION_CREDENTIALS_PATH
    )

Checking if a bucket exists and if you have access to it

boto3 (for s3_client here, see above)

from botocore.exceptions import ClientError, EndpointConnectionError


try:

    s3_client.head_bucket(Bucket=bucket_name)
except ClientError as exception:
    if exception.response["Error"]["Code"] in ("403", "404"):
        raise BucketHardError(
            f"Unable to connect to bucket={bucket_name!r} "
            f"ClientError ({exception.response!r})"
        )
    else:
        raise
except EndpointConnectionError:
    raise BucketSoftError(
        f"Unable to connect to bucket={bucket.name!r} "
        f"EndpointConnectionError"
    )
else:
    print("It exists and we have access to it.")

google-cloud-storage

from google.api_core.exceptions import BadRequest


try:
    gcs_client.get_bucket(bucket_name)
except BadRequest as exception:
    raise BucketHardError(
        f"Unable to connect to bucket={bucket_name!r}, "
        f"because bucket not found due to {exception}"
    )
else:
    print("It exists and we have access to it.")

Checking if an object exists

boto3

from botocore.exceptions import ClientError


def key_existing(client, bucket_name, key):
    """return a tuple of (
        key's size if it exists or 0,
        S3 key metadata
    )
    If the object doesn't exist, return None for the metadata.
    """
    try:
        response = client.head_object(Bucket=bucket_name, Key=key)
        return response["ContentLength"], response.get("Metadata")
    except ClientError as exception:
        if exception.response["Error"]["Code"] == "404":
            return 0, None
        raise

Note, if you do this a lot and often find that the object doesn't exist the using list_objects_v2 is probably faster.

google-cloud-storage

def key_existing(client, bucket_name, key):
    """return a tuple of (
        key's size if it exists or 0,
        S3 key metadata
    )
    If the object doesn't exist, return None for the metadata.
    """
    bucket = client.get_bucket(bucket_name)
    blob = bucket.get_blob(key)
    if blob:
        return blob.size, blob.metadata
    return 0, None

Uploading a file with a special Content-Encoding

Note: You have to use your imagination with regards to the source. In this example, I'm assuming that the source is a file on disk and that it might have already been compressed with gzip.

boto3

def upload(file_path, bucket_name, key_name, metadata=None, compressed=False):
    content_type = get_key_content_type(key_name)

    metadata = metadata or {}

    # boto3 will raise a botocore.exceptions.ParamValidationError
    # error if you try to do something like:
    #
    #  s3.put_object(Bucket=..., Key=..., Body=..., ContentEncoding=None)
    #
    # ...because apparently 'NoneType' is not a valid type.
    # We /could/ set it to something like '' but that feels like an
    # actual value/opinion. Better just avoid if it's not something
    # really real.
    extras = {}
    if content_type:
        extras["ContentType"] = content_type
    if compressed:
        extras["ContentEncoding"] = "gzip"
    if metadata:
        extras["Metadata"] = metadata

     with open(file_path, "rb") as f:
         s3_client.put_object(Bucket=bucket_name, Key=key_name, Body=f, **extras)

google-cloud-storage

def upload(file_path, bucket_name, key_name, metadata=None, compressed=False):
    content_type = get_key_content_type(key_name)

    metadata = metadata or {}
    bucket = gcs_client.get_bucket(bucket_name)
    blob = bucket.blob(key_name)

    if content_type:
        blob.content_type = content_type
    if compressed:
        blob.content_encoding = "gzip"
    blob.metadata = metadata
    blob.upload_from_file(f)

Downloading and uncompressing a gzipped object

boto3

from io import BytesIO
from gzip import GzipFile
from botocore.exceptions import ClientError

from .utils import iter_lines


def get_stream(bucket_name, key_name):
    try:
        response = source.s3_client.get_object(
            Bucket=bucket_name, Key=key
        )
    except ClientError as exception:
        if exception.response["Error"]["Code"] == "NoSuchKey":
            raise KeyHardError("key not in bucket")
        raise

    stream = response["Body"]
    # But if the content encoding is gzip we have re-wrap the stream.
    if response.get("ContentEncoding") == "gzip":
        body = response["Body"].read()
        bytestream = BytesIO(body)
        stream = GzipFile(None, "rb", fileobj=bytestream)

    for line in iter_lines(stream):
        yield line.decode("utf-8")

google-cloud-storage

from io import BytesIO
from gzip import GzipFile
from botocore.exceptions import ClientError

from .utils import iter_lines


def get_stream(bucket_name, key_name):
    bucket = gcs_client.get_bucket(bucket_name)
    blob = bucket.get_blob(key)
    if blob is None:
        raise KeyHardError("key not in bucket")

    bytestream = BytesIO()
    blob.download_to_file(bytestream)
    bytestream.seek(0)

    for line in iter_lines(bytestream):
        yield line.decode("utf-8")

Note That here blob.download_to_file works a bit like requests.get() in that it automatically notices the Content-Encoding metadata and does the gunzip on the fly.

Conclusion

It's not fair to compare them on style because I think boto3 came out of boto which probably started back in the day when Google was just web search and web emails.

I wanted to include a section about how to unit test against these. Especially how to mock them. But what I had for a draft was getting ugly. Yes, it works for the testing needs I have in my app but it's very personal taste (aka. appropriate for the context) and admittedly quite messy.

Fancy linkifying of text with Bleach and domain checks (with Python)

10 October 2018 2 comments   Web development, Python


Bleach is awesome. Thank you for it @willkg! It's a Python library for sanitizing text as well as "linkifying" text for HTML use. For example, consider this:

>>> import bleach
>>> bleach.linkify("Here is some text with a url.com.")
'Here is some text with a <a href="http://url.com" rel="nofollow">url.com</a>.'

Note that sanitizing is separate thing, but if you're curious, consider this example:

>>> bleach.linkify(bleach.clean("Here is <script> some text with a url.com."))
'Here is &lt;script&gt; some text with a <a href="http://url.com" rel="nofollow">url.com</a>.'

With that output you can confidently template interpolate that string straight into your HTML.

Getting fancy

That's a great start but I wanted a more. For one, I don't always want the rel="nofollow" attribute on all links. In particular for links that are within the site. Secondly, a lot of things look like a domain but isn't. For example This is a text.at the start which would naively become...:

>>> bleach.linkify("This is a text.at the start")
'This is a <a href="http://text.at" rel="nofollow">text.at</a> the start'

...because text.at looks like a domain.

So here is how I use it here on www.peterbe.com to linkify blog comments:

def custom_nofollow_maker(attrs, new=False):
    href_key = (None, u"href")

    if href_key not in attrs:
        return attrs

    if attrs[href_key].startswith(u"mailto:"):
        return attrs

    p = urlparse(attrs[href_key])
    if p.netloc not in settings.NOFOLLOW_EXCEPTIONS:
        # Before we add the `rel="nofollow"` let's first check that this is a
        # valid domain at all.
        root_url = p.scheme + "://" + p.netloc
        try:
            response = requests.head(root_url)
            if response.status_code == 301:
                redirect_p = urlparse(response.headers["location"])
                # If the only difference is that it redirects to https instead
                # of http, then amend the href.
                if (
                    redirect_p.scheme == "https"
                    and p.scheme == "http"
                    and p.netloc == redirect_p.netloc
                ):
                    attrs[href_key] = attrs[href_key].replace("http://", "https://")

        except ConnectionError:
            return None

        rel_key = (None, u"rel")
        rel_values = [val for val in attrs.get(rel_key, "").split(" ") if val]
        if "nofollow" not in [rel_val.lower() for rel_val in rel_values]:
            rel_values.append("nofollow")
        attrs[rel_key] = " ".join(rel_values)

    return attrs

html = bleach.linkify(text, callbacks=[custom_nofollow_maker])

This basically taking the default nofollow callback and extending it a bit.

By the way, here is the complete code I use for sanitizing and linkifying blog comments here on this site: render_comment_text.

Caveats

This is slow because it requires network IO every time a piece of text needs to be linkified (if it has domain looking things in it) but that's best alleviated by only doing it once and either caching it or persistently storing the cleaned and rendered output.

Also, the check uses try: requests.head() except requests.exceptions.ConnectionError: as the method to see if the domain works. I considered doing a whois lookup or something but that felt a little wrong because just because a domain exists doesn't mean there's a website there. Either way, it could be that the domain/URL is perfectly fine but in that very unlucky instant you checked your own server's internet or some other DNS lookup thing is busted. Perhaps wrapping it in a retry and doing try: requests.head() except requests.exceptions.RetryError: instead.

Lastly, the business logic I chose was to rewrite all http:// to https:// only if the URL http://domain does a 301 redirect to https://domain. So if the original link was http://bit.ly/redirect-slug it leaves it as is. Perhaps a fancier version would be to look at the domain name ending. For example HEAD http://google.com 301 redirects to https://www.google.com so you could use the fact that "www.google.com".endswith("google.com").

UPDATE Oct 10 2018

Moments after publishing this, I discovered a bug where it would fail badly if the text contained a URL with an ampersand in it. Turns out, it was a known bug in Bleach. It only happens when you try to pass a filter to the bleach.Cleaner() class.

So I simplified my code and now things work. Apparently, using bleach.Cleaner(filters=[...]) is faster so I'm losing that. But, for now, that's OK in my context.

Also, in another later fix, I improved the function some more by avoiding non-HTTP links (with the exception of mailto: and tel:). Otherwise it would attempt to run requests.head('ssh://server.example.com') which doesn't make sense.

The ideal number of workers in Jest

08 October 2018 0 comments   ReactJS, Python


tl;dr; Use --runInBand when running jest in CI and use --maxWorkers=3 on your laptop.

Running out of memory on CircleCI

We have a test suite that covers 236 tests across 68 suites and runs mainly a bunch of enzyme rendering of React component but also some plain old JavaScript function tests. We hit a problem where tests utterly failed in CircleCI due to running out of memory. Several individual tests, before it gave up or failed, reported to take up to 45 seconds.
Turns out, jest tried to use 36 workers because the Docker OS it was running was reporting 36 CPUs.

> circleci@9e4c489cf76b:~/repo$ node
> var os = require('os')
undefined
> os.cpus().length
36

After forcibly setting --maxWorkers=2 to the jest command, the tests passed and it took 20 seconds. Yay!

But that got me thinking, what is the ideal number of workers when I'm running the suite here on my laptop? To find out, I wrote a Python script that would wrap the call CI=true yarn run test --maxWorkers=%(WORKERS) repeatedly and report which number is ideal for my laptop.

After leaving it running for a while it spits out this result:

SORTED BY BEST TIME:
3 8.47s
4 8.59s
6 9.12s
5 9.18s
2 9.51s
7 10.14s
8 10.59s
1 13.80s

The conclusion is vague. There is some benefit to using some small number greater than 1. If you attempt a bigger number it might backfire and take longer than necessary and if you do do that your laptop is likely to crawl and cough.

Notes and conclusions

django-pipeline and Zopfli

15 August 2018 0 comments   Django, Web development, Python


tl;dr; I wrote my own extension to django-pipeline that uses Zopfli to create .gz files from static assets collected in Django. Here's the code.

Nginx and Gzip

What I wanted was to continue to use django-pipeline which does a great job of reading a settings.BUNDLES setting and generating things like /static/js/myapp.min.a206ec6bd8c7.js. It has configurable options to not just make those files but also generate /static/js/myapp.min.a206ec6bd8c7.js.gz which means that with gzip_static in Nginx, Nginx doesn't have to Gzip compress static files on-the-fly but can basically just read it from disk. Nginx doesn't care how the file got there but an immediate advantage of preparing the file on disk is that the compression can be higher (smaller .gz files). That means smaller responses to be sent to the client and less CPU work needed from Nginx. Your job is to set gzip_static on; in your Nginx config (per location) and make sure every compressable file exists on disk with the same name but with the .gz suffix.

In other words, when the client does GET https://example.com/static/foo.js Nginx quickly does a read on the file system to see if there exists a ROOT/static/foo.js.gz and if so, return that. If the files doesn't exist, and you have gzip on; in your config, Nginx will read the ROOT/static/foo.js into memory, compress it (usually with a lower compression level) and return that. Nginx takes care of figuring out whether to do this, at all, dynamically by reading the Accept-Encoding header from the request.

Zopfli

The best solution today to generate these .gz files is Zopfli. Zopfli is slower than good old regular gzip but the files get smaller. To manually compress a file you can install the zopfli executable (e.g. brew install zopfli or apt install zopfli) and then run zopfli $ROOT/static/foo.js which creates a $ROOT/static/foo.js.gz file.

So your task is to build some pipelining code that generates .gz version of every static file your Django server creates.
At first I tried django-static-compress which has an extension to regular Django staticfiles storage. The default staticfiles storage is django.contrib.staticfiles.storage.StaticFilesStorage and that's what django-static-compress extends.

But I wanted more. I wanted all the good bits from django-pipeline (minification, hashes in filenames, concatenation, etc.) Also, in django-static-compress you can't control the parameters to zopfli such as the number of iterations. And with django-static-compress you have to install Brotli which I can't use because I don't want to compile my own Nginx.

Solution

So I wrote my own little mashup. I took some ideas from how django-pipeline does regular gzip compression as a post-process step. And in my case, I never want to bother with any of the other files that are put into the settings.STATIC_ROOT directory from the collectstatic command.

Here's my implementation: peterbecom.storage.ZopfliPipelineCachedStorage. Check it out. It's very tailored to my personal preferences and usecase but it works great. To use it, I have this in my settings.py: STATICFILES_STORAGE = "peterbecom.storage.ZopfliPipelineCachedStorage"

I know what you're thinking

Why not try to get this into django-pipeline or into django-compress-static. The answer is frankly laziness. Hopefully someone else can pick up this task. I have fewer and fewer projects where I use Django to handle static files. These days most of my projects are single-page-apps that are 100% static and using Django for XHR requests to get the data.

Django lock decorator with django-redis

14 August 2018 2 comments   Redis, Django, Web development, Python


Here's the code. It's quick-n-dirty but it works wonderfully:

import functools
import hashlib

from django.core.cache import cache
from django.utils.encoding import force_bytes


def lock_decorator(key_maker=None):
    """
    When you want to lock a function from more than 1 call at a time.
    """

    def decorator(func):
        @functools.wraps(func)
        def inner(*args, **kwargs):
            if key_maker:
                key = key_maker(*args, **kwargs)
            else:
                key = str(args) + str(kwargs)
            lock_key = hashlib.md5(force_bytes(key)).hexdigest()
            with cache.lock(lock_key):
                return func(*args, **kwargs)

        return inner

    return decorator

How To Use It

This has saved my bacon more than once. I use it on functions that really need to be made synchronous. For example, suppose you have a function like this:

def fetch_remote_thing(name):
    try:
        return Thing.objects.get(name=name).result
    except Thing.DoesNotExist:
        # Need to go out and fetch this
        result = some_internet_fetching(name)  # Assume this is sloooow
        Thing.objects.create(name=name, result=result)
        return result

That function is quite dangerous because if executed by two concurrent web requests for example, they will trigger
two "identical" calls to some_internet_fetching and if the database didn't have the name already, it will most likely trigger two calls to Thing.objects.create(name=name, ...) which could lead to integrity errors or if it doesn't the whole function breaks down because it assumes that there is only 1 or 0 of these Thing records.

Easy to solve, just add the lock_decorator:

@lock_decorator()
def fetch_remote_thing(name):
    try:
        return Thing.objects.get(name=name).result
    except Thing.DoesNotExist:
        # Need to go out and fetch this
        result = some_internet_fetching(name)  # Assume this is sloooow
        Thing.objects.create(name=name, result=result)
        return result

Now, thanks to Redis distributed locks, the function is always allowed to finish before it starts another one. All the hairy locking (in particular, the waiting) is implemented deep down in Redis which is rock solid.

Bonus Usage

Another use that has also saved my bacon is functions that aren't necessarily called with the same input argument but each call is so resource intensive that you want to make sure it only does one of these at a time. Suppose you have a Django view function that does some resource intensive work and you want to stagger the calls so that it only runs it one at a time. Like this for example:

def api_stats_calculations(request, part):
    if part == 'users-per-month':
        data = _calculate_users_per_month()  # expensive
    elif part == 'pageviews-per-week':
        data = _calculate_pageviews_per_week()  # intensive
    elif part == 'downloads-per-day':
        data = _calculate_download_per_day()  # slow
    elif you == 'get' and the == 'idea':
        ...

    return http.JsonResponse({'data': data})

If you just put @lock_decorator() on this Django view function, and you have some (almost) concurrent calls to this function, for example from a uWSGI server running with threads and multiple processes, then it will not synchronize the calls.

The solution to this is to write your own function for generating the lock key, like this for example:

@lock_decorator(
    key_maker=lamnbda request, part: 'api_stats_calculations'
)
def api_stats_calculations(request, part):
    if part == 'users-per-month':
        data = _calculate_users_per_month()  # expensive
    elif part == 'pageviews-per-week':
        data = _calculate_pageviews_per_week()  # intensive
    elif part == 'downloads-per-day':
        data = _calculate_download_per_day()  # slow
    elif you == 'get' and the == 'idea':
        ...

    return http.JsonResponse({'data': data})

Now it works.

How Time-Expensive Is It?

Perhaps you worry that 99% of your calls to the function don't have the problem of calling the function concurrently. How much is this overhead of this lock costing you? I wondered that too and set up a simple stress test where I wrote a really simple Django view function. It looked something like this:

@lock_decorator(key_maker=lambda request: 'samekey')
def sample_view_function(request):
    return http.HttpResponse('Ok\n')

I started a Django server with uWSGI with multiple processors and threads enabled. Then I bombarded this function with a simple concurrent stress test and observed the requests per minute. The cost was extremely tiny and almost negligable (compared to not using the lock decorator). Granted, in this test I used Redis on redis://localhost:6379/0 but generally the conclusion was that the call is extremely fast and not something to worry too much about. But your mileage may vary so do your own experiments for your context.

What's Needed

You need to use django-redis as your Django cache backend. I've blogged before about using django-redis, for example Fastest cache backend possible for Django and Fastest Redis configuration for Django.

django-html-validator now supports Django 2.x

13 August 2018 0 comments   Django, Web development, Python

https://pypi.org/project/django-html-validator/


django-html-validator is a Django project that can validate your generated HTML. It does so by sending the HTML to https://html5.validator.nu/ or you can start your own Java server locally with vnu.jar from here. The output is that you can have validation errors printed to stdout or you can have them put as .txt files in a temporary directory. You can also include it in your test suite and make it so that tests fail if invalid HTML is generated during rendering in Django unit tests.

The project seems to have become a lot more popular than I thought it would. It started as a one-evening-hack and because there was interest I wrapped it up in a proper project with "docs" and set up CI for future contributions.

I kinda of forgot the project since almost all my current projects generate JSON on the server and generates the DOM on-the-fly with client-side JavaScript but apparently a lot of issues and PRs were filed related to making it work in Django 2.x. So I took the time last night to tidy up the tox.ini etc. and the necessary compatibility fixes to make it work with but Django 1.8 up to Django 2.1. Pull request here.

Thank you all who contributed! I'll try to make a better job noticing filed issues in the future.

Quick dog-piling (aka stampeding herd) URL stresstest

10 August 2018 0 comments   Python


Whenever you want to quickly bombard a URL with some concurrent traffic, you can use this:

import random
import time
import requests
import concurrent.futures


def _get_size(url):
    sleep = random.random() / 10
    # print("sleep", sleep)
    time.sleep(sleep)
    r = requests.get(url)
    # print(r.status_code)
    assert len(r.text)
    return len(r.text)


def run(url, times=10):
    sizes = []
    futures = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for _ in range(times):
            futures.append(executor.submit(_get_size, url))
        for future in concurrent.futures.as_completed(futures):
            sizes.append(future.result())
    return sizes


if __name__ == "__main__":
    import sys

    print(run(sys.argv[1]))

It's really basic but it works wonderfully. It starts 10 concurrent threads that all hit the same URL at almost the same time.
I've been using this stress test a local Django server to test some atomicity writes with the file system.

A good Django view function cache decorator for http.JsonResponse

20 June 2018 0 comments   Django, Web development, Python


I use this a lot. It has served me very well. The code:

import hashlib
import functools

import markus  # optional
from django.core.cache import cache
from django import http
from django.utils.encoding import force_bytes, iri_to_uri

metrics = markus.get_metrics(__name__)  # optional


def json_response_cache_page_decorator(seconds):
    """Cache only when there's a healthy http.JsonResponse response."""

    def decorator(func):

        @functools.wraps(func)
        def inner(request, *args, **kwargs):
            cache_key = 'json_response_cache:{}:{}'.format(
                func.__name__,
                hashlib.md5(force_bytes(iri_to_uri(
                    request.build_absolute_uri()
                ))).hexdigest()
            )
            content = cache.get(cache_key)
            if content is not None:

                # metrics is optional
                metrics.incr(
                    'json_response_cache_hit',
                    tags=['view:{}'.format(func.__name__)]
                )

                return http.HttpResponse(
                    content,
                    content_type='application/json'
                )
            response = func(request, *args, **kwargs)
            if (
                isinstance(response, http.JsonResponse) and
                response.status_code in (200, 304)
            ):
                cache.set(cache_key, response.content, seconds)
            return response

        return inner

    return decorator

To use it simply add to Django view functions that might return a http.JsonResponse. For example, something like this:

@json_response_cache_page_decorator(60)
def search(request):
    q = request.GET.get('q')
    if not q:
        return http.HttpResponseBadRequest('no q')
    results = search_database(q)
    return http.JsonResponse({
        'results': results,
    })

The reasons I use this instead of django.views.decorators.cache.cache_page() is because of a couple of reasons.

Disclaimer: This snippet of code comes from a side-project that has a very specific set of requirements. They're rather unique to that project and I have a full picture of the needs. E.g. I know what specific headers matter and don't matter. Your project might be different. For example, perhaps you don't have markus to handle your metrics. Or perhaps you need to re-write the query string for something to normalize the cache key differently. Point being, take the snippet of code as inspiration when you too find that django.views.decorators.cache.cache_page() isn't good enough for your Django view functions.