Spark
Download do Spark: https://spark.apache.org/downloads.html
Cluster Manager: The cluster manager distributes the code and manages the data that is represented in RDDs.
Resilient Distributed Dataset (RDD): RDDs with transformations and actions are the main programming abstractions and present parallelized collections.
Spark works with three types of cluster managers – standalone, Apache Mesos, and Hadoop YARN
Cluster Overview: http://spark.apache.org/docs/latest/cluster-overview.html
[Figura 1]
The Spark driver program takes the program classes and hands them over to a
cluster manager. The cluster manager, in turn, starts executors in multiple worker
nodes, each having a set of tasks. When we ran the example program earlier, all these
actions happened transparently in your machine! Later when we install in a cluster,
the examples would run, again transparently, but across multiple machines in the
cluster. That is the magic of Spark and distributed computing!
Spark no EC2 da Amazon
http://spark.apache.org/docs/latest/ec2-scripts.html
https://github.com/amplab/spark-ec2#readme
Spark Standalone
http://spark.apache.org/docs/latest/spark-standalone.html
Spark Shell
The Spark shell is a wonderful tool for rapid prototyping with Spark. It helps to be
familiar with Scala, but that isn't necessary. The Spark shell works with Scala and
Python. The Spark shell allows you to interactively query and communicate with
the Spark cluster. This can be great for debugging, for just trying things out, or
interactively exploring new datasets or approaches. The previous chapter should
have gotten you to the point of having a Spark instance running, so now, all you
need to do is start your Spark shell and point it at your running instance with the
command given in the next few lines. Spark will start an instance when you invoke
the Spark shell or start a Spark program from an IDE. So, a local installation on a Mac
or Linux PC/laptop is sufficient to start exploring the Spark shell. Not having to spin
up a real cluster to do the prototyping is an important feature of Spark.
If things went well, you were successful in using Spark to run logistic regression.
That's awesome! We have just done a number of things; we defined a class and
created an RDD and a function. As you can see, the Spark shell is quite powerful.
Much of the power comes from it being based on the Scala REPL(the Scala interactive
shell), and so it inherits all of the power of the Scala REPL.
wget http://www-stat.stanford.edu/~tibs/ElemStatLearn/
datasets/spam.data
scala> val inFile = sc.textFile("./spam.data")
The sc in the command line is the Spark context. While applications would create a
Spark context explicitly, the Spark shell creates one called sc for you and that is the
one we normally use.
Note: If you've connected to a Spark master, it's possible that it will attempt to
load the file on any one of the different machines in the cluster, so make sure that
it can be accessed by all the worker nodes in the cluster. In general you will want
to put your data in HDFS, S3, or a similar distributed file systems for the future to
avoid this problem. In a local mode, you can just load the file directly (for example,
sc.textFile([filepath])). You can also use the addFile function on the Spark
context to make a file available across all of the machines like this:
scala> import org.apache.spark.SparkFiles
scala> val file = sc.addFile("/opt/spark/spam.data")
scala> val inFile = sc.textFile(SparkFiles.get("spam.data"))
We use one map operation to convert the line to a set of numbers in string format
and then convert each of the number to a double, as shown next:
scala> val nums = inFile.map(line => line.split(' ').map(_.toDouble))
scala> inFile.first()
scala> nums.first()
Operators in Spark are divided into transformations and actions.
Transformations are evaluated lazily. Spark just creates the RDD's
lineage graph when you call a transformation like map. No actual
work is done until an action is invoked on the RDD. Creating the
RDD and the map functions are transformations. The .first()
function is an action that forces execution.
So when we created the inFile, it really didn't do anything except
for creating a variable and set up the pointers. Only when we call
an action like .first() does Spark evaluate the transformations.
As a result, even if we point the inFile to a non-existent directory,
Spark will take it. But when we call inFile.first(), it will
throw the Input path does not exist: error.
Amazon S3
val file = sc.textFile("s3n://bigdatademo/sample/wiki/")
file.first()
file.take(1)
s3n://<AWS ACCESS ID>:<AWS SECRET>@bucket/path.
sample.saveAsTextFile("s3n://mysparkbucket/test")
Running Spark shell in Python
If you are more comfortable with Python than Scala, you can also work with Spark
interactively in Python by running [cmd]./pyspark[/cdm]
textFile = sc.textFile("README.md")
textFile.count()
textFile.first()
linesWithSpark = textFile.filter(lambda line: "Spark" in line)
textFile.filter(lambda line: "Spark" in line).count()
Creating a SparkContext
This chapter will cover how to create a SparkContext object in your cluster. A
SparkContext object represents the connection to a Spark cluster and provides the
entry point to interact with Spark. We need to create SparkContext so that we can
interact with Spark and distribute our jobs.
Loading Data
RDDs
Spark RDDs can be created from any supported Hadoop source. Native collections in
Scala, Java, and Python can also serve as the basis for an RDD. Creating RDDs from a
native collection is especially useful for testing.
Before jumping into the details on the supported data sources/links, take some time
to learn about what RDDs are and what they are not. It is crucial to understand that
even though an RDD is defined, it does not actually contain data but just creates
the pipeline for it. (As an RDD follows the principle of lazy evaluation, it evaluates
an expression only when it is needed, that is, when an action is called for.) This
means that when you go to access the data in an RDD, it could fail. The computation
to create the data in an RDD is only done when the data is referenced by caching
or writing out the RDD. This also means that you can chain a large number of
operations and not have to worry about excessive blocking in a computational
thread. It's important to note during application development that you can write
code, compile it, and even run your job; unless you materialize the RDD, your code
may not have even tried to load the original data.
val dataRDD = sc.parallelize(List(1,2,4))
dataRDD.take(3)
import org.apache.spark.SparkFiles;
...
sc.addFile("spam.data")
val inFile = sc.textFile(SparkFiles.get("spam.data"))
inFile.first()
val inFile = sc.textFile("Line_of_numbers.csv")
val numbersRDD = inFile.map(line => line.split(','))
scala> numbersRDD.take(10)
val numbersRDD = inFile.map(line => line.split(',')).map(_.
toDouble)
scala> val numbersRDD = inFile.map(line => line.split(',')).map(_.
toDouble)
val numbersRDD = inFile.map(line => line.split(',')).map(_.
toDouble)
scala> val numbersRDD = inFile.flatMap(line => line.split(',')).
map(_.toDouble)
scala> numbersRDD.collect()
scala> numbersRDD.sum()
open CSV
import au.com.bytecode.opencsv.CSVReader
import java.io.StringReader
sc.addFile("Line_of_numbers.csv")
val inFile = sc.textFile("Line_of_numbers.csv")
val splitLines = inFile.map(line => {
val reader = new CSVReader(new StringReader(line))
reader.readNext()
})
val numericData = splitLines.map(line =>
line.map(_.toDouble))
val summedData = numericData.map(row => row.sum)
println(summedData.collect().mkString(","))
val data = sc.sequenceFile[String, Int](inputFile)
Saving Data
For Scala:
rddOfStrings.saveAsTextFile("out.txt")
keyValueRdd.saveAsObjectFile("sequenceOut")
Manipulating your RDD
Manipulating your RDD in Scala and Java
RDDs are the primary abstraction in Spark. From a structural view, they are just a
bunch of elements—but elements that can be operated in parallel!
Manipulating your RDD in Scala is quite simple, especially if you are familiar with
Scala's collection library. Many of the standard functions are available directly on
Spark's RDDs with the primary catch being that they are immutable. This makes
porting existing Scala code to be distributed much simpler than porting Java or
Python code. At least in theory, this is true. While Scala encourages functional
programming, one can always use Scala in a non-functional way.
val data = sc.parallelize(List(("key1", 1), ("Kay2", 2), ("Key3", 2)))
data.saveAsSequenceFile("/tmp/seq-output")
scala> import
org.apache.hadoop.io.Text
import
org.apache.hadoop.io.Text
scala> import
org.apache.hadoop.io.IntWritable
import
org.apache.hadoop.io.IntWritable
val result = sc.sequenceFile("/tmp/seq-output/part-00001", classOf[Text], classOf[IntWritable]). map{
case
(x, y) => (x.toString, y.get())}
scala> val result = sc.sequenceFile("/tmp/seq-output/part-00001", classOf[Text], classOf[IntWritable]). map{
case
(x, y) => (x.toString, y.get())}
result: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at map at <console>:29
scala> result.collect
res14: Array[(String, Int)] = Array((Kay2,2), (Key3,2))
</console>
/spark-1.4.1/bin/spark-shell
--packages com.databricks:spark-csv_2.10:1.1.0
scala> sqlContext
res0: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@12c57a43
scala> val df
= sqlContext.load("com.databricks.spark.csv", Map("path"
-> "
file:///root/emp.csv
","header"->"true"))
warning: there were 1 deprecation warning(s); re-run with -deprecation
for
details
df: org.apache.spark.sql.DataFrame = [emp_id: string, emp_name: string, country: string, salary: string]
scala> df.printSchema()
root
|-- emp_id: string (nullable = true)
|-- emp_name: string (nullable = true)
|-- country: string (nullable = true)
|-- salary: string (nullable = true)
scala> df.registerTempTable("emp")
scala> val names = sqlContext.sql("select emp_name from emp")
names: org.apache.spark.sql.DataFrame = [emp_name: string]
scala> names.foreach(println)
[Guy]
[Jonas]
[Hector]
[Byron]
[Owen]
[Zachery]
[Alden]
[Akeem]
scala> sqlContext
res0: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@330305d3
scala> val df
= sqlContext.load("org.apache.spark.sql.json", Map("path"
-> "
file:///employee.json
"))
warning: there were 1 deprecation warning(s); re-run with -deprecation
for
details
df: org.apache.spark.sql.DataFrame = [birth_date: string, department_id: bigint, education_level: string, employee_id: bigint, end_date: string, first_name: string, full_name: string, gender: string, hire_date: string, last_name: string, management_role: string, marital_status: string, position_id: bigint, position_title: string, salary: double, store_id: bigint, supervisor_id: bigint]
scala> df.printSchema()
root
|-- birth_date: string (nullable = true)
|-- department_id: long (nullable = true)
|-- education_level: string (nullable = true)
|-- employee_id: long (nullable = true)
|-- end_date: string (nullable = true)
|-- first_name: string (nullable = true)
|-- full_name: string (nullable = true)
|-- gender: string (nullable = true)
|-- hire_date: string (nullable = true)
|-- last_name: string (nullable = true)
|-- management_role: string (nullable = true)
|-- marital_status: string (nullable = true)
|-- position_id: long (nullable = true)
|-- position_title: string (nullable = true)
|-- salary: double (nullable = true)
|-- store_id: long (nullable = true)
|-- supervisor_id: long (nullable = true)
scala> df.registerTempTable("employee")
scala> val names = sqlContext.sql("select first_name from employee limit 5")
names: org.apache.spark.sql.DataFrame = [first_name: string]
scala> names.foreach(println)
[Sheri]
[Derrick]
[Michael]
[Maya]
[Roberta]