跳至主要内容

Spark Application Setup Pitfalls

Spark Application Setup Pitfalls

This is a practical blog that records the pitfalls that I met to install and config the Spark environment. Before we start, we supposed that you have installed the Java and downloaded Hadoop/Spark. Because Spark rely on Hadoop’s some functionality, like HDFS or yarn, in most cases, we also install & config Hadoop.

When downloading, we need to make sure the version of Spark match the Hadoop (More than that, in our cases, we have a Java application which holds a SparkContext and act as a driver program, so we need to make sure the version of spark-core & spark-sql etc use the same Spark & Hadoop as the Spark cluster).

Hadoop Overview

After extract the hadoop-2.6.5.tar.gz file, we can change directory into the config directory ./etc/hadoop. There exists two kinds of configurations:

  • Read-only default configuration - core-default.xml, hdfs-default.xml, yarn-default.xml and mapred-default.xml.
  • Site-specific configuration - etc/hadoop/core-site.xml, etc/hadoop/hdfs-site.xml, etc/hadoop/yarn-site.xml and etc/hadoop/mapred-site.xml.

In our cases, we need to config a HDFS cluster, so the following steps:

  • Config Java Home in etc/hadoop/hadoop-env.sh (The following command can help you if you are not familiar with the ):
readlink -f /usr/bin/java | sed "s:bin/java::"

  • Config HttpFS port in etc/hadoop/httpfs-env.sh, which is the port to communicate with NameNode:
# The HTTP port used by HttpFS
export HTTPFS_HTTP_PORT=14000

  • Config NameNode address in etc/hadoop/core-site.xml,:
<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://xxx:9000</value>
    </property>
</configuration>

  • Config Hadoop property in etc/hadoop/hdfs-site.xml:
<property>
  <name>dfs.replication</name>
  <value>3</value>
</property>
<property>
  <name>dfs.name.dir</name>
  <value>/opt/hd-data/nn</value>
</property>
<property>
  <name>dfs.data.dir</name>
  <value>/opt/hd-data/dn</value>
</property>

  • Config datanodes lists in etc/hadoop/slaves:
dn1
dn2
dn3

  • Format namenode:
hdfs namenode -format

Now, we ready to start HDFS cluster using start-hdfs.sh and we can refer to http://namenode:50070/ to check whether the config is right.

If you met some problems, continue reading, the following is some errors we met.

Connection Refused

If we specify HDFS NameNode address with localhost, it will cause ConnectionRefused (For more other reasons, see here)

    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://xxx:9000</value>
    </property>

Access Denied

If the hdfs cluster is in intranet and an easy way to avoid any access problem can be disable permission control:

  <property>
    <name>dfs.permissions</name>
    <value>false</value>
  </property>

BindException

This means some port are already used by other applications, if we can’t change the situation, we may need to change ports of Hadoop with the help of this doc.

Spark Pitfalls

Spark installation is relative simple, we can just use default config and just edit slave to add right slave worker is fine.

As we have introduced, our application adopt the following architecture: a Java application holds reference to SparkContext, which communicates to Spark cluster with tasks and results. But we found little resources (the online resource is mostly about spark-submit) about how to config such a architecture, which causes much trouble for us.

overview

Class Not Found

If your application works well when master of Spark is set to local, but failed with ClassNotFound when change master to spark://xxx:7077, you need to extra dependency in cluster.

ClassNotFoundException: org.apache.spark.sql.kafka010.KafkaSourceRDDPartition

This is because when master is not local, the code need to be sent to cluster and some dependency is just in our application’s classpath but not on cluster. So we have to add the jar into Spark's class path. We have two options:

  • Copy dependency jar of our application into <spark-home>/jars/;
  • Call setJars of SparkConf, which will upload jars to Spark cluster when starting the job;
new SparkConf()  
    .setAppName(appName)  
    .setMaster(masterUri)    
    .setJars(new String[]{"out/artifacts/streamer_jar/streamer.jar", "xx/xx/kafka-clients-1.0.0.jar"})

Class Cast Exception of Lambda

