
In the prior post, we learned what happens when a function decorated with transaction.atomic
is called and what happens in with transaction.atomic()
. In summary:
BEGIN;
is sent to the DB (if the DB is PSQL or another SQL variant).
Now, we will discuss what to put in a transaction and what to avoid. Due to the following transaction behaviors, certain operations are dangerous when placed in a transaction block.
Keep on reading for alternatives and code examples.
The primary risk is that transactions hold locks until they are done to prevent conflicting operations on tables and rows and allow the transaction to be reversible - this is essential to make the DB operations in the transaction atomic. This means a long-running transaction that operates on multiple tables or a few critical ones may cause outages by hogging locks and preventing reads/writes to those tables/rows.
In essence, if we put the wrong code in a transaction block, we can effectively take down the DB by blocking all other connections to the DB from doing operations on it.
The secondary risk is that transactions need to be reversible and are EXPECTED to be reversible. The DB automatically reverses every operation if an error occurs in the transaction. Therefore, the DB operations we put in the transaction should be reversible - for the most part, we don't need to worry about this with PSQL. But what about other codes?
Oftentimes, when we change our data, we need to do follow-up tasks like firing events, updating services, sending push notifications, etc. These tasks are NOT reversible - we can't unsend an event, a request, or a notification. If an error occurs, the data changes are rolled back, but we've already sent the push notification saying, "Your report has been generated; click here to view it." What happens when the user or other services act on this false information? There will be a cascade of failures. Therefore, any code that can't be reversed should not be in a transaction, or we risk leaving our system in a bad state when an error occurs in the transaction.
These are things that, depending on how much data is being processed and DB traffic, can cause outages due to holding locks for too long. All of these things are fine if they don’t take too long.
Slow queries — Fetching data is usually fast, except for these three scenarios. These queries will slow down the transaction and extend the time it holds locks, which will have adverse effects on other users.
@transaction.atomic
def process_large_order_report(start_date, end_date, min_order_value=1000):
# Complex query with multiple joins and aggregations
large_orders = Order.objects.filter(
created_at__range=(start_date, end_date),
total_amount__gte=min_order_value,
status='completed'
).select_related(
'customer',
'shipping_address',
'billing_address'
).prefetch_related(
'items__product__category',
'items__product__supplier'
).annotate(
item_count=Count('items'),
total_weight=Sum('items__product__weight'),
discount_percentage=F('discount_amount') * 100 / F('total_amount')
).filter(
# Additional complex filtering
Q(customer__user__is_active=True) &
(Q(items__product__category__name='Electronics') |
Q(items__product__category__name='Furniture')) &
~Q(shipping_address__country='US')
).order_by('-total_amount')
# do the transactional work with the large_orders queryset
# fixed
def process_large_order_report(start_date, end_date, min_order_value=1000):
# Complex query with multiple joins and aggregations
large_orders = Order.objects.filter(
created_at__range=(start_date, end_date),
total_amount__gte=min_order_value,
status='completed'
).select_related(
'customer',
'shipping_address',
'billing_address'
).prefetch_related(
'items__product__category',
'items__product__supplier'
).annotate(
item_count=Count('items'),
total_weight=Sum('items__product__weight'),
discount_percentage=F('discount_amount') * 100 / F('total_amount')
).filter(
# Additional complex filtering
Q(customer__user__is_active=True) &
(Q(items__product__category__name='Electronics') |
Q(items__product__category__name='Furniture')) &
~Q(shipping_address__country='US')
).order_by('-total_amount')
# Start the transaction block
with transaction.atomic():
# do the transactional work with the large_orders queryset
Operations on multiple tables — a transaction with operations on multiple tables can lock each table until it is done. This is especially prevalent in Django migrations — another reason to keep migrations small and focused on one or a few tables at a time.
class Migration(migrations.Migration):
dependencies = [("migrations", "0001_initial")]
# too many operations
operations = [
migrations.RemoveField("Author", "age"),
migrations.AddField("Author", "rating", models.IntegerField(default=0)),
migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)),
]
# fixed
# 1st migration
class Migration(migrations.Migration):
dependencies = [("migrations", "0001_initial")]
operations = [
migrations.RemoveField("Author", "age"),
]
# 2nd migration
class Migration(migrations.Migration):
dependencies = [("migrations", "0002_initial")]
operations = [
migrations.AddField("Author", "rating", models.IntegerField(default=0)),
]
# 3rd migration
class Migration(migrations.Migration):
dependencies = [("migrations", "0003_initial")]
operations = [
migrations.AlterField("Book", "price", models.DecimalField(max_digits=5, decimal_places=2)),
]
Data Migrations — Since transactions hold onto a lock from when the query is executed until the transaction fails or ends, a migration that operates on every row in the table will end up locking the entire table, either by preventing reads or writes on each row.
def migrate_user_profiles():
# Get all users with legacy profiles
users_with_profiles = User.objects.filter(
legacy_profile__isnull=False
).select_related('legacy_profile')
# Process all users in a single transaction
with transaction.atomic():
# Track progress
total = users_with_profiles.count()
print(f"Migrating {total} user profiles...")
# Process each user
for i, user in enumerate(users_with_profiles):
if i % 100 == 0:
print(f"Processed {i}/{total} profiles")
legacy = user.legacy_profile
legacy.update_new_user_profile()
# fixed
def migrate_user_profiles():
# Get all users with legacy profiles
users_with_profiles = User.objects.filter(
legacy_profile__isnull=False
).select_related('legacy_profile')
# Process all users in a single transaction
# Track progress
total = users_with_profiles.count()
print(f"Migrating {total} user profiles...")
# Process each user
for i, user in enumerate(users_with_profiles):
if i % 100 == 0:
print(f"Processed {i}/{total} profiles")
with transaction.atomic():
legacy = user.legacy_profile
legacy.update_new_user_profile()
Run them later. These queries are necessary, BUT we don’t have to run them during business hours. The best policy here is to reduce the risk of an outage by determining how crucial the table is, estimating how long the migration may take, executing the migration when the DB has the least traffic, and preparing a rollback plan.
Shrink the amount of time the transaction takes. This can be done by partitioning the table and running the migration on individual partitions. PSQL Partitions & Django
*Django only wraps transactions around migrations for PSQL and SQLite.
class Migration(migrations.Migration):
dependencies = [("migrations", "0001_initial")]
# this migration, if on a large table, can slow down and block other operations
# do it later
operations = [
migrations.RemoveField("Users", "middle_name"),
]
Irreversible operations — Everything in the transaction should be reversible if the transaction is rolled back; if we put an API call in the transaction, it can’t be undone.
def transaction(user_data, user_files):
with transaction.atomic():
user = User.objects.create(**user_data)
async_notification_service.send_email(user.email, "You can login now!")
Account.objects.create(user=user, balance=0)
# rest of user creation proccess
def transaction(user_data, user_files):
with transaction.atomic():
user = User.objects.create(**user_data)
Account.objects.create(user=user, balance=0)
# rest of user creation proccess
# the transaction is still in progress, so it can still be rolled back, it is not
# committed until the transaction block is exited, so putting the notification here
# is not a good idea - especially if the job starts immediately tries to read the data
# this creates a race condition
async_notification_service.send_email(user.email, "You can login now!")
def transaction(user_data, user_files):
with transaction.atomic():
user = User.objects.create(**user_data)
Account.objects.create(user=user, balance=0)
# rest of user creation proccess
transaction.on_commit(partial(async_notification_service.send_email, user.email, "You can login now!"))
Blocking calls — since transactions prevent all other queries from operating on the tables/rows the transaction is changing, any code that increases the duration of the transaction will cause the DB to be locked, causing timeouts and unresponsiveness in the apps dependent on the DB.
def transaction(user_data, user_files):
with transaction.atomic():
user = User.objects.create(**user_data)
for file_data in user_files:
# transaction waits for this upload and so do all other connections that need access to table/rows the transaction
# uses
url = Cloudinary.upload_file(file_data['data'])
Files.objects.create(**file_data['meta_data'], user=user, url=url)
Account.objects.create(user=user, balance=0)
# rest of user creation proccess
# not bad
def transaction(user_data, user_files):
user = None
with transaction.atomic():
user = User.objects.create(**user_data)
Account.objects.create(user=user, balance=0)
# rest of user creation proccess
for file_data in user_files:
url = Cloudinary.upload_file(file_data['data'])
Files.objects.create(**file_data['meta_data'], user=user, url=url)
# best fix
from functools import partial
def transaction(user_data, user_files):
user = None
with transaction.atomic():
user = User.objects.create(**user_data)
Account.objects.create(user=user, balance=0)
# rest of user creation proccess
# partials create a callable with the function and arguments
# so that the function is called with the arguments when the transaction is committed
# TODO: diff between partial and lambda here???
transaction.on_commit(partial(create_user_files, user_files, user))
def create_user_files(user_files, user):
for file_data in user_files:
url = Cloudinary.upload_file(file_data['data'])
Files.objects.create(**file_data['meta_data'], user=user, url=url)
In the next post, we will dive into PSQL and find out: