Spark Commands

Published: 2019-08-04, Updated: 2020-03-23

Links

Pontos importantes

Streaming com SparkSQL

Usando spark streaming

final JavaStreamingContext sc = getContext(Duration.ofSeconds(2));
	sc.textFileStream("/tmp/")
	.foreachRDD(rdd -> {
		rdd.foreachPartition(it -> it.forEachRemaining(System.out::println));
	});
	sc.start();
	sc.awaitTermination();
}

private static JavaStreamingContext getContext(Duration duration) {
	final SparkConf conf = new SparkConf()
		.setAppName("SparkFileReader")
		.setMaster("local[10]");
	final JavaStreamingContext ssc = new JavaStreamingContext(conf, apply(duration.toMillis()));
	ssc.sparkContext().setLogLevel("ERROR");
	return ssc;
}

Processando arquivos de formato complexo como o cnab por exemplo com spark

Links SparkSQL

Anotações

Spark e injeção de dependencia

Rest e spark

Salvar dados do spark no relacional

Mudar a quantidade de partitions

spark.default.parallelism

ForeachPartition

Erro quando voce nao seta o jar e roda a aplication da IDE

java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaPairRDD

Como funciona o master e slave

Se voce for usar um master obrigatoriamente voce precisa subir o slave e entao deve apenas rodar as applications que sao os submits de jars para serem executados

Como submitar applications

Suba um master

./sbin/start-master.sh

Suba um slave

./sbin/start-slave.sh spark://127.0.0.1:7077

Crie uma aplicacao

final SparkConf sparkConf = new SparkConf()
	.setAppName("WordCount")
	.set("spark.cores.max","1") // seta o maximo de execucoes paralelas
	.setMaster("spark://127.0.0.1:7077");

final JavaSparkContext sc = new JavaSparkContext(sparkConf);
sc.setLogLevel("ERROR");

final List<Integer> numbers = new ArrayList<>();
for(int i=0; i < 100_000; i++){
	numbers.add(i);
}

sc.parallelize(numbers, 20)
.map(n -> {
	return new Tuple2<>(n, 1);
})
.keyBy(n -> {
	return n._1;
})
.reduceByKey((Tuple2<Integer, Integer> n1, Tuple2<Integer, Integer> n2) -> {
	return new Tuple2<>(n1._1, n1._2 + n2._2);
})
.foreachPartition(it -> {

	AtomicInteger counter = new AtomicInteger(0);
	it.forEachRemaining(n -> {
		counter.incrementAndGet();
	});
	System.out.printf("counter=%d, thread=%s%n", counter.get(), Thread.currentThread().getId());
})
;

Gere o jar, basta colocar a dependencia do spark como provided pois o executor ja tera essa lib tudo que voce precisa é gerar o jar com a classe do spark

./gradlew clean build && ./bin/spark-submit --class com.mageddo.SimpleTask --master spark://127.0.0.1:7077  --total-executor-cores 1 ./build/libs/spark.jar 

ou

./gradlew clean build && ./bin/spark-submit --class com.mageddo.SimpleTask  ./build/libs/spark.jar 

ou

./gradlew clean build && ./bin/spark-submit ./build/libs/spark.jar # se o seu jar ja tiver o main class especificado

Cluster mode vs client

Quando em clustermode entao ele roda em qualquer maquina, quando em client mode ele roda em qualquer no desde que esteja na sua maquina

Tipos de spark ui

Existem 4 tipos de Spark UI

Spark Jobs

Essa é a mais completa de todas, nela voce consegue ver as applications, quanto tempo levou, em quantas particoes dividiu, qual foram os stages(reduce, group, foreach) executados, quanto tempo cada stage levou. O problema que soh da pra ver esse cara na instancia que estiver rodando o seu SparkContext (quando voce roda via java por exemplo)

Spark Master

Essa é a UI do modo cluster, nela voce tem bem informacoes doque no Jobs UI: os workers, nome da aplicacao e ver quanto tempo ela demorou para rodar como um todo e um link para ir para o worker

Spark Worker

Aqui voce consegue ver os logs da aplicacao que voce clicou

Spark History

Esse é completo que nem o Jobs, ele se baseia nos arquivos de logs da application entao voce precisa ativar essa feature e subir o Spark History Server (veja abaixo em monitorando com o history server)

Setar a HEAP do history server

export SPARK_DAEMON_MEMORY=100m; ./sbin/start-history-server.sh

mais envs do spark (nao necessariamente do history)

Monitorar com o history server

Rodando uma application ativando o history log e especificando a pasta do log

./bin/spark-submit --class com.mageddo.spark.application.RepeatedNumbersSummary \
--master spark://:7077 \
--conf spark.eventLog.enabled=true \
--conf spark.eventLog.dir=/tmp/spark-logs \
./build/libs/spark.jar 

