Your Location is: Home > Scala

apache spark NullPointerException on RDD.count

From: Zaire View: 4580 sngsnd 

Question

I'm using Spark in AWS EMR and try to load data from DB into RDD. However once data is kind-of-loaded and after very basic transformation I'm trying to run RDD.count() on a newly created RDD to output the number of records in RDD and here I arrive into an exception a stack trace of which is displayed in the console and in below quote.

First I got this error in Zeppelin but then I tried in spark-shell (thinking maybe some setup of classpaths is wrong) and still got the same. the output is below:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.6-amzn-0
      /_/
         
Using Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_265)
Type in expressions to have them evaluated.
Type :help for more information.

scala> import java.sql.{Connection, DriverManager, ResultSet}
import java.sql.{Connection, DriverManager, ResultSet}

scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._

scala> import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.SQLContext

scala> import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext}

scala> import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.HiveContext

scala> 

scala> val verticesTable = "..."
verticesTable: String = ...


scala> val config = new SparkConf().setAppName("Read JDBC Data: " + verticesTable)
config: org.apache.spark.SparkConf = [email protected]

scala> config.set("spark.driver.allowMultipleContexts","true")
res0: org.apache.spark.SparkConf = [email protected]

scala> print("Started.......\n")
Started.......

scala> // JDBC connection details

scala> val driver = "com.mysql.jdbc.Driver"
driver: String = com.mysql.jdbc.Driver

scala> val url = "jdbc:mysql://...."
url: String = jdbc:mysql://....

scala> val user = "..."

scala> val pass = "..."

scala> // JDBC Connection and load table in Dataframe

scala> val verticesDf = spark.read.format("jdbc").option("driver", driver).option("url", url).option("dbtable", verticesTable).option("user", user).option("password", pass).load()
Sun Oct 18 12:29:55 UTC 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
verticesDf: org.apache.spark.sql.DataFrame = [id: int, url: string ... 7 more fields]

scala> verticesDf.printSchema
root
 |-- id: integer (nullable = true)
 |-- url: string (nullable = true)
 |-- a1: string (nullable = true)
 |-- batch_id: integer (nullable = true)
 |-- x: double (nullable = true)
 |-- y: double (nullable = true)
 |-- z: double (nullable = true)
 |-- size: double (nullable = true)
 |-- sR: integer (nullable = true)


scala> verticesDf.cache
res4: verticesDf.type = [id: int, url: string ... 7 more fields]

scala> verticesDf.show
COMMENT: table output goes here - seems like data is loaded and correct
only showing top 20 rows


scala> val vDf = verticesDf.map(
     |     row=>{
     |         (
     |             (row.getAs[Integer](0)).toLong,
     |                 (
     |                 row.getAs[String](1),
     |                 row.getAs[Integer](3),
     |                 row.getAs[Double](4),
     |                 row.getAs[Double](5),
     |                 row.getAs[Double](6),
     |                 row.getAs[Double](7)
     |                 )
     |             )
     |         
     |     })
vDf: org.apache.spark.sql.Dataset[(Long, (String, Integer, Double, Double, Double, Double))] = [_1: bigint, _2: struct<_1: string, _2: int ... 4 more fields>]

scala> vDf.cache
res8: vDf.type = [_1: bigint, _2: struct<_1: string, _2: int ... 4 more fields>]

scala> vDf.count
20/10/18 12:38:13 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, ip-172-31-14-11.ec2.internal, executor 1): java.lang.NullPointerException
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:115)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:107)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:222)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1164)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

[Stage 2:>                                                          (0 + 1) / 1]20/10/18 12:38:13 ERROR TaskSetManager: Task 0 in stage 2.0 failed 4 times; aborting job
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 5, ip-172-31-14-11.ec2.internal, executor 1): java.lang.NullPointerException
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:115)
    at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:107)
    at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:222)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1164)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
  at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
  at scala.Option.foreach(Option.scala:257)
  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.checkNoFailures(AdaptiveExecutor.scala:146)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.doRun(AdaptiveExecutor.scala:88)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.tryRunningAndGetFuture(AdaptiveExecutor.scala:66)
  at org.apache.spark.sql.execution.adaptive.AdaptiveExecutor.execute(AdaptiveExecutor.scala:57)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$finalPhysicalPlan$1.apply(AdaptiveSparkPlanExec.scala:128)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec$$anonfun$finalPhysicalPlan$1.apply(AdaptiveSparkPlanExec.scala:127)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:778)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.finalPhysicalPlan(AdaptiveSparkPlanExec.scala:127)
  at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:134)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2838)
  at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2837)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
  at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
  at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
  at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
  at org.apache.spark.sql.Dataset.count(Dataset.scala:2837)
  ... 55 elided
Caused by: java.lang.NullPointerException
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.serializefromobject_doConsume_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.mapelements_doConsume_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.deserializetoobject_doConsume_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
  at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:115)
  at org.apache.spark.sql.execution.columnar.CachedRDDBuilder$$anonfun$1$$anon$1.next(InMemoryRelation.scala:107)
  at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:222)
  at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:299)
  at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1164)
  at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1155)
  at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1090)
  at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1155)
  at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:881)
  at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:357)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:308)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
  at org.apache.spark.scheduler.Task.run(Task.scala:123)
  at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)

Best answer

Inside your map method there might be row will be null or any other columns will be null, try to handle null columns inside your map function. below is example if your row is null.

Change below code

scala> val vDf = verticesDf.map(
     |     row=>{
     |         (
     |             (row.getAs[Integer](0)).toLong,
     |                 (
     |                 row.getAs[String](1),
     |                 row.getAs[Integer](3),
     |                 row.getAs[Double](4),
     |                 row.getAs[Double](5),
     |                 row.getAs[Double](6),
     |                 row.getAs[Double](7)
     |                 )
     |             )
     |         
     |     })

to

val vDf = verticesDf.map{ row =>
 row match {
     case Nil | null => ((null),null,null,null,null,null,null,null)
     case _ => {((row.getAs[Integer](0)).toLong,(row.getAs[String](1),row.getAs[Integer](3),row.getAs[Double](4),row.getAs[Double](5),row.getAs[Double](6),row.getAs[Double](7)))}
 }  
}