If you met ClassCastException of lambda expression like following shows, we need to make sure the class containing the lambda is serializable. As we have said, our code will be sent to Spark cluster, so our code have to be serializable.

java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.api.java.JavaRDD$$anonfun$filter$1.f$1 of type org.apache.spark.api.java.function.Function in instance of org.apache.spark.api.java.JavaRDD$$anonfun$filter$1

Other Class Cast

If you still met ClassCastException like we had, shown like following, that means Spark doesn’t have our code.

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke.arguments of type scala.collection.Seq in instance of org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke 

It can be two reasons, we didn’t send jar files of our class file or it is out-of-date. In our cases, we first package jar file using maven plugin of Spring Boot, which produce a runnable jar, then packaged all dependency and class files into one jar, both of those two jar can’t be recognized by Spark. Finally, we copy dependency directly to <spark-home>/jars/ and package just our class into a jar, which finally works.

Here and here is some issues about this exception in Spark community.

Initial job has not accepted any resources

If your driver program complains with following warning,

WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster ui  
to ensure that workers are registered and have sufficient memory

you need to check the spark cluster UI (spark_host:8081) and it is always has some warning like fail to connect to driver.

It can be following reasons spark cluster fail to connect to driver:

  • The host running the driver program has more than one ip address ( e.g. WIFI & wire at the same time);
  • Firewall blocks their connection;
  • No worker node (slaves) started, so no Executor;

Window to Timestamp

The last is a small knowledge of window: converting it to Timestamp

val sums = levels.
  groupBy(window($"time", "5 seconds")).
  agg(sum("level") as "level_sum").
  select("window.start", "window.end", "level_sum")

Ref

Written with StackEdit.

评论

此博客中的热门博文

Spring Boot: Customize Environment

Spring Boot: Customize Environment Environment variable is a very commonly used feature in daily programming: used in init script used in startup configuration used by logging etc In Spring Boot, all environment variables are a part of properties in Spring context and managed by Environment abstraction. Because Spring Boot can handle the parse of configuration files, when we want to implement a project which uses yml file as a separate config file, we choose the Spring Boot. The following is the problems we met when we implementing the parse of yml file and it is recorded for future reader. Bind to Class Property values can be injected directly into your beans using the @Value annotation, accessed via Spring’s Environment abstraction or bound to structured objects via @ConfigurationProperties. As the document says, there exists three ways to access properties in *.properties or *.yml : @Value : access single value Environment : can access multi

Elasticsearch: Join and SubQuery

Elasticsearch: Join and SubQuery Tony was bothered by the recent change of search engine requirement: they want the functionality of SQL-like join in Elasticsearch! “They are crazy! How can they think like that. Didn’t they understand that Elasticsearch is kind-of NoSQL 1 in which every index should be independent and self-contained? In this way, every index can work independently and scale as they like without considering other indexes, so the performance can boost. Following this design principle, Elasticsearch has little related supports.” Tony thought, after listening their requirements. Leader notice tony’s unwillingness and said, “Maybe it is hard to do, but the requirement is reasonable. We need to search person by his friends, didn’t we? What’s more, the harder to implement, the more you can learn from it, right?” Tony thought leader’s word does make sense so he set out to do the related implementations Application-Side Join “The first implementation

Implement isdigit

It is seems very easy to implement c library function isdigit , but for a library code, performance is very important. So we will try to implement it and make it faster. Function So, first we make it right. int isdigit ( char c) { return c >= '0' && c <= '9' ; } Improvements One – Macro When it comes to performance for c code, macro can always be tried. #define isdigit (c) c >= '0' && c <= '9' Two – Table Upper version use two comparison and one logical operation, but we can do better with more space: # define isdigit(c) table[c] This works and faster, but somewhat wasteful. We need only one bit to represent true or false, but we use a int. So what to do? There are many similar functions like isalpha(), isupper ... in c header file, so we can combine them into one int and get result by table[c]&SOME_BIT , which is what source do. Source code of ctype.h : # define _ISbit(bit) (1 << (