From 103b310bf876c9e08c09b5240a9af67ca7b5a9e0 Mon Sep 17 00:00:00 2001 From: Alexander Gallego Date: Wed, 10 Aug 2016 16:43:06 -0400 Subject: [PATCH 1/2] Initial checking for twitter datasource --- connectors/twitter_source/.gitignore | 18 +++++++++ connectors/twitter_source/README.md | 18 +++++++++ connectors/twitter_source/build.sbt | 26 +++++++++++++ .../twitter_source/project/assembly.sbt | 1 + .../io/concord/twitter/TweeterStream.scala | 39 +++++++++++++++++++ 5 files changed, 102 insertions(+) create mode 100644 connectors/twitter_source/.gitignore create mode 100644 connectors/twitter_source/README.md create mode 100644 connectors/twitter_source/build.sbt create mode 100644 connectors/twitter_source/project/assembly.sbt create mode 100644 connectors/twitter_source/src/main/scala/io/concord/twitter/TweeterStream.scala diff --git a/connectors/twitter_source/.gitignore b/connectors/twitter_source/.gitignore new file mode 100644 index 0000000..1310ac3 --- /dev/null +++ b/connectors/twitter_source/.gitignore @@ -0,0 +1,18 @@ +*.class +*.log + +# sbt specific +.cache +.history +.lib/ +dist/* +target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ + +# Scala-IDE specific +.scala_dependencies +.worksheet +.idea diff --git a/connectors/twitter_source/README.md b/connectors/twitter_source/README.md new file mode 100644 index 0000000..cb7f397 --- /dev/null +++ b/connectors/twitter_source/README.md @@ -0,0 +1,18 @@ +#Word Count in Scala +=================== +Get Vagrant up and running from parent dir: + + $ ./bootstrap_vagrant.sh + $ vagrant ssh + ... + vagrant@vagrant-ubuntu-trusty-64:~/$ cd /vagrant/scala/ + + +Run pre-package script to build JAR: + + $ ./pre_package_concord.bash + +Open deployment directory and start Concord operators + + $ cd target/concord/ + \ No newline at end of file diff --git a/connectors/twitter_source/build.sbt b/connectors/twitter_source/build.sbt new file mode 100644 index 0000000..68e2a40 --- /dev/null +++ b/connectors/twitter_source/build.sbt @@ -0,0 +1,26 @@ +name := "getting_started" + +version := "1.0.1" + +scalaVersion := "2.11.6" + +scalacOptions ++= Seq("-feature", "-language:higherKinds") + +libraryDependencies ++= Seq( + "io.concord" % "concord" % "0.1.2", + "io.concord" % "rawapi" % "0.2.5" +) + + +assemblyMergeStrategy in assembly := { + case x if x.endsWith("project.clj") => MergeStrategy.discard // Leiningen build files + case x if x.toLowerCase.startsWith("meta-inf") => MergeStrategy.discard // More bumf + case _ => MergeStrategy.first +} + + +resolvers += Resolver.sonatypeRepo("public") + +resolvers += "clojars" at "https://clojars.org/repo" + +resolvers += "conjars" at "http://conjars.org/repo" diff --git a/connectors/twitter_source/project/assembly.sbt b/connectors/twitter_source/project/assembly.sbt new file mode 100644 index 0000000..a815d58 --- /dev/null +++ b/connectors/twitter_source/project/assembly.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0") diff --git a/connectors/twitter_source/src/main/scala/io/concord/twitter/TweeterStream.scala b/connectors/twitter_source/src/main/scala/io/concord/twitter/TweeterStream.scala new file mode 100644 index 0000000..633cd9a --- /dev/null +++ b/connectors/twitter_source/src/main/scala/io/concord/twitter/TweeterStream.scala @@ -0,0 +1,39 @@ +package io.concord.twitter +import java.util.{ HashSet => MutableHashSet} +import io.concord._ +import io.concord.swift._ + +class SentenceGenerator extends Computation { + override def init(ctx: ComputationContext): Unit = { + println(s"${this.getClass.getSimpleName} initialized") + ctx.setTimer("loop", System.currentTimeMillis()) + } + + override def destroy(): Unit = { + println(s"${this.getClass.getSimpleName} destructing") + } + + override def processRecord(ctx: ComputationContext, record: Record): Unit = ??? + + override def processTimer(ctx: ComputationContext, key: String, time: Long): Unit = { + // Stream, key, value. Empty value, no need for val + Range(0, 10000).foreach { + i => ctx.produceRecord("sentences".getBytes, sample().getBytes, "-".getBytes) + } + + ctx.setTimer(key, System.currentTimeMillis()) + } + + override def metadata(): Metadata = { + val ostreams = new MutableHashSet[String](java.util.Arrays.asList("sentences")) + new Metadata("sentence-generator", new MutableHashSet[StreamTuple](), ostreams) + } + +} + + +object SentenceGenerator { + def main(args: Array[String]): Unit = { + ServeComputation.serve(new SentenceGenerator()) + } +} From 590b117b83affcdaedfd509aef6aa90a8d606c74 Mon Sep 17 00:00:00 2001 From: Alexander Gallego Date: Wed, 10 Aug 2016 18:17:46 -0400 Subject: [PATCH 2/2] Not compiling but at least project loads w/ sbt paulp's sbt https://github.com/paulp/sbt-extras works w/ fedora23 and it's much much easier to user reliably. --- connectors/twitter_source/build.sbt | 19 +++++---- .../twitter_source/project/assembly.sbt | 2 +- .../twitter_source/project/build.properties | 1 + .../twitter/ConcordTweeterStream.scala | 39 +++++++++++++++++++ .../io/concord/twitter/TweeterStream.scala | 39 ------------------- 5 files changed, 52 insertions(+), 48 deletions(-) create mode 100644 connectors/twitter_source/project/build.properties create mode 100644 connectors/twitter_source/src/main/scala/io/concord/twitter/ConcordTweeterStream.scala delete mode 100644 connectors/twitter_source/src/main/scala/io/concord/twitter/TweeterStream.scala diff --git a/connectors/twitter_source/build.sbt b/connectors/twitter_source/build.sbt index 68e2a40..d031f90 100644 --- a/connectors/twitter_source/build.sbt +++ b/connectors/twitter_source/build.sbt @@ -1,20 +1,23 @@ -name := "getting_started" +name := "twitter_source" -version := "1.0.1" +version := "0.0.1" -scalaVersion := "2.11.6" - -scalacOptions ++= Seq("-feature", "-language:higherKinds") +scalaVersion := "2.11.0" libraryDependencies ++= Seq( "io.concord" % "concord" % "0.1.2", - "io.concord" % "rawapi" % "0.2.5" + "io.concord" % "rawapi" % "0.2.5", + "org.twitter4j" % "twitter4j-core" % "3.0.3", + "org.twitter4j" % "twitter4j-stream" % "3.0.3", + "ch.qos.logback" % "logback-classic" % "1.1.7", + "com.typesafe.scala-logging" %% "scala-logging" % "3.4.0" ) + assemblyMergeStrategy in assembly := { - case x if x.endsWith("project.clj") => MergeStrategy.discard // Leiningen build files - case x if x.toLowerCase.startsWith("meta-inf") => MergeStrategy.discard // More bumf + case x if x.endsWith("project.clj") => MergeStrategy.discard + case x if x.toLowerCase.startsWith("meta-inf") => MergeStrategy.discard case _ => MergeStrategy.first } diff --git a/connectors/twitter_source/project/assembly.sbt b/connectors/twitter_source/project/assembly.sbt index a815d58..39c1bb8 100644 --- a/connectors/twitter_source/project/assembly.sbt +++ b/connectors/twitter_source/project/assembly.sbt @@ -1 +1 @@ -addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.0") +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") diff --git a/connectors/twitter_source/project/build.properties b/connectors/twitter_source/project/build.properties new file mode 100644 index 0000000..be6c454 --- /dev/null +++ b/connectors/twitter_source/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.5 diff --git a/connectors/twitter_source/src/main/scala/io/concord/twitter/ConcordTweeterStream.scala b/connectors/twitter_source/src/main/scala/io/concord/twitter/ConcordTweeterStream.scala new file mode 100644 index 0000000..55b2861 --- /dev/null +++ b/connectors/twitter_source/src/main/scala/io/concord/twitter/ConcordTweeterStream.scala @@ -0,0 +1,39 @@ +package io.concord.twitter +import java.util.{ HashSet => MutableHashSet} +import io.concord._ +import io.concord.swift._ + +// todo(agallego) - fix and scope this a little bit more +import twitter4j._ + +class ConcordTwitterStream extends Computation with LazyLogging { + override def init(ctx: ComputationContext): Unit = { + logger.info(s"${this.getClass.getSimpleName} initialized") + ctx.setTimer("loop", System.currentTimeMillis()) + } + override def destroy(): Unit = { + logger.info(s"${this.getClass.getSimpleName} destructing") + } + override def processRecord(ctx: ComputationContext, record: Record) = ??? + + override def processTimer(ctx: ComputationContext, + key: String, time: Long): Unit = { + + + ctx.setTimer(key, System.currentTimeMillis()) + } + + override def metadata(): Metadata = { + val ostreams = new MutableHashSet[String](java.util.Arrays.asList("tweets")) + new Metadata("concord-twitter-stream", + new MutableHashSet[StreamTuple](), + ostreams) + } + +} + +object ConcordTwitterStream { + def main(args: Array[String]): Unit = { + ServeComputation.serve(new ConcordTwitterStream()) + } +} diff --git a/connectors/twitter_source/src/main/scala/io/concord/twitter/TweeterStream.scala b/connectors/twitter_source/src/main/scala/io/concord/twitter/TweeterStream.scala deleted file mode 100644 index 633cd9a..0000000 --- a/connectors/twitter_source/src/main/scala/io/concord/twitter/TweeterStream.scala +++ /dev/null @@ -1,39 +0,0 @@ -package io.concord.twitter -import java.util.{ HashSet => MutableHashSet} -import io.concord._ -import io.concord.swift._ - -class SentenceGenerator extends Computation { - override def init(ctx: ComputationContext): Unit = { - println(s"${this.getClass.getSimpleName} initialized") - ctx.setTimer("loop", System.currentTimeMillis()) - } - - override def destroy(): Unit = { - println(s"${this.getClass.getSimpleName} destructing") - } - - override def processRecord(ctx: ComputationContext, record: Record): Unit = ??? - - override def processTimer(ctx: ComputationContext, key: String, time: Long): Unit = { - // Stream, key, value. Empty value, no need for val - Range(0, 10000).foreach { - i => ctx.produceRecord("sentences".getBytes, sample().getBytes, "-".getBytes) - } - - ctx.setTimer(key, System.currentTimeMillis()) - } - - override def metadata(): Metadata = { - val ostreams = new MutableHashSet[String](java.util.Arrays.asList("sentences")) - new Metadata("sentence-generator", new MutableHashSet[StreamTuple](), ostreams) - } - -} - - -object SentenceGenerator { - def main(args: Array[String]): Unit = { - ServeComputation.serve(new SentenceGenerator()) - } -}