$30 off During Our Annual Pro Sale. View Details »

Design and implementation of Cosmos DB Change F...

miyake
April 21, 2021

Design and implementation of Cosmos DB Change Feed-centric architecture

Session material for Cosmos DB Conf

miyake

April 21, 2021
Tweet

More Decks by miyake

Other Decks in Technology

Transcript

  1. Agenda 1. Change Feed-centric architecture Design & Strategy Kazuyuki Miyake

    (Microsoft MVP for Azure) @kazuyukimiyake 2. Change Feed-centric architecture Deep Dive Tatsuro Shibamura (Microsoft MVP for Azure) @shibayan
  2. Massive data processing Needs and Challenges Balancing massive data writing

    and complex queries No performance degradation under Massive writing Can handle different types of queries Cost model to pay as you go 5
  3. Limitations of traditional architectures Try to handle everything in one

    big datastore... Write-optimized datastore are weak on complex queries Query-optimized data stores are weak to massively concurrent writes -> As a result, rely on over-spec datastores 6
  4. CQRS + Materialized-Views 1. Separate write and read to absorb

    differences 2. Deploy a query-optimized Materialized-View 7
  5. Cosmos DB ChangeFeed + Azure Functions No need to implement

    mechanisms for CQRS Synchronized in near Real-time 8
  6. Two Change Feed usage patterns 1. Push model -> Data

    Transformation, Stream Processing 2. Pull model -> Batch Processing 12
  7. Data Transformation, Stream Processing Used for processing to stream data

    with low latency The best solution is to use CosmosDBTrigger in Azure Functions For write-fast storage such as SQL Database and Redis Cache Also used when writing back to Cosmos DB (creating materialized view) 13
  8. Sample code - Push model public class Function1 { public

    Function1(CosmosClient cosmosClient) { _container = cosmosClient.GetContainer("SampleDB", "MaterializedView"); } private readonly Container _container; [FunctionName("Function1")] public async Task Run([CosmosDBTrigger( databaseName: "SampleDB", collectionName: "TodoItems", LeaseCollectionName = "leases")] IReadOnlyList<Document> input, ILogger log) { var tasks = new Task[input.Count]; for (int i = 0; i < input.Count; i++) { // Change the partition key and write it back (actually, do advanced conversion) var partitionKey = new PartitionKey(input[i].GetPropertyValue<string>("anotherKey")); tasks[i] = _container.UpsertItemStreamAsync(new MemoryStream(input[i].ToByteArray()), partitionKey); } await Task.WhenAll(tasks); } } 14
  9. Batch Processing Use when you need to process a large

    amount of data at one time It is practical to implement it using TimerTrigger in Azure Functions Used for archiving to Blob Storage / Data Lake Storage Gen 2 Storage GPv2 and Data Lake Storage Gen 2 are charged by the number of write transactions, so writing stream data every time increases costs 15
  10. Sample code - Pull model public class Function2 { public

    Function2(CosmosClient cosmosClient) { _container = cosmosClient.GetContainer("SampleDB", "TodoItems"); } private readonly Container _container; [FunctionName("Function2")] public async Task Run([TimerTrigger("0 */5 * * * *")] TimerInfo myTimer, ILogger log) { var continuationToken = await LoadContinuationTokenAsync(); var changeFeedStartFrom = continuationToken != null ? ChangeFeedStartFrom.ContinuationToken(continuationToken) : ChangeFeedStartFrom.Now(); var changeFeedIterator = _container.GetChangeFeedIterator<TodoItem>(changeFeedStartFrom, ChangeFeedMode.Incremental); while (changeFeedIterator.HasMoreResults) { try { var items = await changeFeedIterator.ReadNextAsync(); // TODO: Implementation } catch (CosmosException ex) when (ex.StatusCode == HttpStatusCode.NotModified) { continuationToken ??= ex.Headers.ContinuationToken; break; } } await SaveContinuationTokenAsync(continuationToken); } } 16
  11. Improving resiliency - Retry policy CosmosDBTrigger proceeds to the next

    Change Feed when an execution error occurs. Retry policy is used because data in case of failure will be lost without being processed again. Use FixedDelayRetry or ExponentialBackoffRetry with an unlimited ( -1 ) maximum number of retries. Change Feed will not proceed until successful, so no data will be lost. 18
  12. Sample code - Retry policy public class Function1 { //

    infinity retry with 10 sec interval [FixedDelayRetry(-1, "00:00:10")] [FunctionName("Function1")] public async Task Run([CosmosDBTrigger( databaseName: "SampleDB", collectionName: "TodoItems", LeaseCollectionName = "leases")] IReadOnlyList<Document> input) { // TODO: Implementation } } 19
  13. Focus on idempotency and eventual consistency Coding for idempotency whenever

    possible For storage that can be overwrite or delete (Cosmos DB / SQL Database / etc) When it is difficult to ensure idempotency, focus on eventual consistency. Focus on "At least once" For storage that can only be append (Blob Storage / Data Lake Storage Gen 2) 20
  14. Avoid inconsistent states - Graceful shutdown Azure Functions will be

    restarted when a new version is deployed or platform is updated If the host is restarted while executing a Function, the states may be inconsistent Implement Graceful shutdown to avoid inconsistent states Increase resiliency by combining with Retry policy 21
  15. Sample code - Graceful shutdown public class Function1 { //

    infinity retry with 10 sec interval [FixedDelayRetry(-1, "00:00:10")] [FunctionName("Function1")] public async Task Run([CosmosDBTrigger( databaseName: "SampleDB", collectionName: "TodoItems", LeaseCollectionName = "leases")] IReadOnlyList<Document> input, CancellationToken cancellationToken) { try { // Pass cancellation token await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); } catch (OperationCanceledException) { // TODO: Implement rollback throw; } } } 22
  16. References Azure Cosmos DB trigger for Functions 2.x and higher

    | Microsoft Docs Azure/azure-cosmos-dotnet-v3: .NET SDK for Azure Cosmos DB for the core SQL API Change feed pull model | Microsoft Docs Azure Functions error handling and retry guidance | Microsoft Docs Cancellation tokens - Develop C# class library functions using Azure Functions | Microsoft Docs 23