Search This Blog

Tuesday, December 15, 2015

animating images in ipython notebook

Put this :
%matplotlib notebook

instead of
%matplotlib inline

  • The old %matplotlib inline activates the inline backend, which renders figures in the notebook as static pngs.
  • The new %matplotlib notebook activates the nbagg backend, added in matplotlib 1.4, which will include a javascript interface for interaction with inline figures in the notebook. This only works in IPython 3.x; for older IPython versions, use %matplotlib nbagg
  • nbagg is different than mpld3 in that it requires a live connection to a Python kernel. This allows it to be more feature complete than mpld3, but any static rendering of the notebook will not include the interactivity.

Monday, October 19, 2015

Spark/IPython on CentOS 6.5

1. Introduction
IPython is a remarkable tool to demonstrate/share your data analysis with collaborators. Spark provides Python APIs so that it is really nice to use IPython to share data analysis, with explanations for each step.

However, if you are on CentOS, you have big troubles. CentOS relies in python 2.6 for yum install, while most of modern python tools, including recent versions of IPython, depends on python 2.7.6 or later versions. The solution is to maintain two different versions of python in each system. Yum install typically happens under root privilege, python 2.6 for root and python2.7 for other users.

After that, PySpark (up to spark 1.1.0 ) has dependencies with numpy, which depends on lapack and blas libraries.If you don’t install these tools, it will yield some random errors, while executing your python program across cluster.

2. Procedures
2.1. Install Python2.7, with pip2.7 on each node.

sudo yum groupinstall "Development tools"
sudo yum install zlib-devel bzip2-devel openssl-devel ncurses-devel sqlite-devel
wget --no-check-certificate https://www.python.org/ftp/python/2.7.6/Python-2.7.6.tar.xz
tar xf Python-2.7.6.tar.xz
cd Python-2.7.6
./configure --prefix=/usr/local
make 
sudo make altinstall

After running the commands above your newly installed Python 2.7.6 interpreter will be available as /usr/local/bin/python2.7 and the system version of Python 2.6.6 will be available as /usr/bin/python and /usr/bin/python2.6.

Check with:
 ls -ltr /usr/bin/python*

lrwxrwxrwx 1 root root    6 Nov 16  2002 /usr/bin/python2 -> python
-rwxr-xr-x 1 root root 1418 Jul 10  2013 /usr/bin/python2.6-config
-rwxr-xr-x 2 root root 4864 Jul 10  2013 /usr/bin/python2.6
-rwxr-xr-x 2 root root 4864 Jul 10  2013 /usr/bin/python
lrwxrwxrwx 1 root root   16 Oct 24 15:39 /usr/bin/python-config -> python2.6-config

ls -ltr /usr/local/bin/python*
-rwxr-xr-x 1 root root 6214533 Mar 19 22:46 /usr/local/bin/python2.7
-rwxr-xr-x 1 root root    1674 Mar 19 22:46 /usr/local/bin/python2.7-config
 
If things don’t look right, you might need to create a symbolic link in /usr/local/bin
cd /usr/local/bin
ls -ltr python*
WARNING: don’t do this before checking the $PATH for root. if it has /usr/local/bin before /usr/bin, it will see python2.7 first i.e.
 echo $PATH
/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/root/bin
 
If you add this link, do a “which python” for the user and for root. If root is pointing to /usr/local/bin/python, remove the link you just added, and figure out something else.
sudo ln -s /usr/local/bin/python2.7 /usr/local/bin/python
final check:

sudo which python
sudo python --version


which python
python --version
2.2. install dependencies for IPython/PySpark
Assuming Spark is already installed across the cluster, we will install pip and numpy dependencies, followed by a few useful python data analysis tools
wget https://bitbucket.org/pypa/setuptools/raw/bootstrap/ez_setup.py
sudo /usr/local/bin/python2.7 ez_setup.py
sudo /usr/local/bin/easy_install-2.7 pip

sudo yum install lapack lapack-devel blas blas-devel libpng-devel freetype*

 sudo /usr/local/bin/pip2.7 install numpy
 sudo /usr/local/bin/pip2.7 install scipy
 sudo /usr/local/bin/pip2.7 install matplotlib
 sudo /usr/local/bin/pip2.7 install pandas
 
2.3 Finally, install ipython

     sudo /usr/local/bin/pip2.7 install ipython[notebook]
 

Friday, June 12, 2015

Large Sparse Matrix-Vector dot product in Python with Key-Value Storages

