mirror of https://github.com/kortix-ai/suna.git
chore: add idempotency to webhook and credit systems
This commit is contained in:
parent
8de9c03471
commit
a9779dd034
|
@ -20,6 +20,25 @@ class CreditManager:
|
|||
type: Optional[str] = None
|
||||
) -> Dict:
|
||||
client = await self.db.client
|
||||
amount = Decimal(str(amount))
|
||||
|
||||
recent_window = datetime.now(timezone.utc) - timedelta(seconds=20)
|
||||
recent_entries = await client.from_('credit_ledger').select(
|
||||
'id, created_at, amount, description'
|
||||
).eq('account_id', account_id).eq('amount', float(amount)).eq(
|
||||
'description', description
|
||||
).gte('created_at', recent_window.isoformat()).execute()
|
||||
|
||||
if recent_entries.data:
|
||||
logger.warning(f"[IDEMPOTENCY] Potential duplicate credit add detected for {account_id}: "
|
||||
f"amount={amount}, description='{description}', "
|
||||
f"found {len(recent_entries.data)} similar entries in last 20 seconds")
|
||||
return {
|
||||
'success': True,
|
||||
'message': 'Credit already added (duplicate prevented)',
|
||||
'amount': float(amount),
|
||||
'duplicate_prevented': True
|
||||
}
|
||||
|
||||
result = await client.from_('credit_accounts').select(
|
||||
'expiring_credits, non_expiring_credits, balance, tier'
|
||||
|
|
|
@ -12,6 +12,7 @@ from .config import (
|
|||
TIERS,
|
||||
TRIAL_DURATION_DAYS,
|
||||
TRIAL_CREDITS,
|
||||
get_tier_by_name
|
||||
)
|
||||
from .credit_manager import credit_manager
|
||||
|
||||
|
@ -425,50 +426,54 @@ class SubscriptionService:
|
|||
'credits': float(new_tier_info.monthly_credits)
|
||||
}
|
||||
|
||||
if subscription.status == 'trialing' and subscription.get('trial_end'):
|
||||
await self._handle_trial_subscription(subscription, account_id, new_tier, client)
|
||||
return
|
||||
|
||||
billing_anchor = datetime.fromtimestamp(subscription['current_period_start'], tz=timezone.utc)
|
||||
next_grant_date = datetime.fromtimestamp(subscription['current_period_end'], tz=timezone.utc)
|
||||
|
||||
account_result = await client.from_('credit_accounts').select('tier, billing_cycle_anchor, stripe_subscription_id, trial_status, trial_started_at').eq('account_id', account_id).execute()
|
||||
current_account = await client.from_('credit_accounts').select(
|
||||
'tier, stripe_subscription_id, last_grant_date, billing_cycle_anchor'
|
||||
).eq('account_id', account_id).execute()
|
||||
|
||||
if not account_result.data or len(account_result.data) == 0:
|
||||
logger.info(f"User {account_id} has no credit account, creating none tier account first")
|
||||
await client.from_('credit_accounts').insert({
|
||||
'account_id': account_id,
|
||||
'balance': 0,
|
||||
'tier': 'none'
|
||||
}).execute()
|
||||
account_result = await client.from_('credit_accounts').select('tier, billing_cycle_anchor, stripe_subscription_id, trial_status, trial_started_at').eq('account_id', account_id).execute()
|
||||
|
||||
if account_result.data and len(account_result.data) > 0:
|
||||
account_data = account_result.data[0]
|
||||
current_tier_name = account_data.get('tier')
|
||||
existing_anchor = account_data.get('billing_cycle_anchor')
|
||||
old_subscription_id = account_data.get('stripe_subscription_id')
|
||||
trial_status = account_data.get('trial_status')
|
||||
trial_started_at = account_data.get('trial_started_at')
|
||||
|
||||
if subscription.status == 'trialing':
|
||||
await self._handle_trial_subscription(subscription, account_id, new_tier, client)
|
||||
return
|
||||
elif subscription.status == 'active' and trial_status == 'active':
|
||||
await client.rpc('handle_trial_end', {
|
||||
'p_account_id': account_id,
|
||||
'p_converted': True,
|
||||
'p_new_tier': new_tier['name']
|
||||
}).execute()
|
||||
logger.info(f"Converted trial to paid subscription for user {account_id}")
|
||||
|
||||
current_tier_info = TIERS.get(current_tier_name)
|
||||
current_tier = None
|
||||
if current_tier_info:
|
||||
current_tier = {
|
||||
'name': current_tier_info.name,
|
||||
'credits': float(current_tier_info.monthly_credits)
|
||||
}
|
||||
if current_account.data:
|
||||
existing_data = current_account.data[0]
|
||||
current_tier_name = existing_data.get('tier')
|
||||
old_subscription_id = existing_data.get('stripe_subscription_id')
|
||||
last_grant_date = existing_data.get('last_grant_date')
|
||||
existing_anchor = existing_data.get('billing_cycle_anchor')
|
||||
if last_grant_date and existing_anchor:
|
||||
try:
|
||||
last_grant_dt = datetime.fromisoformat(last_grant_date.replace('Z', '+00:00'))
|
||||
existing_anchor_dt = datetime.fromisoformat(existing_anchor.replace('Z', '+00:00'))
|
||||
|
||||
if (abs((billing_anchor - last_grant_dt).total_seconds()) < 60 and
|
||||
current_tier_name == new_tier['name'] and
|
||||
old_subscription_id == subscription['id']):
|
||||
logger.info(f"[IDEMPOTENCY] Skipping duplicate credit grant for {account_id} - "
|
||||
f"already processed at {last_grant_date}")
|
||||
return
|
||||
except Exception as e:
|
||||
logger.warning(f"Error parsing dates for idempotency check: {e}")
|
||||
|
||||
current_tier = get_tier_by_name(current_tier_name) if current_tier_name else None
|
||||
|
||||
current_tier = {
|
||||
'name': current_tier.name,
|
||||
'credits': float(current_tier.monthly_credits)
|
||||
} if current_tier else {
|
||||
'name': 'none',
|
||||
'credits': 0
|
||||
}
|
||||
|
||||
should_grant_credits = self._should_grant_credits(current_tier_name, current_tier, new_tier, subscription, old_subscription_id)
|
||||
|
||||
if should_grant_credits:
|
||||
await client.from_('credit_accounts').update({
|
||||
'last_grant_date': billing_anchor.isoformat()
|
||||
}).eq('account_id', account_id).execute()
|
||||
|
||||
await self._grant_subscription_credits(account_id, new_tier, billing_anchor)
|
||||
else:
|
||||
logger.info(f"No credits granted - not an upgrade scenario")
|
||||
|
|
|
@ -634,14 +634,17 @@ class WebhookService:
|
|||
client = await db.client
|
||||
|
||||
subscription_id = invoice.get('subscription')
|
||||
if not subscription_id:
|
||||
invoice_id = invoice.get('id')
|
||||
|
||||
if not subscription_id or not invoice_id:
|
||||
logger.warning(f"Invoice missing subscription or ID: {invoice}")
|
||||
return
|
||||
|
||||
period_start = invoice.get('period_start')
|
||||
period_end = invoice.get('period_end')
|
||||
|
||||
if not period_start or not period_end:
|
||||
logger.warning(f"Invoice missing period information: {invoice.get('id')}")
|
||||
logger.warning(f"Invoice missing period information: {invoice_id}")
|
||||
return
|
||||
|
||||
customer_result = await client.schema('basejump').from_('billing_customers')\
|
||||
|
@ -655,7 +658,7 @@ class WebhookService:
|
|||
account_id = customer_result.data[0]['account_id']
|
||||
|
||||
account_result = await client.from_('credit_accounts')\
|
||||
.select('tier, last_grant_date, next_credit_grant, billing_cycle_anchor')\
|
||||
.select('tier, last_grant_date, next_credit_grant, billing_cycle_anchor, last_processed_invoice_id')\
|
||||
.eq('account_id', account_id)\
|
||||
.execute()
|
||||
|
||||
|
@ -666,11 +669,21 @@ class WebhookService:
|
|||
tier = account['tier']
|
||||
period_start_dt = datetime.fromtimestamp(period_start, tz=timezone.utc)
|
||||
|
||||
# Primary idempotency check: Have we already processed this invoice?
|
||||
if account.get('last_processed_invoice_id') == invoice_id:
|
||||
logger.info(f"[IDEMPOTENCY] Invoice {invoice_id} already processed for account {account_id}")
|
||||
return
|
||||
|
||||
# Secondary idempotency check: Skip if we already processed this renewal period
|
||||
if account.get('last_grant_date'):
|
||||
last_grant = datetime.fromisoformat(account['last_grant_date'].replace('Z', '+00:00'))
|
||||
|
||||
if period_start_dt <= last_grant:
|
||||
logger.info(f"Skipping renewal for user {account_id} - already processed")
|
||||
# Check if this renewal was already processed (within 24 hours window)
|
||||
time_diff = abs((period_start_dt - last_grant).total_seconds())
|
||||
if time_diff < 86400: # 24 hours in seconds
|
||||
logger.info(f"[IDEMPOTENCY] Skipping renewal for user {account_id} - "
|
||||
f"already processed at {account['last_grant_date']} "
|
||||
f"(current period_start: {period_start_dt.isoformat()}, diff: {time_diff}s)")
|
||||
return
|
||||
|
||||
monthly_credits = get_monthly_credits(tier)
|
||||
|
@ -698,7 +711,8 @@ class WebhookService:
|
|||
|
||||
await client.from_('credit_accounts').update({
|
||||
'last_grant_date': period_start_dt.isoformat(),
|
||||
'next_credit_grant': next_grant.isoformat()
|
||||
'next_credit_grant': next_grant.isoformat(),
|
||||
'last_processed_invoice_id': invoice_id
|
||||
}).eq('account_id', account_id).execute()
|
||||
|
||||
final_state = await client.from_('credit_accounts').select(
|
||||
|
|
Loading…
Reference in New Issue