deque from functools import partial, wraps from heapq import merge from itertools import chain, count, groupby, islice, repeat, takewhile, tee from operator import itemgetter from sys import version_info from six import binary_type, string_types, text_type from six.moves import filter, map, zip, zip_longest from .recipes import flatten, take __all__ = [ 'adjacent', 'always_iterable', 'bucket', 'chunked', 'collapse', 'collate', 'consumer', 'distinct_permutations', 'distribute', 'divide', 'first', 'groupby_transform', 'ilen', 'interleave_longest', 'interleave', 'intersperse', 'iterate', 'one', 'padded', 'peekable', 'side_effect', 'sliced', 'sort_together', 'split_after', 'split_before', 'spy', 'stagger', 'unique_to_each', 'windowed', 'with_iter', 'zip_offset', ] _marker = object() def chunked(iterable, n): """Break an iterable into lists of a given length:: >>> list(chunked([1, 2, 3, 4, 5, 6, 7], 3)) [[1, 2, 3], [4, 5, 6], [7]] If the length of ``iterable`` is not evenly divisible by ``n``, the last returned list will be shorter. This is useful for splitting up a computation on a large number of keys into batches, to be pickled and sent off to worker processes. One example is operations on rows in MySQL, which does not implement server-side cursors properly and would otherwise load the entire dataset into RAM on the client. """ return iter(partial(take, n, iter(iterable)), []) def first(iterable, default=_marker): """Return the first item of an iterable, ``default`` if there is none. >>> first([0, 1, 2, 3]) 0 >>> first([], 'some default') 'some default' If ``default`` is not provided and there are no items in the iterable, raise ``ValueError``. ``first()`` is useful when you have a generator of expensive-to-retrieve values and want any arbitrary one. It is marginally shorter than ``next(iter(...), default)``. """ try: return next(iter(iterable)) except StopIteration: # I'm on the edge about raising ValueError instead of StopIteration. At # the moment, ValueError wins, because the caller could conceivably # want to do something different with flow control when I raise the # exception, and it's weird to explicitly catch StopIteration. if default is _marker: raise ValueError('first() was called on an empty iterable, and no ' 'default value was provided.') return default class peekable(object): """Wrap an iterator to allow lookahead and prepending elements. Call ``peek()`` on the result to get the value that will next pop out of ``next()``, without advancing the iterator: >>> p = peekable(['a', 'b']) >>> p.peek() 'a' >>> next(p) class peekable(object): """Wrap an iterator to allow lookahead and prepending elements. Call ``peek()`` on the result to get the value that will next pop out of ``next()``, without advancing the iterator: >>> p = peekable(['a', 'b']) >>> p.peek() 'a' >>> next(p) 'a' Pass ``peek()`` a default value to return that instead of raising ``StopIteration`` when the iterator is exhausted. >>> p = peekable([]) >>> p.peek('hi') 'hi' peekables also offer a ``prepend()`` method which will insert items before the remaining part of the underlying source iterator. >>> p = peekable([1, 2, 3]) >>> p.prepend(10, 11, 12) >>> next(p) 10 >>> p.peek() 11 >>> list(p) [11, 12, 1, 2, 3] Prepended items are treated by other peekable methods exactly as if they had come from the source iterator. You may index the peekable to look ahead by more than one item. The values up to the index you specified will be cached. Index 0 is the item that will be returned by ``next()``, index 1 is the item after that, and so on: >>> p = peekable(['a', 'b', 'c', 'd']) >>> p[0] 'a' >>> p[1] 'b' >>> next(p) 'a' >>> p.prepend('x') >>> p[1] 'b' >>> next(p) 'x' >>> next(p) 'b' Negative indexes are supported, but be aware that they will cache the remaining items in the source iterator, which may require significant storage. To test whether there are more items in the iterator, examine the peekable's truth value. If it is truthy, there are more items (which may have been prepended or obtained from the source iterator). >>> assert peekable([1]) >>> p = peekable([]) >>> assert not p >>> p.prepend(1) >>> assert p """ def __init__(self, iterable): self._it = iter(iterable) self._cache = deque() def __iter__(self): return self def __bool__(self): try: self.peek() except StopIteration: return False return True def __nonzero__(self): # For Python 2 compatibility return self.__bool__() def peek(self, default=_marker): """Return the item that will be next returned from ``next()``. Return ``default`` if there are no items left. If ``default`` is not provided, raise ``StopIteration``. """ if not self._cache: try: self._cache.append(next(self._it)) except StopIteration: if default is _marker: raise return default return self._cache[0] def prepend(self, *items): """Stack up items to be the next ones returned from ``next()`` or ``self.peek()``. The items will be returned in first in, first out order:: >>> p = peekable([1, 2, 3]) >>> p.prepend(10, 11, 12) >>> next(p) 10 def __init__(self, iterable): self._it = iter(iterable) self._cache = deque() def __iter__(self): return self def __bool__(self): try: self.peek() except StopIteration: return False return True def __nonzero__(self): # For Python 2 compatibility return self.__bool__() def peek(self, default=_marker): """Return the item that will be next returned from ``next()``. Return ``default`` if there are no items left. If ``default`` is not provided, raise ``StopIteration``. """ if not self._cache: try: self._cache.append(next(self._it)) except StopIteration: if default is _marker: raise return default return self._cache[0] def prepend(self, *items): """Stack up items to be the next ones returned from ``next()`` or ``self.peek()``. The items will be returned in first in, first out order:: >>> p = peekable([1, 2, 3]) >>> p.prepend(10, 11, 12) >>> next(p) 10 >>> list(p) [11, 12, 1, 2, 3] It is possible, by prepending items, to "resurrect" a peekable that previously raised ``StopIteration``. >>> p = peekable([]) >>> next(p) Traceback (most recent call last): ... StopIteration >>> p.prepend(1) >>> next(p) 1 >>> next(p) Traceback (most recent call last): ... StopIteration """ self._cache.extendleft(reversed(items)) def __next__(self): if self._cache: return self._cache.popleft() return next(self._it) def next(self): # For Python 2 compatibility return self.__next__() def _get_slice(self, index): start = index.start stop = index.stop if ( ((start is not None) and (start < 0)) or ((stop is not None) and (stop < 0)) ): stop = None elif ( (start is not None) and (stop is not None) and (start > stop) ): stop = start + 1 cache_len = len(self._cache) if stop is None: self._cache.extend(self._it) elif stop >= cache_len: self._cache.extend(islice(self._it, stop - cache_len)) return list(self._cache)[index] def __getitem__(self, index): if isinstance(index, slice): return self._get_slice(index) cache_len = len(self._cache) if index < 0: def _collate(*iterables, **kwargs): """Helper for ``collate()``, called when the user is using the ``reverse`` or ``key`` keyword arguments on Python versions below 3.5. """ key = kwargs.pop('key', lambda a: a) reverse = kwargs.pop('reverse', False) min_or_max = partial(max if reverse else min, key=lambda a_b: a_b[0]) peekables = [peekable(it) for it in iterables] peekables = [p for p in peekables if p] # Kill empties. while peekables: _, p = min_or_max((key(p.peek()), p) for p in peekables) yield next(p) peekables = [x for x in peekables if x] def collate(*iterables, **kwargs): """Return a sorted merge of the items from each of several already-sorted ``iterables``. >>> list(collate('ACDZ', 'AZ', 'JKL')) ['A', 'A', 'C', 'D', 'J', 'K', 'L', 'Z', 'Z'] Works lazily, keeping only the next value from each iterable in memory. Use ``collate()`` to, for example, perform a n-way mergesort of items that don't fit in memory. :arg key: A function that returns a comparison value for an item. Defaults to the identity function. :arg reverse: If ``reverse=True``, yield results in descending order rather than ascending. ``iterables`` must also yield their elements in descending order. If the elements of the passed-in iterables are out of order, you might get unexpected results. If neither of the keyword arguments are specified, this function delegates to ``heapq.merge()``. """ if not kwargs: return merge(*iterables) return _collate(*iterables, **kwargs) # If using Python version 3.5 or greater, heapq.merge() will be faster than # collate - use that instead. if version_info >= (3, 5, 0): collate = merge def consumer(func): """Decorator that automatically advances a PEP-342-style "reverse iterator" to its first yield point so you don't have to call ``next()`` on it manually. >>> @consumer ... def tally(): ... i = 0 ... while True: ... print('Thing number %s is %s.' % (i, (yield))) ... i += 1 ... >>> t = tally() >>> t.send('red') Thing number 0 is red. >>> t.send('fish') Thing number 1 is fish. Without the decorator, you would have to call ``next(t)`` before ``t.send()`` could be used. """ @wraps(func) def wrapper(*args, **kwargs): gen = func(*args, **kwargs) next(gen) return gen return wrapper def ilen(iterable): """Return the number of items in ``iterable``. >>> ilen(x for x in range(1000000) if x % 3 == 0) 333334 This consumes the iterable, so handle with care. """ d = deque(enumerate(iterable, 1), maxlen=1) return d[0][0] if d else 0 def iterate(func, start): """Return ``start``, ``func(start)``, ``func(func(start))``, ... >>> from itertools import islice >>> list(islice(iterate(lambda x: 2*x, 1), 10)) [1, 2, 4, 8, 16, 32, 64, 128, 256, 512] """ while True: yield start start = func(start) LGTM! :-D