diff --git a/connectors/twitter_source/.gitignore b/connectors/twitter_source/.gitignore new file mode 100644 index 0000000..1310ac3 --- /dev/null +++ b/connectors/twitter_source/.gitignore @@ -0,0 +1,18 @@ +*.class +*.log + +# sbt specific +.cache +.history +.lib/ +dist/* +target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ + +# Scala-IDE specific +.scala_dependencies +.worksheet +.idea diff --git a/connectors/twitter_source/README.md b/connectors/twitter_source/README.md new file mode 100644 index 0000000..cb7f397 --- /dev/null +++ b/connectors/twitter_source/README.md @@ -0,0 +1,18 @@ +#Word Count in Scala +=================== +Get Vagrant up and running from parent dir: + + $ ./bootstrap_vagrant.sh + $ vagrant ssh + ... + vagrant@vagrant-ubuntu-trusty-64:~/$ cd /vagrant/scala/ + + +Run pre-package script to build JAR: + + $ ./pre_package_concord.bash + +Open deployment directory and start Concord operators + + $ cd target/concord/ + \ No newline at end of file diff --git a/connectors/twitter_source/build.sbt b/connectors/twitter_source/build.sbt new file mode 100644 index 0000000..d031f90 --- /dev/null +++ b/connectors/twitter_source/build.sbt @@ -0,0 +1,29 @@ +name := "twitter_source" + +version := "0.0.1" + +scalaVersion := "2.11.0" + +libraryDependencies ++= Seq( + "io.concord" % "concord" % "0.1.2", + "io.concord" % "rawapi" % "0.2.5", + "org.twitter4j" % "twitter4j-core" % "3.0.3", + "org.twitter4j" % "twitter4j-stream" % "3.0.3", + "ch.qos.logback" % "logback-classic" % "1.1.7", + "com.typesafe.scala-logging" %% "scala-logging" % "3.4.0" +) + + + +assemblyMergeStrategy in assembly := { + case x if x.endsWith("project.clj") => MergeStrategy.discard + case x if x.toLowerCase.startsWith("meta-inf") => MergeStrategy.discard + case _ => MergeStrategy.first +} + + +resolvers += Resolver.sonatypeRepo("public") + +resolvers += "clojars" at "https://clojars.org/repo" + +resolvers += "conjars" at "http://conjars.org/repo" diff --git a/connectors/twitter_source/project/assembly.sbt b/connectors/twitter_source/project/assembly.sbt new file mode 100644 index 0000000..39c1bb8 --- /dev/null +++ b/connectors/twitter_source/project/assembly.sbt @@ -0,0 +1 @@ +addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3") diff --git a/connectors/twitter_source/project/build.properties b/connectors/twitter_source/project/build.properties new file mode 100644 index 0000000..be6c454 --- /dev/null +++ b/connectors/twitter_source/project/build.properties @@ -0,0 +1 @@ +sbt.version=0.13.5 diff --git a/connectors/twitter_source/src/main/scala/io/concord/twitter/ConcordTweeterStream.scala b/connectors/twitter_source/src/main/scala/io/concord/twitter/ConcordTweeterStream.scala new file mode 100644 index 0000000..55b2861 --- /dev/null +++ b/connectors/twitter_source/src/main/scala/io/concord/twitter/ConcordTweeterStream.scala @@ -0,0 +1,39 @@ +package io.concord.twitter +import java.util.{ HashSet => MutableHashSet} +import io.concord._ +import io.concord.swift._ + +// todo(agallego) - fix and scope this a little bit more +import twitter4j._ + +class ConcordTwitterStream extends Computation with LazyLogging { + override def init(ctx: ComputationContext): Unit = { + logger.info(s"${this.getClass.getSimpleName} initialized") + ctx.setTimer("loop", System.currentTimeMillis()) + } + override def destroy(): Unit = { + logger.info(s"${this.getClass.getSimpleName} destructing") + } + override def processRecord(ctx: ComputationContext, record: Record) = ??? + + override def processTimer(ctx: ComputationContext, + key: String, time: Long): Unit = { + + + ctx.setTimer(key, System.currentTimeMillis()) + } + + override def metadata(): Metadata = { + val ostreams = new MutableHashSet[String](java.util.Arrays.asList("tweets")) + new Metadata("concord-twitter-stream", + new MutableHashSet[StreamTuple](), + ostreams) + } + +} + +object ConcordTwitterStream { + def main(args: Array[String]): Unit = { + ServeComputation.serve(new ConcordTwitterStream()) + } +}