Concurrency & Locking
Distributed concurrency patterns and lock management in PULSE
Concurrency & Locking
PULSE uses distributed locking to prevent concurrent execution of critical operations.
Problem: Concurrent Snapshot Generation
Scenario:
- Nightly snapshot generation scheduled at 2 AM UTC
- Multiple Worker instances worldwide receive cron trigger
- All 10+ instances start computing snapshots simultaneously
- Database overloaded with duplicate queries
Solution:
- Use KV distributed lock
- Only first instance acquires lock
- Other instances skip execution
- Lock auto-expires after 10 minutes
Distributed Lock Implementation
Acquire Lock
const SNAPSHOT_LOCK_KEY = 'snapshot:generation:lock'
const SNAPSHOT_LOCK_TTL = 600 // 10 minutes
// Check if lock exists
const lockValue = await cache.get(SNAPSHOT_LOCK_KEY)
if (lockValue) {
console.log('Lock held by another instance, skipping')
return { generated: 0, failed: 0 }
}
// Acquire lock with TTL
await cache.put(SNAPSHOT_LOCK_KEY, Date.now().toString(), {
expirationTtl: SNAPSHOT_LOCK_TTL
})
Release Lock
finally {
try {
await cache.delete(SNAPSHOT_LOCK_KEY)
} catch (e) {
console.warn('Failed to release lock:', e)
}
}
Lock Guarantees
Not guaranteed:
- Mutual exclusion (race conditions possible)
- Atomicity (lock acquire + work not atomic)
- Re-entrance protection
Guaranteed:
- Only one instance holds lock for TTL
- Lock auto-expires after TTL
- Best-effort cleanup
Why it works for snapshots:
- Idempotent operation (safe to run twice)
- Graceful degradation if multiple instances run
- Non-critical if snapshot generation fails
Other Concurrency Patterns
Rate Limiting
Per-IP rate limiting:
const limitResult = await env.RATE_LIMITER.limit({
key: `${clientIp}/auth`
})
if (!limitResult.success) {
return new Response(
JSON.stringify({ success: false, error: 'Rate limit exceeded' }),
{ status: 429 }
)
}
Database Transactions
SQLite transactions for multi-step operations:
await db.prepare('BEGIN TRANSACTION').run()
try {
await db.prepare('INSERT INTO events ...').bind(...).run()
await db.prepare('UPDATE sessions ...').bind(...).run()
await db.prepare('COMMIT').run()
} catch (error) {
await db.prepare('ROLLBACK').run()
throw error
}
Batch Processing
Combine multiple operations into single database call:
// Before: N database round-trips
for (const record of records) {
await db.prepare('INSERT INTO events ...').bind(...).run()
}
// After: 1 database round-trip
const statements = records.map(r => ({
sql: 'INSERT INTO events ...',
params: [...]
}))
await db.batch(statements)
Concurrency Challenges
Race Condition: API Key Validation
Problem:
// Old API key check
const key = await db.prepare(
'SELECT * FROM api_keys WHERE key = ?'
).bind(apiKey).first()
// 100+ events/sec = 100+ queries/sec
Solution:
// Cache in KV with 60s TTL
const cached = await env.CACHE.get(`key:${apiKey}`)
if (cached) return JSON.parse(cached)
const key = await db.prepare(...).first()
if (key) {
await env.CACHE.put(
`key:${apiKey}`,
JSON.stringify(key),
{ expirationTtl: 60 }
)
}
Result: 100x faster validation
Race Condition: Cache Invalidation
Problem:
// Order of operations matters
await db.prepare('UPDATE cohorts ...').run()
// If we crash here, cache stays stale
await cache.delete(`cohort:${cohortId}`)
// Too late, cache miss for reads
Solution:
// Invalidate cache first
await cache.delete(`cohort:${cohortId}`)
// Then update database
await db.prepare('UPDATE cohorts ...').run()
// If crash: cache empty (safe - live query)
// If success: both consistent
Race Condition: Job Status Tracking
Problem:
// Job could be cancelled while processing
const job = await repo.findById(jobId)
if (job.status === 'cancelled') return
// Process...
await repo.update(jobId, { status: 'completed' })
// Could overwrite 'cancelled' status
Solution:
// Use conditional update
const result = await db.prepare(
'UPDATE import_jobs SET status = ? WHERE id = ? AND status NOT IN (?)'
).bind('completed', jobId, 'cancelled').run()
if (result.changes === 0) {
// Job was already cancelled or deleted
return
}
Best Practices
1. Idempotency
Make operations safe to run multiple times:
// Good: Idempotent
INSERT INTO events (...) ON CONFLICT(event_id) DO UPDATE SET ...
// Bad: Not idempotent
INSERT INTO events (...) -- Fails if event already exists
2. Graceful Degradation
Fail safely, not catastrophically:
// Snapshot generation with graceful failure
const generationResult = await generateRetentionSnapshots(db, cache)
// Returns { generated: 10, failed: 2 }
// Service continues even if some snapshots fail
3. Timeout Protection
Prevent long-running operations from blocking:
// Snapshot generation
if (hour === 2 && minute === 0) {
// Will complete within 5 minutes
// Workers timeout: 30 seconds for free tier
}
4. Atomic Updates
Use database-level atomicity:
// Atomic: All or nothing
await db.prepare(`
UPDATE import_jobs
SET processed_records = processed_records + ?,
status = CASE WHEN processed_records >= total_records THEN 'completed' ELSE 'processing' END
WHERE id = ?
`).bind(processedCount, jobId).run()
Distributed Lock Alternatives
KV Lease Pattern
// Acquire "lease" with short expiration
const leaseId = crypto.randomUUID()
const acquired = await cache.put(
`lock:${resource}`,
leaseId,
{ expirationTtl: 5 }
)
// Check if we still hold lease
const current = await cache.get(`lock:${resource}`)
if (current !== leaseId) {
// Lost lease (timeout)
return
}
Durable Object Serialization
// Durable Objects serialize requests
// Ensures sequential processing
export default class Lock {
async lock(resource) {
// Only one lock can be acquired at a time
// Sequential, not concurrent
}
}
Performance Impact
| Operation | Without Lock | With Lock | Overhead |
|---|---|---|---|
| Snapshot Gen | 100 parallel runs | 1 run | -99% |
| Cache Hit | <10ms | <10ms | 0% |
| Cache Miss | <100ms | <100ms | 0% |
| Auth (cached) | <1ms | <1ms | 0% |
Monitoring Concurrency Issues
Watch for:
// Log when lock is contested
const lockValue = await cache.get(SNAPSHOT_LOCK_KEY)
if (lockValue) {
console.warn('[Concurrency] Lock contested at ' + new Date())
}
// Alert if lock TTL expires mid-operation
setTimeout(() => {
console.error('[Concurrency] Lock timeout detected')
}, SNAPSHOT_LOCK_TTL * 1000)
Next Steps
- Performance — Optimization techniques
- Security — Security architecture
- Components — Component details
Last updated: April 3, 2026