This is a practical blog that records the pitfalls that I met to install and config the Spark environment. Before we start, we supposed that you have installed the Java and downloaded Hadoop/Spark. Because Spark rely on Hadoop’s some functionality, like HDFS or yarn, in most cases, we also install & config Hadoop.
When downloading, we need to make sure the version of Spark match the Hadoop (More than that, in our cases, we have a Java application which holds a SparkContext and act as a driver program, so we need to make sure the version of spark-core & spark-sql etc use the same Spark & Hadoop as the Spark cluster).
Hadoop Overview
After extract the hadoop-2.6.5.tar.gz file, we can change directory into the config directory ./etc/hadoop. There exists two kinds of configurations:
- Read-only default configuration - core-default.xml, hdfs-default.xml, yarn-default.xml and mapred-default.xml.
- Site-specific configuration - etc/hadoop/core-site.xml, etc/hadoop/hdfs-site.xml, etc/hadoop/yarn-site.xml and etc/hadoop/mapred-site.xml.
In our cases, we need to config a HDFS cluster, so the following steps:
- Config Java Home in
etc/hadoop/hadoop-env.sh(The following command can help you if you are not familiar with the ):
readlink -f /usr/bin/java | sed "s:bin/java::"
- Config
HttpFSport inetc/hadoop/httpfs-env.sh, which is the port to communicate withNameNode:
# The HTTP port used by HttpFS
export HTTPFS_HTTP_PORT=14000
- Config
NameNodeaddress inetc/hadoop/core-site.xml,:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://xxx:9000</value>
</property>
</configuration>
- Config
Hadoopproperty inetc/hadoop/hdfs-site.xml:
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.name.dir</name>
<value>/opt/hd-data/nn</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/opt/hd-data/dn</value>
</property>
- Config datanodes lists in
etc/hadoop/slaves:
dn1
dn2
dn3
- Format namenode:
hdfs namenode -format
Now, we ready to start HDFS cluster using start-hdfs.sh and we can refer to http://namenode:50070/ to check whether the config is right.
If you met some problems, continue reading, the following is some errors we met.
Connection Refused
If we specify HDFS NameNode address with localhost, it will cause ConnectionRefused (For more other reasons, see here)
<property>
<name>fs.defaultFS</name>
<value>hdfs://xxx:9000</value>
</property>
Access Denied
If the hdfs cluster is in intranet and an easy way to avoid any access problem can be disable permission control:
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
BindException
This means some port are already used by other applications, if we can’t change the situation, we may need to change ports of Hadoop with the help of this doc.
Spark Pitfalls
Spark installation is relative simple, we can just use default config and just edit slave to add right slave worker is fine.
As we have introduced, our application adopt the following architecture: a Java application holds reference to SparkContext, which communicates to Spark cluster with tasks and results. But we found little resources (the online resource is mostly about spark-submit) about how to config such a architecture, which causes much trouble for us.
Class Not Found
If your application works well when master of Spark is set to local, but failed with ClassNotFound when change master to spark://xxx:7077, you need to extra dependency in cluster.
ClassNotFoundException: org.apache.spark.sql.kafka010.KafkaSourceRDDPartition
This is because when master is not local, the code need to be sent to cluster and some dependency is just in our application’s classpath but not on cluster. So we have to add the jar into Spark's class path. We have two options:
- Copy dependency jar of our application into
<spark-home>/jars/; - Call
setJarsofSparkConf, which will upload jars toSparkcluster when starting the job;
new SparkConf()
.setAppName(appName)
.setMaster(masterUri)
.setJars(new String[]{"out/artifacts/streamer_jar/streamer.jar", "xx/xx/kafka-clients-1.0.0.jar"})
Class Cast Exception of Lambda
If you met ClassCastException of lambda expression like following shows, we need to make sure the class containing the lambda is serializable. As we have said, our code will be sent to Spark cluster, so our code have to be serializable.
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1
Other Class Cast
If you still met ClassCastException like we had, shown like following, that means Spark doesn’t have our code.
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke.arguments of type scala.collection.Seq in instance of org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
It can be two reasons, we didn’t send jar files of our class file or it is out-of-date. In our cases, we first package jar file using maven plugin of Spring Boot, which produce a runnable jar, then packaged all dependency and class files into one jar, both of those two jar can’t be recognized by Spark. Finally, we copy dependency directly to <spark-home>/jars/ and package just our class into a jar, which finally works.
Here and here is some issues about this exception in Spark community.
Initial job has not accepted any resources
If your driver program complains with following warning,
WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster ui
to ensure that workers are registered and have sufficient memory
you need to check the spark cluster UI (spark_host:8081) and it is always has some warning like fail to connect to driver.
It can be following reasons spark cluster fail to connect to driver:
- The host running the
driverprogram has more than one ip address ( e.g. WIFI & wire at the same time); - Firewall blocks their connection;
- No worker node (slaves) started, so no
Executor;
Window to Timestamp
The last is a small knowledge of window: converting it to Timestamp
val sums = levels.
groupBy(window($"time", "5 seconds")).
agg(sum("level") as "level_sum").
select("window.start", "window.end", "level_sum")
Ref
- Hadoop doc
- Hadoop Install guide blog
- How to install hadoop cluster
- Common Spark trouble shooting
- Spark sql function: datetime
- Spark cluster overview
- ClassCastException when using lambda
Written with StackEdit.
评论
发表评论