Saving data to two JDBC tables granting atomicity in Apache Spark

Published: 2017-08-30, Updated: 2018-03-24

In this article I will cover a solution to group and save data to a relational database in two tables granting atomicity.

The Problem

We have at a JSON file many students of some schools and we need to store it in two tables at HSQLDB saving the students at STUDENT table referring it to the SCHOOL table.

Data structure

Input format

The students are in a JSON file with many lines in a format represented bellow:

{"name":"Mark", "schoolName":"Walter Payton College Prep"}
{"name":"Maria", "schoolName":"Northside College Preparatory High School"}

Output

The two tables HSQLDB script

CREATE TABLE STUDENT (
	ID INTEGER GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY,
	SCHOOL_ID INTEGER,
	NAME VARCHAR(255)
)

CREATE TABLE SCHOOL (
	ID INTEGER GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY,
	NAME VARCHAR(255)
)

Breaking into steps

Spark map/reduce dag graph

Step 1 - Parse the JSON, Remove duplicated students and group by school

First let's define the VO that represents our student and school, here we'll use the same VO for both because this way is easier to join then later

import java.io.Serializable;

public class Student implements Serializable {

	public int id;
	public String name;
	public int schoolId;
	public String schoolName;

	public Student() {
	}

	public Student(String name, String schoolName) {
		this.name = name;
		this.schoolName = schoolName;
	}

	@Override
	public boolean equals(Object o) {
		if (this == o) return true;
		if (o == null || getClass() != o.getClass()) return false;

		Student student = (Student) o;

		return schoolName.equals(student.schoolName);
	}

	@Override
	public int hashCode() {
		return schoolName.hashCode();
	}
}

Then we can work

// reading the json and parsing it into a vo list, I'm using Jackson data bind to parse the JSON lines
final JavaPairRDD<String, Student> studentsPair = sc.textFile(".../students.json")
.mapToPair(line -> {
	final Student student = new ObjectMapper().readValue(line, Student.class);
	return new Tuple2<>(student.name, student);
})

// removing duplicated students
.reduceByKey((s1, s2) -> s1)

// here I'm correlating what's key to find a student, in that case I wil use the schoolName
.mapToPair(tuple -> new Tuple2<>(tuple._2.schoolName, tuple._2))

Step 2 - Save the Schools

Step 2.1 Save the schools and get they IDs back

final JavaPairRDD<String, Student> savedSchools = studentsPair

// Here I'm removing all students that have the same schoolName, that way 
// Just one Student will left for every school
// My objective here is to have a list of schools without any duplicates
.reduceByKey((s1, s2) -> s1)

// Then I just loop though every school, save it to database
// and map it's ID back, that way I can join it with students again and save them refering to the school ID
// that I just created
.mapPartitionsToPair(it -> { 

	final Set<Tuple2<String, Student>> schools = new HashSet<>();
	try(Connection conn = getConnection()){

		it.forEachRemaining(tuple -> {

			try(PreparedStatement stm = conn.prepareStatement("INSERT INTO SCHOOL (NAME) VALUES (?)", Statement.RETURN_GENERATED_KEYS)){

				stm.setString(1, tuple._2.schoolName);
				stm.executeUpdate();

				try(ResultSet rs = stm.getGeneratedKeys()){
					if(rs.next()){
						tuple._2.schoolId = rs.getInt(1);
					}else {
						throw new IllegalStateException();
					}
				}
				schools.add(new Tuple2<>(tuple._1, tuple._2));

			}catch (Exception e){
				throw new RuntimeException(e);
			}
		});
		conn.commit();
	}
	return schools.iterator();
});

mapPartitionsToPair is used because it reduce the number of open/close connections

Step 2.2 Joining schools back to students with IDs

This is the simpliest step, we have all schools saved with it's ids, just need to join the schools with students by schoolName

JavaPairRDD<String, Tuple2<Student, Student>> joinedStudents = savedSchools.join(studentsPair);

See, The Student at the left of the Tuple2 is not a Student and yes a School, it has the ID that we just saved, the Student at the right of course is a Student that has no school ID but now is paired with the school. I saved the School at a Student VO because that way I have not to treat different VO types inside Student.equals()

Why I didn't use group by?

Since we've created a pair of school-> student we must to group the students by the school, just because we need a pair of one school to many students to save it at the two tables and create a foreign key between them. We can do that in two ways, the first and more logical is to use groupBy.

JavaPairRDD<String, Iterable<Student>> studentGroupedPair = studentsPair.groupByKey();

But, there is a problem, groupBy have a memory limitation in Spark, if your students list are too big then you will got a problem, it implies that all school students must fit your local node memory (JVM executor), if not, you will get A OutOfMemoryError, the iterable above says it to you. For this reason we used join that is less logical approach but very effective.

Step 3 Save the students

Just need to make a foreachPartition and save every student in database, in the example a connection pool is used and will only commit when the partition it.forEachRemaining ends.

joinedStudents
.foreachPartition(it -> {
	try(Connection conn = getConnection()){

		it.forEachRemaining(tuple -> {

			try(PreparedStatement stm = conn.prepareStatement("INSERT INTO STUDENT (SCHOOL_ID, NAME) VALUES (?, ?)")){

				stm.setInt(1, tuple._2._1.schoolId);
				stm.setString(2, tuple._2._2.name);
				stm.executeUpdate();

			}catch (Exception e){
				throw new RuntimeException(e);
			}

		});
		conn.commit();
	}
});

Making spark "Atomic"

If you have some experience with relational databases probably will ask

What about The transactions? If spark crash while saving to database? Some students will get saved others don't

Right, spark is distributed so it use many connections to do the same thing, that's why Spark is faster. So if we need to grant the database ACID we need to make some workaround. Here we have two options:

Insert school/student if not exists

If isolation Is not a problem you can just insert the school/student if it no exists yet

Create a control flag

If isolation is also a problem you can create a control column like STATUS char. Insert the records with pending P and when the job ends make a update like that

UPDATE STUDENT SET STATUS='C' WHERE STATUS = 'P'

This is a good workaround for many cases

Results

You can see the demo working just typing

$ git clone https://github.com/mageddo/spring-kafka-spark-samples && \
./gradlew spark-save-custom-output:test

There is the groupAndTransport__PerformanceTest unit test, it make the process with a massive quantity of students, below a benchmark result:

My machine specs

CPU
	Intel(R) Core(TM) i3-2120 CPU @ 3.30GHz cores = 2 threads = 2
MEMORY
	16 GiB width:	64 bits clock:	1333MHz (0.8ns) 

The benchmark

Initial Students Quantity Not Duplicated Students Not Duplicated Schools Total time
10 M 6.3 M 3.5 M 9min 24s

Conclusion

In this article I've showed you the way that I found to make a scalable Map/Reduce/Group to save information to a relational database, I hope that it work for you and if you have a better or another aproach to solve this problem please let we know


Gradle Release Plugin Bookmarks Set custom screen resolution at Debian like

Comments