multithreading - SparkSession.getOrCreate() failing to return previously configured session, job fails -


we running spark 2.1.0 in scala in cluster. invoke jobs through rest api, here sample java command line generated client , invoked:

"/usr/lib/jvm/jre-1.7.0/bin/java" "-cp" "/opt/spark/spark-2.1.0-bin-hadoop2.7/conf/:/opt/spark/spark-2.1.0-bin-hadoop2.7/jars/*" "-xmx3072m" "-dspark.jobnumber=162740893" "-dspark.submit.deploymode=cluster" "-dspark.master=spark://clusterhost.foobar.com:7077" "-dspark.executor.memory=12g" "-djobnumber=162740893" "-dspark.driver.memory=3g" "-dspark.app.name=foobar-loader-162740893" "-dspark.eventlog.dir=/opt/spark/events" "-dspark.jars=/opt/spark/foobar-1.0.jar" "-dspark.eventlog.enabled=false" "-xx:maxpermsize=256m" "org.apache.spark.deploy.worker.driverwrapper" "spark://worker@10.10.10.241:44745" "/opt/spark/foobar-1.0.jar" "com.foobar.loader" "aaa,bob,ccc"

we have main com.foobar.loader class (referenced in command line above) , creates sparksession this:

object loader {   def main(args: array[string]): unit = {     val sparkconf = // build configuration properties files etc     val sparksession = sparksession.builder.config(sparkconf).getorcreate()     val rows:dataset[row] = // rows     val transformed = workerpart6(rows) // implicitly invokes workerpart6.apply()     // "transformed"   } } 

we have 20 scala classes (such workerpart6) each piece of work invoked later, these scala singletons ("object") , in apply() method, first thing sparksession this:

object workerpart6 {   def apply(rows: dataset[row]) = {     rows.rdd.map { case row =>        transform(row)     }.tods   }   // ...   def transform(row: row) = {     val sparksession = sparksession.builder().getorcreate() // exception occurs here     // additional processing on row   } } 

this simplification of code demonstrative.

so idea job kicks off, main class creates configured sparksession; , of workers can "getorcreate" session , pick configured session , work. allows workers not have worry how session configured keep of classes independent , decoupled.

however when run in cluster, blowing following stack:

caused by: org.apache.spark.sparkexception: job aborted due stage failure: task 2 in stage 113.0 failed 4 times, recent failure: lost task 2.3 in stage 113.0 (tid 451, 10.10.10.241, executor 0): org.apache.spark.sparkexception: master url must set in configuration     @ org.apache.spark.sparkcontext.<init>(sparkcontext.scala:379)     @ org.apache.spark.sparkcontext$.getorcreate(sparkcontext.scala:2313)     @ org.apache.spark.sql.sparksession$builder$$anonfun$6.apply(sparksession.scala:868)     @ org.apache.spark.sql.sparksession$builder$$anonfun$6.apply(sparksession.scala:860)     @ scala.option.getorelse(option.scala:121)     @ org.apache.spark.sql.sparksession$builder.getorcreate(sparksession.scala:860)     @ com.foobar.workerpart6$.transform(workerpart6.scala:206)     @ com.foobar.workerpart6$$anonfun$2.apply(workerpart6.scala:44)     @ com.foobar.workerpart6$$anonfun$2.apply(workerpart6.scala:43)     @ scala.collection.iterator$$anon$11.next(iterator.scala:409)     @ scala.collection.iterator$$anon$11.next(iterator.scala:409)     @ scala.collection.iterator$$anon$13.hasnext(iterator.scala:462)     @ scala.collection.iterator$$anon$11.hasnext(iterator.scala:408)     @ scala.collection.iterator$$anon$11.hasnext(iterator.scala:408)     @ org.apache.spark.util.collection.externalsorter.insertall(externalsorter.scala:191)     @ org.apache.spark.shuffle.sort.sortshufflewriter.write(sortshufflewriter.scala:63)     @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:96)     @ org.apache.spark.scheduler.shufflemaptask.runtask(shufflemaptask.scala:53)     @ org.apache.spark.scheduler.task.run(task.scala:99)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:282)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)     @ java.lang.thread.run(thread.java:748) 

the problem worker's sparksession.builder.getorcreate() being called before main loader's, , without configuration, there no master url or other required parameters. parameter it's complaining missing "-dspark.master=spark://clusterhost.foobar.com:7077" command line.

we use pattern everywhere throughout code, getting session getorcreate when needed, in dozens of places, , have run , tested hundreds of runs without problem. after recent changes workerpart6 problem started occur.

the curious thing workerpart6's apply method being called in separate thread , stack not include our main loader class @ all. guess spark magic creating worker threads , somehow initializing them classes need. looks spark figures out instances , maybe anonymous functions needs , farms them out separate threads?

how can figure out how or why initializing these classes? usage pattern not idea? not need obtain sparksession in apply method; make these classes regular classes instantiation can independent of call them work.

or, more generally, best practices sharing sparksession across many worker classes without knowing order in they'll created or threads on? thought purpose of getorcreate method must misunderstanding that.

edit: worked around problem creating session in worker's apply method sparksession.builder().getorcreate(), , passing session other methods in class, rather have each method call sparksession.builder().getorcreate(). fixes error leaves me questions of 1) why fix , original problem, 2) how spark decide functions run on own thread, , 3) fix cause class run less concurrency or parallelism sort of adding synchronized blocks everywhere?


Comments

Popular posts from this blog

ZeroMQ on Windows, with Qt Creator -

unity3d - Unity SceneManager.LoadScene quits application -

python - Error while using APScheduler: 'NoneType' object has no attribute 'now' -