Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Cache Strategies with Redisson & Exposed

Cache Strategies with Redisson & Exposed

Cache Strategies with Redisson

* Read Through
* Write Through
* Write Behind

Connect to Database with Kotlin Exposed

Avatar for Sunghyouk Bae

Sunghyouk Bae

May 12, 2025
Tweet

More Decks by Sunghyouk Bae

Other Decks in Programming

Transcript

  1. Agenda • ׮নೠ நद ੹ۚ Cache Strategies • Cache Aside,

    Read Through, Write Through, Write Behind • Cache with Redisson • RMapCache, RLocalCachedMap • Interactive with DB by Exposed
  2. Cache Aside ಁఢ গ೒ܻா੉࣌੉ ݢ੷ நदܳ ઑഥೞҊ, நदী ؘ੉ఠо হਵݶ

    DBীࢲ ੍যৡ ٍ நदী ੷੢ೠ׮. ߈ࠂ੸ੋ ੍ӝо ݆਷ ؘ੉ఠী ੸೤ 1. நद ੍ӝ 2. DB ੍ӝ 3. நद ॳӝ
  3. Read Through ಁఢ நदী ؘ੉ఠо হਸ ٸ நदо ૒੽ DBীࢲ

    ؘ੉ఠܳ ઑഥ೧ நदী ੷੢ೠ׮. গ೒ܻா੉࣌਷ ೦࢚ நद݅ ੽Ӕೠ׮. 1. நद ੍ӝ 2. DB ੍ӝ 3. நद ੍ӝ
  4. Write Through ಁఢ ؘ੉ఠܳ நद৬ DBী زदী ੷੢ೠ׮. ؘ੉ఠ ੌҙࢿ੉

    ࠁ੢غ૑݅, ॳӝ ࢿמ੉ ੷ೞؼ ࣻ ੓׮ 1. நद ॳӝ 2. DB ॳӝ
  5. Write Behind ؘ੉ఠܳ ਋ࢶ நदী ੷੢ೞҊ, ੌ੿ ઱ӝ۽ நदীࢲ DB۽

    ੌҚ ੷੢೤פ׮. ॳӝ ࢿמ੉ જ૑݅, நद ੢গ द ؘ੉ఠ ਬप ਤ೷੉ ੓׮ 1. நद ॳӝ 2. ૑োػ DB ॳӝ
  6. Write Around ؘ੉ఠܳ DBী݅ ੷੢ೞҊ, நदח јनೞ૑ ঋח׮. ੉റ ઑഥ

    द நदী ੸੤ؾפ׮. ॳӝо ࠼ߣೞ૑݅ ੍ӝח ٘ޙ ҃਋ী ੸೤ 4. நद ੍ӝ 2. DB ੍ӝ 3. நद ॳӝ 1. DB ॳӝ
  7. நद ੹ۚ߹ ੸ਊ ߑউ ੹ۚݺ ബҗ੸ੋ ࢚ട ੢੼ ױ੼ ߂

    ઱੄੼ Cache-Aside ੍ӝ ੘স੉ ݆Ҋ, ؘ੉ఠ ߸҃੉ ੸਷ ҃਋ ߧਊ੸, நद ੢গী ఍۱੸, ҳഅ੉ рױ ؘ੉ఠ ୭नࢿ ҙܻ ೙ਃ Read-Through زੌ ؘ੉ఠо ߈ࠂ੸ਵ۽ ੍൤ח ҃਋ ੍ӝ ࢿמ ӓ؀ച, ੌҙࢿ ਬ૑ ਊ੉ ୐ ਃ୒ द ૑ো, ਕ߁স ೙ਃ Write-Through ؘ੉ఠ ੌҙࢿ੉ ݒ਋ ઺ਃೠ ҃਋ ؘ੉ఠ ੌҙࢿ ࠁ੢ ॳӝ ૑ো ߊࢤ Write-Back ॳӝ ੘স੉ ݆Ҋ, ੌद੸ ؘ੉ఠ ਬपਸ хࣻೡ ࣻ ੓ח ҃਋ ॳӝ ࢿמ ӓ؀ച, DB ࠗೞ хࣗ நद ੢গ द ؘ੉ఠ ਬप ਤ೷ Write-Around ؘ੉ఠо ੗઱ ॳ੉૑݅, ੗઱ ੍൤૑ ঋח ҃਋ ࠛ೙ਃೠ நद য়৏ ߑ૑ ੍ӝ ࢿמ਷ Cache-Asideࠁ׮ ծਸ ࣻ ੓਺
  8. நद ੷੢ࣗ ਬഋ Local Cache vs Global Cache vs Near

    Cache ҳ࠙ Local Cache (۽ஸ நद) Global Cache (Ӗ۽ߥ நद) Near Cache (פয நद) ҳࢿ п ࢲߡ(ੋझఢझ) ղࠗী நदܳ م ߹ب੄ ઺ঔ நद ࢲߡ(৘: Redis, Memcached) ࢎਊ ௿ۄ੉঱౟ ژח WAS খױী ۽ஸ நद ୶о ࣘب ݒ਋ ࡅܴ (֎౟ਕ௼ ࠛ೙ਃ, ݫݽܻ ੽Ӕ) ֎౟ਕ௼ ҃ਬ۽ Local Cacheࠁ׮ וܿ Local Cache ࣻળ੄ ࡅܲ ࣘب ੌҙࢿ ࢲߡ р ؘ੉ఠ ࠛੌ஖ оמ, زӝച য۰਑ ࢲߡ р ؘ੉ఠ ੌҙࢿ ࠁ੢ Ӗ۽ߥ நद৬ زӝച ೙ਃ ഛ੢ࢿ ࢲߡ ࣻ טܾࣻ۾ நद ઺ࠂ, ݫݽܻ ࠺ബਯ ࢲߡ ࣻ ט۰ب ؘ੉ఠ ઺ࠂ ੸਺, ഛ੢ࢿ ਋ࣻ ࢲߡ ࣻ ૐо द Ӗ۽ߥ நद ࠗೞ ࠙࢑ ੢੼ - ୡҊࣘ ੽Ӕ - ֎౟ਕ௼ ੢গ ৔ೱ হ਺ - ؘ੉ఠ ҕਬ ਊ੉ - ੌҙࢿ ਬ૑ - ഛ੢ࢿ જ਺ - ࡅܲ ੽Ӕ - Ӗ۽ߥ நद ࠗೞ хࣗ ױ੼ - ؘ੉ఠ ੌҙࢿ ҙܻ য۰਑ - ݫݽܻ խ࠺ - ֎౟ਕ௼ ૑ো - ױੌ ੢গ੼ оמࢿ - زӝച ࠂ੟ - ୶о ݫݽܻ ೙ਃ ੸೤ ࢎ۹ ߸҃ ੸Ҋ ࢲߡ߹ ة݀੸ ؘ੉ఠ ৈ۞ ࢲߡо زੌ ؘ੉ఠ ҕਬ, ੌҙࢿ ઺ਃೠ ҃਋ ؀ӏݽ ࠙࢑ ജ҃ীࢲ நद ബਯ ӓ؀ച
  9. MapLoader open class EntityMapLoader<ID: Any, E: HasIdentifier<ID>>( private val loadByIdFromDB:

    (ID) -> E?, private val loadAllIdsFromDB: () -> Collection<ID>, ): MapLoader<ID, E> { companion object: KLogging() { private const val DEFAULT_QUERY_TIMEOUT = 30_000 // 30 seconds } override fun load(id: ID): E? = transaction { loadByIdFromDB(id) } override fun loadAllKeys(): Iterable<ID>? = transaction { queryTimeout = DEFAULT_QUERY_TIMEOUT loadAllIdsFromDB() } } EntityMapLoader with Exposed ExposedMapLoaders.kt
  10. MapWriter EntityMapWriter with Exposed open class EntityMapWriter<ID: Any, E: HasIdentifier<ID>>(

    private val writeToDB: (map: Map<ID, E>) -> Unit, private val deleteFromDB: (ids: Collection<ID>) -> Unit, ): MapWriter<ID, E> { override fun write(map: Map<ID, E>) = transaction { writeToDB(map) } override fun delete(keys: Collection<ID>) = transaction { deleteFromDB(keys) } } ExposedMapWriters.kt
  11. MapLoaderAsync public interface MapLoaderAsync<K, V> { CompletionStage<V> load(K key); AsyncIterator<K>

    loadAllKeys(); } public interface AsyncIterator<V> { CompletionStage<Boolean> hasNext(); CompletionStage<V> next(); } SuspendedExposedMapLoaders.kt
  12. ExposedCachedRepository interface ExposedCacheRepository<T: HasIdentifier<ID>, ID: Any> { val cacheName: String

    val entityTable: IdTable<ID> fun ResultRow.toEntity(): T val cache: RMap<ID, T?> fun exists(id: ID): Boolean = cache.containsKey(id) fun findFreshById(id: ID): T? = entityTable.selectAll().where { entityTable.id eq id }.singleOrNull()?.toEntity() fun findFreshAll(vararg ids: ID): List<T> = entityTable.selectAll().where { entityTable.id inList ids.toList() }.map { it.toEntity() } fun findFreshAll(ids: Collection<ID>): List<T> = entityTable.selectAll().where { entityTable.id inList ids }.map { it.toEntity() } fun get(id: ID): T? = cache[id] fun findAll( limit: Int? = null, offset: Long? = null, sortBy: Expression<*> = entityTable.id, sortOrder: SortOrder = SortOrder.ASC, where: SqlExpressionBuilder.() -> Op<Boolean> = { Op.TRUE }, ): List<T> fun getAll(ids: Collection<ID>, batchSize: Int = 100): List<T> fun put(entity: T) = cache.fastPut(entity.id, entity) fun putAll(entities: Collection<T>, batchSize: Int = 100) { cache.putAll(entities.associateBy { it.id }, batchSize) } fun invalidate(vararg ids: ID): Long = cache.fastRemove(*ids) fun invalidateAll() = cache.clear() fun invalidateByPattern(patterns: String, count: Int = 100): Long { val keys = cache.keySet(patterns, count) return cache.fastRemove(*keys.toVarargArray()) } }
  13. Create Cache<K,V> for Synchronous mapCache(cacheName, redissonClient) { log.info { "RMapCache

    ܳ ࢤࢿ೤פ׮. config=$config" } if (config.isReadOnly) { loader(mapLoader) } else { loader(mapLoader) mapWriter.requireNotNull("mapWriter") writer(mapWriter) writeMode(config.writeMode) } codec(config.codec) writeRetryAttempts(config.writeRetryAttempts) writeRetryInterval(config.writeRetryInterval) } localCachedMap(cacheName, redissonClient) { log.info { "RLocalCacheMap ܳ ࢤࢿ೤פ׮. config=$config" } if (config.isReadOnly) { loader(mapLoader) } else { loader(mapLoader) mapWriter.requireNotNull("mapWriter") writer(mapWriter) writeMode(config.writeMode) } codec(config.codec) syncStrategy(config.nearCacheSyncStrategy) writeRetryAttempts(config.writeRetryAttempts) writeRetryInterval(config.writeRetryInterval) timeToLive(config.ttl) if (config.nearCacheMaxIdleTime > Duration.ZERO) { maxIdle(config.nearCacheMaxIdleTime) } }
  14. SuspendedExposedCacheRepository interface SuspendedExposedCacheRepository<T: HasIdentifier<ID>, ID: Any> { val cacheName: String

    val entityTable: IdTable<ID> fun ResultRow.toEntity(): T val cache: RMap<ID, T?> suspend fun exists(id: ID): Boolean = cache.containsKeyAsync(id).coAwait() suspend fun findFreshById(id: ID): T? = entityTable.selectAll().where { entityTable.id eq id }.singleOrNull()?.toEntity() suspend fun findFreshAll(vararg ids: ID): List<T> = entityTable.selectAll().where { entityTable.id inList ids.toList() }.map { it.toEntity() } suspend fun findFreshAll(ids: Collection<ID>): List<T> = entityTable.selectAll().where { entityTable.id inList ids }.map { it.toEntity() } suspend fun get(id: ID): T? = cache.getAsync(id).coAwait() suspend fun findAll( limit: Int? = null, offset: Long? = null, sortBy: Expression<*> = entityTable.id, sortOrder: SortOrder = SortOrder.ASC, where: SqlExpressionBuilder.() -> Op<Boolean> = { Op.TRUE }, ): List<T> suspend fun getAll(ids: Collection<ID>, batchSize: Int = DefaultBatchSize): List<T> suspend fun put(entity: T) = cache.fastPutAsync(entity.id, entity).coAwait() suspend fun putAll(entities: Collection<T>, batchSize: Int = DefaultBatchSize) { cache.putAllAsync(entities.associateBy { it.id }, batchSize).coAwait() } suspend fun invalidate(vararg ids: ID): Long = cache.fastRemoveAsync(*ids).coAwait() suspend fun invalidateAll(): Boolean = cache.clearAsync().coAwait() suspend fun invalidateByPattern(patterns: String, count: Int = DefaultBatchSize): Long { val keys = cache.keySet(patterns, count) return cache.fastRemoveAsync(*keys.toTypedArray()).coAwait() } }
  15. Create Cache<K,V> for Asynchronous localCachedMap(cacheName, redissonClient) { log.info { "RLocalCAcheMap

    ܳ ࢤࢿ೤פ׮. config=$config" } if (config.isReadOnly) { loaderAsync(suspendedMapLoader) } else { loaderAsync(suspendedMapLoader) suspendedMapWriter.requireNotNull("mapWriter") writerAsync(suspendedMapWriter) writeMode(config.writeMode) } codec(config.codec) syncStrategy(config.nearCacheSyncStrategy) writeRetryAttempts(config.writeRetryAttempts) writeRetryInterval(config.writeRetryInterval) timeToLive(config.ttl) if (config.nearCacheMaxIdleTime > Duration.ZERO) { maxIdle(config.nearCacheMaxIdleTime) } } mapCache(cacheName, redissonClient) { log.info { "RMapCache ܳ ࢤࢿ೤פ׮. config=$config" } if (config.isReadOnly) { loaderAsync(suspendedMapLoader) } else { loaderAsync(suspendedMapLoader) suspendedMapWriter.requireNotNull("suspendedMapWriter") writerAsync(suspendedMapWriter) writeMode(config.writeMode) } codec(config.codec) writeRetryAttempts(config.writeRetryAttempts) writeRetryInterval(config.writeRetryInterval) }
  16. Read Through withEntityTable(testDB) { val id = getExistingId() // DBীࢲ

    ઑഥೠ ч val entityFromDB = repository.findFreshById(id) entityFromDB.shouldNotBeNull() // நदীࢲ ઑഥೠ ч val entityFromCache = repository.get(id) entityFromCache.shouldNotBeNull() entityFromCache shouldBeEqualTo entityFromDB repository.exists(id).shouldBeTrue() } withEntityTable(testDB) { // DBীࢲ ݽٚ ূ౭౭ܳ நदী ۽٘ೠ റ ߈ജೠ׮ val entities = repository.findAll() entities.shouldNotBeEmpty() entities.size shouldBeEqualTo repository.entityTable.selectAll().count().toInt() } ReadThroughScenario.kt
  17. Write Through withEntityTable(testDB) { val id = getExistingId() // நदীࢲ

    ઑഥೠ ч val entity = repository.get(id) entity.shouldNotBeNull() // நदী јनػ ч ੷੢ -> DBীب ੷੢ val updatedEntity = updateEntityEmail(entity) repository.put(updatedEntity) // நदীࢲ ઑഥೠ ч val entityFromCache = repository.get(id) entityFromCache.shouldNotBeNull() assertSameEntityWithoutUpdatedAt(entityFromCache, updatedEntity) // DBীࢲ ઑഥೠ ч val entityFromDB = repository.findFreshById(id) entityFromDB.shouldNotBeNull() assertSameEntityWithoutUpdatedAt(entityFromDB, entityFromCache) } WriteThroughScenario.kt
  18. Write Behind withEntityTable(testDB) { val entities = createNewEntities(1000) repository.putAll(entities) await

    .atMost(Duration.ofSeconds(10)) .withPollInterval(Duration.ofMillis(1000)) .until { getAllCountFromDB() >= entities.size.toLong() } // DBীࢲ ઑഥೠ ч val dbCount = getAllCountFromDB() dbCount shouldBeGreaterThan entities.size.toLong() } WriteBehindScenario.kt