Using AsyncLocalStorage to Propagate Transaction Context

I'm trying to implement a pattern where I can use AsyncLocalStorage to automatically propagate a transaction context without having to explicitly pass a transaction object throughout my codebase. The goal is to be able to write code like this:
const updateUsername = async (username: string) => {
await withTransaction(async () => {
const user = await db.selectFrom('users')
.where('id', '=', userId)
.select(['id', 'isVerified'])
.executeTakeFirstOrThrow();

if (user.isVerified) {
await db.updateTable('users')
.where('id', '=', userId)
.set({ username })
.execute();

await logUserActivity('username_changed');
}
});
}

const logUserActivity = async (activity: string) => {
await db.insertInto('activityLog')
.values({
activityType: activity,
timestamp: new Date(),
})
.execute();

await db.updateTable('users')
.set({ lastActivityAt: new Date() })
.where('id', '=', currentUserId())
.execute();
}
const updateUsername = async (username: string) => {
await withTransaction(async () => {
const user = await db.selectFrom('users')
.where('id', '=', userId)
.select(['id', 'isVerified'])
.executeTakeFirstOrThrow();

if (user.isVerified) {
await db.updateTable('users')
.where('id', '=', userId)
.set({ username })
.execute();

await logUserActivity('username_changed');
}
});
}

const logUserActivity = async (activity: string) => {
await db.insertInto('activityLog')
.values({
activityType: activity,
timestamp: new Date(),
})
.execute();

await db.updateTable('users')
.set({ lastActivityAt: new Date() })
.where('id', '=', currentUserId())
.execute();
}
Where any query made using db inside the withTransaction callback would automatically use the transaction created by withTransaction, even across function boundaries and imported modules. I've attempted to implement this by creating a custom dialect wrapper along these lines, but I'm running into initialization issues and type problems. Does anyone know of an example implementation/recipe for this I can use, or have any pointers? (I know this was intentionally dropped from Kysely as a native feature, and I'm aware of the risks) Any guidance would be greatly appreciated!
Solution:
For option 2, something like: with-transaction.ts: ```ts...
Jump to solution
5 Replies
Igal
Igal2w ago
Hey 👋 There are other downsides, e.g. testability - it's less straight forward then IoC.
lorentz
lorentzOP2w ago
yeah, alright 😦 thanks for responding, and thank you for an amazing library. our broader problem is that we accumulated a substantial collection of queries that we combine in various ways as subqueries, which works really really well so far. it's working better than any other db lib ive worked with ⭐ the only issue now is that we have an increasing need to wrap queries in transactions, and the code change to transform all queries to functions taking an optional transaction would be significant. so far those are the only two options i've thought of: 1. wrapping in functions with optional transaction argument 2. use AsyncLocalStorage are there any other approaches to this problem im not thinking of?
Igal
Igal2w ago
It shouldn't be optional.. it should accept a Kysely instance, which then also accepts Transaction and ControlledTransaction since they're extending Kysely. No branching points. Unware of transaction or not. Easier to test in isolation. "Increasing" sounds like an incremental change. I would slowly convert what's needed. If something is affecting too many irrelevant things for the current needs, I'd duplicate it and tag the old version as tech debt. Write new things with the new style only. Don't unit test these parts, prefer integration tests - if it's PostgreSQL, maybe PGLite could speed things up. That's for option 1.
Solution
Igal
Igal2w ago
For option 2, something like: with-transaction.ts:
import type { Kysely } from 'kysely'
import { AsyncLocalStorage } from 'node:async_hooks'

const asyncLocalStorage = new AsyncLocalStorage()

async function withTransaction<T>(db: Kysely<DB>, cb: (trx: Transaction<DB>) => Promise<T>): Promise<T> {
return await db.transaction().execute((trx) => asyncLocalStorage.run(trx, () => cb(trx)))
}

function useTransaction(): Transaction<DB> {
return asyncLocalStorage.getStore()
}
import type { Kysely } from 'kysely'
import { AsyncLocalStorage } from 'node:async_hooks'

const asyncLocalStorage = new AsyncLocalStorage()

async function withTransaction<T>(db: Kysely<DB>, cb: (trx: Transaction<DB>) => Promise<T>): Promise<T> {
return await db.transaction().execute((trx) => asyncLocalStorage.run(trx, () => cb(trx)))
}

function useTransaction(): Transaction<DB> {
return asyncLocalStorage.getStore()
}
update-username.ts:
import { getDB } from 'path/to/get-db.ts'
import { useTransaction, withTransaction } from 'path/to/with-transaction.ts'

const updateUsername = async (username: string) => {
await withTransaction(getDB(), async (trx) => {
const user = await trx.selectFrom('users')
.where('id', '=', userId)
.select(['id', 'isVerified'])
.executeTakeFirstOrThrow();

if (user.isVerified) {
await trx.updateTable('users')
.where('id', '=', userId)
.set({ username })
.execute();

await logUserActivity('username_changed');
}
});
}

const logUserActivity = async (activity: string) => {
const trx = useTransaction()

await trx.insertInto('activityLog')
.values({
activityType: activity,
timestamp: new Date(),
})
.execute();

await trx.updateTable('users')
.set({ lastActivityAt: new Date() })
.where('id', '=', currentUserId())
.execute();
}
import { getDB } from 'path/to/get-db.ts'
import { useTransaction, withTransaction } from 'path/to/with-transaction.ts'

const updateUsername = async (username: string) => {
await withTransaction(getDB(), async (trx) => {
const user = await trx.selectFrom('users')
.where('id', '=', userId)
.select(['id', 'isVerified'])
.executeTakeFirstOrThrow();

if (user.isVerified) {
await trx.updateTable('users')
.where('id', '=', userId)
.set({ username })
.execute();

await logUserActivity('username_changed');
}
});
}

const logUserActivity = async (activity: string) => {
const trx = useTransaction()

await trx.insertInto('activityLog')
.values({
activityType: activity,
timestamp: new Date(),
})
.execute();

await trx.updateTable('users')
.set({ lastActivityAt: new Date() })
.where('id', '=', currentUserId())
.execute();
}
lorentz
lorentzOP7d ago
thank you! i will explore these

Did you find this page helpful?