Search This Blog

Tuesday, March 10, 2015

SparkSQL cassandra example


import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;

import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.cassandra.api.java.JavaCassandraSQLContext;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Date;

import org.apache.commons.lang3.StringUtils;

import com.datastax.spark.connector.japi.CassandraRow;

public class JavaDemo  implements Serializable {
        public static void main(String[] args) {
        // TODO Auto-generated method stub
        SparkConf conf = new SparkConf().setAppName("Simple Application");
        conf.set("spark.cassandra.connection.host", "127.0.0.1");
        String tableName="animaldatapoints";
        String keySpace="ks";
      
        JavaSparkContext sc = new JavaSparkContext(conf);
       
        JavaCassandraSQLContext cSQL = new JavaCassandraSQLContext(sc);
        cSQL.sqlContext().setKeyspace(keySpace);
        String queryStr=args[0]; //"SELECT * FROM animaldatapoints";
        JavaSchemaRDD cJavaSchemaRDD=cSQL.sql(queryStr);
   
    //    System.out.println(cJavaSchemaRDD.count());
        List<Row>  cJavaRows = cJavaSchemaRDD.take(5);
        for (Row row : cJavaRows ) {
            System.out.println(row);
        }
    }
}

If you use maven, don't forget to add SparkSQL to dependency list.
 <dependency> <!-- Spark SQL -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.2.1</version>
</dependency>

And then, I've loaded animaldatapoints.csv from:

http://phenome.jax.org/db/q?rtn=docs/downloadcenter

It contains approximately 1.1 million rows.

After loading the data into Cassandra, I tested this on my Macbook Air:
cqlsh:ks> select count(*) from ks.animaldatapoints;

It returns:
OperationTimedOut: errors={}, last_host=127.0.0.1

I tested this on the same system:
$spark-submit --master local[4] --class JavaDemo target/Spark-Cassandra-Connector-Sample-0.0.1-SNAPSHOT-jar-with-dependencies.jar "SELECT COUNT(*) FROM animaldatapoints"

It returns:
15/03/10 10:41:03 INFO DAGScheduler: Job 0 finished: runJob at basicOperators.scala:141, took 8.627972 s
[1113967]




No comments:

Post a Comment