I have a Django ORM model called Category
. It's simple and it looks like this this:
class Category(models.Model):
name = models.CharField(max_length=100)
What I occasionally need is all of those collected up in one big dict. Like this:
mapping = {}
for id, name in Category.objects.values_list("id", "name"):
mapping[id] = name
If you want to be fancy you can use the dict
constructor directly with:
mapping = dict(Category.objects.values_list("id", "name"))
...which does the same thing.
Even though it's not strictly necessary, I use the ORM class to put a class method in there so it's all neatly together:
class Category(models.Model):
name = models.CharField(max_length=100)
@classmethod
def get_category_id_name_map(cls):
return dict(cls.objects.values_list("id", "name"))
The Category
model doesn't change very often, so it's ripe for caching to avoid the SQL query. You can use the functools.lru_cache
for memoization.
from functools import lru_cache
class Category(models.Model):
name = models.CharField(max_length=100)
@classmethod
@lru_cache(maxsize=1)
def get_category_id_name_map(cls):
return dict(cls.objects.values_list("id", "name"))
Now, for each Python processor, calling Category.get_category_id_name_map()
will cache it indefinitely and consecutive uses of this makes that function call pretty much instant. Obviously, this assume the list of Category
object is reasonably bounded.
Next, the Category
does sometimes change. To be made aware when it changes, you can use Django signals. To purge that memoization cache, you do this:
from django.db.models.signals import post_delete, post_save,
from django.dispatch import receiver
@receiver(post_save, sender=Category)
@receiver(post_delete, sender=Category)
def purge_get_category_id_name_map(sender, instance, **kwargs):
Category.get_category_id_name_map.cache_clear()
Simple benchmark
def f1():
return dict(Category.objects.values_list("id", "name"))
def f2():
return Category.get_category_id_name_map()
assert f1() == f2()
# Reporting
import time
import random
import statistics
functions = f1, f2
times = {f.__name__: [] for f in functions}
for i in range(10000): # adjust accordingly so whole thing takes a few sec
func = random.choice(functions)
t0 = time.time()
func()
t1 = time.time()
times[func.__name__].append((t1 - t0) * 1000)
def ms(s):
return f"{s * 1000:.3f}ms"
for name, numbers in times.items():
print("FUNCTION:", name, "Used", len(numbers), "times")
print("\tMEDIAN", ms(statistics.median(numbers)))
print("\tMEAN ", ms(statistics.mean(numbers)))
print("\tSTDEV ", ms(statistics.stdev(numbers)))
The output when you run it:
FUNCTION: f1 Used 5016 times
MEDIAN 92.983ms
MEAN 97.828ms
STDEV 19.589ms
FUNCTION: f2 Used 4984 times
MEDIAN 0.000ms
MEAN 0.189ms
STDEV 0.396ms
On average, 500x faster to avoid the SQL query compared to caching it once and retrieving from cache the other 999 times.
Comments
This looks like it won’t work in multi-thread/process servers as the signal is only sent/received in one thread.