In this article I will cover a solution to group and save data to a relational database in two tables granting atomicity.
We have at a JSON file many students of some schools and we need to store it in two tables at HSQLDB saving the students at STUDENT
table referring it to the SCHOOL
table.
The students are in a JSON file with many lines in a format represented bellow:
{"name":"Mark", "schoolName":"Walter Payton College Prep"}
{"name":"Maria", "schoolName":"Northside College Preparatory High School"}
The two tables HSQLDB script
CREATE TABLE STUDENT (
ID INTEGER GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY,
SCHOOL_ID INTEGER,
NAME VARCHAR(255)
)
CREATE TABLE SCHOOL (
ID INTEGER GENERATED BY DEFAULT AS IDENTITY(START WITH 1, INCREMENT BY 1) PRIMARY KEY,
NAME VARCHAR(255)
)
First let's define the VO that represents our student and school, here we'll use the same VO for both because this way is easier to join then later
import java.io.Serializable;
public class Student implements Serializable {
public int id;
public String name;
public int schoolId;
public String schoolName;
public Student() {
}
public Student(String name, String schoolName) {
this.name = name;
this.schoolName = schoolName;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Student student = (Student) o;
return schoolName.equals(student.schoolName);
}
@Override
public int hashCode() {
return schoolName.hashCode();
}
}
Then we can work
// reading the json and parsing it into a vo list, I'm using Jackson data bind to parse the JSON lines
final JavaPairRDD<String, Student> studentsPair = sc.textFile(".../students.json")
.mapToPair(line -> {
final Student student = new ObjectMapper().readValue(line, Student.class);
return new Tuple2<>(student.name, student);
})
// removing duplicated students
.reduceByKey((s1, s2) -> s1)
// here I'm correlating what's key to find a student, in that case I wil use the schoolName
.mapToPair(tuple -> new Tuple2<>(tuple._2.schoolName, tuple._2))
final JavaPairRDD<String, Student> savedSchools = studentsPair
// Here I'm removing all students that have the same schoolName, that way
// Just one Student will left for every school
// My objective here is to have a list of schools without any duplicates
.reduceByKey((s1, s2) -> s1)
// Then I just loop though every school, save it to database
// and map it's ID back, that way I can join it with students again and save them refering to the school ID
// that I just created
.mapPartitionsToPair(it -> {
final Set<Tuple2<String, Student>> schools = new HashSet<>();
try(Connection conn = getConnection()){
it.forEachRemaining(tuple -> {
try(PreparedStatement stm = conn.prepareStatement("INSERT INTO SCHOOL (NAME) VALUES (?)", Statement.RETURN_GENERATED_KEYS)){
stm.setString(1, tuple._2.schoolName);
stm.executeUpdate();
try(ResultSet rs = stm.getGeneratedKeys()){
if(rs.next()){
tuple._2.schoolId = rs.getInt(1);
}else {
throw new IllegalStateException();
}
}
schools.add(new Tuple2<>(tuple._1, tuple._2));
}catch (Exception e){
throw new RuntimeException(e);
}
});
conn.commit();
}
return schools.iterator();
});
mapPartitionsToPair is used because it reduce the number of open/close connections
This is the simpliest step, we have all schools saved with it's ids, just need to join the schools with students by schoolName
JavaPairRDD<String, Tuple2<Student, Student>> joinedStudents = savedSchools.join(studentsPair);
See, The Student at the left of the Tuple2 is not a Student and yes a School, it has the ID that we just saved, the Student at the right of course is a Student that has no school ID but now is paired with the school. I saved the School at a Student VO because that way I have not to treat different VO types inside Student.equals()
Since we've created a pair of school-> student
we must to group the students by the school, just because we need a pair of one school to many students to save it at the two tables and create a foreign key between them. We can do that in two ways, the first and more logical is to use groupBy.
JavaPairRDD<String, Iterable<Student>> studentGroupedPair = studentsPair.groupByKey();
But, there is a problem, groupBy have a memory limitation in Spark, if your students list are too big then you will got a problem, it implies that all school students must fit your local node memory (JVM executor), if not, you will get A OutOfMemoryError, the iterable above says it to you. For this reason we used join that is less logical approach but very effective.
Just need to make a foreachPartition
and save every student in database, in the example a connection pool is used and will only commit when the partition it.forEachRemaining
ends.
joinedStudents
.foreachPartition(it -> {
try(Connection conn = getConnection()){
it.forEachRemaining(tuple -> {
try(PreparedStatement stm = conn.prepareStatement("INSERT INTO STUDENT (SCHOOL_ID, NAME) VALUES (?, ?)")){
stm.setInt(1, tuple._2._1.schoolId);
stm.setString(2, tuple._2._2.name);
stm.executeUpdate();
}catch (Exception e){
throw new RuntimeException(e);
}
});
conn.commit();
}
});
If you have some experience with relational databases probably will ask
What about The transactions? If spark crash while saving to database? Some students will get saved others don't
Right, spark is distributed so it use many connections to do the same thing, that's why Spark is faster. So if we need to grant the database ACID we need to make some workaround. Here we have two options:
If isolation Is not a problem you can just insert the school/student if it no exists yet
If isolation is also a problem you can create a control column like STATUS char
. Insert the records with pending P
and when the job ends make a update like that
UPDATE STUDENT SET STATUS='C' WHERE STATUS = 'P'
This is a good workaround for many cases
You can see the demo working just typing
$ git clone https://github.com/mageddo/spring-kafka-spark-samples && \
./gradlew spark-save-custom-output:test
There is the groupAndTransport__PerformanceTest
unit test, it make the process with a massive quantity of students, below a benchmark result:
My machine specs
CPU
Intel(R) Core(TM) i3-2120 CPU @ 3.30GHz cores = 2 threads = 2
MEMORY
16 GiB width: 64 bits clock: 1333MHz (0.8ns)
The benchmark
Initial Students Quantity | Not Duplicated Students | Not Duplicated Schools | Total time |
---|---|---|---|
10 M | 6.3 M | 3.5 M | 9min 24s |
In this article I've showed you the way that I found to make a scalable Map/Reduce/Group to save information to a relational database, I hope that it work for you and if you have a better or another aproach to solve this problem please let we know