import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.cassandra.api.java.JavaCassandraSQLContext;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Date;
import org.apache.commons.lang3.StringUtils;
import com.datastax.spark.connector.japi.CassandraRow;
public class JavaDemo implements Serializable {
public static void main(String[] args) {
// TODO Auto-generated method stub
SparkConf conf = new SparkConf().setAppName("Simple Application");
conf.set("spark.cassandra.connection.host", "127.0.0.1");
String tableName="animaldatapoints";
String keySpace="ks";
JavaSparkContext sc = new JavaSparkContext(conf);
JavaCassandraSQLContext cSQL = new JavaCassandraSQLContext(sc);
cSQL.sqlContext().setKeyspace(keySpace);
String queryStr=args[0]; //"SELECT * FROM animaldatapoints";
JavaSchemaRDD cJavaSchemaRDD=cSQL.sql(queryStr);
// System.out.println(cJavaSchemaRDD.count());
List<Row> cJavaRows = cJavaSchemaRDD.take(5);
for (Row row : cJavaRows ) {
System.out.println(row);
}
}
}
If you use maven, don't forget to add SparkSQL to dependency list.
<dependency> <!-- Spark SQL -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.2.1</version>
</dependency>
And then, I've loaded animaldatapoints.csv from:
http://phenome.jax.org/db/q?rtn=docs/downloadcenter
It contains approximately 1.1 million rows.
After loading the data into Cassandra, I tested this on my Macbook Air:
cqlsh:ks> select count(*) from ks.animaldatapoints;
It returns:
OperationTimedOut: errors={}, last_host=127.0.0.1
I tested this on the same system:
$spark-submit --master local[4] --class JavaDemo target/Spark-Cassandra-Connector-Sample-0.0.1-SNAPSHOT-jar-with-dependencies.jar "SELECT COUNT(*) FROM animaldatapoints"
It returns:
15/03/10 10:41:03 INFO DAGScheduler: Job 0 finished: runJob at basicOperators.scala:141, took 8.627972 s
[1113967]