Search This Blog

Friday, March 6, 2015

Spark Cassandra Connector Example



After reading this slides, I decided to try it by myself.

Without specific reasons, I chose to use Java, following the example from datastax's github:
https://github.com/datastax/spark-cassandra-connector/blob/master/doc/7_java_api.md

But, it missed many basic things such as packages to be imported. Also, if you decided to use maven, relevant pom.xml file is also missing. Thus, let me show the Java example code that actually works.

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;

import static com.datastax.spark.connector.japi.CassandraJavaUtil.*;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Date;


import org.apache.commons.lang3.StringUtils;

import com.datastax.spark.connector.japi.CassandraRow;

/* I created a jar with dependencies. Refer to pom.xml */

public class JavaDemo  implements Serializable {
  
    // firstly, we define a bean class for saveToCassandra example.
    public static class Person implements Serializable {
        private Integer id;
        private String name;
        private Date birthDate;

        // Remember to declare no-args constructor
        public Person() { }

        public Person(Integer id, String name, Date birthDate) {
            this.id = id;
            this.name = name;
            this.birthDate = birthDate;
        }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getName() { return name; }
        public void setName(String name) { this.name = name; }

        public Date getBirthDate() { return birthDate; }
        public void setBirthDate(Date birthDate) { this.birthDate = birthDate; }

        // other methods, constructors, etc.
    }
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        SparkConf conf = new SparkConf().setAppName("Simple Application");
        conf.set("spark.cassandra.connection.host", "127.0.0.1");
     
        JavaSparkContext sc = new JavaSparkContext(conf);
      
      /* reading data from Cassandra into Spark Rdd */
        JavaRDD<String> cassandraRowsRDD = javaFunctions(sc).cassandraTable("ks", "people")
                .map(new Function<CassandraRow, String>() {
                    @Override
                    public String call(CassandraRow cassandraRow) throws Exception {
                        return cassandraRow.toString();
                    }
                });
        System.out.println("row count:"+ cassandraRowsRDD.count()); // once it is in RDD, we can use RDD operations.
        System.out.println("Data as CassandraRows: \n" + StringUtils.join(cassandraRowsRDD.toArray(), "\n"));
      
        /* select a column */
        JavaRDD<String> idRDD = javaFunctions(sc).cassandraTable("ks", "people")
                .select("id").map(new Function<CassandraRow, String>() {
                    @Override
                    public String call(CassandraRow cassandraRow) throws Exception {
                        return cassandraRow.toString();
                    }
                });
        System.out.println("Data with only 'id' column fetched: \n" + StringUtils.join(idRDD.toArray(), "\n"));
      
        /* where clause */
        JavaRDD<String> nameAnnaRDD = javaFunctions(sc).cassandraTable("ks", "people")
                .where("name=?", "Anna").map(new Function<CassandraRow, String>() {
                    @Override
                    public String call(CassandraRow cassandraRow) throws Exception {
                        return cassandraRow.toString();
                    }
                });
        System.out.println("Data filtered by the where clause (name='Anna'): \n" + StringUtils.join(nameAnnaRDD.toArray(), "\n"));
      
        /* save to cassandra */
        List<Person> people = Arrays.asList(
                new Person(1, "John", new Date()),
                new Person(2, "Troy", new Date()),
                new Person(3, "Andrew", new Date())
        );
        JavaRDD<Person> rdd = sc.parallelize(people);
        javaFunctions(rdd).writerBuilder("ks", "people", mapToRow(Person.class)).saveToCassandra();
      
        /* confirm the row is added */
        System.out.println("row counts:" + javaFunctions(sc).cassandraTable("ks", "people")
                .map(new Function<CassandraRow, String>() {
                    @Override
                    public String call(CassandraRow cassandraRow) throws Exception {
                        return cassandraRow.toString();
                    }
                }).count());
      
    }
}

