This is a practical blog that records the pitfalls that I met to install and config the Spark
environment. Before we start, we supposed that you have installed the Java and downloaded Hadoop/Spark
. Because Spark rely on Hadoop’s some functionality, like HDFS or yarn, in most cases, we also install & config Hadoop.
When downloading, we need to make sure the version of Spark
match the Hadoop
(More than that, in our cases, we have a Java
application which holds a SparkContext
and act as a driver program
, so we need to make sure the version of spark-core & spark-sql
etc use the same Spark & Hadoop
as the Spark
cluster).
Hadoop Overview
After extract the hadoop-2.6.5.tar.gz
file, we can change directory into the config directory ./etc/hadoop
. There exists two kinds of configurations:
- Read-only default configuration - core-default.xml, hdfs-default.xml, yarn-default.xml and mapred-default.xml.
- Site-specific configuration - etc/hadoop/core-site.xml, etc/hadoop/hdfs-site.xml, etc/hadoop/yarn-site.xml and etc/hadoop/mapred-site.xml.
In our cases, we need to config a HDFS
cluster, so the following steps:
- Config Java Home in
etc/hadoop/hadoop-env.sh
(The following command can help you if you are not familiar with the ):
readlink -f /usr/bin/java | sed "s:bin/java::"
- Config
HttpFS
port inetc/hadoop/httpfs-env.sh
, which is the port to communicate withNameNode
:
# The HTTP port used by HttpFS
export HTTPFS_HTTP_PORT=14000
- Config
NameNode
address inetc/hadoop/core-site.xml
,:
<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://xxx:9000</value>
</property>
</configuration>
- Config
Hadoop
property inetc/hadoop/hdfs-site.xml
:
<property>
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.name.dir</name>
<value>/opt/hd-data/nn</value>
</property>
<property>
<name>dfs.data.dir</name>
<value>/opt/hd-data/dn</value>
</property>
- Config datanodes lists in
etc/hadoop/slaves
:
dn1
dn2
dn3
- Format namenode:
hdfs namenode -format
Now, we ready to start HDFS
cluster using start-hdfs.sh
and we can refer to http://namenode:50070/ to check whether the config is right.
If you met some problems, continue reading, the following is some errors we met.
Connection Refused
If we specify HDFS
NameNode
address with localhost
, it will cause ConnectionRefused
(For more other reasons, see here)
<property>
<name>fs.defaultFS</name>
<value>hdfs://xxx:9000</value>
</property>
Access Denied
If the hdfs
cluster is in intranet and an easy way to avoid any access problem can be disable permission control:
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
BindException
This means some port are already used by other applications, if we can’t change the situation, we may need to change ports of Hadoop
with the help of this doc.
Spark Pitfalls
Spark
installation is relative simple, we can just use default config and just edit slave
to add right slave worker is fine.
As we have introduced, our application adopt the following architecture: a Java application holds reference to SparkContext
, which communicates to Spark
cluster with tasks and results. But we found little resources (the online resource is mostly about spark-submit
) about how to config such a architecture, which causes much trouble for us.
Class Not Found
If your application works well when master
of Spark
is set to local
, but failed with ClassNotFound
when change master
to spark://xxx:7077
, you need to extra dependency in cluster.
ClassNotFoundException: org.apache.spark.sql.kafka010.KafkaSourceRDDPartition
This is because when master is not local, the code need to be sent to cluster and some dependency is just in our application’s classpath
but not on cluster. So we have to add the jar into Spark
's class path. We have two options:
- Copy dependency jar of our application into
<spark-home>/jars/
; - Call
setJars
ofSparkConf
, which will upload jars toSpark
cluster when starting the job;
new SparkConf()
.setAppName(appName)
.setMaster(masterUri)
.setJars(new String[]{"out/artifacts/streamer_jar/streamer.jar", "xx/xx/kafka-clients-1.0.0.jar"})
Class Cast Exception of Lambda
If you met ClassCastException
of lambda expression like following shows, we need to make sure the class containing the lambda is serializable. As we have said, our code will be sent to Spark
cluster, so our code have to be serializable.
java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1
Other Class Cast
If you still met ClassCastException
like we had, shown like following, that means Spark
doesn’t have our code.
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke.arguments of type scala.collection.Seq in instance of org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke
It can be two reasons, we didn’t send jar files of our class file or it is out-of-date. In our cases, we first package jar file using maven plugin of Spring Boot
, which produce a runnable jar, then packaged all dependency and class files into one jar, both of those two jar can’t be recognized by Spark
. Finally, we copy dependency directly to <spark-home>/jars/
and package just our class into a jar, which finally works.
Here and here is some issues about this exception in Spark
community.
Initial job has not accepted any resources
If your driver
program complains with following warning,
WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster ui
to ensure that workers are registered and have sufficient memory
you need to check the spark cluster UI (spark_host:8081
) and it is always has some warning like fail to connect to driver
.
It can be following reasons spark cluster
fail to connect to driver
:
- The host running the
driver
program has more than one ip address ( e.g. WIFI & wire at the same time); - Firewall blocks their connection;
- No worker node (slaves) started, so no
Executor
;
Window to Timestamp
The last is a small knowledge of window
: converting it to Timestamp
val sums = levels.
groupBy(window($"time", "5 seconds")).
agg(sum("level") as "level_sum").
select("window.start", "window.end", "level_sum")
Ref
- Hadoop doc
- Hadoop Install guide blog
- How to install hadoop cluster
- Common Spark trouble shooting
- Spark sql function: datetime
- Spark cluster overview
- ClassCastException when using lambda
Written with StackEdit.
评论
发表评论