Scipy and Numpy modules are pretty handy Python modules to deal with many numerical operations in a stand-alone machine. As for large matrix, a lot of tools in distributed/parallel environments exist, but stand-alone tools are still handy in many situations.

I created a random sparse matrix that can represent the same size of graph of Live Journal data set in SNAP. As the network contains 4847571 vertices and 68993773 edges, I created a 4847571 x 4847571 matrix with 68993773 non-zero elements. I firstly considered scipy.sparse module.

from scipy.sparse import dok_matrix
import numpy as np
def matrixGen():
        subscripts = np.random.randint(0,high=4847571-1, size=(68993773,2))
        matrix = dok_matrix((4847571,4847571),dtype=np.int)
        for i in range(0,68993773):
            matrix[subscripts[i,0],subscripts[i,1]] = 1
            if i%10000 == 0:
                print i
        return matrix
However, I hit the wall. In my iPython shell, the above function did not end on my macbook air (8GB RAM):

In [9]: %time matrix = matrixGen()

it stalled after the screen printed 35700000 for more than 30 minutes. So, I stopped it. Even, I cannot construct the desired matrix on my system. I searched for alternative ways. Store the matrix on a key-value storage, which store matrix a key-value pair of ("i,j", 1), where "i,j" is the index and 1 is the non-zero term. I firstly tried LMDB (http://symas.com/mdb/ ) , which is known to one of the fastest key-value storage that can be embedded into an application.

class Graph:
    def __init__ (self,name, **kwargs):
        self.name = name
        self.vname = "v"
        self.env = lmdb.open(os.path.join(os.getcwd(), 'db'), map_size = 5000000000,max_dbs=2,sync=False, writemap=True, map_async=True)
        self.db = self.env.open_db(self.name)
        self.vdb = self.env.open_db(self.vname)
   
    def Open(self):
        with self.env.begin(db = self.db, write=True) as txn:
            results = json.loads(txn.get(self.name))
            self.vertices = results['v']
            self.edges = results['e']
   
    def Generator(self, vertices, edges):
        " Generator(self,vertices, edges) generates a graph in a sparse matrix form"
        self.vertices = vertices
        self.edges = edges
      
        with self.env.begin(db = self.db, write=True) as txn:
            subscripts = np.random.randint(0,  high=self.vertices-1, size=(self.edges,2))
            for i in range(0,self.edges):
                if i % 10000 == 0:
                    self.env.sync()
                    txn.commit()
                    print 100.0*i/68993773.0, "%"
                txn.put(self.name+":"+str(subscripts[i,0])+","+str(subscripts[i,1]),"1")
            self.env.sync()
            txn.put(self.name, json.dumps({"v":self.vertices, "e":self.edges}))
            self.env.sync()

Of course, I didn't have trouble to generate the matrix. It took around 10~15 minutes on my Macbook Air (8GB RAM) (I didn't record it so I can't remember the exact time.)

After that, I tried dotProduct.

def dotProduct(self,vector,batchSize=2000):
        with self.env.begin(db=self.db) as txn:
            with self.env.begin(db=self.vdb, write=True) as vtxn:
                cursor = txn.cursor(db = self.db)
                vcursor = vtxn.cursor(db=self.vdb)
                vcursor.first()
                cursor.set_range(self.name+":"+"0,0")
                rowID = -1;
                count = 0
                rowMatrix=dok_matrix((batchSize,self.vertices), dtype= np.int)
                for k, v in cursor:
                    subscripts = k.split(":")[1].split(",")
                    if rowID != int(subscripts[0]): # entered into a new row.
                        rowID = int(subscripts[0])
                        count += 1
                        if rowID %batchSize == 0:
                            product = rowMatrix.tocsr().dot(vector)
                            print product.shape
                            for i in range(0,len(product)):
                                vcursor.put(self.vname+":"+str(rowID+i),product[i,])
                            rowMatrix=dok_matrix((batchSize,self.vertices), dtype= np.int)
                            print 100.0* float(count)/self.vertices, "%"
                    colID = int(subscripts[1])
                    rowMatrix[rowID%batchSize,colID] = int(v)
                product = rowMatrix.tocsr().dot(vector)
                for i in range(0,len(product)):
                    vcursor.put(self.vname+":"+str(rowID+i),product[i,])
                rowMatrix=dok_matrix((batchSize,self.vertices), dtype= np.int)
                print 100.0* float(count)/self.vertices, "%"
           
I thought the dot product will be more efficient when we do it as with a batch, instead of doing it in every iteration. So, I created a dok_matrix, which can be incrementally constructed. And then, (1) I tried dot product directly with dok_matrix. Also, (2) I tried dot product after converting the dok_matrix into csr_matrix. For the first two cases, I maintained the whole vector within Python, but (3) I also tried to put the vector into LMDB, while matrix calculation is done in csr_matrix. So I tried three cases:

It turned out not dismal. I could finish the matrix-vector dot product, where the sparse matrix contains 68,993,773 non-zero terms and the dimension of the dense vector is 4,847,571. We observe that csr_matrix (red) is more efficient than dok_matrix (blue). Also, we find that putting the vector on the LMDB is more efficient.

When the result dense vector is stored in np.array, the computation gets more efficient when the block size increases. However, when the result dense vector is stored on LMDB, we observe flat or negative performance impact from the larger block size.  I believe that we could interpret this as a negative performance with too large bulk insertion on the LMDB. (LMDB is a b tree based, which is known to read-optimized rather than write-optimized).  However, when the block size was 1000 and 2000, the execution time was not that different.

After this, I thought LevelDB might be another choice. LevelDB is from Google, which is more write-optimized, using Log-Structured-Merge tree, relevant python code is as follow:


    def Open(self, name):
        self.name = name
        graphInfo=json.loads(self.db.Get(name))
        self.vertices = graphInfo['vertices']
        self.edges = graphInfo['edges']
        self.cursor = 0
       
    def Take(self, rows):
        results={}
        end = min(self.cursor+rows, self.vertices)
      
        for row in range(self.cursor,end):
            result = self.db.RangeIter(key_from=self.name+":"+str(row)+",0", key_to=self.name+":"+str(row)+","+str(self.vertices-1))
            results[row] = [(k.split(":")[1].split(","), v) for k,v in result]
        self.cursor = end
        return results
   
    def dotProduct(self,vector,batchSize=2000):
       
        self.cursor=-1
        batch = leveldb.WriteBatch()
        for row in range(0, self.vertices, batchSize):
            results = self.Take(batchSize)
            matrix = dok_matrix((batchSize,self.vertices), dtype=np.int)
            for key in sorted(results.keys()):
                for item in sorted(results[key]):
                    matrix[int(item[0][0])%batchSize, int(item[0][1])] = int(item[1])
            product=matrix.tocsr().dot(vector)
            for pos in range(0,len(product)):
                batch.Put("result"+":"+str(pos+row*batchSize), str(product[pos]))
        self.db.Write(batch)

As I've found the benefits of converting dok_matrix to csr_matrix to calculate the dot product, I only tested csr_matrix case for LevelDB. Trends are as shown in following figure.


It is similar to LMDB cases. Note that the best in LevelDB was 13 minutes 51 seconds, while the best in LMDB was 12 minutes 27 seconds.



Tuesday, May 19, 2015

Command line execution of a Java class built in Eclipse

 IDEs like Eclipse provides higher productivity. However, if your testing program involves command line arguments, you should check run configurations and change the command line argument through clicking mouse several times. Then, your intention would be running the compiled java class on command line.

When you have created a package and created classes within a package, your source directory structure will be like:

$ ls -R src
chapter1

src/chapter1:
Binary.java    Chapter1.java    Factors.java    Gambler.java

Maybe, your binary directory will be structured like:

$ ls -R bin
chapter1

bin/chapter1:
Binary.class    Chapter1.class    Factors.class    Gambler.class


When your Chapter1 class contains the entry point, main() method, for instance, your Chapter1.java will be like:

package chapter1;

public class Chapter1 {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        Factors factors = new Factors();
        factors.run(args);
    }

}

