Concurrency & Locking

Distributed concurrency patterns and lock management in PULSE

Concurrency & Locking

PULSE uses distributed locking to prevent concurrent execution of critical operations.

Problem: Concurrent Snapshot Generation

Scenario:

Solution:

Distributed Lock Implementation

Acquire Lock

const SNAPSHOT_LOCK_KEY = 'snapshot:generation:lock'
const SNAPSHOT_LOCK_TTL = 600 // 10 minutes

// Check if lock exists
const lockValue = await cache.get(SNAPSHOT_LOCK_KEY)
if (lockValue) {
  console.log('Lock held by another instance, skipping')
  return { generated: 0, failed: 0 }
}

// Acquire lock with TTL
await cache.put(SNAPSHOT_LOCK_KEY, Date.now().toString(), {
  expirationTtl: SNAPSHOT_LOCK_TTL
})

Release Lock

finally {
  try {
    await cache.delete(SNAPSHOT_LOCK_KEY)
  } catch (e) {
    console.warn('Failed to release lock:', e)
  }
}

Lock Guarantees

Not guaranteed:

Guaranteed:

Why it works for snapshots:

Other Concurrency Patterns

Rate Limiting

Per-IP rate limiting:

const limitResult = await env.RATE_LIMITER.limit({
  key: `${clientIp}/auth`
})

if (!limitResult.success) {
  return new Response(
    JSON.stringify({ success: false, error: 'Rate limit exceeded' }),
    { status: 429 }
  )
}

Database Transactions

SQLite transactions for multi-step operations:

await db.prepare('BEGIN TRANSACTION').run()
try {
  await db.prepare('INSERT INTO events ...').bind(...).run()
  await db.prepare('UPDATE sessions ...').bind(...).run()
  await db.prepare('COMMIT').run()
} catch (error) {
  await db.prepare('ROLLBACK').run()
  throw error
}

Batch Processing

Combine multiple operations into single database call:

// Before: N database round-trips
for (const record of records) {
  await db.prepare('INSERT INTO events ...').bind(...).run()
}

// After: 1 database round-trip
const statements = records.map(r => ({
  sql: 'INSERT INTO events ...',
  params: [...]
}))
await db.batch(statements)

Concurrency Challenges

Race Condition: API Key Validation

Problem:

// Old API key check
const key = await db.prepare(
  'SELECT * FROM api_keys WHERE key = ?'
).bind(apiKey).first()
// 100+ events/sec = 100+ queries/sec

Solution:

// Cache in KV with 60s TTL
const cached = await env.CACHE.get(`key:${apiKey}`)
if (cached) return JSON.parse(cached)

const key = await db.prepare(...).first()
if (key) {
  await env.CACHE.put(
    `key:${apiKey}`,
    JSON.stringify(key),
    { expirationTtl: 60 }
  )
}

Result: 100x faster validation

Race Condition: Cache Invalidation

Problem:

// Order of operations matters
await db.prepare('UPDATE cohorts ...').run()
// If we crash here, cache stays stale

await cache.delete(`cohort:${cohortId}`)
// Too late, cache miss for reads

Solution:

// Invalidate cache first
await cache.delete(`cohort:${cohortId}`)

// Then update database
await db.prepare('UPDATE cohorts ...').run()

// If crash: cache empty (safe - live query)
// If success: both consistent

Race Condition: Job Status Tracking

Problem:

// Job could be cancelled while processing
const job = await repo.findById(jobId)
if (job.status === 'cancelled') return

// Process...
await repo.update(jobId, { status: 'completed' })
// Could overwrite 'cancelled' status

Solution:

// Use conditional update
const result = await db.prepare(
  'UPDATE import_jobs SET status = ? WHERE id = ? AND status NOT IN (?)'
).bind('completed', jobId, 'cancelled').run()

if (result.changes === 0) {
  // Job was already cancelled or deleted
  return
}

Best Practices

1. Idempotency

Make operations safe to run multiple times:

// Good: Idempotent
INSERT INTO events (...) ON CONFLICT(event_id) DO UPDATE SET ...

// Bad: Not idempotent
INSERT INTO events (...)  -- Fails if event already exists

2. Graceful Degradation

Fail safely, not catastrophically:

// Snapshot generation with graceful failure
const generationResult = await generateRetentionSnapshots(db, cache)
// Returns { generated: 10, failed: 2 }
// Service continues even if some snapshots fail

3. Timeout Protection

Prevent long-running operations from blocking:

// Snapshot generation
if (hour === 2 && minute === 0) {
  // Will complete within 5 minutes
  // Workers timeout: 30 seconds for free tier
}

4. Atomic Updates

Use database-level atomicity:

// Atomic: All or nothing
await db.prepare(`
  UPDATE import_jobs
  SET processed_records = processed_records + ?,
      status = CASE WHEN processed_records >= total_records THEN 'completed' ELSE 'processing' END
  WHERE id = ?
`).bind(processedCount, jobId).run()

Distributed Lock Alternatives

KV Lease Pattern

// Acquire "lease" with short expiration
const leaseId = crypto.randomUUID()
const acquired = await cache.put(
  `lock:${resource}`,
  leaseId,
  { expirationTtl: 5 }
)

// Check if we still hold lease
const current = await cache.get(`lock:${resource}`)
if (current !== leaseId) {
  // Lost lease (timeout)
  return
}

Durable Object Serialization

// Durable Objects serialize requests
// Ensures sequential processing

export default class Lock {
  async lock(resource) {
    // Only one lock can be acquired at a time
    // Sequential, not concurrent
  }
}

Performance Impact

OperationWithout LockWith LockOverhead
Snapshot Gen100 parallel runs1 run-99%
Cache Hit<10ms<10ms0%
Cache Miss<100ms<100ms0%
Auth (cached)<1ms<1ms0%

Monitoring Concurrency Issues

Watch for:

// Log when lock is contested
const lockValue = await cache.get(SNAPSHOT_LOCK_KEY)
if (lockValue) {
  console.warn('[Concurrency] Lock contested at ' + new Date())
}

// Alert if lock TTL expires mid-operation
setTimeout(() => {
  console.error('[Concurrency] Lock timeout detected')
}, SNAPSHOT_LOCK_TTL * 1000)

Next Steps

Last updated: April 3, 2026