## Collecting Max Items in Python

Lately, I’ve needed a way to collect a running list of the top X values and their associated items in Python. This is useful if you’d like to know such things as:

• Top 100 price gainers in your price series database;
• Top 10 most volatile stocks in your price series database;
• Top 5 longest running batch jobs in your operations arena;
• Any many more…

Here’s the MaxItems code to do the job:

import heapq

class MaxItems(object):
"""
Caches the max X items of whatever you're analyzing and
returns a list containing only those max X values and
associated items.
"""
def __init__(self, size):
self.size = size
self._recs = []

def push(self, value, items):
if len(self._recs) < self.size:
heapq.heappush(self._recs, (value, items))

else:
minitem = heapq.heappushpop(self._recs, (value, items))

def items(self):
return heapq.nlargest(self.size, self._recs)



Example call and results:

pricegains = []
pricegains.append({'symbol':'GOOG', 'gain':234.0})
pricegains.append({'symbol':'YHOO', 'gain':124.0})
pricegains.append({'symbol':'IBM', 'gain':1242.0})
pricegains.append({'symbol':'GE', 'gain':1800.0})
pricegains.append({'symbol':'ENE', 'gain':0.0})
pricegains.append({'symbol':'KREM', 'gain':12.0})
maxitems = MaxItems(3)

for row in pricegains:
maxitems.push(row['gain'], row)

print maxitems.items()

----------------------------------------------------------
Results of call:
(1800.0, {'symbol': 'GE', 'gain': 1800.0})
(1242.0, {'symbol': 'IBM', 'gain': 1242.0})
(234.0, {'symbol': 'GOOG', 'gain': 234.0})


The heapq module works nicely in accomplishing the task. What’s ironic is Python’s heapq module implements the min-heap algorithm which works out nicely and efficiently in determining the maximum values over a list. But, does not work out so efficiently for determining the minimum values over a list. I’ll cover the MinItems class in another post. But, to give you a hint of what does work in collecting the minimum values over a list is one of the alternatives I explored in building the MaxItems class…

Alternative yet Inefficient version of MaxItems:

import bisect

class MaxItems2(object):
"""
Caches the max X items of whatever you're analyzing and
returns a list containing only those max X values and
associated items.
"""
def __init__(self, size):
self.size = size
self._recs = []

def push(self, value, items):
if len(self._recs) < self.size:
bisect.insort(self._recs, (value, items))

elif bisect.bisect(self._recs, (value, items)) > self.size:
bisect.insort(self._recs, (value, items))
minitem = self._recs.pop(0)

def items(self):
return sorted(self._recs, reverse=True)


MaxItems2 takes advantage of the bisect module and while it works great; performance is at a minimum 2x worse on average than using the heapq method.

Test Code:

import random

pricegains = []
maxitems = MaxItems(100)
for x in xrange(500000):
gain = random.uniform(1.0,500.0)
maxitems.push(gain, ('This', 'is', 'Record'))

rows = maxitems.items()


Calling the above code with the wonderful timeit module produced the following results:

• heapq method: Ten iterations finished in 1.90 seconds.
• bisect method: Ten iterations finished in 3.80 seconds.

If you know of a faster way to collect the top x of a group of items…please share.

MT

## Running Variance

Variance – kinda the bread and butter for analysis work on a time series. Doesn’t get much respect though. But, take the square root of the variance and you get the almighty standard deviation. Today, though, let’s give variance its due…

For an intro into variance…check out these posts:

Problem with variance is calculating it in the traditional sense. Its costly to compute across a time series. It can be quite a drag on your simulation engine’s performance. The way to reduce the cost is to calculate the running variance. And that’s when you get into quite a briar patch – loss of precision and overflow issues. See John D. Cook’s post covering the variance briar patch:

And a few more posts by John covering different variance formulas and their outcomes:

John does great work and I learn a lot from his posts. But, I was still having problems finding a variance formula that fit my needs:

• Reduced the precision loss issue as much as possible;
• Allowed an easy way to window the running variance;
• Allowed an easy way to memoize the call.

Thankfully, I found a post by Subluminal Messages covering his very cool Running Standard Deviations formula. The code doesn’t work as is – needs correcting on a few items – but you can get the gist of the formula just fine. The formula uses the power sum of the squared differences of the values versus Welford’s approach of using the sum of the squared differences of the mean. Which makes it a bit easier to memoize. Not sure if its as good in solving the precision loss and overflow issues as Welford’s does….but so far I haven’t found any issues with it.