Then, my final pom.xml file. Note the portion in bold. I built jar with dependencies since I've faced some undefined class error for javaFunctions(sc) when I submit the job. It is due to classpath. Instead of including relevant classpaths, I just took a convenient option. All my goal was to get something ready to work.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>Spark-Cassandra-Connector-Sample</groupId>
  <artifactId>Spark-Cassandra-Connector-Sample</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <build>
    <sourceDirectory>src</sourceDirectory>
    <plugins>
    <plugin>
      <artifactId>maven-assembly-plugin</artifactId>
      <configuration>
        <archive>
          <manifest>
            <mainClass>fully.qualified.MainClass</mainClass>
          </manifest>
        </archive>
        <descriptorRefs>
          <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
      </configuration>
        <executions>
    <execution>
      <id>make-assembly</id> <!-- this is used for inheritance merges -->
      <phase>package</phase> <!-- bind to the packaging phase -->
      <goals>
        <goal>single</goal>
      </goals>
    </execution>
  </executions>

    </plugin>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>

  <dependencies>
      <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-core_2.10</artifactId>
          <version>1.2.1</version>
      </dependency>
      <dependency>
          <groupId>com.datastax.spark</groupId>
          <artifactId>spark-cassandra-connector_2.10</artifactId>
          <version>1.2.0-alpha3</version>
      </dependency>
      <dependency>
          <groupId>com.datastax.spark</groupId>
          <artifactId>spark-cassandra-connector-java_2.10</artifactId>
          <version>1.2.0-alpha3</version>
      </dependency>>
  </dependencies>
</project>

I executed this spark application in my macbook air, where cassandra/spark is in local mode.

$SPARK_HOME/bin/spark-submit --class JavaDemo --master local[2] target/Spark-Cassandra-Connector-Sample-0.0.1-SNAPSHOT-jar-with-dependencies.jar 2> log
row count:3
Data as CassandraRows:
CassandraRow{id: 10, birth_date: 1987-12-02 00:00:00-0500, name: Catherine}
CassandraRow{id: 12, birth_date: 1970-10-02 00:00:00-0400, name: Anna}
CassandraRow{id: 11, birth_date: 2004-09-08 00:00:00-0400, name: Isadora}
Data with only 'id' column fetched:
CassandraRow{id: 10}
CassandraRow{id: 12}
CassandraRow{id: 11}
Data filtered by the where clause (name='Anna'):
CassandraRow{id: 12, birth_date: 1970-10-02 00:00:00-0400, name: Anna}
row counts:6

Execution time of each job (a query in this example) is as follows.

$ grep Job log | grep finished
15/03/07 01:10:42 INFO DAGScheduler: Job 0 finished: count at JavaDemo.java:61, took 1.855545 s
15/03/07 01:10:42 INFO DAGScheduler: Job 1 finished: toArray at JavaDemo.java:62, took 0.335054 s
15/03/07 01:10:42 INFO DAGScheduler: Job 2 finished: toArray at JavaDemo.java:70, took 0.252191 s
15/03/07 01:10:43 INFO DAGScheduler: Job 3 finished: toArray at JavaDemo.java:79, took 0.460959 s
15/03/07 01:10:43 INFO DAGScheduler: Stage 4 (runJob at RDDFunctions.scala:29) finished in 0.061 s
15/03/07 01:10:43 INFO DAGScheduler: Job 4 finished: runJob at RDDFunctions.scala:29, took 0.074383 s
15/03/07 01:10:43 INFO DAGScheduler: Job 5 finished: count at JavaDemo.java:94, took 0.262380 s

1 comment:

  1. Hi,

    I am getting an exception org.apache.spark.SparkException: Task not serializable when I tried following code.

    JavaRDD cassandraRowsRDD = javaFunctions(jsc).cassandraTable("ogellenalytics", "userdetails").map(new Function() {

    @Override
    public String call(CassandraRow cassandraRow) throws Exception {
    return cassandraRow.toString();
    }
    })

    Can you please help me to fix it?

    ReplyDelete