and you have a separate file of Factors.java for Factors class:
package chapter1;

public class Factors {
    public void run(String[] args)
    {
        long N = Long.parseLong(args[0]);
        long n= N;
        for (long i = 2; i <= n/i; i++)
        {
            while (n % i == 0)
            {
                n /= i;
                System.out.print(i + " ");
            }
        }
        if (n > 1) System.out.print(n);
        System.out.println();
    }
}

On the command line, you can type
$ java -cp bin chapter1.Chapter1 287994837222311


It will produce:
17 1739347 9739789

Tuesday, March 10, 2015

SparkSQL cassandra example


import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;

import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.cassandra.api.java.JavaCassandraSQLContext;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Date;

import org.apache.commons.lang3.StringUtils;

import com.datastax.spark.connector.japi.CassandraRow;

public class JavaDemo  implements Serializable {
        public static void main(String[] args) {
        // TODO Auto-generated method stub
        SparkConf conf = new SparkConf().setAppName("Simple Application");
        conf.set("spark.cassandra.connection.host", "127.0.0.1");
        String tableName="animaldatapoints";
        String keySpace="ks";
      
        JavaSparkContext sc = new JavaSparkContext(conf);
       
        JavaCassandraSQLContext cSQL = new JavaCassandraSQLContext(sc);
        cSQL.sqlContext().setKeyspace(keySpace);
        String queryStr=args[0]; //"SELECT * FROM animaldatapoints";
        JavaSchemaRDD cJavaSchemaRDD=cSQL.sql(queryStr);
   
    //    System.out.println(cJavaSchemaRDD.count());
        List<Row>  cJavaRows = cJavaSchemaRDD.take(5);
        for (Row row : cJavaRows ) {
            System.out.println(row);
        }
    }
}

