multithreading - SparkSession.getOrCreate() failing to return previously configured session, job fails -
we running spark 2.1.0 in scala in cluster. invoke jobs through rest api, here sample java command line generated client , invoked:
"/usr/lib/jvm/jre-1.7.0/bin/java" "-cp" "/opt/spark/spark-2.1.0-bin-hadoop2.7/conf/:/opt/spark/spark-2.1.0-bin-hadoop2.7/jars/*" "-xmx3072m" "-dspark.jobnumber=162740893" "-dspark.submit.deploymode=cluster" "-dspark.master=spark://clusterhost.foobar.com:7077" "-dspark.executor.memory=12g" "-djobnumber=162740893" "-dspark.driver.memory=3g" "-dspark.app.name=foobar-loader-162740893" "-dspark.eventlog.dir=/opt/spark/events" "-dspark.jars=/opt/spark/foobar-1.0.jar" "-dspark.eventlog.enabled=false" "-xx:maxpermsize=256m" "org.apache.spark.deploy.worker.driverwrapper" "spark://worker@10.10.10.241:44745" "/opt/spark/foobar-1.0.jar" "com.foobar.loader" "aaa,bob,ccc"
we have main com.foobar.loader
class (referenced in command line above) , creates sparksession
this:
object loader { def main(args: array[string]): unit = { val sparkconf = // build configuration properties files etc val sparksession = sparksession.builder.config(sparkconf).getorcreate() val rows:dataset[row] = // rows val transformed = workerpart6(rows) // implicitly invokes workerpart6.apply() // "transformed" } }
we have 20 scala classes (such workerpart6
) each piece of work invoked later, these scala singletons ("object") , in apply()
method, first thing sparksession
this:
object workerpart6 { def apply(rows: dataset[row]) = { rows.rdd.map { case row => transform(row) }.tods } // ... def transform(row: row) = { val sparksession = sparksession.builder().getorcreate() // exception occurs here // additional processing on row } }
this simplification of code demonstrative.
so idea job kicks off, main class creates configured sparksession; , of workers can "getorcreate
" session , pick configured session , work. allows workers not have worry how session configured keep of classes independent , decoupled.
however when run in cluster, blowing following stack:
caused by: org.apache.spark.sparkexception: job aborted due stage failure: task 2 in stage 113.0 failed 4 times, recent failure: lost task 2.3 in stage 113.0 (tid 451, 10.10.10.241, executor 0): org.apache.spark.sparkexception: master url must set in configuration @ org.apache.spark.sparkcontext.<init>(sparkcontext.scala:379) @ org.apache.spark.sparkcontext$.getorcreate(sparkcontext.scala:2313) @ org.apache.spark.sql.sparksession$builder$$anonfun$6.apply(sparksession.scala:868) @ org.apache.spark.sql.sparksession$builder$$anonfun$6.apply(sparksession.scala:860) @ scala.option.getorelse(option.scala:121) @ org.apache.spark.sql.sparksession$builder.getorcreate(sparksession.scala:860) @ com.foobar.workerpart6$.transform(workerpart6.scala:206) @ com.foobar.workerpart6$$anonfun$2.apply(workerpart6.scala:44) @ com.foobar.workerpart6$$anonfun$2.apply(workerpart6.scala:43) @ scala.collection.iterator$$anon$11.next(iterator.scala:409) @ scala.collection.iterator$$anon$11.next(iterator.scala:409) @ scala.collection.iterator$$anon$13.hasnext(iterator.scala:462) @ scala.collection.iterator$$anon$11.hasnext(iterator.scala:408) @ scala.collection.iterator$$anon$11.hasnext(iterator.scala:408) @ org.apache.spark.util.collection.externalsorter.insertall(externalsorter.scala:191) @ org.apache.spark.shuffle.sort.sortshufflewriter.write(sortshufflewriter.scala:63) @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:96) @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:53) @ org.apache.spark.scheduler.task.run(task.scala:99) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:282) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:748)
the problem worker's sparksession.builder.getorcreate()
being called before main loader's, , without configuration, there no master url or other required parameters. parameter it's complaining missing "-dspark.master=spark://clusterhost.foobar.com:7077"
command line.
we use pattern everywhere throughout code, getting session getorcreate
when needed, in dozens of places, , have run , tested hundreds of runs without problem. after recent changes workerpart6 problem started occur.
the curious thing workerpart6
's apply method being called in separate thread , stack not include our main loader class @ all. guess spark magic creating worker threads , somehow initializing them classes need. looks spark figures out instances , maybe anonymous functions needs , farms them out separate threads?
how can figure out how or why initializing these classes? usage pattern not idea? not need obtain sparksession
in apply method; make these classes regular classes instantiation can independent of call them work.
or, more generally, best practices sharing sparksession
across many worker classes without knowing order in they'll created or threads on? thought purpose of getorcreate
method must misunderstanding that.
edit: worked around problem creating session in worker's apply
method sparksession.builder().getorcreate()
, , passing session other methods in class, rather have each method call sparksession.builder().getorcreate()
. fixes error leaves me questions of 1) why fix , original problem, 2) how spark decide functions run on own thread, , 3) fix cause class run less concurrency or parallelism sort of adding synchronized
blocks everywhere?
Comments
Post a Comment