final JavaStreamingContext sc = getContext(Duration.ofSeconds(2));
sc.textFileStream("/tmp/")
.foreachRDD(rdd -> {
rdd.foreachPartition(it -> it.forEachRemaining(System.out::println));
});
sc.start();
sc.awaitTermination();
}
private static JavaStreamingContext getContext(Duration duration) {
final SparkConf conf = new SparkConf()
.setAppName("SparkFileReader")
.setMaster("local[10]");
final JavaStreamingContext ssc = new JavaStreamingContext(conf, apply(duration.toMillis()));
ssc.sparkContext().setLogLevel("ERROR");
return ssc;
}
sc.parallelize()
spark gets outofmemoryspark.default.parallelism
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD
Se voce for usar um master obrigatoriamente voce precisa subir o slave e entao deve apenas rodar as applications que sao os submits de jars para serem executados
Suba um master
./sbin/start-master.sh
Suba um slave
./sbin/start-slave.sh spark://127.0.0.1:7077
Crie uma aplicacao
final SparkConf sparkConf = new SparkConf()
.setAppName("WordCount")
.set("spark.cores.max","1") // seta o maximo de execucoes paralelas
.setMaster("spark://127.0.0.1:7077");
final JavaSparkContext sc = new JavaSparkContext(sparkConf);
sc.setLogLevel("ERROR");
final List<Integer> numbers = new ArrayList<>();
for(int i=0; i < 100_000; i++){
numbers.add(i);
}
sc.parallelize(numbers, 20)
.map(n -> {
return new Tuple2<>(n, 1);
})
.keyBy(n -> {
return n._1;
})
.reduceByKey((Tuple2<Integer, Integer> n1, Tuple2<Integer, Integer> n2) -> {
return new Tuple2<>(n1._1, n1._2 + n2._2);
})
.foreachPartition(it -> {
AtomicInteger counter = new AtomicInteger(0);
it.forEachRemaining(n -> {
counter.incrementAndGet();
});
System.out.printf("counter=%d, thread=%s%n", counter.get(), Thread.currentThread().getId());
})
;
Gere o jar, basta colocar a dependencia do spark como provided pois o executor ja tera essa lib tudo que voce precisa é gerar o jar com a classe do spark
./gradlew clean build && ./bin/spark-submit --class com.mageddo.SimpleTask --master spark://127.0.0.1:7077 --total-executor-cores 1 ./build/libs/spark.jar
ou
./gradlew clean build && ./bin/spark-submit --class com.mageddo.SimpleTask ./build/libs/spark.jar
ou
./gradlew clean build && ./bin/spark-submit ./build/libs/spark.jar # se o seu jar ja tiver o main class especificado
Quando em clustermode entao ele roda em qualquer maquina, quando em client mode ele roda em qualquer no desde que esteja na sua maquina
Existem 4 tipos de Spark UI
Spark Jobs
Essa é a mais completa de todas, nela voce consegue ver as applications, quanto tempo levou, em quantas particoes dividiu, qual foram os stages(reduce, group, foreach) executados, quanto tempo cada stage levou. O problema que soh da pra ver esse cara na instancia que estiver rodando o seu SparkContext (quando voce roda via java por exemplo)
Spark Master
Essa é a UI do modo cluster, nela voce tem bem informacoes doque no Jobs UI: os workers, nome da aplicacao e ver quanto tempo ela demorou para rodar como um todo e um link para ir para o worker
Spark Worker
Aqui voce consegue ver os logs da aplicacao que voce clicou
Spark History
Esse é completo que nem o Jobs, ele se baseia nos arquivos de logs da application entao voce precisa ativar essa feature e subir o Spark History Server (veja abaixo em monitorando com o history server)
export SPARK_DAEMON_MEMORY=100m; ./sbin/start-history-server.sh
mais envs do spark (nao necessariamente do history)
Rodando uma application ativando o history log e especificando a pasta do log
./bin/spark-submit --class com.mageddo.spark.application.RepeatedNumbersSummary \
--master spark://:7077 \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=/tmp/spark-logs \
./build/libs/spark.jar
./sbin/start-history-server.sh /tmp/spark-logs
ou via env
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=/tmp/spark-logs" ./sbin/start-history-server.sh
ou passando o properties
./sbin/start-history-server.sh --properties-file conf.properties
conf.properties
spark.history.fs.logDirectory=/tmp/spark-logs
Depois so acessar http://127.0.0.1:18080/
Vendo o Usage ./sbin/start-history-server.sh --conf spark.history.fs.logDirectory=/tmp/spark-logs
Usage: HistoryServer [options]
Options:
DIR Deprecated; set spark.history.fs.logDirectory directly
--dir DIR (-d DIR) Deprecated; set spark.history.fs.logDirectory directly
--properties-file FILE Path to a custom Spark properties file.
Default is conf/spark-defaults.conf.
Configuration options can be set by setting the corresponding JVM system property.
History Server options are always available; additional options depend on the provider.
History Server options:
spark.history.ui.port Port where server will listen for connections
(default 18080)
spark.history.acls.enable Whether to enable view acls for all applications
(default false)
spark.history.provider Name of history provider class (defaults to
file system-based provider)
spark.history.retainedApplications Max number of application UIs to keep loaded in memory
(default 50)
FsHistoryProvider options:
spark.history.fs.logDirectory Directory where app logs are stored
(default: file:/tmp/spark-events)
spark.history.fs.updateInterval How often to reload log data from storage
(in seconds, default: 10)
./sbin/stop-history-server.sh /tmp/spark-logs
val sc = getContext
val restPageSize = 10
// count restPages GET /page/count
val restPages = 100
// fileBatchSize is 100 registers
val fileBatchSize = 100
val sparkFiles = ceil(restPages * restPageSize / fileBatchSize).toInt
var batches : RDD[(Int, String, Double)] = sc.parallelize(List())
// will generate n files
for(fileNumber <- 1 to sparkFiles){
val startRestPage = (fileNumber - 1) * restPageSize + 1
val endRestPage = fileNumber * restPageSize
val rdd = sc.range(startRestPage, endRestPage)
.flatMap{ page =>
// download page withdraws
// myRest?page=1
List((fileNumber, "Marjorie", 10.95), (fileNumber, "Mark", 8.98))
}
batches = batches.union(rdd)
}
batches.foreach { case (fileNumber, name, value) =>
println(fileNumber, name, value)
}
println("===============================")
new SparkSession(new SparkContext(conf)).sqlContext().sql()
sqlContext.select(
org.apache.spark.sql.functions.count("IND_SAP_STATUS"),
org.apache.spark.sql.functions.col("IDT_FINANCIAL_TURNOVER").cast(DataTypes.createDecimalType()),
org.apache.spark.sql.types.
org.apache.spark.sql.functions.col("DAT_CREATION")
)
sqlContext.load("jdbc", getOptions())
.select(col("ID").cast(DataTypes.LongType))
.rdd().toJavaRDD()
.map(row -> new MyVO(row.getLong(0)))
private Map<String, String> getOptions() {
final Map<String, String> m = new HashMap<>();
m.put("user", "root");
m.put("password", "password");
m.put("driver", "com.mysql.Driver");
m.put("url", "jdbc://....");
m.put("dbtable", "(SELECT 1 AS ID FROM DUAL) MYTABLE");
return m;
}
java.sql.SQLException: Numeric Overflow
Em alguns casos tomei numeric overflow mudando o metodo e a forma de fazer cast resolveu
final StringBuilder query = new StringBuilder()
.append("( \n")
.append("SELECT CAST(ID AS NUMBER(20)) AS ID FROM USER \n")
.append(") USER \n");
return session.sqlContext()
.read().format("jdbc").options(getJdbcOptions(query.toString())).load()
.withColumn("ID", col("ID").cast(DataTypes.LongType))
.toJavaRDD()
.map(row -> {
return row.getLong(0);
})
private Map<String, String> getJdbcOptions(String query) {
final Map<String, String> m = new HashMap<>();
m.put("user", "USER");
m.put("password", "password");
m.put("driver", "com.mysql.jdbc.Driver");
m.put("url", "jdbc:mysql://mysql-server.dev:3306/TEMP");
m.put("dbtable", query);
return m;
}