So, let’s start with the formula for the Power Sum Average (PSA):
$PSA = PSA_{yesterday} + ( ( (x_{today} * x_{today}) - x_{yesterday} ) ) / n)$
Where:

• x = value in your time series
• n = number of values you’ve analyzed so far

You also need the Simple Moving Average, which you can find in one of my previous posts here.

Once you have the PSA and SMA; you can tackle the Running Population Variance (Var):
$Population Var = (PSA_{today} * n - n * SMA_{today} * SMA_{today}) / n$

Now, one problem with all these formulas – they don’t cover how to window the running variance. Windowing the variance gives you the ability to view the 20 period running variance at bar 150. All the formulas I’ve mentioned above only give you the running cumulative variance. Deriving the running windowed variance is just a matter of using the same SMA I’ve posted about before and adjusting the Power Sum Average to the following:
$PSA = PSA_{yesterday} + (((x_{today} * x_{today}) - (x_{yesterday} * x_{yesterday}) / n)$
Where:

• x = value in your time series
• n = the period

[Update] If you want the sample Variance you just need to adjust the Var formula to the following:

$Sample Var = (PSA_{today} * n - n * SMA_{today} * SMA_{today}) / (n - 1)$

Okay, on to the code.

Code for the Power Sum Average:

def powersumavg(bar, series, period, pval=None):
"""
Returns the power sum average based on the blog post from
Subliminal Messages.  Use the power sum average to help derive the running
variance.
sources: http://subluminal.wordpress.com/2008/07/31/running-standard-deviations/

Keyword arguments:
bar     --  current index or location of the value in the series
series  --  list or tuple of data to average
period  -- number of values to include in average
pval    --  previous powersumavg (n - 1) of the series.
"""
if period < 1:
raise ValueError("period must be 1 or greater")

if bar < 0:
bar = 0

if pval == None:
if bar > 0:
raise ValueError("pval of None invalid when bar > 0")

pval = 0.0

newamt = float(series[bar])

if bar < period:
result = pval + (newamt * newamt - pval) / (bar + 1.0)

else:
oldamt = float(series[bar - period])
result = pval + (((newamt * newamt) - (oldamt * oldamt)) / period)

return result


Code for the Running Windowed Variance:

def running_var(bar, series, period, asma, apowsumavg):
"""
Returns the running variance based on a given time period.
sources: http://subluminal.wordpress.com/2008/07/31/running-standard-deviations/

Keyword arguments:
bar     --  current index or location of the value in the series
series  --  list or tuple of data to average
asma    --  current average of the given period
apowsumavg -- current powersumavg of the given period
"""
if period < 1:
raise ValueError("period must be 1 or greater")

if bar <= 0:
return 0.0

if asma == None:
raise ValueError("asma of None invalid when bar > 0")

if apowsumavg == None:
raise ValueError("powsumavg of None invalid when bar > 0")

windowsize = bar + 1.0
if windowsize >= period:
windowsize = period

return (apowsumavg * windowsize - windowsize * asma * asma) / windowsize


Example call and results:

list_of_values = [3, 5, 8, 10, 4, 8, 12, 15, 11, 9]
prev_powersumavg = None
prev_sma = None
prev_sma = None
period = 3
for bar, price in enumerate(list_of_values):
new_sma = running_sma(bar, list_of_values, period, prev_sma)
new_powersumavg = powersumavg(bar,
list_of_values,
period,
prev_powersumavg)
new_var = running_var(bar,
list_of_values,
period,
new_sma,
new_powersumavg)

msg = "SMA=%.4f, PSA=%.4f, Var=%.4f" % (new_sma, new_powersumavg, new_var)
print "bar %i: %s" % (bar, msg)

prev_sma = new_sma
prev_powersumavg = new_powersumavg

----------------------------------------------------------
Results of call:
bar 0: SMA=3.0000, PSA=9.0000, Var=0.0000
bar 1: SMA=4.0000, PSA=17.0000, Var=1.0000
bar 2: SMA=5.3333, PSA=32.6667, Var=4.2222
bar 3: SMA=7.6667, PSA=63.0000, Var=4.2222
bar 4: SMA=7.3333, PSA=60.0000, Var=6.2222
bar 5: SMA=7.3333, PSA=60.0000, Var=6.2222
bar 6: SMA=8.0000, PSA=74.6667, Var=10.6667
bar 7: SMA=11.6667, PSA=144.3333, Var=8.2222
bar 8: SMA=12.6667, PSA=163.3333, Var=2.8889
bar 9: SMA=11.6667, PSA=142.3333, Var=6.2222


Of course, as I said in the beginning of this post, just take the square root of this Running Windowed Variance to obtain the Standard Deviation.

Later Trades,

MT

## Running Sum

We’ve covered Running SMAs and EMAs…let’s dig into Running Sums or often called Running Totals. Formula as follows:
$Sum_{today} = Sum_{yesterday} + (price_{today} - price_{today - period})$
Where $price_{today - period}$ represents the price that is dropping off the slice you are summing. For example:

Take a list of numbers = 20, 40, 60, 80, 100, 120.
The formula for the 3-bar running sum would be:
bar 1: 20
bar 2: 20 + 40 = 60
bar 3: 20 + 40 + 60 = 120

bar 4: 40 + 60 + 80 = 180
Or we can apply our formula from above as $Sum_{today} = 120 + (80 - 20)$

bar 5: 60 + 80 + 100 = 240
Or use formula of $Sum_{today} = 180 + (100 - 40)$

bar 6: 80 + 100 + 120 = 300
Or use formula of $Sum_{today} = 240 + (120 - 60)$

Coding in Python we get:

def running_sum(bar, series, period, pval=None):
"""
Returns the running sum of values in a list of tuple - avoids summing entire series on each call.

Keyword arguments:
bar     --  current index or location of the value in the series
series  --  list or tuple of data to sum
period  -- number of values to include in sum
pval    --  previous sum (n - 1) of the series.
"""
if period < 1:
raise ValueError("period must be 1 or greater")

if bar <= 0:
return series[0]

if bar < period:
return pval + series[bar]

return pval + (series[bar] - series[bar - period])


Example call and results:

list_of_values = [20, 40, 60, 80, 100, 120]
prevsum = list_of_values[0]   #first sum is the first value in the series.
for bar, price in enumerate(list_of_values):
newsum = running_sum(bar, list_of_values, 3, pval=prevsum)
print "bar %i: %i" % (bar, newsum)
prevsum = newsum
----------------------------------------------------------
bar 0: 20
bar 1: 60
bar 2: 120
bar 3: 180
bar 4: 240
bar 5: 300


## Exponential Moving Average (EMA)

Now that we’ve tackled Running Simple Moving Averages (SMA)…let’s move on to Exponential Moving Averages (EMA). You may wonder why we’re not covering Running Exponential Moving Averages? The default formula for EMA is the running method – so we’re already covered.

Check out the posts below to understand the background on Exponential Moving Averages (EMA) and their calculation.

Be careful with using EMAs in your backtesting. Or any of these running type of indicators. Since all of them require a starting value. If that starting value changes – your signals change. Which can happen if you switch price quote providers that have different history requirements. Should not be a big deal but something to be aware of.

Let’s begin. We need to calculate our smoothing factor for the time series. Typical use in technical analysis is:
$\alpha = 2.0 / (periods + 1.0)$

We can use any value between 0 & 1 for the smoothing factor. Closer to one is less smooth and places greater weight on the more recent values. Use a value of 1 and you get the most recent value back. Closer to zero is more smooth and places greater weight on the older values.

Now, the formula for an EMA given our smoothing factor:
$EMA_{today} = EMA_{yesterday} + \alpha(price_{today} - EMA_{yesterday})$

Coding in Python we get:

def ema(bar, series, period, prevma, smoothing=None):
'''Returns the Exponential Moving Average of a series.

Keyword arguments:
bar         -- currrent index or location of the series
series      -- series of values to be averaged
period      -- number of values in the series to average
prevma      -- previous exponential moving average
smoothing   -- smoothing factor to use in the series.
valid values: between 0 & 1.
default: None - which then uses formula = 2.0 / (period + 1.0)
closer to 1 to gives greater weight to recent values - less smooth
closer to 0 gives greater weight to older values -- more smooth
'''
if period < 1:
raise ValueError("period must be 1 or greater")

if smoothing:
if (smoothing < 0) or (smoothing > 1.0):
raise ValueError("smoothing must be between 0 and 1")

else:
smoothing = 2.0 / (period + 1.0)

if bar <= 0:
return series[0]

elif bar < period:
return cumulative_sma(bar, series, prevma)

return prevma + smoothing * (series[bar] - prevma)

def cumulative_sma(bar, series, prevma):
"""
Returns the cumulative or unweighted simple moving average.
Avoids averaging the entire series on each call.

Keyword arguments:
bar     --  current index or location of the value in the series
series  --  list or tuple of data to average
prevma  --  previous average (n - 1) of the series.
"""

if bar <= 0:
return series[0]

else:
return prevma + ((series[bar] - prevma) / (bar + 1.0))



Example call and results using the typical smoothing factor of 2 / (period + 1):

prices = [32.47, 32.70, 32.77, 33.11, 33.25, 33.23, 33.23, 33.0, 33.04, 33.21]
period = 5   #number of bars to average
prevsma = prevema = prices[0]   #1st day nothing to average
for bar, close in enumerate(prices):
currentema = ema(bar, prices, period, prevema, smoothing=None)

#running_sma defined in simple moving average blog post
currentsma = running_sma(bar, prices, period, prevsma)

print "Day %02d Value=%.2f %i-bar SMA=%f and EMA=%f" % (bar + 1, close, period, currentsma, currentema)
prevema = currentema
prevsma = currentsma

----------------------------------------------------------
Results of call:
Day 01 Value=32.47 5-day SMA=32.470000 and EMA=32.470000
Day 02 Value=32.70 5-day SMA=32.585000 and EMA=32.585000
Day 03 Value=32.77 5-day SMA=32.646667 and EMA=32.646667
Day 04 Value=33.11 5-day SMA=32.762500 and EMA=32.762500
Day 05 Value=33.25 5-day SMA=32.860000 and EMA=32.860000
Day 06 Value=33.23 5-day SMA=33.012000 and EMA=32.983333
Day 07 Value=33.23 5-day SMA=33.118000 and EMA=33.065556
Day 08 Value=33.00 5-day SMA=33.164000 and EMA=33.043704
Day 09 Value=33.04 5-day SMA=33.150000 and EMA=33.042469
Day 10 Value=33.21 5-day SMA=33.142000 and EMA=33.098313



## Running Simple Moving Average (SMA)

When building a platform to test trading ideas…one of the big issues to deal with is all the indicators that require a spin through the price series in order to calculate. For example, in order to calculate the 200 day simple moving average (SMA) of closing prices for Google today you would have to loop back 200 – 1 days ago and sum the closing prices and divide by 200.

When you are backtesting an idea you often need to start from day 1 of a stock’s trading history and loop forward to the most current day. In essence, pretending each day is the current day at that point in time. Thus, you are looping back 200 – 1 data points for each day in the series. This isn’t such a big deal with a stock such as Google whose trading history is rather limited (2004). But, take a stock like IBM with a more extensive trading history and your code is going to bog down with each call to the SMA indicator. Throw 20,000 securities into your backtest and the looping adds up.

Therefore, running calculations are the preferred method in order to spin just once through the data points. So, in order to calculate the running simple moving average for closing prices you apply the following formula:
$SMA_{today} = SMA_{yesterday} + ((Price_{today} - Price_{today - n}) /n)$

Where

• n = number of values included in your rolling computational window.

Straight-forward and avoids the loop. Here’s the sample Python code for the Running SMA:

def running_sma(bar, series, period, prevma):
"""
Returns the running simple moving average - avoids sum of series per call.

Keyword arguments:
bar     --  current index or location of the value in the series
series  --  list or tuple of data to average
period  --  number of values to include in average
prevma  --  previous simple moving average (n - 1) of the series
"""
if period < 1:
raise ValueError("period must be 1 or greater")

if bar <= 0:
return series[0]

elif bar < period:
return cumulative_sma(bar, series, prevma)

return prevma + ((series[bar] - series[bar - period]) / float(period))

def cumulative_sma(bar, series, prevma):
"""
Returns the cumulative or unweighted simple moving average.
Avoids sum of series per call.

Keyword arguments:
bar     --  current index or location of the value in the series
series  --  list or tuple of data to average
prevma  --  previous average (n - 1) of the series.
"""

if bar <= 0:
return series[0]

else:
return prevma + ((series[bar] - prevma) / (bar + 1.0))



And the example call and results:

prices = [10, 15, 25, 18, 13, 16]
prevsma = prices[0]   #1st day nothing to average so return itself.
for bar, close in enumerate(prices):
currentsma = running_sma(bar, prices, 3, prevsma)
print "Today's 3-day SMA = %.4f" % currentsma
prevsma = currentsma

------- Results ----------------
Today's 3-day SMA = 10.0000
Today's 3-day SMA = 12.5000
Today's 3-day SMA = 16.6667
Today's 3-day SMA = 19.3333
Today's 3-day SMA = 18.6667
Today's 3-day SMA = 15.6667

Go to top