If you use maven, don't forget to add SparkSQL to dependency list.
 <dependency> <!-- Spark SQL -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.2.1</version>
</dependency>

And then, I've loaded animaldatapoints.csv from:

http://phenome.jax.org/db/q?rtn=docs/downloadcenter

It contains approximately 1.1 million rows.

After loading the data into Cassandra, I tested this on my Macbook Air:
cqlsh:ks> select count(*) from ks.animaldatapoints;

It returns:
OperationTimedOut: errors={}, last_host=127.0.0.1

I tested this on the same system:
$spark-submit --master local[4] --class JavaDemo target/Spark-Cassandra-Connector-Sample-0.0.1-SNAPSHOT-jar-with-dependencies.jar "SELECT COUNT(*) FROM animaldatapoints"

It returns:
15/03/10 10:41:03 INFO DAGScheduler: Job 0 finished: runJob at basicOperators.scala:141, took 8.627972 s
[1113967]




Friday, March 6, 2015

Spark Cassandra Connector Example



After reading this slides, I decided to try it by myself.

Without specific reasons, I chose to use Java, following the example from datastax's github:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md

But, it missed many basic things such as packages to be imported. Also, if you decided to use maven, relevant pom.xml file is also missing. Thus, let me show the Java example code that actually works.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Date;


import org.apache.commons.lang3.StringUtils;

import com.datastax.spark.connector.japi.CassandraRow;

/* I created a jar with dependencies. Refer to pom.xml */

public class JavaDemo  implements Serializable {
  
    // firstly, we define a bean class for saveToCassandra example.
    public static class Person implements Serializable {
        private Integer id;
        private String name;
        private Date birthDate;

        // Remember to declare no-args constructor
        public Person() { }

        public Person(Integer id, String name, Date birthDate) {
            this.id = id;
            this.name = name;
            this.birthDate = birthDate;
        }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getName() { return name; }
        public void setName(String name) { this.name = name; }

        public Date getBirthDate() { return birthDate; }
        public void setBirthDate(Date birthDate) { this.birthDate = birthDate; }

        // other methods, constructors, etc.
    }
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        SparkConf conf = new SparkConf().setAppName("Simple Application");
        conf.set("spark.cassandra.connection.host", "127.0.0.1");
     
        JavaSparkContext sc = new JavaSparkContext(conf);
      
      /* reading data from Cassandra into Spark Rdd */
        JavaRDD<String> cassandraRowsRDD = javaFunctions(sc).cassandraTable("ks", "people")
                .map(new Function<CassandraRow, String>() {
                    @Override
                    public String call(CassandraRow cassandraRow) throws Exception {
                        return cassandraRow.toString();
                    }
                });
        System.out.println("row count:"+ cassandraRowsRDD.count()); // once it is in RDD, we can use RDD operations.
        System.out.println("Data as CassandraRows: \n" + StringUtils.join(cassandraRowsRDD.toArray(), "\n"));
      
        /* select a column */
        JavaRDD<String> idRDD = javaFunctions(sc).cassandraTable("ks", "people")
                .select("id").map(new Function<CassandraRow, String>() {
                    @Override
                    public String call(CassandraRow cassandraRow) throws Exception {
                        return cassandraRow.toString();
                    }
                });
        System.out.println("Data with only 'id' column fetched: \n" + StringUtils.join(idRDD.toArray(), "\n"));
      
