Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Cache Strategies with Redisson & Exposed

Cache Strategies with Redisson & Exposed

Cache Strategies with Redisson

* Read Through
* Write Through
* Write Behind

Connect to Database with Kotlin Exposed

Avatar for Sunghyouk Bae (Debop)

Sunghyouk Bae (Debop)

May 12, 2025
Tweet

More Decks by Sunghyouk Bae (Debop)

Other Decks in Programming

Transcript

  1. Agenda • ׮নೠ நद ੹ۚ Cache Strategies • Cache Aside,

    Read Through, Write Through, Write Behind • Cache with Redisson • RMapCache, RLocalCachedMap • Interactive with DB by Exposed
  2. Cache Aside ಁఢ গ೒ܻா੉࣌੉ ݢ੷ நदܳ ઑഥೞҊ, நदী ؘ੉ఠо হਵݶ

    DBীࢲ ੍যৡ ٍ நदী ੷੢ೠ׮. ߈ࠂ੸ੋ ੍ӝо ݆਷ ؘ੉ఠী ੸೤ 1. நद ੍ӝ 2. DB ੍ӝ 3. நद ॳӝ
  3. Read Through ಁఢ நदী ؘ੉ఠо হਸ ٸ நदо ૒੽ DBীࢲ

    ؘ੉ఠܳ ઑഥ೧ நदী ੷੢ೠ׮. গ೒ܻா੉࣌਷ ೦࢚ நद݅ ੽Ӕೠ׮. 1. நद ੍ӝ 2. DB ੍ӝ 3. நद ੍ӝ
  4. Write Through ಁఢ ؘ੉ఠܳ நद৬ DBী زदী ੷੢ೠ׮. ؘ੉ఠ ੌҙࢿ੉

    ࠁ੢غ૑݅, ॳӝ ࢿמ੉ ੷ೞؼ ࣻ ੓׮ 1. நद ॳӝ 2. DB ॳӝ
  5. Write Behind ؘ੉ఠܳ ਋ࢶ நदী ੷੢ೞҊ, ੌ੿ ઱ӝ۽ நदীࢲ DB۽

    ੌҚ ੷੢೤פ׮. ॳӝ ࢿמ੉ જ૑݅, நद ੢গ द ؘ੉ఠ ਬप ਤ೷੉ ੓׮ 1. நद ॳӝ 2. ૑োػ DB ॳӝ
  6. Write Around ؘ੉ఠܳ DBী݅ ੷੢ೞҊ, நदח јनೞ૑ ঋח׮. ੉റ ઑഥ

    द நदী ੸੤ؾפ׮. ॳӝо ࠼ߣೞ૑݅ ੍ӝח ٘ޙ ҃਋ী ੸೤ 4. நद ੍ӝ 2. DB ੍ӝ 3. நद ॳӝ 1. DB ॳӝ
  7. நद ੹ۚ߹ ੸ਊ ߑউ ੹ۚݺ ബҗ੸ੋ ࢚ട ੢੼ ױ੼ ߂

    ઱੄੼ Cache-Aside ੍ӝ ੘স੉ ݆Ҋ, ؘ੉ఠ ߸҃੉ ੸਷ ҃਋ ߧਊ੸, நद ੢গী ఍۱੸, ҳഅ੉ рױ ؘ੉ఠ ୭नࢿ ҙܻ ೙ਃ Read-Through زੌ ؘ੉ఠо ߈ࠂ੸ਵ۽ ੍൤ח ҃਋ ੍ӝ ࢿמ ӓ؀ച, ੌҙࢿ ਬ૑ ਊ੉ ୐ ਃ୒ द ૑ো, ਕ߁স ೙ਃ Write-Through ؘ੉ఠ ੌҙࢿ੉ ݒ਋ ઺ਃೠ ҃਋ ؘ੉ఠ ੌҙࢿ ࠁ੢ ॳӝ ૑ো ߊࢤ Write-Back ॳӝ ੘স੉ ݆Ҋ, ੌद੸ ؘ੉ఠ ਬपਸ хࣻೡ ࣻ ੓ח ҃਋ ॳӝ ࢿמ ӓ؀ച, DB ࠗೞ хࣗ நद ੢গ द ؘ੉ఠ ਬप ਤ೷ Write-Around ؘ੉ఠо ੗઱ ॳ੉૑݅, ੗઱ ੍൤૑ ঋח ҃਋ ࠛ೙ਃೠ நद য়৏ ߑ૑ ੍ӝ ࢿמ਷ Cache-Asideࠁ׮ ծਸ ࣻ ੓਺
  8. நद ੷੢ࣗ ਬഋ Local Cache vs Global Cache vs Near

    Cache ҳ࠙ Local Cache (۽ஸ நद) Global Cache (Ӗ۽ߥ நद) Near Cache (פয நद) ҳࢿ п ࢲߡ(ੋझఢझ) ղࠗী நदܳ م ߹ب੄ ઺ঔ நद ࢲߡ(৘: Redis, Memcached) ࢎਊ ௿ۄ੉঱౟ ژח WAS খױী ۽ஸ நद ୶о ࣘب ݒ਋ ࡅܴ (֎౟ਕ௼ ࠛ೙ਃ, ݫݽܻ ੽Ӕ) ֎౟ਕ௼ ҃ਬ۽ Local Cacheࠁ׮ וܿ Local Cache ࣻળ੄ ࡅܲ ࣘب ੌҙࢿ ࢲߡ р ؘ੉ఠ ࠛੌ஖ оמ, زӝച য۰਑ ࢲߡ р ؘ੉ఠ ੌҙࢿ ࠁ੢ Ӗ۽ߥ நद৬ زӝച ೙ਃ ഛ੢ࢿ ࢲߡ ࣻ טܾࣻ۾ நद ઺ࠂ, ݫݽܻ ࠺ബਯ ࢲߡ ࣻ ט۰ب ؘ੉ఠ ઺ࠂ ੸਺, ഛ੢ࢿ ਋ࣻ ࢲߡ ࣻ ૐо द Ӗ۽ߥ நद ࠗೞ ࠙࢑ ੢੼ - ୡҊࣘ ੽Ӕ - ֎౟ਕ௼ ੢গ ৔ೱ হ਺ - ؘ੉ఠ ҕਬ ਊ੉ - ੌҙࢿ ਬ૑ - ഛ੢ࢿ જ਺ - ࡅܲ ੽Ӕ - Ӗ۽ߥ நद ࠗೞ хࣗ ױ੼ - ؘ੉ఠ ੌҙࢿ ҙܻ য۰਑ - ݫݽܻ խ࠺ - ֎౟ਕ௼ ૑ো - ױੌ ੢গ੼ оמࢿ - زӝച ࠂ੟ - ୶о ݫݽܻ ೙ਃ ੸೤ ࢎ۹ ߸҃ ੸Ҋ ࢲߡ߹ ة݀੸ ؘ੉ఠ ৈ۞ ࢲߡо زੌ ؘ੉ఠ ҕਬ, ੌҙࢿ ઺ਃೠ ҃਋ ؀ӏݽ ࠙࢑ ജ҃ীࢲ நद ബਯ ӓ؀ച
  9. MapLoader open class EntityMapLoader<ID: Any, E: HasIdentifier<ID>>( private val loadByIdFromDB:

    (ID) -> E?, private val loadAllIdsFromDB: () -> Collection<ID>, ): MapLoader<ID, E> { companion object: KLogging() { private const val DEFAULT_QUERY_TIMEOUT = 30_000 // 30 seconds } override fun load(id: ID): E? = transaction { loadByIdFromDB(id) } override fun loadAllKeys(): Iterable<ID>? = transaction { queryTimeout = DEFAULT_QUERY_TIMEOUT loadAllIdsFromDB() } } EntityMapLoader with Exposed ExposedMapLoaders.kt
  10. MapWriter EntityMapWriter with Exposed open class EntityMapWriter<ID: Any, E: HasIdentifier<ID>>(

    private val writeToDB: (map: Map<ID, E>) -> Unit, private val deleteFromDB: (ids: Collection<ID>) -> Unit, ): MapWriter<ID, E> { override fun write(map: Map<ID, E>) = transaction { writeToDB(map) } override fun delete(keys: Collection<ID>) = transaction { deleteFromDB(keys) } } ExposedMapWriters.kt
  11. MapLoaderAsync public interface MapLoaderAsync<K, V> { CompletionStage<V> load(K key); AsyncIterator<K>

    loadAllKeys(); } public interface AsyncIterator<V> { CompletionStage<Boolean> hasNext(); CompletionStage<V> next(); } SuspendedExposedMapLoaders.kt
  12. ExposedCachedRepository interface ExposedCacheRepository<T: HasIdentifier<ID>, ID: Any> { val cacheName: String

    val entityTable: IdTable<ID> fun ResultRow.toEntity(): T val cache: RMap<ID, T?> fun exists(id: ID): Boolean = cache.containsKey(id) fun findFreshById(id: ID): T? = entityTable.selectAll().where { entityTable.id eq id }.singleOrNull()?.toEntity() fun findFreshAll(vararg ids: ID): List<T> = entityTable.selectAll().where { entityTable.id inList ids.toList() }.map { it.toEntity() } fun findFreshAll(ids: Collection<ID>): List<T> = entityTable.selectAll().where { entityTable.id inList ids }.map { it.toEntity() } fun get(id: ID): T? = cache[id] fun findAll( limit: Int? = null, offset: Long? = null, sortBy: Expression<*> = entityTable.id, sortOrder: SortOrder = SortOrder.ASC, where: SqlExpressionBuilder.() -> Op<Boolean> = { Op.TRUE }, ): List<T> fun getAll(ids: Collection<ID>, batchSize: Int = 100): List<T> fun put(entity: T) = cache.fastPut(entity.id, entity) fun putAll(entities: Collection<T>, batchSize: Int = 100) { cache.putAll(entities.associateBy { it.id }, batchSize) } fun invalidate(vararg ids: ID): Long = cache.fastRemove(*ids) fun invalidateAll() = cache.clear() fun invalidateByPattern(patterns: String, count: Int = 100): Long { val keys = cache.keySet(patterns, count) return cache.fastRemove(*keys.toVarargArray()) } }
  13. Create Cache<K,V> for Synchronous mapCache(cacheName, redissonClient) { log.info { "RMapCache

    ܳ ࢤࢿ೤פ׮. config=$config" } if (config.isReadOnly) { loader(mapLoader) } else { loader(mapLoader) mapWriter.requireNotNull("mapWriter") writer(mapWriter) writeMode(config.writeMode) } codec(config.codec) writeRetryAttempts(config.writeRetryAttempts) writeRetryInterval(config.writeRetryInterval) } localCachedMap(cacheName, redissonClient) { log.info { "RLocalCacheMap ܳ ࢤࢿ೤פ׮. config=$config" } if (config.isReadOnly) { loader(mapLoader) } else { loader(mapLoader) mapWriter.requireNotNull("mapWriter") writer(mapWriter) writeMode(config.writeMode) } codec(config.codec) syncStrategy(config.nearCacheSyncStrategy) writeRetryAttempts(config.writeRetryAttempts) writeRetryInterval(config.writeRetryInterval) timeToLive(config.ttl) if (config.nearCacheMaxIdleTime > Duration.ZERO) { maxIdle(config.nearCacheMaxIdleTime) } }
  14. SuspendedExposedCacheRepository interface SuspendedExposedCacheRepository<T: HasIdentifier<ID>, ID: Any> { val cacheName: String

    val entityTable: IdTable<ID> fun ResultRow.toEntity(): T val cache: RMap<ID, T?> suspend fun exists(id: ID): Boolean = cache.containsKeyAsync(id).coAwait() suspend fun findFreshById(id: ID): T? = entityTable.selectAll().where { entityTable.id eq id }.singleOrNull()?.toEntity() suspend fun findFreshAll(vararg ids: ID): List<T> = entityTable.selectAll().where { entityTable.id inList ids.toList() }.map { it.toEntity() } suspend fun findFreshAll(ids: Collection<ID>): List<T> = entityTable.selectAll().where { entityTable.id inList ids }.map { it.toEntity() } suspend fun get(id: ID): T? = cache.getAsync(id).coAwait() suspend fun findAll( limit: Int? = null, offset: Long? = null, sortBy: Expression<*> = entityTable.id, sortOrder: SortOrder = SortOrder.ASC, where: SqlExpressionBuilder.() -> Op<Boolean> = { Op.TRUE }, ): List<T> suspend fun getAll(ids: Collection<ID>, batchSize: Int = DefaultBatchSize): List<T> suspend fun put(entity: T) = cache.fastPutAsync(entity.id, entity).coAwait() suspend fun putAll(entities: Collection<T>, batchSize: Int = DefaultBatchSize) { cache.putAllAsync(entities.associateBy { it.id }, batchSize).coAwait() } suspend fun invalidate(vararg ids: ID): Long = cache.fastRemoveAsync(*ids).coAwait() suspend fun invalidateAll(): Boolean = cache.clearAsync().coAwait() suspend fun invalidateByPattern(patterns: String, count: Int = DefaultBatchSize): Long { val keys = cache.keySet(patterns, count) return cache.fastRemoveAsync(*keys.toTypedArray()).coAwait() } }
  15. Create Cache<K,V> for Asynchronous localCachedMap(cacheName, redissonClient) { log.info { "RLocalCAcheMap

    ܳ ࢤࢿ೤פ׮. config=$config" } if (config.isReadOnly) { loaderAsync(suspendedMapLoader) } else { loaderAsync(suspendedMapLoader) suspendedMapWriter.requireNotNull("mapWriter") writerAsync(suspendedMapWriter) writeMode(config.writeMode) } codec(config.codec) syncStrategy(config.nearCacheSyncStrategy) writeRetryAttempts(config.writeRetryAttempts) writeRetryInterval(config.writeRetryInterval) timeToLive(config.ttl) if (config.nearCacheMaxIdleTime > Duration.ZERO) { maxIdle(config.nearCacheMaxIdleTime) } } mapCache(cacheName, redissonClient) { log.info { "RMapCache ܳ ࢤࢿ೤פ׮. config=$config" } if (config.isReadOnly) { loaderAsync(suspendedMapLoader) } else { loaderAsync(suspendedMapLoader) suspendedMapWriter.requireNotNull("suspendedMapWriter") writerAsync(suspendedMapWriter) writeMode(config.writeMode) } codec(config.codec) writeRetryAttempts(config.writeRetryAttempts) writeRetryInterval(config.writeRetryInterval) }
  16. Read Through withEntityTable(testDB) { val id = getExistingId() // DBীࢲ

    ઑഥೠ ч val entityFromDB = repository.findFreshById(id) entityFromDB.shouldNotBeNull() // நदীࢲ ઑഥೠ ч val entityFromCache = repository.get(id) entityFromCache.shouldNotBeNull() entityFromCache shouldBeEqualTo entityFromDB repository.exists(id).shouldBeTrue() } withEntityTable(testDB) { // DBীࢲ ݽٚ ূ౭౭ܳ நदী ۽٘ೠ റ ߈ജೠ׮ val entities = repository.findAll() entities.shouldNotBeEmpty() entities.size shouldBeEqualTo repository.entityTable.selectAll().count().toInt() } ReadThroughScenario.kt
  17. Write Through withEntityTable(testDB) { val id = getExistingId() // நदীࢲ

    ઑഥೠ ч val entity = repository.get(id) entity.shouldNotBeNull() // நदী јनػ ч ੷੢ -> DBীب ੷੢ val updatedEntity = updateEntityEmail(entity) repository.put(updatedEntity) // நदীࢲ ઑഥೠ ч val entityFromCache = repository.get(id) entityFromCache.shouldNotBeNull() assertSameEntityWithoutUpdatedAt(entityFromCache, updatedEntity) // DBীࢲ ઑഥೠ ч val entityFromDB = repository.findFreshById(id) entityFromDB.shouldNotBeNull() assertSameEntityWithoutUpdatedAt(entityFromDB, entityFromCache) } WriteThroughScenario.kt
  18. Write Behind withEntityTable(testDB) { val entities = createNewEntities(1000) repository.putAll(entities) await

    .atMost(Duration.ofSeconds(10)) .withPollInterval(Duration.ofMillis(1000)) .until { getAllCountFromDB() >= entities.size.toLong() } // DBীࢲ ઑഥೠ ч val dbCount = getAllCountFromDB() dbCount shouldBeGreaterThan entities.size.toLong() } WriteBehindScenario.kt