yield* db.transaction(E.fn(function* (tx) {
// Insert into main DB and Cache
yield* tx((client) => client.insert(DbSchema.MarketListing).values(insertEntries)).pipe(
E.when(() => insertEntries.length !== 0),
E.tap(() => E.logInfo(`Inserted ${insertEntries.length} entries into main db`)),
E.catchAll((e) => E.logError(`Error inserting new Listings into DB ${e}`))); // Error Thrown and propagates even though catched
const cacheInsertEntries = yield* decoder(insertEntries.map((entry) => ({ ...entry, itemType, itemId })));
yield* cacheDb.execute((client) => client.insert(CacheDbSchema.HistoryCache).values(cacheInsertEntries)).pipe(
E.when(() => cacheInsertEntries.length !== 0),
E.tap(() => E.logInfo( `Inserted ${cacheInsertEntries.length} entries into cache`)));
// Update main DB and Cache
yield* tx((client) => client.update(DbSchema.MarketListing).set(updateEntries))
.pipe(E.when(() => updateEntries.length !== 0),
E.tap(() =>E.logInfo(`Updated ${updateEntries.length} entries in main db`)));
yield* cacheDb.transaction(E.fn(function* (tx) {
const cacheUpdateEntries = yield* decoder(updateEntries.map((entry) => ({ ...entry, itemType, itemId })));
yield* E.all(cacheUpdateEntries.map((cacheEntry) =>
tx((client) => client.update(CacheDbSchema.HistoryCache).set(cacheEntry).where(
eq(CacheDbSchema.HistoryCache.listingId, cacheEntry.listingId)))),
).pipe(E.when(() => cacheUpdateEntries.length !== 0),
E.tap(() => E.logInfo(`Updated ${cacheUpdateEntries.length} entries in cache`)))
}))
}))
})
yield* db.transaction(E.fn(function* (tx) {
// Insert into main DB and Cache
yield* tx((client) => client.insert(DbSchema.MarketListing).values(insertEntries)).pipe(
E.when(() => insertEntries.length !== 0),
E.tap(() => E.logInfo(`Inserted ${insertEntries.length} entries into main db`)),
E.catchAll((e) => E.logError(`Error inserting new Listings into DB ${e}`))); // Error Thrown and propagates even though catched
const cacheInsertEntries = yield* decoder(insertEntries.map((entry) => ({ ...entry, itemType, itemId })));
yield* cacheDb.execute((client) => client.insert(CacheDbSchema.HistoryCache).values(cacheInsertEntries)).pipe(
E.when(() => cacheInsertEntries.length !== 0),
E.tap(() => E.logInfo( `Inserted ${cacheInsertEntries.length} entries into cache`)));
// Update main DB and Cache
yield* tx((client) => client.update(DbSchema.MarketListing).set(updateEntries))
.pipe(E.when(() => updateEntries.length !== 0),
E.tap(() =>E.logInfo(`Updated ${updateEntries.length} entries in main db`)));
yield* cacheDb.transaction(E.fn(function* (tx) {
const cacheUpdateEntries = yield* decoder(updateEntries.map((entry) => ({ ...entry, itemType, itemId })));
yield* E.all(cacheUpdateEntries.map((cacheEntry) =>
tx((client) => client.update(CacheDbSchema.HistoryCache).set(cacheEntry).where(
eq(CacheDbSchema.HistoryCache.listingId, cacheEntry.listingId)))),
).pipe(E.when(() => cacheUpdateEntries.length !== 0),
E.tap(() => E.logInfo(`Updated ${cacheUpdateEntries.length} entries in cache`)))
}))
}))
})