Using Functional Programming for efficient Data...

Using Functional Programming for efficient Data Processing and Analysis

A PyCon workshop on Functional Programming
Video: https://www.youtube.com/watch?v=9kDUTJahXBM
Code: https://github.com/reubano/pycon17-tute

Reuben Cummings

May 17, 2017

  Using Functional Programming for efficient Data Processing and Analysis PyCon

    — Portland, Oregon — May 17, 2017 by Reuben Cummings @reubano
  • Managing Director, Nerevu Development • Founder of Arusha Coders

    • Author of several popular Python packages Who am I?
  organization room presenter 1 matt 3 james 6 reuben You

    can't afford to have security be an optional or "nice-to- have"... structured unstructured
  Storage flat binary type,day tutorial,wed talk,fri poster,sun keynote,fri 00103e0 b0e6

    04... 00105f0 e4e7 03... 0010600 0be8 04... 00105b0 c4e4 02... 00106e0 b0e9 04...
  Rectangle (imperative) class Rectangle(object): def __init__(self, length, width): self.length =

    length self.width = width @property def area(self): return self.length * self.width def grow(self, amount): self.length *= amount
  Rectangle (imperative) >>> r = Rectangle(2, 3) >>> r.length 2

    >>> r.area 6 >>> r.grow(2) >>> r.length 4 >>> r.area 12
  Infinite Squares (imperative) >>> from itertools import count >>> >>>

    squares = ( ... Rectangle(x, x) for x in count(1)) >>> squares <generator object <genexpr> at 0x11233ca40> >>> next(squares) <__main__.Rectangle at 0x1123a8400>
  Infinite Squares (imperative) >>> sum_area(squares) KeyboardInterrupt Traceback (most recent call

    last) <ipython-input-196-6a83df34d1b4> in <module>() ----> 1 sum_area(squares) <ipython-input-193-3d117e0b93c3> in sum_area(rects) 3 4 for r in rects: ----> 5 area += r.area
  Rectangle (functional) def make_rect(length, width): return (length, width) def grow_rect(rect,

    amount): return (rect[0] * amount, rect[1]) def get_length (rect): return rect[0] def get_area (rect): return rect[0] * rect[1]
  >>> grow_rect(r, 2) (4, 3) >>> get_length(r) 2 >>> get_area(r)

    6 Rectangle (functional) >>> r = make_rect(2, 3) >>> get_length(r) 2 >>> get_area(r) 6
  Infinite Squares (functional) >>> from itertools import islice >>> >>>

    squares = ( ... make_rect(x, x) for x in count(1)) >>> >>> area = accumulate_area(squares) >>> next(islice(area, 6, 7)) 140 >>> next(area) 204
  Infinite Squares (functional) >>> from itertools import accumulate >>> >>>

    squares = ( ... make_rect(x, x) for x in count(1)) >>> >>> area = accumulate(map(get_area, squares)) >>> next(islice(area, 6, 7)) 140 >>> next(area) 204
  Exercise #1 (Problem) z = √(x2 + y2 ) ratio

    = function1(x, y, factor) hyp = function2(rectangle)
  Exercise #1 (Problem) z = √(x2 + y2 ) x

    y z x ₒ factor y h ratio = function1(x, y, factor) hyp = function2(rectangle) >>> get_ratio(1, 2, 2) 0.7905694150420948
  Exercise #1 (Solution) from math import sqrt, pow def get_hyp(rect):

    sum_s = sum(pow(r, 2) for r in rect) return sqrt(sum_s) def get_ratio(length, width, factor=1): rect = make_rect(length, width) big_rect = grow_rect(rect, factor) return get_hyp(rect) / get_hyp(big_rect)
  Exercise #1 (Solution) >>> get_ratio(1, 2, 2) 0.7905694150420948 >>> get_ratio(1,

    2, 3) 0.6201736729460423 >>> get_ratio(3, 4, 2) 0.6933752452815365 >>> get_ratio(3, 4, 3) 0.5076730825668095
  csv data >>> from csv import DictReader >>> from io

    import StringIO >>> >>> csv_str = 'Type,Day\ntutorial,wed\ntalk,fri' >>> csv_str += '\nposter,sun' >>> f = StringIO(csv_str) >>> data = DictReader(f) >>> dict(next(data)) {'Day': 'wed', 'Type': 'tutorial'}
  JSON data >>> from urllib.request import urlopen >>> from ijson

    import items >>> >>> json_url = 'https://api.github.com/users' >>> f = urlopen(json_url) >>> data = items(f, 'item') >>> next(data) {'avatar_url': 'https://avatars3.githubuserco…', 'events_url': 'https://api.github.com/users/…', 'followers_url': 'https://api.github.com/use…', 'following_url': 'https://api.github.com/use…',
  xls(x) data >>> from urllib.request import urlretrieve >>> from xlrd

    import open_workbook >>> >>> xl_url = 'https://github.com/reubano/meza' >>> xl_url += '/blob/master/data/test/test.xlsx' >>> xl_url += '?raw=true' >>> xl_path = urlretrieve(xl_url)[0] >>> book = open_workbook(xl_path) >>> sheet = book.sheet_by_index(0) >>> header = sheet.row_values(0)
  xls(x) data >>> nrows = range(1, sheet.nrows) >>> rows =

    (sheet.row_values(x) for x in nrows) >>> data = ( ... dict(zip(header, row)) for row in rows) >>> >>> next(data) {' ': ' ', 'Some Date': 30075.0, 'Some Value': 234.0, 'Sparse Data': 'Iñtërnâtiônàližætiøn', 'Unicode Test': 'Ādam'}
  grouping data >>> import itertools as it >>> from operator

    import itemgetter >>> >>> records = [ ... {'item': 'a', 'amount': 200}, ... {'item': 'b', 'amount': 200}, ... {'item': 'c', 'amount': 400}] >>> >>> keyfunc = itemgetter('amount') >>> _sorted = sorted(records, key=keyfunc) >>> groups = it.groupby(_sorted, keyfunc)
  grouping data >>> data = ((key, list(g)) for key, g

    in groups) >>> next(data) (200, [{'amount': 200, 'item': 'a'}, {'amount': 200, 'item': 'b'}])
  aggregating data >>> key = 'amount' >>> value = sum(r.get(key,

    0) for r in records) >>> {**records[0], key: value} {'a': 'item', 'amount': 800}
  csv files >>> from csv import DictWriter >>> >>> records

    = [ ... {'item': 'a', 'amount': 200}, ... {'item': 'b', 'amount': 400}] >>> >>> header = list(records[0].keys()) >>> with open('output.csv', 'w') as f: ... w = DictWriter(f, header) ... w.writeheader() ... w.writerows(records)
  csv data >>> from meza.io import read >>> >>> records

    = read('output.csv') >>> next(records) {'amount': '200', 'item': 'a'}
  JSON data >>> from meza.io import read_json >>> >>> f

    = urlopen(json_url) >>> records = read_json(f, path='item') >>> next(records) {'avatar_url': 'https://avatars3.githubuserco…', 'events_url': 'https://api.github.com/users/…', 'followers_url': 'https://api.github.com/use…', 'following_url': 'https://api.github.com/use…', … }
  xlsx data >>> from meza.io import read_xls >>> >>> records

    = read_xls(xl_path) >>> next(records) {'Some Date': '1982-05-04', 'Some Value': '234.0', 'Sparse Data': 'Iñtërnâtiônàližætiøn', 'Unicode Test': 'Ādam'}
  aggregation >>> from meza.process import aggregate >>> >>> records =

    [ ... {'a': 'item', 'amount': 200}, ... {'a': 'item', 'amount': 300}, ... {'a': 'item', 'amount': 400}] ... >>> aggregate(records, 'amount', sum) {'a': 'item', 'amount': 900}
  merging >>> from meza.process import merge >>> >>> records =

    [ ... {'a': 200}, {'b': 300}, {'c': 400}] >>> >>> merge(records) {'a': 200, 'b': 300, 'c': 400}
  grouping >>> from meza.process import group >>> >>> records =

    [ ... {'item': 'a', 'amount': 200}, ... {'item': 'a', 'amount': 200}, ... {'item': 'b', 'amount': 400}] >>> >>> groups = group(records, 'item') >>> next(groups)
  normalization >>> from meza.process import normalize >>> >>> records =

    [ ... { ... 'color': 'blue', 'setosa': 5, ... 'versi': 6 ... }, { ... 'color': 'red', 'setosa': 3, ... 'versi': 5 ... }]
  normalization >>> kwargs = { ... 'data': 'length', 'column':'species', ...

    'rows': ['setosa', 'versi']} >>> >>> data = normalize(records, **kwargs) >>> next(data) {'color': 'blue', 'length': 5, 'species': 'setosa'}
  normalization before after color setosa versi blue 5 6 red

    3 5 color length species blue 5 setosa blue 6 versi red 3 setosa red 5 versi
  csv files >>> from meza import convert as cv >>>

    from meza.io import write >>> >>> records = [ ... {'item': 'a', 'amount': 200}, ... {'item': 'b', 'amount': 400}] >>> >>> csv = cv.records2csv(records) >>> write('output.csv', csv)
  Exercise #2 (Problem) • create a list of dicts with

    keys "factor", "length", "width", and "ratio" (for factors 1 - 20)
  Exercise #2 (Problem) records = [ { 'factor': 1, 'length':

    2, 'width': 2, 'ratio': 1.0 }, { 'factor': 2, 'length': 2, 'width': 2, 'ratio': 0.6324… }, { 'factor': 3, 'length': 2, 'width': 2, 'ratio': 0.4472…} ]
  Exercise #2 (Problem) • create a list of dicts with

    keys "factor", "length", "width", and "ratio" (for factors 1 - 20) • group the records by quartiles of the "ratio" value, and aggregate each group by the median "ratio"
  Exercise #2 (Problem) • create a list of dicts with

    keys "factor", "length", "width", and "ratio" (for factors 1 - 20) • group the records by quartiles of the "ratio" value, and aggregate each group by the median "ratio" • write the records out to a csv file (1 row per group)
  Exercise #2 (Solution) >>> length = width = 2 >>>

    records = [ ... { ... 'length': length, ... 'width': width, ... 'factor': f, ... 'ratio': get_ratio(length, width, f) ... } ... ... for f in range(1, 21)]
  Exercise #2 (Solution) >>> from statistics import median >>> from

    meza import process as pr >>> >>> def aggregator(group):
  41. Exercise #2 (Solution) >>> from meza import convert as cv

    >>> from meza.io import write >>> >>> results = [ ... {'key': k, 'median': g} ... for k, g in groups] >>> >>> csv = cv.records2csv(results) >>> write('results.csv', csv)
  42. Exercise #2 (Solution) $ csvlook results.csv | key | median

    | | --- | ------ | | 0 | 0.108… | | 1 | 0.343… | | 2 | 0.632… | | 4 | 1.000… |
  43. Python Events Calendar >>> from riko.collections import SyncPipe >>> >>>

    url = 'www.python.org/events/python-events/' >>> _xpath = '/html/body/div/div[3]/div/section' >>> xpath = '{}/div/div/ul/li'.format(_xpath) >>> xconf = {'url': url, 'xpath': xpath} >>> kwargs = {'emit': False, 'token_key': None} >>> epath = 'h3.a.content' >>> lpath = 'p.span.content' >>> rrule = [{'field': 'h3'}, {'field': 'p'}]
  44. Python Events Calendar >>> flow = ( ... SyncPipe('xpathfetchpage', conf=xconf)

    ... .subelement( ... conf={'path': epath}, ... assign='event', **kwargs) ... .subelement( ... conf={'path': lpath}, ... assign='location', **kwargs) ... .rename(conf={'rule': rrule}))
  45. Python Events Calendar >>> stream = flow.output >>> next(stream) {'event':

    'PyDataBCN 2017', 'location': 'Barcelona, Spain'} >>> next(stream) {'event': 'PyConWEB 2017', 'location': 'Munich, Germany'}
  46. Python Events Calendar >>> dpath = 'p.time.datetime' >>> frule =

    { ... 'field': 'date', 'op': 'after', ... 'value':'2017-06-01'} >>> >>> flow = ( ... SyncPipe('xpathfetchpage', conf=xconf) ... .subelement( ... conf={'path': epath}, ... assign='event', **kwargs)
  47. Python Events Calendar ... .subelement( ... conf={'path': lpath}, ... assign='location',

    **kwargs) ... .subelement( ... conf={'path': dpath}, ... assign='date', **kwargs) ... .rename(conf={'rule': rrule}) ... .filter(conf={'rule': frule}))
  48. Python Events Calendar >>> stream = flow.output >>> next(stream) {'date':

    '2017-06-06T00:00:00+00:00', 'event': 'PyCon Taiwan 2017', 'location': 'Academia Sinica, 128 Academia Road, Section 2, Nankang, Taipei 11529, Taiwan'}
  49. Python Events Calendar >>> from meza.process import merge >>> from

    riko.collections import SyncCollection >>> >>> _type = 'xpathfetchpage' >>> source = {'url': url, 'type': _type} >>> xpath2 = '{}/div/ul/li'.format(_xpath) >>> sources = [ ... merge([source, {'xpath': xpath}]), ... merge([source, {'xpath': xpath2}])]
  50. Python Events Calendar >>> sc = SyncCollection(sources, parallel=True) >>> flow

    = (sc.pipe() ... .subelement( ... conf={'path': epath}, ... assign='event', **kwargs) ... .rename(conf={'rule': rrule})) >>> >>> stream = flow.list >>> stream[0] {'event': 'PyDataBCN 2017'}
  51. Exercise #3 (Problem) • fetch the Python jobs rss feed

    • tokenize the "summary" field by newlines ("\n") • use "subelement" to extract the location (the first "token") • filter for jobs located in the U.S.
  52. Exercise #3 (Solution) >>> from riko.collections import SyncPipe >>> >>>

    url = 'https://www.python.org/jobs/feed/rss' >>> fetch_conf = {'url': url} >>> tconf = {'delimiter': '\n'} >>> rule = { ... 'field': 'location', 'op': 'contains'} >>> vals = ['usa', 'united states'] >>> frule = [ ... merge([rule, {'value': v}]) ... for v in vals]
  53. Exercise #3 (Solution) >>> fconf = {'rule': frule, 'combine': 'or'}

    >>> kwargs = {'emit': False, 'token_key': None} >>> path = 'location.content.0' >>> rrule = [ ... {'field': 'summary'}, ... {'field': 'summary_detail'}, ... {'field': 'author'}, ... {'field': 'links'}]
  54. Exercise #3 (Solution) >>> flow = (SyncPipe('fetch', conf=fetch_conf) ... .tokenizer(

    ... conf=tconf, field='summary', ... assign='location') ... .subelement( ... conf={'path': path}, ... assign='location', **kwargs) ... .filter(conf=fconf) ... .rename(conf={'rule': rrule}))
  55. Exercise #3 (Solution) >>> stream = flow.list >>> stream[0] {'dc:creator':

    None, 'id': 'https://python.org/jobs/2570/', 'link': 'https://python.org/jobs/2570/', 'location': 'College Park,MD,USA', 'title': 'Python Developer - MarketSmart', 'title_detail': 'Python Developer - MarketSmart', 'y:published': None, 'y:title': 'Python Developer - MarketSmart'}
  56. Exercise #3 (Solution) >>> from meza import convert as cv

    >>> from meza.fntools import dfilter >>> from meza.io import write >>> >>> fields = ['link', 'location', 'title'] >>> records = [ ... dfilter( ... item, blacklist=fields, ... inverse=True) ... for item in stream]
  57. Exercise #3 (Solution) >>> json = cv.records2json(records) >>> write('pyjobs.json', json)

    $ head -n7 pyjobs.json [ { "link": "https://python.org/jobs/2570/", "location": "College Park,MD,USA", "title": "Python Developer - MarketSmart" }, {
  58. Infinite Squares (functional) def accumulate_area2(rects, accum=0): it = iter(rects) try:

    area = get_area(next(it)) except StopIteration: return accum += area yield accum yield from accumulate_area2(it, accum)