Spark

Spark


Download do Spark: https://spark.apache.org/downloads.html


Cluster Manager: The cluster manager distributes the code and manages the data that is represented in RDDs. 


Resilient Distributed Dataset (RDD): RDDs with transformations and actions are the main programming abstractions and present parallelized collections. 


Spark works with three types of cluster managers – standalone, Apache Mesos, and Hadoop YARN



Cluster Overview: http://spark.apache.org/docs/latest/cluster-overview.html


[Figura 1]


The Spark driver program takes the program classes and hands them over to a

cluster manager. The cluster manager, in turn, starts executors in multiple worker

nodes, each having a set of tasks. When we ran the example program earlier, all these

actions happened transparently in your machine! Later when we install in a cluster,

the examples would run, again transparently, but across multiple machines in the

cluster. That is the magic of Spark and distributed computing!



Spark no EC2 da Amazon

http://spark.apache.org/docs/latest/ec2-scripts.html


https://github.com/amplab/spark-ec2#readme



Spark Standalone

http://spark.apache.org/docs/latest/spark-standalone.html





Spark Shell

The Spark shell is a wonderful tool for rapid prototyping with Spark. It helps to be

familiar with Scala, but that isn't necessary. The Spark shell works with Scala and

Python. The Spark shell allows you to interactively query and communicate with

the Spark cluster. This can be great for debugging, for just trying things out, or

interactively exploring new datasets or approaches. The previous chapter should

have gotten you to the point of having a Spark instance running, so now, all you

need to do is start your Spark shell and point it at your running instance with the

command given in the next few lines. Spark will start an instance when you invoke

the Spark shell or start a Spark program from an IDE. So, a local installation on a Mac

or Linux PC/laptop is sufficient to start exploring the Spark shell. Not having to spin

up a real cluster to do the prototyping is an important feature of Spark.


If things went well, you were successful in using Spark to run logistic regression.

That's awesome! We have just done a number of things; we defined a class and

created an RDD and a function. As you can see, the Spark shell is quite powerful.

Much of the power comes from it being based on the Scala REPL(the Scala interactive

shell), and so it inherits all of the power of the Scala REPL. 




wget http://www-stat.stanford.edu/~tibs/ElemStatLearn/

datasets/spam.data


scala> val inFile = sc.textFile("./spam.data")


The sc in the command line is the Spark context. While applications would create a

Spark context explicitly, the Spark shell creates one called sc for you and that is the

one we normally use.


Note: If you've connected to a Spark master, it's possible that it will attempt to

load the file on any one of the different machines in the cluster, so make sure that

it can be accessed by all the worker nodes in the cluster. In general you will want

to put your data in HDFS, S3, or a similar distributed file systems for the future to

avoid this problem. In a local mode, you can just load the file directly (for example,

sc.textFile([filepath])). You can also use the addFile function on the Spark

context to make a file available across all of the machines like this:


scala> import org.apache.spark.SparkFiles

scala> val file = sc.addFile("/opt/spark/spam.data")

scala> val inFile = sc.textFile(SparkFiles.get("spam.data"))



We use one map operation to convert the line to a set of numbers in string format

and then convert each of the number to a double, as shown next:

scala> val nums = inFile.map(line => line.split(' ').map(_.toDouble))



scala> inFile.first()


scala> nums.first()




Operators in Spark are divided into transformations and actions.

Transformations are evaluated lazily. Spark just creates the RDD's

lineage graph when you call a transformation like map. No actual

work is done until an action is invoked on the RDD. Creating the

RDD and the map functions are transformations. The .first()

function is an action that forces execution.

So when we created the inFile, it really didn't do anything except

for creating a variable and set up the pointers. Only when we call

an action like .first() does Spark evaluate the transformations.

As a result, even if we point the inFile to a non-existent directory,

Spark will take it. But when we call inFile.first(), it will

throw the Input path does not exist: error.





Amazon S3

val file = sc.textFile("s3n://bigdatademo/sample/wiki/")

file.first()

file.take(1)


s3n://<AWS ACCESS ID>:<AWS SECRET>@bucket/path.



sample.saveAsTextFile("s3n://mysparkbucket/test")









Running Spark shell in Python

If you are more comfortable with Python than Scala, you can also work with Spark

interactively in Python by running [cmd]./pyspark[/cdm]

textFile = sc.textFile("README.md")

textFile.count()

textFile.first()

linesWithSpark = textFile.filter(lambda line: "Spark" in line)

textFile.filter(lambda line: "Spark" in line).count()







Creating a SparkContext

This chapter will cover how to create a SparkContext object in your cluster. A

SparkContext object represents the connection to a Spark cluster and provides the

entry point to interact with Spark. We need to create SparkContext so that we can

interact with Spark and distribute our jobs.







Loading Data

RDDs

Spark RDDs can be created from any supported Hadoop source. Native collections in

Scala, Java, and Python can also serve as the basis for an RDD. Creating RDDs from a

native collection is especially useful for testing.

Before jumping into the details on the supported data sources/links, take some time

to learn about what RDDs are and what they are not. It is crucial to understand that

even though an RDD is defined, it does not actually contain data but just creates

the pipeline for it. (As an RDD follows the principle of lazy evaluation, it evaluates

an expression only when it is needed, that is, when an action is called for.) This

means that when you go to access the data in an RDD, it could fail. The computation

to create the data in an RDD is only done when the data is referenced by caching

or writing out the RDD. This also means that you can chain a large number of

operations and not have to worry about excessive blocking in a computational

thread. It's important to note during application development that you can write

code, compile it, and even run your job; unless you materialize the RDD, your code

may not have even tried to load the original data.