        /* where clause */
        JavaRDD<String> nameAnnaRDD = javaFunctions(sc).cassandraTable("ks", "people")
                .where("name=?", "Anna").map(new Function<CassandraRow, String>() {
                    @Override
                    public String call(CassandraRow cassandraRow) throws Exception {
                        return cassandraRow.toString();
                    }
                });
        System.out.println("Data filtered by the where clause (name='Anna'): \n" + StringUtils.join(nameAnnaRDD.toArray(), "\n"));
      
        /* save to cassandra */
        List<Person> people = Arrays.asList(
                new Person(1, "John", new Date()),
                new Person(2, "Troy", new Date()),
                new Person(3, "Andrew", new Date())
        );
        JavaRDD<Person> rdd = sc.parallelize(people);
        javaFunctions(rdd).writerBuilder("ks", "people", mapToRow(Person.class)).saveToCassandra();
      
        /* confirm the row is added */
        System.out.println("row counts:" + javaFunctions(sc).cassandraTable("ks", "people")
                .map(new Function<CassandraRow, String>() {
                    @Override
                    public String call(CassandraRow cassandraRow) throws Exception {
                        return cassandraRow.toString();
                    }
                }).count());
      
    }
}

Then, my final pom.xml file. Note the portion in bold. I built jar with dependencies since I've faced some undefined class error for javaFunctions(sc) when I submit the job. It is due to classpath. Instead of including relevant classpaths, I just took a convenient option. All my goal was to get something ready to work.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>Spark-Cassandra-Connector-Sample</groupId>
  <artifactId>Spark-Cassandra-Connector-Sample</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <build>
    <sourceDirectory>src</sourceDirectory>
    <plugins>
    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <configuration>
        <archive>
          <manifest>
            <mainClass>fully.qualified.MainClass</mainClass>
          </manifest>
        </archive>
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
      </configuration>
        <executions>
    <execution>
      <id>make-assembly</id> <!-- this is used for inheritance merges -->
      <phase>package</phase> <!-- bind to the packaging phase -->
      <goals>
        <goal>single</goal>
      </goals>
    </execution>
  </executions>

    </plugin>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>

  <dependencies>
      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.2.1</version>
      </dependency>
      <dependency>
          <groupId>com.datastax.spark</groupId>
          <artifactId>spark-cassandra-connector_2.10</artifactId>
          <version>1.2.0-alpha3</version>
      </dependency>
      <dependency>
          <groupId>com.datastax.spark</groupId>
          <artifactId>spark-cassandra-connector-java_2.10</artifactId>
          <version>1.2.0-alpha3</version>
      </dependency>>
  </dependencies>
</project>

I executed this spark application in my macbook air, where cassandra/spark is in local mode.

$SPARK_HOME/bin/spark-submit --class JavaDemo --master local[2] target/Spark-Cassandra-Connector-Sample-0.0.1-SNAPSHOT-jar-with-dependencies.jar 2> log
row count:3
Data as CassandraRows:
CassandraRow{id: 10, birth_date: 1987-12-02 00:00:00-0500, name: Catherine}
CassandraRow{id: 12, birth_date: 1970-10-02 00:00:00-0400, name: Anna}
CassandraRow{id: 11, birth_date: 2004-09-08 00:00:00-0400, name: Isadora}
Data with only 'id' column fetched:
CassandraRow{id: 10}
CassandraRow{id: 12}
CassandraRow{id: 11}
Data filtered by the where clause (name='Anna'):
CassandraRow{id: 12, birth_date: 1970-10-02 00:00:00-0400, name: Anna}
row counts:6

Execution time of each job (a query in this example) is as follows.

$ grep Job log | grep finished
15/03/07 01:10:42 INFO DAGScheduler: Job 0 finished: count at JavaDemo.java:61, took 1.855545 s
15/03/07 01:10:42 INFO DAGScheduler: Job 1 finished: toArray at JavaDemo.java:62, took 0.335054 s
15/03/07 01:10:42 INFO DAGScheduler: Job 2 finished: toArray at JavaDemo.java:70, took 0.252191 s
15/03/07 01:10:43 INFO DAGScheduler: Job 3 finished: toArray at JavaDemo.java:79, took 0.460959 s
15/03/07 01:10:43 INFO DAGScheduler: Stage 4 (runJob at RDDFunctions.scala:29) finished in 0.061 s
15/03/07 01:10:43 INFO DAGScheduler: Job 4 finished: runJob at RDDFunctions.scala:29, took 0.074383 s
15/03/07 01:10:43 INFO DAGScheduler: Job 5 finished: count at JavaDemo.java:94, took 0.262380 s