sparkref.txt 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. # in docker:
  2. which python3
  3. which python
  4. sudo ln -s /usr/bin/python3 /usr/bin/python
  5. /usr/local/hadoop/spark/sbin/start-all.sh
  6. /usr/local/hadoop/spark/bin/pyspark
  7. # pyspark - python with spark
  8. # configure in jupyter for pyspark and run on it
  9. # check pyspark configuration on port number 4040 (pyspark shell working)
  10. whenever a job is submitted on pyspark we can check 4040
  11. # to start pyspark (in edge node)
  12. /usr/local/hadoop/spark/bin/pyspark
  13. pyspark
  14. # distributed data sets - it will be replicated on all the data nodes and
  15. processing is done at data nodes only
  16. # pyspark can create distributed datasets from any storage (local disk or HDFS)
  17. # executed - job list (pyspark gui)
  18. # in pyspark everythink is RDD
  19. # Text file RDDs can be created using textFile method
  20. # sc - spark contest (executer)
  21. # textFile command is used to create a dataset from a text file
  22. # before that copy the file to hdfs cluster and access from it
  23. # by default the file is access from hdfs (read to access file from local)
  24. dfile = sc.textFile("path to file")
  25. dfile.collect() - collect is an event that will display all the content in dataset
  26. dfile = sc.textFile("/user/raman/a.txt")
  27. dfile.collect()
  28. type(dfile)
  29. <class 'pyspark.rdd.RDD'>
  30. # when job is submitted then executed driver start in pyspark gui
  31. # to display only two records
  32. dfile.take(2)
  33. # create parallelizing - use method of parallelizing to create replica of RDB
  34. # there are two methods in spark:
  35. 1) parallelizing - putting jobs in distributed manner
  36. 2) initialization - sc.textFile()
  37. # parallelize command allows spark to distribute the data across multiple nodes, instead of
  38. depending on single node
  39. data = sc.parallelize([]) - from this data it will create an instant which we can store it
  40. somewhere (writing data to local/hdfs)
  41. # type command is used to display data type
  42. type(data)
  43. <class 'pyspark.rdd.RDD'>
  44. data.collect()
  45. [1, 2, 3, 4, 5]
  46. data.take(2) - display two records
  47. [1, 2]
  48. # map and flatMap (important commands and remaining are concept of python itself)
  49. map transformation takes in a function and applies it to each element in the RDD and
  50. the result of the function is a new value of each element in the resulting RDD.
  51. (using map we could apply a function to all elements of RDD)
  52. flatMap used to produce multiple output elements for each input.
  53. the difference between map and flatmap is that the flatmap is flatten.
  54. flatten means it will convert list of list to list.
  55. dfile = dfile.take(2)
  56. dfile.collect()
  57. ['hello how are you', 'hello I am fine']
  58. dfile.map(lambda line:line.split(" ")) # running this command all by itself gives an error
  59. dat = dfile.map(lambda line:line.split(" ")) # storing to a new variable is not error
  60. # split each line on the basis of space
  61. dat.collect()
  62. [['hello', 'how', 'are', 'you'], ['hello', 'I', 'am', 'fine']]
  63. dat.take(2)
  64. [['hello', 'how', 'are', 'you'], ['hello', 'I', 'am', 'fine']]
  65. datf = dfile.flatMap(lambda line:line.split(" "))
  66. datf.collect()
  67. ['hello', 'how', 'are', 'you', 'hello', 'I', 'am', 'fine']
  68. datf.take(2)
  69. ['hello', 'how']
  70. # frequency of words
  71. data2 = datf.map(lambda word:(word, 1))
  72. # word is a variable (any think can be used)
  73. # assign 1 to each word in this map
  74. data2.collect()
  75. [('hello', 1), ('how', 1), ...]
  76. data3 = dat.map(lambda word:(word, 1))
  77. data3.collect()
  78. [('hello how are you', 1), ('hello I am fine', 1)]
  79. # there is a function, reduce by key
  80. data4 = data2.reduceByKey(lambda a,b:a+b) # sort of aggregation
  81. data4.collect()
  82. [stage 9:> # it has executed in 9 stages
  83. [('hello', 2), ('how', 1), ...]
  84. # save output in hdfs
  85. data4.saveAsTextFile("hdfs:///user/raman/spark/file.txt")
  86. # Data Frame in PySpark (no RDB in spark)
  87. Data Frame is a distributed collection of rows under named columns.
  88. 1. Data frame also shares common characteristics with RDD
  89. 2. It is immutable in nature
  90. (it means we can create a data frame once but cannot be changed as in the case of RDB)
  91. 3. We can transform a data frame/RDD after applying a technique of transformation(map, flatMap, reduceByKey)
  92. (this transformation not done in original data frame but only in a new variable)
  93. 4. It is distributed (replicated on all data nodes)
  94. # Load data into data frame (copy file on hdfs)
  95. df_load = spark.read.csv("/user/raman/spark/file.csv")
  96. df_load.show() # show command is same as the collect command
  97. # if no header then header is include by itself
  98. # print schema (structure of your dataframe)
  99. df_load.printSchema() # data type of each column
  100. # show data frame
  101. df_load.show(10) # similar to df.head(10) in pandas
  102. # to rename column name
  103. df_load = df_load.toDF('a', 'b', 'c', 'd', 'e', 'f', 'g')
  104. df_load.show()
  105. # to rename a column name
  106. df_load = df_load.withColumnRename('g', 'good') # check the command
  107. # drop a column from a data frame
  108. df_load.drop('g')
  109. df1 = df_load[df_load.f!='volume'].show()
  110. df2 = df_load[df_load.f=='volume'].show()
  111. df1.show()
  112. # add a column with data 1/df1.e
  113. df3 = df1.withColumn('e1', 1/df1.e) # muting the data
  114. df3.show()
  115. # we can implement aggregration in pyspark (groupby)
  116. df3.show()
  117. df3.groupby(['a']) # groupby on single field
  118. df4 = df3.groupby(['a', 'b']).agg({'d':'mean'}) # groupby on multiple fileds
  119. # select mean of d from df3 groupby a, b
  120. # groupby a and b then finding average of d
  121. df4.show()
  122. # learn join command (joining dataframe in pyspark)
  123. (same as joining dataframe in pandas)
  124. # distributed computation
  125. 3:30 to 5:00 17/09/2020
  126. apt-get install python3-venv