# This live demo was first used at the Berkeley MSRI workshop on Interactive Parallel Computing
# on January 29th.  It requires IPython/IPython1 and its dependencies.  Information about installing
# these things can be found at this URL:
#
# http://ipython.scipy.org/moin/Parallel_Computing
#
# Once installation is done:
# 
# 1.  Do ipcluster -n 2 to start 2 engines and a controller
# 2.  Type the following commands into an interactive IPython session
# 3.  You can also run the entire script by typing:  irunner MSRIdemo.ipy
#
# please email question/comments to Brian Granger <ellisonbgATgmail.com>
#
# The <demo> tags are meant to be used with IPython's demo capabilities.

# <demo> silent


import numpy
%autoindent off

# <demo> --- stop ---
# Creating a RemoteController instance.

import ipython1.kernel.api as kernel
rc = kernel.RemoteController(('127.0.0.1',10105))

# <demo> --- stop ---
# Get the IDs of the connected engines

print rc.getIDs()

# <demo> --- stop ---
# Running python commands:  blocking mode

rc.executeAll('a=5')                                         # On all engines
rc.executeAll('import math')
rc.execute([0, 1], 'c=a*math.pi; print c')                   # Only on engines [0, 1]

# <demo> --- stop ---
# Running python commands:  non-blocking mode

rc.executeAll('import time')
rc.executeAll('time.sleep(10)',block=False)                           # Submit and return immediately

# <demo> --- stop ---
# Another non-blocking example

# Don't block
rc.execute(0, 'y = sum(x**100 for x in xrange(100000))',block=False)  # Submit this
print "Execute has been called in non-blocking mode."                 # Print this
rc.execute(0, 'print y', block=True)                                  # Then block

# <demo> --- stop ---
# Parallel Magic Commands

# First make everything non-blocking
rc.block = False

# %px foo ~ rc.executeAll('foo')
%px import numpy
%px a = numpy.random.rand(4,4)
%px print numpy.linalg.eigvals(a)

# <demo> --- stop ---
# Get the last result

%result

# <demo> --- stop ---
# More magic

# %pn 0 foo ~ rc.execute(0, 'foo')
%pn 0 print numpy.linspace(0,1.0,10)
%pn 1 print numpy.random.rand(2,2)

# <demo> --- stop ---
# Get the last result

%result

# <demo> --- stop ---
# Auto Magic

# %autopx makes everything parallel
%autopx

max_evals = []

for i in range(100):
    a = numpy.random.rand(10,10)
    a = a + a.transpose()
    evals = numpy.linalg.eigvals(a)
    max_evals.append(evals[0].real)


%autopx

# <demo> --- stop ---
# See what the result was

rc.block=True
%px print "Average max eigenvalue is: ", sum(max_evals)/len(max_evals)

# <demo> --- stop ---
# Moving Python objects around

a = 1.035345345                             # Local a
rc.pushAll(a=a)                             # Send a as a to all engines
%px print a
local_a = rc.pullAll('a')                   # Pull a on all engines back
print local_a                               # Print local copy

# <demo> --- stop ---
# Push and pull can also take engine ids

rc.push(0, c=numpy.random.rand(2,2))        # Push 2x2 random matrix to 0 as c
# Take transpose
%pn 0 c = c.transpose()
local_c = rc.pull(0, 'c')                   # Pull back
print local_c                               # Print local copy

# <demo> --- stop ---
# Dictionary Interface

rc['d'] = 3564567                           # Like rc.pushAll(d=3564567)
%px print d
print rc['d']                               # Like rc.pullAll('d')

# <demo> --- stop ---
# Array/Dictionary Interface

# Remote controller objects have both a dictionary
# and array interface that can be combined:
for i in range(2):
    rc[i]['id'] = i


%px print id

# <demo> --- stop ---
# Scatter/Gather

rc.scatterAll('a', range(8))                # Partition and distribute [0,...,8]
%px print a 
local_a = rc.gatherAll('a')                 # Gather partitions back
print local_a

# <demo> --- stop ---
# Parallelized map

#  Apply x**10 element by element
%time r1 = map(lambda x: x**10, range(100000))
# Do it in parallel (scatter/map/gather)
%time r2 = rc.mapAll('lambda x: x**10', range(100000))    
print sum(r1), sum(r2)

# <demo> --- stop ---
# Let's compute pi

from __future__ import division

def pi(n):
    """Compute pi using n terms of Wallis' product.

    pi(n) = 2 \prod_{i=1}^{n}\frac{4i^2}{4i^2-1}."""
    num = 1
    den = 1
    for i in xrange(1,n+1):
	tmp = 4*i*i
	num *= tmp
	den *= tmp-1
    return 2.0*(num/den)


%time pi(5000)

# <demo> --- stop ---
# <demo> silent

rc.block = False

%autopx
    
def wpi_nd(range_spec):

    n1,n2 = range_spec
    
    num = 1
    den = 1
    for i in xrange(n1,n2):
	tmp = 4*i*i
	num *= tmp
	den *= tmp-1
    return num,den


%autopx

# Local functions

def wpi_nd(range_spec):

    n1,n2 = range_spec
    
    num = 1
    den = 1
    for i in xrange(n1,n2):
	tmp = 4*i*i
	num *= tmp
	den *= tmp-1
    return num,den


def part_range(n1,n2,nchunks):
    """Partition a range specification in nchunks"""
    size,rem = divmod(n2-n1,nchunks)
    sizes = [size]*nchunks
    while rem > 0:
        for i in range(nchunks):
            sizes[i] += 1
            rem -= 1
            if rem == 0:
                break
    ranges = []
    start=n1
    for size in sizes:
        ranges.append((start,start+size))
        start += size
    return ranges


rc.block = True

# <demo> --- stop ---
# Parallelized calculation of pi

def par_pi(n, rc):
    """Compute pi using n terms of Wallis' product.

    pi(n) = 2 \prod_{i=1}^{n}\frac{4i^2}{4i^2-1}.

    Parallel version."""
    
    # How many partitions should we use
    num_engines = len(rc.getIDs())
    # Compute partial product on each engine
    m = rc.mapAll('wpi_nd',part_range(1,n+1,num_engines))
    # Compute total product of num, den
    num,den = reduce(lambda x,y:(x[0]*y[0],x[1]*y[1]),m)
    return 2.0*(num/den)


%time pi(5000)
%time par_pi(5000, rc)

# <demo> --- stop ---
# A larger calculation

%time pi(10000)
%time par_pi(10000, rc)

# <demo> --- stop ---
# Parallel list comprehensions

rc.scatterAll('a', range(100000))               # Scatter
%px result = [2*x for x in a]
local_result = rc.gatherAll('a')                # Gather to assemble result
print local_result[0:100]                              

