Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions connectors/twitter_source/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
*.class
*.log

# sbt specific
.cache
.history
.lib/
dist/*
target/
lib_managed/
src_managed/
project/boot/
project/plugins/project/

# Scala-IDE specific
.scala_dependencies
.worksheet
.idea
18 changes: 18 additions & 0 deletions connectors/twitter_source/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#Word Count in Scala
===================
Get Vagrant up and running from parent dir:

$ ./bootstrap_vagrant.sh
$ vagrant ssh
...
vagrant@vagrant-ubuntu-trusty-64:~/$ cd /vagrant/scala/


Run pre-package script to build JAR:

$ ./pre_package_concord.bash

Open deployment directory and start Concord operators

$ cd target/concord/

29 changes: 29 additions & 0 deletions connectors/twitter_source/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
name := "twitter_source"

version := "0.0.1"

scalaVersion := "2.11.0"

libraryDependencies ++= Seq(
"io.concord" % "concord" % "0.1.2",
"io.concord" % "rawapi" % "0.2.5",
"org.twitter4j" % "twitter4j-core" % "3.0.3",
"org.twitter4j" % "twitter4j-stream" % "3.0.3",
"ch.qos.logback" % "logback-classic" % "1.1.7",
"com.typesafe.scala-logging" %% "scala-logging" % "3.4.0"
)



assemblyMergeStrategy in assembly := {
case x if x.endsWith("project.clj") => MergeStrategy.discard
case x if x.toLowerCase.startsWith("meta-inf") => MergeStrategy.discard
case _ => MergeStrategy.first
}


resolvers += Resolver.sonatypeRepo("public")

resolvers += "clojars" at "https://clojars.org/repo"

resolvers += "conjars" at "http://conjars.org/repo"
1 change: 1 addition & 0 deletions connectors/twitter_source/project/assembly.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
1 change: 1 addition & 0 deletions connectors/twitter_source/project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=0.13.5
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.concord.twitter
import java.util.{ HashSet => MutableHashSet}
import io.concord._
import io.concord.swift._

// todo(agallego) - fix and scope this a little bit more
import twitter4j._

class ConcordTwitterStream extends Computation with LazyLogging {
override def init(ctx: ComputationContext): Unit = {
logger.info(s"${this.getClass.getSimpleName} initialized")
ctx.setTimer("loop", System.currentTimeMillis())
}
override def destroy(): Unit = {
logger.info(s"${this.getClass.getSimpleName} destructing")
}
override def processRecord(ctx: ComputationContext, record: Record) = ???

override def processTimer(ctx: ComputationContext,
key: String, time: Long): Unit = {


ctx.setTimer(key, System.currentTimeMillis())
}

override def metadata(): Metadata = {
val ostreams = new MutableHashSet[String](java.util.Arrays.asList("tweets"))
new Metadata("concord-twitter-stream",
new MutableHashSet[StreamTuple](),
ostreams)
}

}

object ConcordTwitterStream {
def main(args: Array[String]): Unit = {
ServeComputation.serve(new ConcordTwitterStream())
}
}