博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
scala
阅读量:6935 次
发布时间:2019-06-27

本文共 9447 字,大约阅读时间需要 31 分钟。

hot3.png

package aimport org.apache.spark.rdd.RDDimport org.apache.spark.{HashPartitioner, SparkConf, SparkContext}/**  * Created by yuduy on 2018/1/11.  */class A {  def sc: SparkContext = {    val conf = new SparkConf().setMaster("").setAppName("")    val sc = new SparkContext(conf)    sc  }  def aaa: Unit = {    val conf = new SparkConf().setMaster("").setAppName("")    val sc = new SparkContext(conf)    val input: RDD[String] = sc.textFile("")    val warns: RDD[String] = input.filter(line => line.contains("warn"))    val errors: RDD[String] = input.filter(linux => linux.contains("error"))    val calm: RDD[String] = warns.union(errors)    calm.persist()    calm.collect().mkString("")    val tmp: Array[String] = calm.collect()    val a: RDD[Int] = input.map(l => 1)    calm.take(10).foreach(println)    calm.flatMap(_ + 1)    val words: RDD[String] = input.flatMap(line => line.split(" "))    val counts = words.map(word => (word, 1)).//reduceByKey((x, y) => x + y)    reduceByKey{      case (x, y) => x + y    }    counts.saveAsTextFile("")    //words.groupBy()    val to_combinue = sc.parallelize(List((1,2)))    to_combinue      .combineByKey((v) => (v, 1), (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1), (acc1: (Int, Int), acc2:(Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2))        .map{ case (key, value) => (key, value._1 / value._2.toFloat)}    to_combinue.collectAsMap().map(println(_))    to_combinue.partitions.size    //to_combinue.coalesce()    //to_combinue.mapValues((x, y) => x).persist()    input.map(s => s)    val pair: RDD[(String, String)] = sc.sequenceFile[String, String]("").partitionBy(new HashPartitioner(100)).persist()    //pair.flatMap()    //val map = pair.map((x, y) => x).persist()    //pair.map((x, y) => (x, y)).persist()    val si = sc.parallelize(List(1,2,3))    si.map(x => (x, 1))    val du = sc.parallelize(List((1, 2), (3, 4)))    du.flatMap {      case (d1, d2) => d1 + ""    }//((d1, d2) => (d1))    val links: RDD[(String, Seq[String])] = sc.objectFile[(String, Seq[String])]("link").partitionBy(new HashPartitioner(100)).persist()    var ranks: RDD[(String, Double)] = links.mapValues(v => 1.0)    for (i <- 0 to 10) {      val contributions: RDD[(String, Double)] = links.join(ranks).flatMap {        case (pageId: String, (links: Iterable[String], rank: Double)) =>          links.map(dest => (dest, rank / links.size))      }      ranks = contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85 * v)    }    ranks.saveAsTextFile("ranks")  }}

 

package aimport org.apache.hadoop.fs.Pathimport org.apache.spark.broadcast.Broadcastimport org.apache.spark.rdd.RDDimport org.apache.spark.storage.StorageLevelimport org.apache.spark.{SparkConf, SparkContext}import scala.util.Random/**  * Created by yuduy on 2018/1/17.  */object Page {  val sc: SparkContext  def page(path: String): Unit = {    val sc = new SparkContext(new SparkConf())    //url -> neighbor url    val lines: RDD[String] = sc.textFile(path)    //url neighbors    val links: RDD[(String, Iterable[String])] = lines.map {      s =>        val parts = s.split("\\s+")        (parts(0), parts(1))    }.distinct().groupByKey() //.cache()    //submit conf: spark.rdd.compress=true    lines.persist(StorageLevel.MEMORY_ONLY_SER).setName("links")    //url rank    var ranks: RDD[(String, Double)] = links.mapValues(v => 1.0)    var lastCheckPointRanks: RDD[(String, Double)] = null    for (i <- 1 to 10) {      val contribs = links.join(ranks).values.flatMap {        case (urls, rank) =>          urls.map(url => (url, rank / urls.size))      }      ranks = contribs.reduceByKey(_ + _).mapValues(0.15 + 0.85 * _)      //checkpoint      if (i != 10) {        ranks.persist(StorageLevel.MEMORY_ONLY_SER).setName(s"iter$i: ranks")        ranks.checkpoint()        //force action, just for trigger calculation        ranks.foreach(_ => Unit)        if (lastCheckPointRanks != null) {          lastCheckPointRanks.getCheckpointFile.foreach {            ckp =>              val p = new Path(ckp)              val fs = p.getFileSystem(sc.hadoopConfiguration)              fs.delete(p, true)          }          lastCheckPointRanks.unpersist(blocking = false)        }        lastCheckPointRanks = ranks      }    }    //force action, like ranks.saveAsTextFile()    ranks.foreach(_ => Unit)  }  def improve(path: String): Unit = {    val sc = new SparkContext(new SparkConf())    //url -> neighbor url    val lines: RDD[String] = sc.textFile(path)    val links: RDD[(Long, Long)] = lines.map {      s =>        val parts = s.split("\\s+")        (parts(0).toLong, parts(1).toLong)    }.distinct()    links.persist(StorageLevel.MEMORY_ONLY_SER).setName("links")    //count of each url's outs    val outCnts: RDD[(Long, Int)] = links.mapValues(_ => 1).reduceByKey(_ + _)    outCnts.persist(StorageLevel.MEMORY_ONLY_SER).setName("out-counts")    //url rank    var ranks: RDD[(Long, Double)] = links.mapValues(v => 1.0)    val c1: RDD[(Long, (Iterable[Long], Iterable[Int]))] = links.cogroup(outCnts)    val c2: RDD[(Long, (Iterable[Long], Iterable[Int], Iterable[Double]))] = links.cogroup(outCnts, ranks)    //RDD[(Long, (Iterable[Long], Iterable[Int], Iterable[Double]))]    //                     [url]         [count]       [rank]    //                   url  rank    /*val contribs: RDD[(Long, Double)] = links.cogroup(outCnts, ranks).values.flatMap {      pair =>        for (u <- pair._1.iterator;             v <- pair._2.iterator;             w <- pair._3.iterator)          yield (u, w / v)    }*/    val contribs = keyWithRandomInt(links).cogroup(expandKeyWithRandomInt(outCnts), expandKeyWithRandomInt(ranks)).values.flatMap {      pair =>        for (u <- pair._1.iterator;             v <- pair._2.iterator;             w <- pair._3.iterator)          yield (u, w / v)    }  }  def keyWithRandomInt[K, V](rdd: RDD[(K, V)]): RDD[((K, Int), V)] = {    rdd.map(x => ((x._1, Random.nextInt(10)), x._2))  }  def expandKeyWithRandomInt[K, V](rdd: RDD[(K, V)]): RDD[((K, Int), V)] = {    rdd.flatMap {      x =>        for (i <- 0 to 10)          yield ((x._1, i), x._2)    }  }  def last(path: String): Unit = {    val lines: RDD[String] = sc.textFile(path)    val links: RDD[(Long, Long)] = lines.map {      s =>        val parts = s.split("\\s+")        (parts(0).trim.toLong, parts(1).trim.toLong)    }.distinct()    links.persist(StorageLevel.MEMORY_ONLY_SER).setName("links")    //count of each url's outs    val outCnts: RDD[(Long, Long)] = links.mapValues(_ => 1L).reduceByKey(_ + _)      .persist(StorageLevel.MEMORY_ONLY_SER).setName("out-counts")    //init ranks    var ranks: RDD[(Long, Double)] = outCnts.mapValues(_ => 1.0)      .persist(StorageLevel.MEMORY_ONLY_SER).setName("init-ranks")    //force action, just for trigger calculation    ranks.foreach(_ => Unit)    val skewedOutCnts: scala.collection.Map[Long, Long] = outCnts.filter(_._2 >= 1000000).collectAsMap()    val bcSkewedOutCnts: Broadcast[scala.collection.Map[Long, Long]] = sc.broadcast(skewedOutCnts)    val skewed = links.filter {      link =>        val cnts = bcSkewedOutCnts.value        cnts.contains(link._1)    }.persist(StorageLevel.MEMORY_ONLY_SER).setName("skewed-links")    //force action, just for trigger calculation    skewed.foreach(_ => Unit)    val noSkewed = links.filter{      link =>        val cnts = bcSkewedOutCnts.value        !cnts.contains(link._1)    }.groupByKey().persist().setName("no-skewed-links")    //force action, just for trigger calculation    noSkewed.foreach(_ => Unit)    links.unpersist(blocking = false)  }}

 

package com.pplive.pike.exec.spoutproto;import java.util.concurrent.atomic.AtomicBoolean;import java.util.concurrent.atomic.AtomicLong;/** * Created by yuduy on 2018/5/17. */public class Tps {    private static class Statistics {        public final AtomicBoolean busy = new AtomicBoolean(false);        public final AtomicLong count = new AtomicLong(0);        public long lastModifyTime;    }    private final AtomicLong total;    private final long tpsLimit;    private final Statistics[] windows;    public Tps(int tpsLimit) {        this.tpsLimit = tpsLimit;        this.total = new AtomicLong(0);        this.windows = new Statistics[100];        for (int i = 0; i < 100; i++) {            //avoid null            this.windows[i] = new Statistics();        }    }    private long addAndReturn() {        for (; ; ) {            long currentTimeMillis = System.currentTimeMillis();            Statistics stat = windows[(int) currentTimeMillis % this.windows.length];            //no contented            if ((currentTimeMillis - stat.lastModifyTime <= 10) && !stat.busy.get()) {//lastModifyTime and total, count may confix. so !stat.busy.get()                long rtn = total.incrementAndGet();                stat.count.incrementAndGet();                return rtn;            }            //contented            if (stat.busy.compareAndSet(false, true)) {                try {                    total.set(total.get() - stat.count.get());                    stat.count.set(1);                    stat.lastModifyTime = currentTimeMillis;//non-volatile, but write with barrier                    return total.get();                } finally {                    stat.busy.set(false);//release lock and write barrier                }            }        }    }    public boolean overLoad() {        return addAndReturn() > tpsLimit;    }}

 

转载于:https://my.oschina.net/u/1380557/blog/1608762

你可能感兴趣的文章
ORACLE查看当前连接用户的权限信息或者角色信息
查看>>
痛并快乐的造轮子之旅:awk访问数据库之旅
查看>>
转载:如何避免代码中的if嵌套
查看>>
kerberos简单介绍
查看>>
javaweb中为mysql的curd多个值的语句
查看>>
jquery json
查看>>
Web Dynpro Controller
查看>>
php7 安装扩展
查看>>
【总结整理】数据可视化
查看>>
安装zookeeper
查看>>
FFmpeg-20160422-snapshot-bin
查看>>
C 语言复杂声明
查看>>
IOS 二张图片合并
查看>>
【java】在分页查询结果中对最后的结果集List进行操作add()或remove()操作,报错:java.lang.UnsupportedOperationException...
查看>>
CentOS系统环境下安装MongoDB
查看>>
LeetCode 3. Longest Substring Without Repeating Characters
查看>>
安卓模拟器BlueStacks 安装使用教程(图解)
查看>>
Hadoop YARN学习之Hadoop框架演进历史简述
查看>>
C++中友元类使用场合
查看>>
Laravel5.5的异常捕获和处理
查看>>