<font id="tbtdb"></font>

        <sub id="tbtdb"><delect id="tbtdb"></delect></sub>

          <progress id="tbtdb"></progress>
          <address id="tbtdb"></address><thead id="tbtdb"></thead>
          <font id="tbtdb"></font>

          <meter id="tbtdb"><cite id="tbtdb"><dl id="tbtdb"></dl></cite></meter>

          好程序員-千鋒教育旗下高端IT職業教育品牌

          400-811-9990
          我的賬戶
          好程序員

          專注高端IT職業培訓

          親愛的猿猿,歡迎!

          已有賬號,請

          如尚未注冊?

          [BigData] 好程序員大數據培訓教程分享Actor學習筆記

          [復制鏈接]
          676 0
          葉子老師 發表于 2019-8-19 11:43:44 | 只看該作者 |閱讀模式 打印 上一主題 下一主題
            好程序員大數據培訓教程分享Actor學習筆記,scala中她能實現很強大的功能,他是基于并發機制的一個事件模型
          我們現在學的scala2.10.x版本就是之前的Actor
          同步:在主程序上排隊執行的任務,只有前一個任務執行完畢后,才能執行下一個任務
          異步:指不進入主程序,而進入"任務對列"的任務,只有等主程序任務執行完畢,"任務對列"開始請求主程序,請求任務執行,該任務會進入主程序
          java
          共享變量 -- 加鎖
          會出現鎖死問題
          scala
          Actor不共享數據
          沒有鎖的概念
          Actor通信之間需要message(通信)
          Aactor執行順序
          1.首先調用start()方法啟動Actor
          2.調用start()方法后act()方法會被執行
          3.Actor之間進行發送消息
          Actor發送消息的三種方式
          ! -> 發送異步消息,沒有返回值
          !? -> 發送同步消息,有返回值,會有線程等待
          !! -> 發送異步消息,有返回值,返回值類型Future[Any](用來獲取異步操作結果)
          Actor并行執行
          //注意,這兩個actor會并行執行,當其中一個for循環結束后,actor結束
          object ActorDemo01 {
            def main(args: Array[String]): Unit = {
              MyActor1.start()
              MyActor2.start()
            }
          }
          object MyActor1 extends Actor{
            override def act(): Unit = {
              for (i <- 1 to 10){
                println(s"actor => $i")
                Thread.sleep(2000)
              }
            }
            object MyActor2 extends Actor{
              override def act(): Unit = {
                for (i <- 1 to 5){
                  println(s"actor2 => $i")
                  Thread.sleep(2000)
                }
              }
            }
          }
          Actor不斷接受消息
          執行第一種方式,異步
          object ActorDemo02 {
            def main(args: Array[String]): Unit = {
              val actor: MyActor = new MyActor
              actor.start()
              //并行執行
              actor ! "start"  // !->異步
              actor ! "stop"
              println("發送完成")
            }
          }
          class MyActor extends Actor{
            override def act(): Unit = {
              while (true){   //死循環
                receive {   //接收
                  case "start" => {
                    println("starting")
                    Thread.sleep(1000)
                    println("started")
                  }
                  case "stop" => {
                    println("stopping")
                    Thread.sleep(1000)
                    println("stopped")
                  }
                }
              }
            }
          }
          第二種方式:利用react來代替receive,也就是說react線程可復用,比receive更高效
          object ActorDemo03 {
            def main(args: Array[String]): Unit = {
              val actor: MyActor3 = new MyActor3
              actor.start()
              actor ! "start"
              actor ! "stop"
              println("成功了")
            }
          }
          class MyActor3 extends Actor{
            override def act(): Unit = {
              loop {
                react{
                  case "start" =>{
                    println("starting")
                    Thread.sleep(1000)
                    println("sarted")
                  }
                  case "stop" =>{
                    println("stoppting")
                    Thread.sleep(1000)
                    println("stopped")
                  }
                }
              }
            }
          }
          結合樣例類練習Actor發送消息
          //創建樣例類
          case class AsyncMsg(id: Int, msg: String)
          case class SyncMsg(id: Int, msg: String)
          case class ReplyMsg(id: Int, msg: String)
          object ActorDemo01 extends Actor {
            override def act(): Unit = {
              while (true) {
                receive {
                  case "start" => println("starting...")
                  case AsyncMsg(id, msg) =>
                  {
                    println(s"id:$id,msg:$msg")
                    sender ! ReplyMsg(1,"sucess")  //接收到消息后返回響應消息
                  }
                  case SyncMsg(id,msg) => {
                    println(s"id:$id,msg:$msg")
                    sender ! ReplyMsg(2,"sucess")
                  }
                }
              }
            }
          }
          object ActorTest{
            def main(args: Array[String]): Unit = {
              val actor: Actor = ActorDemo01.start()
          //    //異步發送消息,沒有返回值
          //    actor ! AsyncMsg(3,"heihei")
          //    println("異步消息發送完成,沒有返回值")
          //    //同步發送消息,有返回值
          //    val text: Any = actor !? SyncMsg(4,"OK")
          //    println(text)
          //    println("同步消息發送成功")
              //異步發送消息,有返回值,返回類型為Future[Any]
              val reply: Future[Any] = actor !! SyncMsg(5,"OK is 不存在的")
              Thread.sleep(2000)
              if (reply.isSet){
                val applyMsg: Any = reply.apply()
                println(applyMsg)
              }else{
                println("Nothing")
              }
            }
          }
          Actor并行化的wordcount
          class Task extends Actor {
            override def act(): Unit = {
              loop {
                react {
                  case SubmitTask(fileName) => {
                    val contents = Source.fromFile(new File(fileName)).mkString
                    val arr = contents.split("\r\n")
                    val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length)
                    //val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))
                    sender ! ResultTask(result)
                  }
                  case StopTask => {
                    exit()
                  }
                }
              }
            }
          }
          object WorkCount {
            def main(args: Array[String]) {
              val files = Array("c://words.txt", "c://words.log")
              val replaySet = new mutable.HashSet[Future[Any]]
              val resultList = new mutable.ListBuffer[ResultTask]
              for(f <- files) {
                val t = new Task
                val replay = t.start() !! SubmitTask(f)
                replaySet += replay
              }
              while(replaySet.size > 0){
                val toCumpute = replaySet.filter(_.isSet)
                for(r <- toCumpute){
                  val result = r.apply()
                  resultList += result.asInstanceOf[ResultTask]
                  replaySet.remove(r)
                }
                Thread.sleep(100)
              }
              val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2))
              println(finalResult)
            }
          }
          case class SubmitTask(fileName: String)
          case object StopTask
          case class ResultTask(result: Map[String, Int])
          好程序員大數據培訓官網:http://www.xn122.com/

          精彩內容,一鍵分享給更多人!
          收藏
          收藏0
          轉播
          轉播
          分享
          淘帖0
          支持
          支持0
          反對
          反對0
          回復

          使用道具 舉報

          您需要登錄后才可以回帖

          本版積分規則

          關注我們
          好程序員
          千鋒好程序員

          北京校區(總部):北京市海淀區寶盛北里西區28號中關村智誠科創大廈

          深圳西部硅谷校區:深圳市寶安區寶安大道5010號深圳西部硅谷B座A區605-619

          杭州龍馳智慧谷校區:浙江省杭州市下沙經濟技術開發區元成路199號龍馳智慧谷B座7層

          鄭州校區:鄭州市二七區航海中路60號海為科技園C區10層、12層

          Copyright 2007-2019 北京千鋒互聯科技有限公司 .All Right

          京ICP備12003911號-5 京公安網11010802011455號

          請您保持通訊暢通1對1咨詢馬上開啟

          电影韩国三级2019在线观看