Subindo o history server

./sbin/start-history-server.sh /tmp/spark-logs

ou via env

SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=/tmp/spark-logs" ./sbin/start-history-server.sh

ou passando o properties

./sbin/start-history-server.sh  --properties-file conf.properties

conf.properties

spark.history.fs.logDirectory=/tmp/spark-logs

Depois so acessar http://127.0.0.1:18080/

Vendo o Usage ./sbin/start-history-server.sh --conf spark.history.fs.logDirectory=/tmp/spark-logs

Usage: HistoryServer [options]

Options:
  DIR                         Deprecated; set spark.history.fs.logDirectory directly
  --dir DIR (-d DIR)          Deprecated; set spark.history.fs.logDirectory directly
  --properties-file FILE      Path to a custom Spark properties file.
                              Default is conf/spark-defaults.conf.

Configuration options can be set by setting the corresponding JVM system property.
History Server options are always available; additional options depend on the provider.

History Server options:

  spark.history.ui.port              Port where server will listen for connections
                                     (default 18080)
  spark.history.acls.enable          Whether to enable view acls for all applications
                                     (default false)
  spark.history.provider             Name of history provider class (defaults to
                                     file system-based provider)
  spark.history.retainedApplications Max number of application UIs to keep loaded in memory
                                     (default 50)
FsHistoryProvider options:

  spark.history.fs.logDirectory      Directory where app logs are stored
                                     (default: file:/tmp/spark-events)
  spark.history.fs.updateInterval    How often to reload log data from storage
                                     (in seconds, default: 10)

./sbin/stop-history-server.sh /tmp/spark-logs

Fazer baixando paginas no RESTful e fazer union dos resultados para posterior redução

val sc = getContext

val restPageSize = 10
// count restPages GET /page/count
val restPages = 100

// fileBatchSize is 100 registers
val fileBatchSize = 100
val sparkFiles = ceil(restPages * restPageSize / fileBatchSize).toInt

var batches : RDD[(Int, String, Double)] = sc.parallelize(List())

// will generate n files
for(fileNumber <- 1 to sparkFiles){

	val startRestPage = (fileNumber - 1) * restPageSize + 1
	val endRestPage = fileNumber * restPageSize
	val rdd = sc.range(startRestPage, endRestPage)
	.flatMap{ page =>

		// download page withdraws
		// myRest?page=1

		List((fileNumber, "Marjorie", 10.95), (fileNumber, "Mark", 8.98))
	}
	batches = batches.union(rdd)
}

batches.foreach { case (fileNumber, name, value) =>
	println(fileNumber, name, value)
}
println("===============================")

Pegar o SQL do SparkSession

new SparkSession(new SparkContext(conf)).sqlContext().sql()

Lendo da tabela jdbc com spark sql

Lendo do cassandra para vo

fazendo cast das colunas

sqlContext.select(
	org.apache.spark.sql.functions.count("IND_SAP_STATUS"),
	org.apache.spark.sql.functions.col("IDT_FINANCIAL_TURNOVER").cast(DataTypes.createDecimalType()),
	org.apache.spark.sql.types.
	org.apache.spark.sql.functions.col("DAT_CREATION")
)

SparkSQL fazer select na tabela ja filtrando o que vem dela antes de mandar para o spark

sqlContext.load("jdbc", getOptions())
.select(col("ID").cast(DataTypes.LongType))
.rdd().toJavaRDD()
.map(row -> new MyVO(row.getLong(0)))

private Map<String, String> getOptions() {
	final Map<String, String> m = new HashMap<>();
	m.put("user", "root");
	m.put("password", "password");
	m.put("driver", "com.mysql.Driver");
	m.put("url", "jdbc://....");
	m.put("dbtable", "(SELECT 1 AS ID FROM DUAL) MYTABLE");
	return m;
}

Spark select levando numeric overflow

java.sql.SQLException: Numeric Overflow

Em alguns casos tomei numeric overflow mudando o metodo e a forma de fazer cast resolveu

final StringBuilder query = new StringBuilder()
	.append("( \n")
	.append("SELECT CAST(ID AS NUMBER(20)) AS ID FROM USER \n")
	.append(") USER \n");

return session.sqlContext()
	.read().format("jdbc").options(getJdbcOptions(query.toString())).load()
	.withColumn("ID", col("ID").cast(DataTypes.LongType))
	.toJavaRDD()
	.map(row -> {
		return row.getLong(0);
	})

private Map<String, String> getJdbcOptions(String query) {
	final Map<String, String> m = new HashMap<>();
	m.put("user", "USER");
	m.put("password", "password");
	m.put("driver", "com.mysql.jdbc.Driver");
	m.put("url", "jdbc:mysql://mysql-server.dev:3306/TEMP");
	m.put("dbtable", query);
	return m;
}

Jira Commands Awesome Java

Comments