123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- # in docker:
- which python3
- which python
- sudo ln -s /usr/bin/python3 /usr/bin/python
- /usr/local/hadoop/spark/sbin/start-all.sh
- /usr/local/hadoop/spark/bin/pyspark
- # pyspark - python with spark
- # configure in jupyter for pyspark and run on it
- # check pyspark configuration on port number 4040 (pyspark shell working)
- whenever a job is submitted on pyspark we can check 4040
- # to start pyspark (in edge node)
- /usr/local/hadoop/spark/bin/pyspark
- pyspark
- # distributed data sets - it will be replicated on all the data nodes and
- processing is done at data nodes only
- # pyspark can create distributed datasets from any storage (local disk or HDFS)
- # executed - job list (pyspark gui)
- # in pyspark everythink is RDD
- # Text file RDDs can be created using textFile method
- # sc - spark contest (executer)
- # textFile command is used to create a dataset from a text file
- # before that copy the file to hdfs cluster and access from it
- # by default the file is access from hdfs (read to access file from local)
- dfile = sc.textFile("path to file")
- dfile.collect() - collect is an event that will display all the content in dataset
- dfile = sc.textFile("/user/raman/a.txt")
- dfile.collect()
- type(dfile)
- <class 'pyspark.rdd.RDD'>
- # when job is submitted then executed driver start in pyspark gui
- # to display only two records
- dfile.take(2)
- # create parallelizing - use method of parallelizing to create replica of RDB
- # there are two methods in spark:
- 1) parallelizing - putting jobs in distributed manner
- 2) initialization - sc.textFile()
- # parallelize command allows spark to distribute the data across multiple nodes, instead of
- depending on single node
- data = sc.parallelize([]) - from this data it will create an instant which we can store it
- somewhere (writing data to local/hdfs)
- # type command is used to display data type
- type(data)
- <class 'pyspark.rdd.RDD'>
- data.collect()
- [1, 2, 3, 4, 5]
- data.take(2) - display two records
- [1, 2]
- # map and flatMap (important commands and remaining are concept of python itself)
- map transformation takes in a function and applies it to each element in the RDD and
- the result of the function is a new value of each element in the resulting RDD.
- (using map we could apply a function to all elements of RDD)
- flatMap used to produce multiple output elements for each input.
- the difference between map and flatmap is that the flatmap is flatten.
- flatten means it will convert list of list to list.
- dfile = dfile.take(2)
- dfile.collect()
- ['hello how are you', 'hello I am fine']
- dfile.map(lambda line:line.split(" ")) # running this command all by itself gives an error
- dat = dfile.map(lambda line:line.split(" ")) # storing to a new variable is not error
- # split each line on the basis of space
- dat.collect()
- [['hello', 'how', 'are', 'you'], ['hello', 'I', 'am', 'fine']]
- dat.take(2)
- [['hello', 'how', 'are', 'you'], ['hello', 'I', 'am', 'fine']]
- datf = dfile.flatMap(lambda line:line.split(" "))
- datf.collect()
- ['hello', 'how', 'are', 'you', 'hello', 'I', 'am', 'fine']
- datf.take(2)
- ['hello', 'how']
- # frequency of words
- data2 = datf.map(lambda word:(word, 1))
- # word is a variable (any think can be used)
- # assign 1 to each word in this map
- data2.collect()
- [('hello', 1), ('how', 1), ...]
- data3 = dat.map(lambda word:(word, 1))
- data3.collect()
- [('hello how are you', 1), ('hello I am fine', 1)]
- # there is a function, reduce by key
- data4 = data2.reduceByKey(lambda a,b:a+b) # sort of aggregation
- data4.collect()
- [stage 9:> # it has executed in 9 stages
- [('hello', 2), ('how', 1), ...]
- # save output in hdfs
- data4.saveAsTextFile("hdfs:///user/raman/spark/file.txt")
- # Data Frame in PySpark (no RDB in spark)
- Data Frame is a distributed collection of rows under named columns.
- 1. Data frame also shares common characteristics with RDD
- 2. It is immutable in nature
- (it means we can create a data frame once but cannot be changed as in the case of RDB)
- 3. We can transform a data frame/RDD after applying a technique of transformation(map, flatMap, reduceByKey)
- (this transformation not done in original data frame but only in a new variable)
- 4. It is distributed (replicated on all data nodes)
- # Load data into data frame (copy file on hdfs)
- df_load = spark.read.csv("/user/raman/spark/file.csv")
- df_load.show() # show command is same as the collect command
- # if no header then header is include by itself
- # print schema (structure of your dataframe)
- df_load.printSchema() # data type of each column
- # show data frame
- df_load.show(10) # similar to df.head(10) in pandas
- # to rename column name
- df_load = df_load.toDF('a', 'b', 'c', 'd', 'e', 'f', 'g')
- df_load.show()
- # to rename a column name
- df_load = df_load.withColumnRename('g', 'good') # check the command
- # drop a column from a data frame
- df_load.drop('g')
- df1 = df_load[df_load.f!='volume'].show()
- df2 = df_load[df_load.f=='volume'].show()
- df1.show()
- # add a column with data 1/df1.e
- df3 = df1.withColumn('e1', 1/df1.e) # muting the data
- df3.show()
- # we can implement aggregration in pyspark (groupby)
- df3.show()
- df3.groupby(['a']) # groupby on single field
- df4 = df3.groupby(['a', 'b']).agg({'d':'mean'}) # groupby on multiple fileds
- # select mean of d from df3 groupby a, b
- # groupby a and b then finding average of d
- df4.show()
- # learn join command (joining dataframe in pyspark)
- (same as joining dataframe in pandas)
- # distributed computation
- 3:30 to 5:00 17/09/2020
- apt-get install python3-venv
|