From 0fbf8222aedad91941cbfc3b9a2fddde8a798d0b Mon Sep 17 00:00:00 2001 From: virginiajimenez96 Date: Wed, 1 Dec 2021 14:15:08 +0000 Subject: [PATCH 1/2] Rooster and Sun Test and events in csv --- rddofeventscsv.scala | 44 +++++++ .../scala/io/pathogen/spark/Rooster.scala | 13 +-- src/main/scala/io/pathogen/spark/Sun.scala | 17 ++- .../io/pathogen/spark/events_test.csv | 16 +++ .../io/pathogen/spark/RoosterSunTest.scala | 110 ++++++++++++++++++ 5 files changed, 184 insertions(+), 16 deletions(-) create mode 100644 rddofeventscsv.scala create mode 100644 src/test/resources/io/pathogen/spark/events_test.csv create mode 100644 src/test/scala/io/pathogen/spark/RoosterSunTest.scala diff --git a/rddofeventscsv.scala b/rddofeventscsv.scala new file mode 100644 index 0000000..c97dd5b --- /dev/null +++ b/rddofeventscsv.scala @@ -0,0 +1,44 @@ +import io.pathogen.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.graphx.Graph +import io.pathogen.spark.DateUtils.Frequency +import scala.io.Source + +/* +val sun = 1L +val moon = 2L +val chicken = 3L +val dog = 4L +val owl = 5L + +random events: 6L, 7L, 8L +*/ + + +var events = Array[Event]() +val eventsFile = Source.fromFile("src/test/resources/io/pathogen/spark/events.csv") +for (line <- eventsFile.getLines.drop(1)) { + val cols = line.split(",").map(_.trim) + val currentEvent = new Event(conceptId=cols(0).toLong, eventStart=cols(1).toLong, eventStop=cols(2).toLong, amplitude=cols(3).toLong) + events = events :+ currentEvent +} +eventsFile.close + +val spark:SparkSession = SparkSession.builder().master("local[1]").appName("test").getOrCreate() +val rdd:RDD[Event] = spark.sparkContext.parallelize(events) + + +//Need a config to run the rooster +val config = new Config(samplingRate=Frequency.MINUTE,maxIterations=75) +val roosterObject = new Rooster(config) +val sunObject = new Sun(config) + +//Calling function from rooster +val g = roosterObject.crow(rdd) + +//calling function for sun +val s = sunObject.rise(g) + + +val result = s.vertices.collect() diff --git a/src/main/scala/io/pathogen/spark/Rooster.scala b/src/main/scala/io/pathogen/spark/Rooster.scala index bf5b009..a116c04 100644 --- a/src/main/scala/io/pathogen/spark/Rooster.scala +++ b/src/main/scala/io/pathogen/spark/Rooster.scala @@ -17,12 +17,11 @@ package io.pathogen.spark import com.google.common.hash.Hashing -import com.typesafe.scalalogging.LazyLogging import org.apache.spark.HashPartitioner import org.apache.spark.graphx.{Edge, Graph, PartitionStrategy} import org.apache.spark.rdd.RDD -class Rooster(config: Config) extends Serializable with LazyLogging { +class Rooster(config: Config) extends Serializable { /** * @param events The initial time related events @@ -30,15 +29,15 @@ class Rooster(config: Config) extends Serializable with LazyLogging { */ def crow(events: RDD[Event]): Graph[Pathogen, Double] = { - logger.info(s"Observing correlation across ${events.count()} time related events") + println(s"Observing correlation across ${events.count()} time related events") if (config.simulations == 0) { - logger.warn("Correlation does not imply causation, proceed at your own risk") + println("Correlation does not imply causation, proceed at your own risk") } val correlations = getEventCorrelation(events) correlations.cache() val correlationCount = correlations.count() - logger.info(s"Found $correlationCount possible correlations") + println(s"Found $correlationCount possible correlations") val causalities = if (config.simulations > 1) { getEventCausality(events, correlations, config.simulations) @@ -109,13 +108,13 @@ class Rooster(config: Config) extends Serializable with LazyLogging { val randomCorrelations = getEventCorrelation(randomEvents) randomCorrelations.cache() val rcc = randomCorrelations.count() - logger.info(s"Monte carlo $simulation/$simulations - $rcc correlations found") + println(s"Monte carlo $simulation/$simulations - $rcc correlations found") randomCorrelations } } - logger.info("Normalizing causality score") + println("Normalizing causality score") val noiseHash = events.sparkContext.union(simulationResults) map { n => (n.srcId + n.dstId, n.attr) } reduceByKey (_ + _) partitionBy new HashPartitioner(events.partitions.length) diff --git a/src/main/scala/io/pathogen/spark/Sun.scala b/src/main/scala/io/pathogen/spark/Sun.scala index 7ca3cfa..b939963 100644 --- a/src/main/scala/io/pathogen/spark/Sun.scala +++ b/src/main/scala/io/pathogen/spark/Sun.scala @@ -16,22 +16,21 @@ package io.pathogen.spark -import com.typesafe.scalalogging.LazyLogging import io.pathogen.spark.Sun.VertexData import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD import scala.util.Try -class Sun(config: Config) extends Serializable with LazyLogging { +class Sun(config: Config) extends Serializable { /** * @param causalGraph The graph of observed (and normalized) correlations * @return The Contagion graph where vertices contain both a sensitivity and aggressiveness scores */ def rise(causalGraph: Graph[Pathogen, Double]): Graph[Pathogen, Double] = { - val initGraph = initializeGraph(causalGraph) + initGraph.cache() // --------------- @@ -40,7 +39,7 @@ class Sun(config: Config) extends Serializable with LazyLogging { val vertexSensitivity = propagateCausality(initGraph, config.tolerance, config.maxIterations, config.forgetfulness) val totalSensitivity = vertexSensitivity.values.sum() - logger.info(s"Total sensitivity is ${"%.2f".format(totalSensitivity)}") + println(s"Total sensitivity is ${"%.2f".format(totalSensitivity)}") // --------------- // AGGRESSIVENESS @@ -48,7 +47,7 @@ class Sun(config: Config) extends Serializable with LazyLogging { val vertexAggressiveness = propagateCausality(initGraph.reverse, config.tolerance, config.maxIterations, config.forgetfulness) val totalAggressiveness = vertexAggressiveness.values.sum() - logger.info(s"Total aggressiveness is ${"%.2f".format(totalAggressiveness)}") + println(s"Total aggressiveness is ${"%.2f".format(totalAggressiveness)}") // --------------- // PATHOGEN @@ -73,7 +72,7 @@ class Sun(config: Config) extends Serializable with LazyLogging { if(config.erratic > 0.0f) { - logger.info("Modelling Erratic behavior requires full ergodicity") + println("Modelling Erratic behavior requires full ergodicity") val edges = causalGraph.edges.map { edge => ((edge.srcId, edge.dstId), edge.attr) } @@ -126,7 +125,7 @@ class Sun(config: Config) extends Serializable with LazyLogging { graph.cache() val vertices = graph.vertices.count() val edges = graph.edges.count() - logger.info(s"Starting explaining causality on $vertices hosts and $edges connections") + println(s"Starting explaining causality on $vertices hosts and $edges connections") // Execute Pregel to source to destination nodes val propagated = graph.pregel( @@ -147,9 +146,9 @@ class Sun(config: Config) extends Serializable with LazyLogging { val maxSteps = propagated.vertices.values.values.max() if (maxSteps == maxIterations) { val nonConverged = propagated.vertices.filter(_._2._2 == maxSteps).count() - logger.warn(s"$nonConverged/$vertices nodes did not converge after $maxIterations iterations") + println(s"$nonConverged/$vertices nodes did not converge after $maxIterations iterations") } else { - logger.info(s"Pathogen converged after $averageSteps steps in average, max is $maxSteps") + println(s"Pathogen converged after $averageSteps steps in average, max is $maxSteps") } propagated.vertices.mapValues(_._1) diff --git a/src/test/resources/io/pathogen/spark/events_test.csv b/src/test/resources/io/pathogen/spark/events_test.csv new file mode 100644 index 0000000..6ebef68 --- /dev/null +++ b/src/test/resources/io/pathogen/spark/events_test.csv @@ -0,0 +1,16 @@ +Id,eventStart,eventStop,amplitude +3,1609483500,1609487100,1 +1,1609484400,1609527600,1 +3,1609570500,1609571700,1 +1,1609571100,1609614600,1 +4,1609624800,1609626600,1 +1,1609657500,1609701000,1 +4,1609709400,1609710600,1 +1,1609743600,1609787700,1 +5,1609797600,1609806600,1 +2,1609790400,1609826400,1 +5,1609889400,1609894800,1 +2,1609878600,1609916400,1 +6,1610002800,1610031600,1 +7,1610053200,1610057700,1 +8,1610121600,1610136000,1 diff --git a/src/test/scala/io/pathogen/spark/RoosterSunTest.scala b/src/test/scala/io/pathogen/spark/RoosterSunTest.scala new file mode 100644 index 0000000..53c4dc2 --- /dev/null +++ b/src/test/scala/io/pathogen/spark/RoosterSunTest.scala @@ -0,0 +1,110 @@ +package io.pathogen.spark + +import java.text.SimpleDateFormat +import io.pathogen.spark.DateUtils.Frequency +import org.joda.time.DateTime +import org.scalatest.{FlatSpec, Matchers} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.graphx._ +import io.pathogen.spark.DateUtils.Frequency +import scala.io.Source +import scala.math.BigDecimal + + +class RoosterSunTest extends FlatSpec with Matchers { + + //Function for rounding + def round(n: Double): Double = { + BigDecimal(n).setScale(4, BigDecimal.RoundingMode.HALF_UP).toDouble + } + + //Reading CSV + var events = Array[Event]() + val eventsFile = Source.fromInputStream(this.getClass.getResourceAsStream("events_test.csv"), "UTF-8") + for (line <- eventsFile.getLines.drop(1)) { + val cols = line.split(",").map(_.trim) + val currentEvent = new Event(conceptId=cols(0).toLong, eventStart=cols(1).toLong, eventStop=cols(2).toLong, amplitude=cols(3).toLong) + events = events :+ currentEvent + } + eventsFile.close + + val spark:SparkSession = SparkSession.builder().master("local[1]").appName("test").getOrCreate() + spark.sparkContext.setLogLevel("OFF"); + val rdd:RDD[Event] = spark.sparkContext.parallelize(events) + + //Defining objects needed + val config = new Config(samplingRate=Frequency.MINUTE,maxIterations=75) + val roosterObject = new Rooster(config) + val sunObject = new Sun(config) + + //Calling function from rooster + val g = roosterObject.crow(rdd) + + //Check the correct format of the output of Rooster and the input of Sun + "Rooster" should "return correct class" in { + g shouldBe a [Graph[_,_]] + g.edges.collect()(0) shouldBe a [Edge[_]] + } + + + //Check the correct values of the edges + "Rooster" should "return correct edges values" in { + for ( edge <-g.edges.collect() ) + { + //spark.graphx.Edge.srcId = id of origin node, spark.graphx.Edge.dstId = id of destination node, spark.graphx.Edge.attr = attribute + if (edge.srcId == 1 && edge.dstId == 4) + { + round(edge.attr) shouldBe 0.2959 + } + else if (edge.srcId == 2 && edge.dstId == 5) + { + edge.attr shouldBe 1.0 + } + else if (edge.srcId == 1 && edge.dstId == 2) + { + round(edge.attr) shouldBe 0.3085 + } + else if (edge.srcId == 4 && edge.dstId == 1) + { + round(edge.attr) shouldBe 0.2959 + } + else if (edge.srcId == 1 && edge.dstId == 5) + { + round(edge.attr) shouldBe 0.2749 + } + else if (edge.srcId == 3 && edge.dstId == 1) + { + round(edge.attr) shouldBe 0.7785 + } + } + } + + //Calling function for sun + val s = sunObject.rise(g) + + //Check the correct format of the output of Sun + "Sun" should "return correct class" in { + s shouldBe a [Graph[_,_]] + s.vertices.collect()(0)._1 shouldBe a [java.lang.Long] + s.vertices.collect()(0)._2 shouldBe a [Pathogen] + } + + //Check the correct values of the vertices + "Sun" should "return correct vertices values" in { + val result = s.vertices.collect().toMap + + round(result.get(1).get.aggressiveness) shouldBe 0.3717 + round(result.get(1).get.sensitivity) shouldBe 0.4265 + round(result.get(2).get.aggressiveness) shouldBe 0.1239 + round(result.get(2).get.sensitivity) shouldBe 0.1421 + round(result.get(3).get.aggressiveness) shouldBe 0.3655 + round(result.get(3).get.sensitivity) shouldBe 0.4193 + round(result.get(4).get.aggressiveness) shouldBe 0.1389 + round(result.get(4).get.sensitivity) shouldBe 0.1594 + result.get(5).get shouldBe Pathogen(0.0,0.0) + result.get(6).get shouldBe Pathogen(0.0,0.0) + result.get(7).get shouldBe Pathogen(0.0,0.0) + result.get(8).get shouldBe Pathogen(0.0,0.0) + } +} From 5d66279ed52851fd364f2c0e6804bfa48bf1f177 Mon Sep 17 00:00:00 2001 From: virginiajimenez96 Date: Thu, 2 Dec 2021 20:24:17 +0000 Subject: [PATCH 2/2] Upgraded to scala 2.12 and spark 3.1.2 --- pom.xml | 33 ++++++++++++++++----------------- rddofeventscsv.scala | 35 +++++++++++++++++++++-------------- 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/pom.xml b/pom.xml index 3ed891e..56ea76a 100644 --- a/pom.xml +++ b/pom.xml @@ -82,9 +82,9 @@ UTF-8 - 2.11.8 - 2.11 - 2.1.0 + 2.12.10 + 2.12 + 3.1.2 1.8 @@ -122,22 +122,22 @@ joda-time joda-time - 2.9.3 + 2.10.13 org.joda joda-convert - 1.8.2 + 2.2.1 com.typesafe.scala-logging scala-logging_${scala.binary.version} - 3.7.1 + 3.9.4 org.scalatest scalatest_${scala.binary.version} - 2.2.6 + 3.0.8 test @@ -147,7 +147,7 @@ org.apache.maven.plugins maven-compiler-plugin - 2.5.1 + 3.8.1 ${java.version} ${java.version} @@ -169,7 +169,7 @@ org.apache.maven.plugins maven-surefire-plugin - 2.7 + 2.22.2 true @@ -177,7 +177,7 @@ org.sonatype.plugins nexus-staging-maven-plugin - 1.6.7 + 1.6.8 true ossrh @@ -188,7 +188,7 @@ org.apache.maven.plugins maven-source-plugin - 2.2.1 + 3.2.1 attach-sources @@ -201,7 +201,7 @@ net.alchim31.maven scala-maven-plugin - 3.2.1 + 4.5.3 attach-javadocs @@ -214,10 +214,9 @@ org.scalatest scalatest-maven-plugin - 1.0 + 2.0.2 ${project.build.directory}/surefire-reports - . once WDF TestSuite.txt @@ -233,7 +232,7 @@ org.apache.maven.plugins maven-release-plugin - 2.3.2 + 2.5.3 true false @@ -244,14 +243,14 @@ org.apache.maven.scm maven-scm-provider-gitexe - 1.8.1 + 1.12.0 org.apache.maven.plugins maven-gpg-plugin - 1.5 + 3.0.1 sign-artifacts diff --git a/rddofeventscsv.scala b/rddofeventscsv.scala index c97dd5b..647a13e 100644 --- a/rddofeventscsv.scala +++ b/rddofeventscsv.scala @@ -4,20 +4,20 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.graphx.Graph import io.pathogen.spark.DateUtils.Frequency import scala.io.Source +import org.apache.spark.sql.types._ -/* -val sun = 1L -val moon = 2L -val chicken = 3L -val dog = 4L -val owl = 5L +/* + The csv file cointains: -random events: 6L, 7L, 8L + 12 events: 6 pairs of events of the objects sun and moon, and animals: rooster, dog and owl. + Some of these events are overlapped, other are not. + Events 13,14 and 15 are random events. + Once the file is read, the events are stored in an array of class Event. */ var events = Array[Event]() -val eventsFile = Source.fromFile("src/test/resources/io/pathogen/spark/events.csv") +val eventsFile = Source.fromFile("src/test/resources/io/pathogen/spark/events_test.csv") for (line <- eventsFile.getLines.drop(1)) { val cols = line.split(",").map(_.trim) val currentEvent = new Event(conceptId=cols(0).toLong, eventStart=cols(1).toLong, eventStop=cols(2).toLong, amplitude=cols(3).toLong) @@ -25,20 +25,27 @@ for (line <- eventsFile.getLines.drop(1)) { } eventsFile.close +// The following cell creates an RDD of the previous events (array). val spark:SparkSession = SparkSession.builder().master("local[1]").appName("test").getOrCreate() val rdd:RDD[Event] = spark.sparkContext.parallelize(events) - -//Need a config to run the rooster +// Objects definitions val config = new Config(samplingRate=Frequency.MINUTE,maxIterations=75) val roosterObject = new Rooster(config) val sunObject = new Sun(config) -//Calling function from rooster +/* Rooster.Scala takes as input the initial time related events (an RDD of events) + and outputs the causal effects explained as a graph (graph[Pathogen, double]). +*/ val g = roosterObject.crow(rdd) -//calling function for sun -val s = sunObject.rise(g) - +/* Sun.Scala takes as input a graph[Pathogen, double] of observed (and normalized) + correlations and returns a graph where vertices contain both a sensitivity and aggressiveness scores. + + Class Pathogen contains sensitiviy and aggressiveness. + Aggressiveness: measures how likely an event could explain downstreams effects (it causes other events) + Sensitivity: measures how likely an event results from an upstream event (it is caused by other ovents) +*/ +val s = sunObject.rise(g) val result = s.vertices.collect()