Queue-Based Checkout Flow Proposal¶
Table of Contents¶
- Current Architecture Issues
- Proposed Solution
- Architecture Approaches
- Recommended Implementation
- Implementation Guide
- Benefits
- Migration Path
Current Architecture Issues¶
Based on apps/api/tickets/views/order_views.py:287-336, the current system uses:
- Redis cache locks (
cache.add) with 120-second timeout - Database row locks (
select_for_update) - Synchronous processing in the HTTP request/response cycle
Problems with Current Approach¶
- Long wait times: Users wait during the entire checkout process (up to 120s)
- Arbitrary timeouts: 120s lock timeout is a guess, not based on actual processing time
- Resource leaks: If a user closes their browser, locks may hold unnecessarily
- Single point of failure: Cache failure creates race conditions
- Poor UX: No feedback during processing
- Scaling issues: Locks don't scale across multiple Redis instances
- Testing difficulties: Hard to test race conditions and lock scenarios
Proposed Solution¶
Move from synchronous locking to asynchronous queue-based processing with time-limited reservations.
Key principles: 1. Immediate feedback: Return response to user quickly 2. Asynchronous processing: Use Celery to handle order creation 3. Database-backed state: Use reservations instead of cache locks 4. Automatic cleanup: Celery Beat removes expired reservations 5. Retry logic: Built-in error handling and retry mechanisms
Architecture Approaches¶
Approach 1: Immediate Task Queue ⭐ (Recommended)¶
Flow:
User submits checkout → Validate → Enqueue task → Return "processing" response →
Task processes order → Update user via WebSocket/Polling
Benefits: - No Redis locks needed - Celery naturally serializes tasks per ticket - Users get immediate feedback - Failed tasks can retry automatically - Better error handling and observability
Implementation:
# tasks.py
from celery import shared_task
from django.db import transaction
@shared_task(bind=True, max_retries=3)
def process_checkout_order(self, order_data):
"""
Process a checkout order asynchronously.
Uses database transaction and row locks only - no Redis locks needed.
"""
try:
with transaction.atomic():
# Lock only the specific tickets being purchased
tickets = Ticket.objects.select_for_update(nowait=False).filter(
id__in=order_data['ticket_ids']
)
# Check availability
for ticket in tickets:
if not check_ticket_availability(ticket, order_data['quantities'][ticket.id]):
return {'status': 'error', 'message': f'{ticket.name} sold out'}
# Create order
order = Order.objects.create(...)
# Create Stripe session
session = stripe.checkout.Session.create(...)
return {'status': 'success', 'checkout_url': session.url}
except Exception as exc:
# Retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
Approach 2: Reservation System with TTL ⭐ (Recommended)¶
Concept: Create temporary "reservations" that automatically expire
Flow:
User adds to cart → Create 10-minute reservation → User checks out →
Convert reservation to order OR reservation expires → Tickets released
Benefits: - Prevents overselling without locks - User-friendly (cart doesn't disappear immediately) - No cleanup code needed in error paths - Works across server restarts - Simple to understand and debug
Database model:
class TicketReservation(models.Model):
ticket = models.ForeignKey(Ticket, on_delete=models.CASCADE)
quantity = models.IntegerField()
session_id = models.CharField(max_length=255) # Browser session ID
email = models.EmailField()
reserved_at = models.DateTimeField(auto_now_add=True)
expires_at = models.DateTimeField() # 10 minutes from reserved_at
class Meta:
indexes = [
models.Index(fields=['ticket', 'expires_at']),
models.Index(fields=['session_id']),
]
Celery task to clean expired reservations:
@shared_task
def cleanup_expired_reservations():
"""Run every minute to clean up expired reservations"""
expired = TicketReservation.objects.filter(
expires_at__lt=timezone.now()
)
count = expired.count()
expired.delete()
logger.info(f"Cleaned up {count} expired reservations")
Modified availability check:
def check_ticket_availability(ticket, quantity, include_pending=False):
"""
Checks availability considering both sold tickets and active reservations.
Args:
ticket: The ticket to check
quantity: Number of tickets requested
include_pending: Whether to include pending orders (default: False)
Returns:
bool: True if tickets are available
"""
# Count sold tickets
sold_filter = Q(ticket=ticket, orders__success=True)
if include_pending:
# Include pending orders to prevent race conditions during checkout
sold_filter |= Q(ticket=ticket, orders__success=False)
sold = TicketOrder.objects.filter(sold_filter).aggregate(
total=Sum('quantity')
)['total'] or 0
# Count active reservations (not yet checked out)
reserved = TicketReservation.objects.filter(
ticket=ticket,
expires_at__gt=timezone.now()
).aggregate(total=Sum('quantity'))['total'] or 0
available = ticket.quantity - sold - reserved
return available >= quantity
Approach 3: Event Sourcing with Command Pattern¶
Most robust but complex - Recommended for future iteration
Flow:
User request → Create CheckoutCommand → Queue command →
Command handler processes → Emit events → Update read models
Models:
class CheckoutCommand(models.Model):
command_id = models.UUIDField(primary_key=True, default=uuid.uuid4)
command_type = models.CharField(max_length=50) # 'reserve', 'checkout', 'cancel'
payload = models.JSONField()
status = models.CharField(choices=[
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('failed', 'Failed'),
])
created_at = models.DateTimeField(auto_now_add=True)
processed_at = models.DateTimeField(null=True)
error_message = models.TextField(null=True)
Celery tasks:
@shared_task
def process_checkout_command(command_id):
command = CheckoutCommand.objects.select_for_update().get(id=command_id)
if command.status != 'pending':
return # Already processed
command.status = 'processing'
command.save()
try:
# Process based on command type
if command.command_type == 'checkout':
handle_checkout(command.payload)
command.status = 'completed'
command.processed_at = timezone.now()
except Exception as e:
command.status = 'failed'
command.error_message = str(e)
finally:
command.save()
Recommended Implementation¶
Use Approach 1 (Task Queue) + Approach 2 (Reservations)
This hybrid approach combines the best of both: 1. Task Queue handles asynchronous processing 2. Reservations prevent overselling without locks 3. Database transactions ensure atomicity 4. Celery retries handle transient failures
Implementation Guide¶
Step 1: Add Reservation Model¶
# apps/api/tickets/models.py
class TicketReservation(models.Model):
"""
Temporary reservation of tickets during checkout process.
Automatically expires after 10 minutes if not converted to an order.
"""
ticket = models.ForeignKey('Ticket', on_delete=models.CASCADE, related_name='reservations')
quantity = models.IntegerField(validators=[MinValueValidator(1)])
session_id = models.CharField(max_length=255, db_index=True)
email = models.EmailField()
expires_at = models.DateTimeField(db_index=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
indexes = [
models.Index(fields=['ticket', 'expires_at']),
models.Index(fields=['session_id', 'expires_at']),
]
ordering = ['-created_at']
def __str__(self):
return f"Reservation {self.session_id} - {self.quantity}x {self.ticket.name}"
def is_expired(self):
return timezone.now() > self.expires_at
Migration:
Step 2: Create Celery Tasks¶
# apps/api/tickets/tasks.py
from celery import shared_task
from django.db import transaction
from django.utils import timezone
from datetime import timedelta
import logging
import stripe
logger = logging.getLogger(__name__)
@shared_task
def cleanup_expired_reservations():
"""
Runs every minute via Celery Beat.
Deletes expired reservations to free up ticket inventory.
"""
expired = TicketReservation.objects.filter(
expires_at__lt=timezone.now()
)
count = expired.count()
expired.delete()
logger.info(f"Cleaned up {count} expired ticket reservations")
return count
def _process_checkout_core(reservation_data):
"""
Core checkout processing logic (shared between async and sync execution).
Args:
reservation_data (dict): Contains session_id, params, ticket_orders, promo_code
Returns:
dict: {'status': 'success', 'checkout_url': str, 'order_id': int}
or {'status': 'error', 'message': str}
"""
session_id = reservation_data['session_id']
params = reservation_data['params']
ticket_orders = reservation_data['ticket_orders']
promo_code_str = reservation_data.get('promo_code')
try:
with transaction.atomic():
# Get all reservations for this session
reservations = TicketReservation.objects.select_for_update().filter(
session_id=session_id
)
if not reservations.exists():
return {'status': 'error', 'message': 'Reservations not found'}
# Check if any reservation expired
for reservation in reservations:
if reservation.is_expired():
reservations.delete()
return {'status': 'error', 'message': 'Reservation expired. Please try again.'}
# Validate promo code if provided
promo_code = None
if promo_code_str:
promo_code, error = validate_promo_code(promo_code_str, params['show_id'])
if error:
return {'status': 'error', 'message': error}
# Create order (reuse existing logic from CheckoutSessionView)
order = Order.objects.create(
first_name=params['first_name'],
last_name=params['last_name'],
email=params['email'],
phone=params.get('phone', ''),
show_id=params['show_id'],
# ... calculate totals and fees
)
# Create ticket orders
for ticket_id, data in ticket_orders.items():
ticket = Ticket.objects.get(id=ticket_id)
TicketOrder.objects.create(
ticket=ticket,
quantity=data['quantity'],
donation_amount=data.get('donation_amount', Decimal('0')),
# ... other fields
)
# Create Stripe session
stripe_session = stripe.checkout.Session.create(
# ... same configuration as current implementation
)
# Save Stripe session ID to order
order.session_id = stripe_session.id
order.save()
# Delete reservations once order is created
reservations.delete()
logger.info(f"Order {order.id} created successfully for session {session_id}")
return {
'status': 'success',
'checkout_url': stripe_session.url,
'order_id': order.id
}
except stripe.error.StripeError as e:
logger.error(f"Stripe error for session {session_id}: {e}")
return {'status': 'error', 'message': 'Payment processing error. Please try again.'}
except Exception as exc:
logger.error(f"Error creating checkout session {session_id}: {exc}")
raise # Re-raise for retry logic in async path
@shared_task(bind=True, max_retries=3)
def create_checkout_session(self, reservation_data):
"""
Celery task wrapper for checkout processing.
Handles retries automatically when called asynchronously.
When called synchronously (fallback mode), retries are not available,
but the core logic still executes.
Args:
reservation_data (dict): Contains session_id, params, ticket_orders, promo_code
Returns:
dict: {'status': 'success', 'checkout_url': str, 'order_id': int}
or {'status': 'error', 'message': str}
"""
try:
return _process_checkout_core(reservation_data)
except Exception as exc:
# Only retry in async mode (when self is bound)
if hasattr(self, 'retry'):
logger.warning(f"Checkout task failed, retrying: {exc}")
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
else:
# Sync mode - no retries, return error
logger.error(f"Checkout processing failed in sync mode: {exc}")
return {'status': 'error', 'message': 'Checkout processing failed. Please try again.'}
Step 3: Celery Availability Check Utility¶
# apps/api/tickets/utils/celery_utils.py
from celery import current_app
from celery.exceptions import OperationalError
import logging
logger = logging.getLogger(__name__)
def is_celery_available(timeout=1.0):
"""
Check if Celery broker is available and can accept tasks.
Args:
timeout (float): Timeout in seconds for the connection check
Returns:
bool: True if Celery is available, False otherwise
"""
try:
# Try to inspect the broker connection
inspect = current_app.control.inspect()
if inspect is None:
logger.warning("Celery inspect returned None - broker may be unavailable")
return False
# Try to get active workers (this requires broker connection)
active_workers = inspect.active(timeout=timeout)
# If we can inspect, broker is available
# Note: active_workers can be None if no workers are running,
# but that's okay - broker is still available
logger.debug(f"Celery broker check: available (active workers: {len(active_workers) if active_workers else 0})")
return True
except OperationalError as e:
logger.warning(f"Celery broker unavailable: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error checking Celery availability: {e}")
return False
def enqueue_with_fallback(task_func, task_args, fallback_func, *fallback_args, **fallback_kwargs):
"""
Attempt to enqueue a Celery task, falling back to synchronous execution if unavailable.
Args:
task_func: The Celery task function to call
task_args: Arguments to pass to the task
fallback_func: Function to call synchronously if Celery is unavailable
*fallback_args: Positional arguments for fallback function
**fallback_kwargs: Keyword arguments for fallback function
Returns:
tuple: (is_async: bool, result: Any)
- is_async: True if task was enqueued, False if executed synchronously
- result: Task ID (if async) or function result (if sync)
"""
if is_celery_available():
try:
task = task_func.delay(*task_args)
logger.info(f"Task {task_func.name} enqueued with ID {task.id}")
return (True, task)
except Exception as e:
logger.warning(f"Failed to enqueue task {task_func.name}: {e}. Falling back to sync execution.")
# Fall through to synchronous execution
else:
logger.warning(f"Celery unavailable. Executing {fallback_func.__name__} synchronously.")
# Execute synchronously
result = fallback_func(*fallback_args, **fallback_kwargs)
return (False, result)
Step 4: Modified View with Fallback¶
# apps/api/tickets/views/order_views.py
import uuid
from datetime import timedelta
from django.utils import timezone
from tickets.utils.celery_utils import enqueue_with_fallback
from tickets.tasks import create_checkout_session
class CheckoutSessionView(APIView):
"""
Handles checkout by creating reservations and queueing order processing.
Falls back to synchronous processing if Celery is unavailable.
"""
RESERVATION_TIMEOUT_MINUTES = 10
def get(self, request, *args, **kwargs):
# Validate inputs (same as before)
params, error = validate_request_params(request)
if error:
return error
# Validate show
show_id = params["show_id"]
show, error_response = validate_show(show_id)
if error_response:
return error_response
# Parse ticket orders
ticket_orders, error = self._parse_ticket_orders(request)
if error:
return error
# Create unique session ID for this checkout attempt
checkout_session_id = str(uuid.uuid4())
expires_at = timezone.now() + timedelta(minutes=self.RESERVATION_TIMEOUT_MINUTES)
# Create reservations (fast operation)
try:
with transaction.atomic():
reservations = []
for ticket_id, data in ticket_orders.items():
ticket = Ticket.objects.select_for_update().get(id=ticket_id)
# Check if enough tickets available (considering existing reservations)
if not check_ticket_availability(ticket, data['quantity']):
return Response({
'status': 'error',
'message': f'{ticket.name} is not available. Only {self._get_available_count(ticket)} tickets remaining.'
}, status=status.HTTP_400_BAD_REQUEST)
# Create reservation
reservation = TicketReservation.objects.create(
ticket=ticket,
quantity=data['quantity'],
session_id=checkout_session_id,
email=params['email'],
expires_at=expires_at
)
reservations.append(reservation)
logger.info(f"Created {len(reservations)} reservations for session {checkout_session_id}")
# Prepare task data
reservation_data = {
'session_id': checkout_session_id,
'params': params,
'ticket_orders': ticket_orders,
'promo_code': request.GET.get('promoCode')
}
# Try to enqueue async task, fallback to sync if Celery unavailable
from tickets.tasks import create_checkout_session, _process_checkout_core
is_async, task_result = enqueue_with_fallback(
task_func=create_checkout_session,
task_args=(reservation_data,),
fallback_func=_process_checkout_core,
reservation_data=reservation_data
)
if is_async:
# Async path: return polling response
return Response({
'status': 'processing',
'task_id': task_result.id,
'session_id': checkout_session_id,
'expires_at': expires_at.isoformat(),
'expires_in_seconds': self.RESERVATION_TIMEOUT_MINUTES * 60,
'poll_url': f'/api/checkout/status/{task_result.id}'
})
else:
# Sync path: task already completed, return result directly
if task_result.get('status') == 'success':
return Response({
'status': 'success',
'checkout_url': task_result['checkout_url'],
'order_id': task_result['order_id']
})
else:
return Response({
'status': 'error',
'message': task_result.get('message', 'Checkout failed')
}, status=status.HTTP_400_BAD_REQUEST)
except Ticket.DoesNotExist:
return Response({
'status': 'error',
'message': 'One or more tickets not found'
}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"Error creating reservation: {e}")
return Response({
'status': 'error',
'message': 'Unable to reserve tickets. Please try again.'
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def _process_checkout_synchronously(self, reservation_data):
"""
Synchronous fallback for checkout processing when Celery is unavailable.
Uses the same core logic as the async task.
"""
# Import here to avoid circular imports
from tickets.tasks import _process_checkout_core
# Call the core function directly (synchronous execution)
try:
result = _process_checkout_core(reservation_data)
return result
except Exception as e:
logger.error(f"Synchronous checkout processing failed: {e}")
return {
'status': 'error',
'message': 'Checkout processing failed. Please try again.'
}
def _get_available_count(self, ticket):
"""Helper to get current available ticket count."""
sold = TicketOrder.objects.filter(
ticket=ticket,
orders__success=True
).aggregate(total=Sum('quantity'))['total'] or 0
reserved = TicketReservation.objects.filter(
ticket=ticket,
expires_at__gt=timezone.now()
).aggregate(total=Sum('quantity'))['total'] or 0
return ticket.quantity - sold - reserved
class CheckoutStatusView(APIView):
"""
Endpoint for frontend to poll task status.
Returns the current state of the checkout processing.
"""
def get(self, request, task_id):
from celery.result import AsyncResult
task = AsyncResult(task_id)
if task.ready():
result = task.result
# Handle task failure
if isinstance(result, Exception):
return Response({
'status': 'error',
'message': str(result)
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# Handle task success
if isinstance(result, dict) and result.get('status') == 'error':
return Response(result, status=status.HTTP_400_BAD_REQUEST)
return Response(result)
else:
# Task still processing
return Response({
'status': 'processing',
'task_id': task_id
})
Step 5: Update Availability Check¶
# apps/api/tickets/utils/tickets.py
from django.db.models import Sum, Q
from django.utils import timezone
def check_ticket_availability(ticket, quantity, include_pending=False):
"""
Checks if the requested quantity of tickets is available.
Now accounts for both sold tickets AND active reservations.
Args:
ticket: Ticket instance to check
quantity: Number of tickets requested
include_pending: If True, counts pending orders as sold (default: False)
Used during checkout to prevent race conditions
Returns:
bool: True if tickets are available, False otherwise
Note:
- Sold tickets = TicketOrders with success=True
- Reserved tickets = TicketReservations that haven't expired
- Available = ticket.quantity - sold - reserved
"""
# Count sold tickets
sold_filter = Q(ticket=ticket, orders__success=True)
if include_pending:
# During checkout, also count pending orders to prevent double-selling
sold_filter |= Q(ticket=ticket, orders__success=False)
sold = TicketOrder.objects.filter(sold_filter).aggregate(
total=Sum('quantity')
)['total'] or 0
# Count active (not expired) reservations
# These are tickets in someone's cart or being processed
reserved = TicketReservation.objects.filter(
ticket=ticket,
expires_at__gt=timezone.now()
).aggregate(total=Sum('quantity'))['total'] or 0
# Calculate available tickets
available = ticket.quantity - sold - reserved
logger.debug(
f"Ticket {ticket.id} ({ticket.name}): "
f"total={ticket.quantity}, sold={sold}, reserved={reserved}, "
f"available={available}, requested={quantity}"
)
return available >= quantity
Step 6: Celery Beat Configuration¶
# apps/api/config/celery.py or settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'cleanup-expired-reservations': {
'task': 'tickets.tasks.cleanup_expired_reservations',
'schedule': crontab(minute='*/1'), # Run every minute
'options': {
'expires': 50, # Task expires after 50 seconds
}
},
}
Start Celery Beat:
# In development
celery -A config beat --loglevel=info
# In production (via Docker Compose)
# Add to docker-compose.yml:
celery-beat:
image: your-api-image
command: celery -A config beat --loglevel=info
depends_on:
- redis
- db
Step 7: Frontend Changes¶
// apps/frontend/src/components/Forms/tickets.tsx
interface CheckoutResponse {
status: 'processing' | 'success' | 'error';
task_id?: string;
session_id?: string;
checkout_url?: string;
message?: string;
expires_at?: string;
expires_in_seconds?: number;
poll_url?: string;
}
async function handleCheckout(checkoutData: CheckoutFormData): Promise<void> {
try {
// Step 1: Submit checkout request
const response = await fetch('/api/checkout/', {
method: 'GET',
...buildQueryParams(checkoutData)
});
const data: CheckoutResponse = await response.json();
if (data.status === 'error') {
showError(data.message);
return;
}
if (data.status === 'processing') {
// Show loading state with countdown timer
showProcessingState(data.expires_in_seconds);
// Step 2: Poll for completion
const result = await pollCheckoutStatus(data.task_id!, data.expires_in_seconds!);
if (result.status === 'success') {
// Step 3: Redirect to Stripe
window.location.href = result.checkout_url!;
} else {
showError(result.message || 'Checkout failed. Please try again.');
}
}
} catch (error) {
console.error('Checkout error:', error);
showError('An unexpected error occurred. Please try again.');
}
}
async function pollCheckoutStatus(
taskId: string,
timeoutSeconds: number = 30
): Promise<CheckoutResponse> {
const maxAttempts = Math.ceil(timeoutSeconds / 1); // Poll every 1 second
const pollInterval = 1000; // 1 second
for (let attempt = 0; attempt < maxAttempts; attempt++) {
try {
const response = await fetch(`/api/checkout/status/${taskId}`);
const data: CheckoutResponse = await response.json();
// Task completed (success or error)
if (data.status !== 'processing') {
return data;
}
// Update progress indicator
updateProgressBar((attempt + 1) / maxAttempts);
// Wait before next poll
await sleep(pollInterval);
} catch (error) {
console.error('Poll error:', error);
// Continue polling on network errors
}
}
// Timeout reached
throw new Error('Checkout processing timed out. Please check your email for confirmation or try again.');
}
function sleep(ms: number): Promise<void> {
return new Promise(resolve => setTimeout(resolve, ms));
}
function showProcessingState(expiresInSeconds: number): void {
// Show a modal or inline message
setCheckoutState({
loading: true,
message: 'Processing your order...',
countdown: expiresInSeconds
});
// Optional: Start countdown timer
const interval = setInterval(() => {
setCheckoutState(prev => ({
...prev,
countdown: Math.max(0, prev.countdown - 1)
}));
}, 1000);
// Clear interval when done
return () => clearInterval(interval);
}
function updateProgressBar(progress: number): void {
// Update UI progress indicator (0 to 1)
setProgressPercent(Math.min(progress * 100, 95)); // Cap at 95% until complete
}
UI Component Example:
// ProcessingModal.tsx
export function CheckoutProcessingModal({ countdown }: { countdown: number }) {
return (
<div className="fixed inset-0 bg-black bg-opacity-50 flex items-center justify-center">
<div className="bg-white p-6 rounded-lg max-w-md">
<div className="flex items-center justify-center mb-4">
<Spinner className="h-8 w-8 animate-spin" />
</div>
<h3 className="text-lg font-semibold text-center mb-2">
Processing Your Order
</h3>
<p className="text-gray-600 text-center mb-4">
Please wait while we reserve your tickets and prepare your checkout.
</p>
<div className="w-full bg-gray-200 rounded-full h-2.5">
<div
className="bg-blue-600 h-2.5 rounded-full transition-all duration-300"
style={{ width: `${(30 - countdown) / 30 * 100}%` }}
/>
</div>
<p className="text-sm text-gray-500 text-center mt-2">
Your reservation expires in {countdown} seconds
</p>
</div>
</div>
);
}
Benefits¶
1. Resilient to Celery Outages¶
- Automatic fallback to synchronous processing when Celery is unavailable
- No checkout failures during broker outages
- Graceful degradation maintains service availability
- Monitoring alerts when fallback is triggered
2. No Redis Locks¶
- Reservations are database-backed and persistent
- Works across server restarts
- No need for Redis clustering for lock coordination
3. Automatic Cleanup¶
- Celery Beat automatically removes expired reservations
- No manual cleanup code in error paths
- Prevents inventory leaks
4. Retry Logic¶
- Celery handles transient failures automatically
- Exponential backoff prevents thundering herd
- Better error messages for users
5. Better UX¶
- Users get immediate feedback
- Progress indicators show processing status
- Clear error messages when things go wrong
- Countdown timer shows reservation expiry
6. Scalable¶
- Celery workers can be scaled independently
- Database handles concurrency better than Redis locks
- No lock contention across processes
7. Testable¶
- Tasks can be tested in isolation
- Easier to write unit tests
- Can test race conditions reliably
8. Observable¶
- Celery Flower provides monitoring dashboard
- Task history and retry attempts visible
- Easier to debug production issues
9. Maintainable¶
- Clear separation of concerns
- Async logic separate from HTTP handlers
- Easier to add features (e.g., cart abandonment emails)
Migration Path¶
Phase 1: Preparation (Week 1)¶
- ✅ Add
TicketReservationmodel and migrate - ✅ Create Celery tasks (but don't use yet)
- ✅ Update
check_ticket_availabilityto consider reservations - ✅ Add feature flag in settings:
USE_QUEUE_BASED_CHECKOUT = False
Phase 2: Parallel Running (Week 2)¶
- Deploy new code with feature flag OFF
- Test new endpoints in staging
- Monitor Celery workers and Beat
- Fix any bugs found
Phase 3: Gradual Rollout (Week 3)¶
- Enable for 10% of traffic (use session hash)
- Monitor error rates and performance
- Compare metrics: old vs new system
- Gradually increase to 25%, 50%, 75%
Phase 4: Full Migration (Week 4)¶
- Enable for 100% of traffic
- Monitor for 1 week
- Remove old Redis lock code
- Remove feature flag
Phase 5: Cleanup (Week 5)¶
- Archive old checkout code
- Update documentation
- Train team on new system
- Set up monitoring alerts
Rollback Plan¶
If issues arise:
- Immediate rollback: Set feature flag to
False - Database rollback: Old code ignores reservations table
- Clean reservations: Run cleanup task to remove all reservations
- Monitor: Ensure old system working correctly
Testing Strategy¶
Unit Tests¶
# tests/test_checkout_queue.py
def test_reservation_creation():
"""Test that reservations are created correctly"""
ticket = TicketFactory(quantity=10)
reservation = TicketReservation.objects.create(
ticket=ticket,
quantity=5,
session_id='test-session',
email='test@example.com',
expires_at=timezone.now() + timedelta(minutes=10)
)
assert check_ticket_availability(ticket, 5) == True
assert check_ticket_availability(ticket, 6) == False
def test_expired_reservation_cleanup():
"""Test that expired reservations are cleaned up"""
ticket = TicketFactory(quantity=10)
# Create expired reservation
reservation = TicketReservation.objects.create(
ticket=ticket,
quantity=5,
session_id='test-session',
email='test@example.com',
expires_at=timezone.now() - timedelta(minutes=1)
)
# Run cleanup
cleanup_expired_reservations()
# Reservation should be deleted
assert TicketReservation.objects.count() == 0
assert check_ticket_availability(ticket, 10) == True
def test_checkout_task_success():
"""Test successful checkout task"""
ticket = TicketFactory(quantity=10)
result = create_checkout_session.delay({
'session_id': 'test-session',
'params': {...},
'ticket_orders': {...}
})
assert result.status == 'success'
assert 'checkout_url' in result.result
Integration Tests¶
def test_concurrent_checkout():
"""Test that concurrent checkouts don't oversell"""
ticket = TicketFactory(quantity=5)
# Simulate 10 concurrent requests for 3 tickets each
tasks = []
for i in range(10):
task = create_checkout_session.delay({...})
tasks.append(task)
# Wait for all to complete
results = [task.get() for task in tasks]
# Only first 1-2 should succeed (5 tickets / 3 = 1.67)
successes = [r for r in results if r['status'] == 'success']
assert len(successes) <= 2
Monitoring & Alerts¶
Key Metrics to Track¶
- Reservation metrics
- Active reservations count
- Average reservation duration
-
Expiry rate (% that expire vs convert)
-
Task metrics
- Task success rate
- Task retry rate
- Average processing time
-
Queue length
-
Business metrics
- Checkout completion rate
- Cart abandonment rate
-
Overselling incidents (should be 0)
-
Resilience metrics
- Celery availability rate
- Fallback activation frequency
- Sync vs async checkout ratio
- Average response time (sync vs async)
Recommended Alerts¶
# Set up in monitoring tool (Datadog, Prometheus, etc.)
# Alert if too many reservations expiring
if (expired_reservations / total_reservations) > 0.5:
alert("High reservation expiry rate - UX issue?")
# Alert if tasks failing
if (failed_tasks / total_tasks) > 0.05:
alert("High task failure rate")
# Alert if checkout queue backing up
if celery_queue_length > 100:
alert("Checkout queue backing up - scale workers?")
# Alert if any overselling detected
if check_overselling():
alert("CRITICAL: Overselling detected!")
# Alert if Celery fallback is being used frequently
if sync_checkout_count / total_checkout_count > 0.1:
alert("High Celery fallback usage - check broker/worker health")
Future Enhancements¶
Once the queue system is stable, consider:
- Cart abandonment emails
- Send email when reservation expires
-
"Complete your purchase" reminders
-
WebSocket updates
- Real-time status updates instead of polling
-
Better UX for users
-
Priority queues
- VIP customers get higher priority
-
Early bird sales get separate queue
-
Analytics
- Track checkout funnel drop-off
- A/B test reservation timeouts
-
Optimize for conversion
-
Inventory forecasting
- Predict when tickets will sell out
- Recommend ticket quantity to users
Questions & Answers¶
Q: What happens if Celery is down?¶
A: The system includes an automatic fallback mechanism. When Celery is unavailable: 1. The system detects Celery unavailability via broker connection check 2. Checkout processing runs synchronously in the request thread instead 3. User still gets immediate response (no polling needed in sync mode) 4. Reservations are still created and cleaned up normally 5. All error handling and retry logic still applies
This ensures checkout remains functional even during Celery outages, though response times may be slightly longer (typically 1-3 seconds instead of <200ms for async path).
Q: Can users have multiple reservations?¶
A: Yes, by session_id. Each checkout attempt gets a unique session. Old reservations expire automatically.
Q: What if user's browser crashes during checkout?¶
A: Reservation expires after 10 minutes. Tickets return to inventory. User can try again.
Q: How do we handle refunds?¶
A: Refunds still go through SuccessSessionView. No changes needed to refund flow.
Q: Can we extend reservation timeout?¶
A: Yes, but 10 minutes is recommended. Longer = more inventory locked. Shorter = more abandoned carts.
Q: What are the trade-offs of the synchronous fallback?¶
A: When fallback is active: - Response time: Increases from ~200ms to 1-3 seconds (still acceptable for users) - Server load: Request threads are blocked longer, may need more workers during outages - Scalability: Less efficient than async, but maintains functionality - Monitoring: Important to alert when fallback is used frequently (indicates infrastructure issues)
The fallback is designed to be a safety net, not the primary path. If fallback usage exceeds 5-10% of checkouts, investigate Celery infrastructure.
Conclusion¶
The queue-based checkout system with reservations provides: - ✅ Better reliability (no lock failures) - ✅ Better UX (immediate feedback, progress indicators) - ✅ Better scalability (independent worker scaling) - ✅ Better observability (Celery Flower monitoring) - ✅ Better testability (isolated task testing)
Recommended next steps: 1. Review this proposal with team 2. Create implementation plan 3. Set up development environment 4. Start with Phase 1 (model creation) 5. Iterate based on testing results