Upgrade to Pro — share decks privately, control downloads, hide ads and more …

Swiss Army Django: Small Footprint ETL - Django...

Swiss Army Django: Small Footprint ETL - DjangoCon US

Noah Kantrowitz

October 17, 2023
Tweet

More Decks by Noah Kantrowitz

Other Decks in Programming

Transcript

  1. Noah Kantrowitz —He/him —coderanger.net | cloudisland.nz/@coderanger —Kubernetes (ContribEx) and Python

    (webmaster@) —SRE/Platform for Geomagical Labs, part of IKEA —We do CV/AR for the home DjangoCon US 2023 2
  2. The shape of things —Extractors: scraper/importer cron jobs —Transforms: parse

    HTML/JSON/etc then munge —Loaders: Django ORM, maybe DRF or Pydantic DjangoCon US 2023 10
  3. Robots Data in disguise —Parsed to structured data —json.loads() —BeautifulSoup

    —struct.unpack() —Reshape the data to make it easy to query —Possibly import-time aggregations like averages DjangoCon US 2023 11
  4. Aggregations # Query time async def myview(): return await Item.objects.aaggregate(Avg("value"))

    # Ingest time avg = await Item.objects.aaggregate(Avg("value")) await ItemStats.objects.aupdate_or_create( defaults={"avg": avg}) DjangoCon US 2023 12
  5. Async & Django Yes, you can really use it! More

    on the ORM later DjangoCon US 2023 14
  6. Why async? —Everything in one box —Fewer services, fewer problems

    —Portability and development env DjangoCon US 2023 15
  7. Running an async app —Async server not included —python -m

    uvicorn etl.asgi:application —--reload for development —--access-log --no-server-header —--ssl-keyfile=... --ssl-certfile=... DjangoCon US 2023 18
  8. Task factories async def do_every(fn, n): while True: await fn()

    await asyncio.sleep(n) DjangoCon US 2023 20
  9. The other shoe —Crashes can happen —Plan for convergence —Think

    about failure modes —Make task status models if needed DjangoCon US 2023 22
  10. Async ORM —Mind your a*s —Transactions and sync_to_async —Don't worry

    about concurrency —Still improving DjangoCon US 2023 24
  11. The simple case async def scrape_things(): resp = await client.get("/api/things/")

    resp.raise_for_status() for row in resp.json(): await Thing.objects.aupdate_or_create( id=row["key"], defaults={"name": row["fullName"]}, ) DjangoCon US 2023 28
  12. Foreign keys # {"user": 25} defaults={"user_id": row["user"]} # {"user": "foo"}

    user = await User.objects.only("id")\ .aget(email=row["user"]) defaults={"user": user} DjangoCon US 2023 29
  13. DRF serializers item = await Item.objects.filter(id=data["id"])\ .afirst() ser = ItemAPISerializer(instance=item,

    data=data) await sync_to_async(ser.is_valid)(raise_exception=True) await sync_to_async(ser.save)() DjangoCon US 2023 30
  14. while True: for cron in decorators._registry.values(): if (cron.next_run_at is None

    or cron.next_run_at <= now) and ( cron.previous_started_at is None or ( cron.previous_finished_at is not None and cron.previous_started_at <= cron.previous_finished_at ) ): cron.previous_started_at = now asyncio.create_task(_do_cron(cron)) await asyncio.sleep(1) DjangoCon US 2023 33
  15. await cron.fn() cron.previous_finished_at = now cron.next_run_at = ( croniter(cron.cronspec, now,

    hash_id=cron.name) .get_next(ret_type=datetime) .replace(tzinfo=dtimezone.utc) ) DjangoCon US 2023 34
  16. Incremental load max = await Trades.objects.aaggregate(Max("id")) resp = await client.get(...,

    query={"since": max.get("id__max") or 0}) DjangoCon US 2023 36
  17. Multi-stage transforms one = await step_one() two = await step_two(one)

    three, four = await gather( step_three(two), step_four(two), ) DjangoCon US 2023 37
  18. Why GraphQL? query { items { name, image, value requiredFor

    { quantity, quest { title, image, text DjangoCon US 2023 41
  19. Model types @gql.django.type(models.Item) class Item: id: int name: auto image:

    auto required_for: list[QuestRequired] reward_for: list[QuestReward] DjangoCon US 2023 43
  20. intents = discord.Intents.default() intents.message_content = True client = discord.Client(intents=intents) class

    BotConfig(AppConfig): def ready(self): create_task(client.start(token)) DjangoCon US 2023 48
  21. Chat bot @client.event async def on_message(message): if message.author == client.user:

    return if message.content == "!count": n = await Thing.objects.acount() await message.channel.send( f"There are {n} things") DjangoCon US 2023 49
  22. Notifications async def scrape_things(): # Do the ETL ... channel

    = client.get_channel(id) await channel.send("Batch complete") DjangoCon US 2023 50
  23. Email msg = EmailMessage() msg["From"] = "etl@server" msg["To"] = "[email protected]"

    msg["Subject"] = "Batch complete" msg.set_content(log) await aiosmtplib.send(msg) DjangoCon US 2023 51
  24. Let's get weirder: SSH async def handle_ssh(proc): proc.stdout.write('Welcome!\n> ') cmd

    = await proc.stdin.readline() if cmd == 'count': n = await Thing.objects.acount() proc.stdout.write( f"There are {n} things\n") def ready(): create_task( asyncssh.listen(process_factory=handle_ssh)) DjangoCon US 2023 52
  25. Ingest sharding —Too much data to load? —if hash(url) %

    2 == server_id —Adjust your aggregations DjangoCon US 2023 55
  26. CPU bound? —Does the library drop the GIL? —sync_to_async(…, thread_sensitive=False)

    —ProcessPoolExecutor or aiomultiprocess —(PEP 703 in the future?) DjangoCon US 2023 56
  27. In review ETL systems are useful for massaging data Async

    Django is great for building ETLs GraphQL is an excellent way to query There's many cool async libraries Our tools can grow as our needs grow DjangoCon US 2023 58