<font id="tbtdb"></font>

        <sub id="tbtdb"><delect id="tbtdb"></delect></sub>

          <progress id="tbtdb"></progress>
          <address id="tbtdb"></address><thead id="tbtdb"></thead>
          <font id="tbtdb"></font>

          <meter id="tbtdb"><cite id="tbtdb"><dl id="tbtdb"></dl></cite></meter>

          好程序員-千鋒教育旗下高端IT職業教育品牌

          400-811-9990
          我的賬戶
          好程序員

          專注高端IT職業培訓

          親愛的猿猿,歡迎!

          已有賬號,請

          如尚未注冊?

          [BigData] 好程序員大數據培訓教程分享Master的jps

          [復制鏈接]
          730 0
          葉子老師 發表于 2019-8-19 14:45:39 | 只看該作者 |只看大圖 |閱讀模式 打印 上一主題 下一主題
          好程序員大數據培訓教程分享Master的jps


          SparkSubmit
            類啟動后的服務進程,用于提交任務,
            哪一段啟動提交任務,哪一段啟動submit(Driver端)

          提交任務流程
          1.Driver端提交任務到Master(啟動sparkSubmit進程)
          2.Master生成任務信息,放入對列中
          3.Master通知Worker啟動Executor,(Master過濾出存活的Worker,將任務分配給空閑資源多的worker)
          4.worker的Executor向Driver端注冊(只有executor真正參與計算) -> worker從Dirver端拿信息
          5.Driver端啟動Executor將任務劃分階段,分成小的task,再廣播給相應的Worker讓他去執行
          6.worker會將執行完的任務回傳給Driver


          range 相當于集合子類
          scala> 1.to(10)
          res0: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8,
          9, 10)
          [size=10.5000pt]
          scala> 1 to 10
          res1: scala.collection.immutable.Range.Inclusive = Range(1, 2, 3, 4, 5, 6, 7, 8,
          9, 10)

          提交任務到集群的任務類 :
          Spark context available as sc
          SQL context available as sqlContext
          直接調用:
          spark WordCount
          構建模板代碼:
          SparkConf:構建配置信息類,該配置優先于集群配置文件
          setAppName:指定應用程序名稱,如果不指定,會自動生成一個類似于uuid產生的名稱
          setMaster:指定運行模式:local-用1個線程模擬集群運行,
          local[2]: 用2個線程模擬集群運行,loca-當前有多少空閑到的線程就用多少線程來運行該任務
          /**
            * 用spark實現單詞計數
            */
          object SparkWordCount {
            def main(args: Array[String]): Unit = {
              /**
                * 構建模板代碼
                */
              val conf: SparkConf = new SparkConf()
                .setAppName("SparkWordCount")
          //      .setMaster("local[2]")
          [size=10.5000pt]
              // 創建提交任務到集群的入口類(上下文對象)
              val sc: SparkContext = new SparkContext(conf)
          [size=10.5000pt]
              // 獲取HDFS的數據
              val lines: RDD[String] = sc.textFile(args(0))
          [size=10.5000pt]
              // 切分數據,生成一個個單詞
              val words: RDD[String] = lines.flatMap(_.split(" "))
          [size=10.5000pt]
              // 把單詞生成一個個元組
              val tuples: RDD[(String, Int)] = words.map((_, 1))
          [size=10.5000pt]
              // 進行聚合操作
          //    tuples.reduceByKey((x, y) => x + y)
              val sumed: RDD[(String, Int)] = tuples.reduceByKey(_+_)
          [size=10.5000pt]
              // 以單詞出現的次數進行降序排序
              val sorted: RDD[(String, Int)] = sumed.sortBy(_._2, false)
          [size=10.5000pt]
              // 打印到控制臺
          //    println(sorted.collect.toBuffer)
          //    sorted.foreach(x => println(x))
          //    sorted.foreach(println)
          [size=10.5000pt]
              // 把結果存儲到HDFS
              sorted.saveAsTextFile(args(1))
          [size=10.5000pt]
              // 釋放資源
              sc.stop()
            }
          [size=10.5000pt]}
          打包后上傳Linux
          1.首先啟動zookeeper,hdfs和Spark集群
          啟動hdfs
          /usr/local/hadoop-2.6.1/sbin/start-dfs.sh
          啟動spark
          /usr/local/spark-1.6.1-bin-hadoop2.6/sbin/start-all.sh

          2.使用spark-submit命令提交Spark應用(注意參數的順序)
          /usr/local/spark-1.6.1-bin-hadoop2.6/bin/spark-submit \
          --class com.qf.spark.WordCount \
          --master spark://node01:7077 \
          --executor-memory 2G \
          --total-executor-cores 4 \
          /root/spark-mvn-1.0-SNAPSHOT.jar \
          hdfs://node01:9000/words.txt \
          hdfs://node01:9000/out

          3.查看程序執行結果
          hdfs dfs -cat hdfs://node01:9000/out/part-00000

          javaSparkWC
          import org.apache.spark.SparkConf;
          import org.apache.spark.api.java.JavaPairRDD;
          import org.apache.spark.api.java.JavaRDD;
          import org.apache.spark.api.java.JavaSparkContext;
          import org.apache.spark.api.java.function.FlatMapFunction;
          import org.apache.spark.api.java.function.Function2;
          import org.apache.spark.api.java.function.PairFunction;
          import scala.Tuple2;
          [size=10.5000pt]
          import java.util.Arrays;
          import java.util.List;
          [size=10.5000pt]
          public class JavaSparkWC {
              public static void main(String[] args) {
                  SparkConf conf = new SparkConf()
                          .setAppName("JavaSparkWC").setMaster("local[1]");

                  //提交任務入口類
                  JavaSparkContext jsc = new JavaSparkContext(conf);
          [size=10.5000pt]
                  //獲取數據
                  JavaRDD<String> lines = jsc.textFile("hdfs://hadoop01:9000/wordcount/input/a.txt");
                  //切分數據
                  JavaRDD<String> words =
                          lines.flatMap(new FlatMapFunction<String, String>() {
                      @Override
                      public Iterable<String> call(String s) throws Exception {
                          List<String> splited = Arrays.asList(s.split(" ")); //生成list
                          return splited;
                      }
                  });
          [size=10.5000pt]
                  //生成元祖                               //一對一組 ,(輸入單詞,輸出單詞,輸出1)
                  JavaPairRDD<String, Integer> tuples =
                          words.mapToPair(new PairFunction<String, String, Integer>() {
                      @Override
                      public Tuple2<String, Integer> call(String s) throws Exception {
                          return new Tuple2<String, Integer>(s, 1);
                      }
                  });
          [size=10.5000pt]
                  //聚合                                                  //2個相同key的value,聚合
                  JavaPairRDD<String, Integer> sumed =
                          tuples.reduceByKey(new Function2<Integer, Integer, Integer>() {
                      @Override
                      public Integer call(Integer v1, Integer v2) throws Exception {
                          return v1 + v2;
                      }
                  });
          [size=10.5000pt]
                  //此前key為String類型,沒有辦法排序
                  //Java api并沒有提供sortBy算子,此時需要把兩個值位置調換,排序完成后,在換回來
                  final JavaPairRDD<Integer, String> swaped =
                          sumed.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
                      @Override
                      public Tuple2<Integer, String> call(Tuple2<String, Integer> tup) throws Exception {
          //                return new Tuple2<Integer, String>(tup._2, tup._1);
                          return tup.swap(); //swap(),交換方法
                      }
                  });
          [size=10.5000pt]
                  //降序排序
                  JavaPairRDD<Integer, String> sorted = swaped.sortByKey(false);
                  //再次交換
                  JavaPairRDD<String, Integer> res = sorted.mapToPair(
                      new PairFunction<Tuple2<Integer, String>, String, Integer>() {
                         @Override
                         public Tuple2<String, Integer> call(Tuple2<Integer, String> tup)throws Exception {
                              return tup.swap();
                         }
                  });
          [size=10.5000pt]
                  System.out.println(res.collect());
          [size=10.5000pt]
                  jsc.stop();//釋放資源
              }
          [size=10.5000pt]}
          好程序員大數據培訓官網:http://www.xn122.com/

          精彩內容,一鍵分享給更多人!
          收藏
          收藏0
          轉播
          轉播
          分享
          淘帖0
          支持
          支持0
          反對
          反對0
          回復

          使用道具 舉報

          您需要登錄后才可以回帖

          本版積分規則

          關注我們
          好程序員
          千鋒好程序員

          北京校區(總部):北京市海淀區寶盛北里西區28號中關村智誠科創大廈

          深圳西部硅谷校區:深圳市寶安區寶安大道5010號深圳西部硅谷B座A區605-619

          杭州龍馳智慧谷校區:浙江省杭州市下沙經濟技術開發區元成路199號龍馳智慧谷B座7層

          鄭州校區:鄭州市二七區航海中路60號海為科技園C區10層、12層

          Copyright 2007-2019 北京千鋒互聯科技有限公司 .All Right

          京ICP備12003911號-5 京公安網11010802011455號

          請您保持通訊暢通1對1咨詢馬上開啟

          电影韩国三级2019在线观看