Merge pull request #1595 from escapade-mckv/billing-improvements-clean

chore: add idempotency to webhook and credit systems
This commit is contained in:
Bobbie 2025-09-10 00:23:29 +05:30 committed by GitHub
commit 94bf9d8988
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 80 additions and 42 deletions

View File

@ -20,6 +20,25 @@ class CreditManager:
type: Optional[str] = None
) -> Dict:
client = await self.db.client
amount = Decimal(str(amount))
recent_window = datetime.now(timezone.utc) - timedelta(seconds=20)
recent_entries = await client.from_('credit_ledger').select(
'id, created_at, amount, description'
).eq('account_id', account_id).eq('amount', float(amount)).eq(
'description', description
).gte('created_at', recent_window.isoformat()).execute()
if recent_entries.data:
logger.warning(f"[IDEMPOTENCY] Potential duplicate credit add detected for {account_id}: "
f"amount={amount}, description='{description}', "
f"found {len(recent_entries.data)} similar entries in last 20 seconds")
return {
'success': True,
'message': 'Credit already added (duplicate prevented)',
'amount': float(amount),
'duplicate_prevented': True
}
result = await client.from_('credit_accounts').select(
'expiring_credits, non_expiring_credits, balance, tier'

View File

@ -12,6 +12,7 @@ from .config import (
TIERS,
TRIAL_DURATION_DAYS,
TRIAL_CREDITS,
get_tier_by_name
)
from .credit_manager import credit_manager
@ -425,50 +426,54 @@ class SubscriptionService:
'credits': float(new_tier_info.monthly_credits)
}
if subscription.status == 'trialing' and subscription.get('trial_end'):
await self._handle_trial_subscription(subscription, account_id, new_tier, client)
return
billing_anchor = datetime.fromtimestamp(subscription['current_period_start'], tz=timezone.utc)
next_grant_date = datetime.fromtimestamp(subscription['current_period_end'], tz=timezone.utc)
account_result = await client.from_('credit_accounts').select('tier, billing_cycle_anchor, stripe_subscription_id, trial_status, trial_started_at').eq('account_id', account_id).execute()
current_account = await client.from_('credit_accounts').select(
'tier, stripe_subscription_id, last_grant_date, billing_cycle_anchor'
).eq('account_id', account_id).execute()
if not account_result.data or len(account_result.data) == 0:
logger.info(f"User {account_id} has no credit account, creating none tier account first")
await client.from_('credit_accounts').insert({
'account_id': account_id,
'balance': 0,
'tier': 'none'
}).execute()
account_result = await client.from_('credit_accounts').select('tier, billing_cycle_anchor, stripe_subscription_id, trial_status, trial_started_at').eq('account_id', account_id).execute()
if account_result.data and len(account_result.data) > 0:
account_data = account_result.data[0]
current_tier_name = account_data.get('tier')
existing_anchor = account_data.get('billing_cycle_anchor')
old_subscription_id = account_data.get('stripe_subscription_id')
trial_status = account_data.get('trial_status')
trial_started_at = account_data.get('trial_started_at')
if subscription.status == 'trialing':
await self._handle_trial_subscription(subscription, account_id, new_tier, client)
return
elif subscription.status == 'active' and trial_status == 'active':
await client.rpc('handle_trial_end', {
'p_account_id': account_id,
'p_converted': True,
'p_new_tier': new_tier['name']
}).execute()
logger.info(f"Converted trial to paid subscription for user {account_id}")
current_tier_info = TIERS.get(current_tier_name)
current_tier = None
if current_tier_info:
current_tier = {
'name': current_tier_info.name,
'credits': float(current_tier_info.monthly_credits)
}
if current_account.data:
existing_data = current_account.data[0]
current_tier_name = existing_data.get('tier')
old_subscription_id = existing_data.get('stripe_subscription_id')
last_grant_date = existing_data.get('last_grant_date')
existing_anchor = existing_data.get('billing_cycle_anchor')
if last_grant_date and existing_anchor:
try:
last_grant_dt = datetime.fromisoformat(last_grant_date.replace('Z', '+00:00'))
existing_anchor_dt = datetime.fromisoformat(existing_anchor.replace('Z', '+00:00'))
if (abs((billing_anchor - last_grant_dt).total_seconds()) < 60 and
current_tier_name == new_tier['name'] and
old_subscription_id == subscription['id']):
logger.info(f"[IDEMPOTENCY] Skipping duplicate credit grant for {account_id} - "
f"already processed at {last_grant_date}")
return
except Exception as e:
logger.warning(f"Error parsing dates for idempotency check: {e}")
current_tier = get_tier_by_name(current_tier_name) if current_tier_name else None
current_tier = {
'name': current_tier.name,
'credits': float(current_tier.monthly_credits)
} if current_tier else {
'name': 'none',
'credits': 0
}
should_grant_credits = self._should_grant_credits(current_tier_name, current_tier, new_tier, subscription, old_subscription_id)
if should_grant_credits:
await client.from_('credit_accounts').update({
'last_grant_date': billing_anchor.isoformat()
}).eq('account_id', account_id).execute()
await self._grant_subscription_credits(account_id, new_tier, billing_anchor)
else:
logger.info(f"No credits granted - not an upgrade scenario")

