To get the public stream of tweets, I use Spark Streaming's TwitterUtils. See this guide for Spark 1.6.0.
The output of the Spark Streaming application eventually will be shown as graphic inside an HTML page. Basically, the hashtag count data will be passed to a NodeJS Process, which acts as a webserver, before being pushed to a web browser.
The Spark Streaming application, as well as NodeJS server, are hosted on an Amazon EC2 instance. The operating system on the instance is Amazon Linux 2. According to this forum, the OS is built with and aim to be compatible with various versions of RHEL.
First, install Node Version Manager.
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.34.0/install.sh | bash
. ~/.nvm/nvm.sh
Next, install NodeJS.
nvm install v5.8.0
Because my Spark application is written in java, JDK is required. For this application, I use JDK 8, which can be downloaded at Oracle Web site.
To download the right version of the installation file, I need to check if my system a Linux 32-bit or 64-bit?:
Using uname command, the last word x86_64 indicates that my system is Linux 64-bit.
$ uname -a
Linux ip-172-31-32-179.us-east-2.compute.internal 4.14.243-185.433.amzn2.x86_64
#1 SMP Mon Aug 9 05:55:52 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
Instead of installation from RPM Packages, which is a systemwide installation for all users, I choose to install a private version of JDK from archive binary (.tar.gz) into /home/ec2-user/jdk directory.
$ cd ~ ; mkdir jdk
$ cd jdk
$ tar zxvf jdk-8u301-linux-x64.tar.gz
$ ls
jdk1.8.0_301 jdk-8u301-linux-x64.tar.gz
$ rm *.tar.gz
Next, append these lines in ~/.bash_profile.
# Set environment variables for Java
export JAVA_HOME=/home/ec2-user/jdk/jdk1.8.0_301
export PATH=$PATH:$JAVA_HOME/bin
For this application, I use Spark 1.6.0
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-1.6.0-bin-hadoop2.6.tgz -P .
$ sudo tar zxvf ./spark-* -C /usr/local
$ sudo mv /usr/local/spark-* /usr/local/spark
Append these lines in ~/.bash_profile
# Environment variables for Spark
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
Logout and re-login. Check if installation is OK:
$ which spark-submit
/usr/local/spark/bin/spark-submit
Run these commands and visit an application web page: http://ec2-3-144-125-91.us-east-2.compute.amazonaws.com:3000/chart/.
$ git clone https://github.com/Chan-prksa/TopTag.git
$ cd TopTag/node_js/TwitChart/
$ setsid nohup node server.js &
As a side note: using setsid nohup ... & allow the node process to continue running even after the terminal is closed. nohup command alone is supposed to have this effect but it seems that Amazon Linux has an issue with nohup. People at Stackexchange Website face the same issue.
When starting the application. there will be two new processes show up in table generated by ps x
$ ps x | more
PID TTY STAT TIME COMMAND
4680 ? Ssl 0:00 node server.js
4707 ? Sl 0:14 /home/ec2-user/jdk/jdk1.8.0_301/bin/java -cp /usr/local/spark/conf
/:/usr/local/spark/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-
api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark/lib/datan
ucleus-core-3.2.10.jar -Xms1g -Xmx1g org.apache.spark.deploy.SparkSubmit --master local[4] --
class TopTagByLanguage ../../java2/out/artifacts/TweetTopTagByLanguage_jar/TweetTopTagByLanguage.jar
We can terminate both processes by using awkprogram to find their process ID and using those ID as arguments in killcommand. This can be accomplished by using command substitution:
kill -9 $(ps x | awk '/server\.js|TweetTopTagByLanguage\.jar/ { print $1}')
The awkprogram searches for names of the files being executed when starting processes. For the NodeJS process, the file name is server.js and the file name is TweetTopTagByLanguage.jar for the Apache Spark process.
$ ps x | awk '/server\.js/ { print }'
4680 ? Ssl 0:00 node server.js
$ ps x | awk '/TweetTopTagByLanguage\.jar/ { print }'
4707 ? Sl 0:51 /home/ec2-user/jdk/jdk1.8.0_301/bin/java -cp /usr/local/spark/conf/:
/usr/local/spark/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-
api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark/lib/datan
ucleus-core-3.2.10.jar -Xms1g -Xmx1g org.apache.spark.deploy.SparkSubmit --master local[4] --
class TopTagByLanguage ../../java2/out/artifacts/TweetTopTagByLanguage_jar/TweetTopTagByLanguage.jar
$ ps x | awk '/server\.js|TweetTopTagByLanguage\.jar/ { print }'
4680 ? Ssl 0:00 node server.js
4707 ? Sl 1:40 /home/ec2-user/jdk/jdk1.8.0_301/bin/java -cp /usr/local/spark/conf
/:/usr/local/spark/lib/spark-assembly-1.6.0-hadoop2.6.0.jar:/usr/local/spark/lib/datanucleus-
api-jdo-3.2.6.jar:/usr/local/spark/lib/datanucleus-rdbms-3.2.9.jar:/usr/local/spark/lib/datan
ucleus-core-3.2.10.jar -Xms1g -Xmx1g org.apache.spark.deploy.SparkSubmit --master local[4] --
class TopTagByLanguage ../../java2/out/artifacts/TweetTopTagByLanguage_jar/TweetTopTagByLanguage.jar
JavaDStream<String> words = tweets.flatMap(new FlatMapFunction<Status, String>() {
@Override
public Iterable<String> call(Status s) throws Exception {
return Arrays.asList(s.getText().split("\\s+"));
}
});
JavaDStream<String> hashTags = words.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String word) throws Exception {
return word.startsWith("#");
}
});
JavaPairDStream<String, Integer> hashTagCount = hashTags.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
// leave out the # character
return new Tuple2<String, Integer>(s, 1);
}
});
Function2<Integer, Integer, Integer> addition = new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer a, Integer b) throws Exception {
return a + b;
}
};
JavaPairDStream<String, Integer> hashTagTotals = hashTagCount
.reduceByKeyAndWindow(addition, outputWindow, outputSlide); JavaPairDStream<String,String> langWordPairs = tweets.flatMapToPair(
new PairFlatMapFunction<Status, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Status s) throws Exception {
String[] words = s.getText().split("\\s+");
ArrayList<Tuple2<String, String>> pairs = new ArrayList<Tuple2<String, String>>(words.length);
for (int i = 0; i != words.length; ++i) {
pairs.add(new Tuple2<String, String>(s.getLang(), words[i]));
}
return pairs;
}
}
);
JavaPairDStream<String,String> langAndHashTags = langWordPairs.filter(
new Function<Tuple2<String, String>, Boolean>() {
@Override
public Boolean call(Tuple2<String,String> lt) throws Exception {
return lt._2().startsWith("#");
}
});
JavaPairDStream<Tuple2<String,String>, Integer> langAndTagCounts = langAndHashTags.mapToPair(
new PairFunction<Tuple2<String, String>, Tuple2<String,String>, Integer>(){
@Override
public Tuple2<Tuple2<String,String>, Integer> call(Tuple2<String, String> lt) throws Exception {
return new Tuple2<Tuple2<String,String>, Integer>(lt, 1);
}
}
);
JavaPairDStream<Tuple2<String,String>, Integer> langAndTagTotals = langAndTagCounts
.reduceByKeyAndWindow(addition,outputWindow, outputSlide); /*
This part perform "Build priority queues, containing top-K hash tags, indexed by language"
*/
class MinQStringPair extends PriorityQueue<StringIntPair> {
int maxSize;
MinQStringPair(int maxSize) {
super(maxSize);
this.maxSize = maxSize;
}
public MinQStringPair putIntoTopK(StringIntPair newPair) {
if(size() < maxSize ) super.add(newPair);
else {
if(newPair.compareTo(super.peek()) > 0) {
super.poll();
super.add(newPair);
}
}
return this;
}
};
Function<StringIntPair, MinQStringPair> createCombiner = new Function<StringIntPair, MinQStringPair>() {
public MinQStringPair call(StringIntPair langCnt) throws Exception{
MinQStringPair minQ = new MinQStringPair(topNTags);
minQ.putIntoTopK(langCnt);
return minQ;
}
};
Function2<MinQStringPair, StringIntPair, MinQStringPair> mergeValue =
new Function2<MinQStringPair, StringIntPair, MinQStringPair>() {
public MinQStringPair call(MinQStringPair minQ, StringIntPair langCnt) throws Exception {
minQ.putIntoTopK(langCnt);
return minQ;
}
};
Function2<MinQStringPair, MinQStringPair, MinQStringPair> mergeCombiners =
new Function2<MinQStringPair, MinQStringPair, MinQStringPair>() {
public MinQStringPair call(MinQStringPair qa, MinQStringPair qb) throws Exception {
MinQStringPair qc, qd;
if(qa.size() > qb.size()) { qc = qa; qd = qb; }
else { qc = qb; qd = qa; }
while(qd.size() != 0)
qc.putIntoTopK(qd.poll());
return qc;
}
};
JavaPairDStream<String, MinQStringPair> langAndTogNTags =
langAndTagTotals2.combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(4), true);