val dataRDD = sc.parallelize(List(1,2,4))

dataRDD.take(3)




import org.apache.spark.SparkFiles;

...

sc.addFile("spam.data")

val inFile = sc.textFile(SparkFiles.get("spam.data"))

inFile.first()





val inFile = sc.textFile("Line_of_numbers.csv")

val numbersRDD = inFile.map(line => line.split(','))

scala> numbersRDD.take(10)



val numbersRDD = inFile.map(line => line.split(',')).map(_.

toDouble)

scala> val numbersRDD = inFile.map(line => line.split(',')).map(_.

toDouble)


val numbersRDD = inFile.map(line => line.split(',')).map(_.

toDouble)


scala> val numbersRDD = inFile.flatMap(line => line.split(',')).

map(_.toDouble)


scala> numbersRDD.collect()


scala> numbersRDD.sum()





open CSV

import au.com.bytecode.opencsv.CSVReader

import java.io.StringReader

sc.addFile("Line_of_numbers.csv")

val inFile = sc.textFile("Line_of_numbers.csv")

val splitLines = inFile.map(line => {

val reader = new CSVReader(new StringReader(line))

reader.readNext()

})

val numericData = splitLines.map(line =>

line.map(_.toDouble))

val summedData = numericData.map(row => row.sum)

println(summedData.collect().mkString(","))





val data = sc.sequenceFile[String, Int](inputFile)





Saving Data

For Scala:

rddOfStrings.saveAsTextFile("out.txt")

keyValueRdd.saveAsObjectFile("sequenceOut")





Manipulating your RDD




Manipulating your RDD in Scala and Java

RDDs are the primary abstraction in Spark. From a structural view, they are just a

bunch of elements—but elements that can be operated in parallel!

Manipulating your RDD in Scala is quite simple, especially if you are familiar with

Scala's collection library. Many of the standard functions are available directly on

Spark's RDDs with the primary catch being that they are immutable. This makes

porting existing Scala code to be distributed much simpler than porting Java or

Python code. At least in theory, this is true. While Scala encourages functional

programming, one can always use Scala in a non-functional way. 





val data = sc.parallelize(List(("key1", 1), ("Kay2", 2), ("Key3", 2)))

data.saveAsSequenceFile("/tmp/seq-output")


scala> import org.apache.hadoop.io.Text

import org.apache.hadoop.io.Text

scala> import org.apache.hadoop.io.IntWritable

import org.apache.hadoop.io.IntWritable

val result = sc.sequenceFile("/tmp/seq-output/part-00001", classOf[Text], classOf[IntWritable]). map{case (x, y) => (x.toString, y.get())}

scala> val result = sc.sequenceFile("/tmp/seq-output/part-00001", classOf[Text], classOf[IntWritable]). map{case (x, y) => (x.toString, y.get())}

result: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[15] at map at <console>:29

 

scala> result.collect

res14: Array[(String, Int)] = Array((Kay2,2), (Key3,2))

</console>










/spark-1.4.1/bin/spark-shell --packages com.databricks:spark-csv_2.10:1.1.0

scala> sqlContext

res0: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@12c57a43

 

scala> val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "file:///root/emp.csv","header"->"true"))

warning: there were 1 deprecation warning(s); re-run with -deprecation for details

df: org.apache.spark.sql.DataFrame = [emp_id: string, emp_name: string, country: string, salary: string]

 

scala>   df.printSchema()

root

 |-- emp_id: string (nullable = true)

 |-- emp_name: string (nullable = true)

 |-- country: string (nullable = true)

 |-- salary: string (nullable = true)

 

 

scala> df.registerTempTable("emp")

 

scala> val names = sqlContext.sql("select emp_name from emp")

names: org.apache.spark.sql.DataFrame = [emp_name: string]

 

scala> names.foreach(println)

[Guy]

[Jonas]

[Hector]

[Byron]

[Owen]

[Zachery]

[Alden]

[Akeem]















scala> sqlContext

res0: org.apache.spark.sql.SQLContext = org.apache.spark.sql.hive.HiveContext@330305d3

 

scala> val df = sqlContext.load("org.apache.spark.sql.json", Map("path" -> "file:///employee.json"))

warning: there were 1 deprecation warning(s); re-run with -deprecation for details

df: org.apache.spark.sql.DataFrame = [birth_date: string, department_id: bigint, education_level: string, employee_id: bigint, end_date: string, first_name: string, full_name: string, gender: string, hire_date: string, last_name: string, management_role: string, marital_status: string, position_id: bigint, position_title: string, salary: double, store_id: bigint, supervisor_id: bigint]

 

scala>  df.printSchema()

root

 |-- birth_date: string (nullable = true)

 |-- department_id: long (nullable = true)

 |-- education_level: string (nullable = true)

 |-- employee_id: long (nullable = true)

 |-- end_date: string (nullable = true)

 |-- first_name: string (nullable = true)

 |-- full_name: string (nullable = true)

 |-- gender: string (nullable = true)

 |-- hire_date: string (nullable = true)

 |-- last_name: string (nullable = true)

 |-- management_role: string (nullable = true)

 |-- marital_status: string (nullable = true)

 |-- position_id: long (nullable = true)

 |-- position_title: string (nullable = true)

 |-- salary: double (nullable = true)

 |-- store_id: long (nullable = true)

 |-- supervisor_id: long (nullable = true)

 

 

scala> df.registerTempTable("employee")

scala> val names = sqlContext.sql("select first_name from employee limit 5")

names: org.apache.spark.sql.DataFrame = [first_name: string]

scala> names.foreach(println)

[Sheri]

[Derrick]

[Michael]

[Maya]

[Roberta]







Report Page