View File

@ -634,14 +634,17 @@ class WebhookService:
client = await db.client
subscription_id = invoice.get('subscription')
if not subscription_id:
invoice_id = invoice.get('id')
if not subscription_id or not invoice_id:
logger.warning(f"Invoice missing subscription or ID: {invoice}")
return
period_start = invoice.get('period_start')
period_end = invoice.get('period_end')
if not period_start or not period_end:
logger.warning(f"Invoice missing period information: {invoice.get('id')}")
logger.warning(f"Invoice missing period information: {invoice_id}")
return
customer_result = await client.schema('basejump').from_('billing_customers')\
@ -655,7 +658,7 @@ class WebhookService:
account_id = customer_result.data[0]['account_id']
account_result = await client.from_('credit_accounts')\
.select('tier, last_grant_date, next_credit_grant, billing_cycle_anchor')\
.select('tier, last_grant_date, next_credit_grant, billing_cycle_anchor, last_processed_invoice_id')\
.eq('account_id', account_id)\
.execute()
@ -666,11 +669,21 @@ class WebhookService:
tier = account['tier']
period_start_dt = datetime.fromtimestamp(period_start, tz=timezone.utc)
# Primary idempotency check: Have we already processed this invoice?
if account.get('last_processed_invoice_id') == invoice_id:
logger.info(f"[IDEMPOTENCY] Invoice {invoice_id} already processed for account {account_id}")
return
# Secondary idempotency check: Skip if we already processed this renewal period
if account.get('last_grant_date'):
last_grant = datetime.fromisoformat(account['last_grant_date'].replace('Z', '+00:00'))
if period_start_dt <= last_grant:
logger.info(f"Skipping renewal for user {account_id} - already processed")
# Check if this renewal was already processed (within 24 hours window)
time_diff = abs((period_start_dt - last_grant).total_seconds())
if time_diff < 86400: # 24 hours in seconds
logger.info(f"[IDEMPOTENCY] Skipping renewal for user {account_id} - "
f"already processed at {account['last_grant_date']} "
f"(current period_start: {period_start_dt.isoformat()}, diff: {time_diff}s)")
return
monthly_credits = get_monthly_credits(tier)
@ -698,7 +711,8 @@ class WebhookService:
await client.from_('credit_accounts').update({
'last_grant_date': period_start_dt.isoformat(),
'next_credit_grant': next_grant.isoformat()
'next_credit_grant': next_grant.isoformat(),
'last_processed_invoice_id': invoice_id
}).eq('account_id', account_id).execute()
final_state = await client.from_('credit_accounts').select(