diff --git a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/mailCount/MailCount.scala b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/mailCount/MailCount.scala index 785499d..ef566b4 100644 --- a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/mailCount/MailCount.scala +++ b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/mailCount/MailCount.scala @@ -53,9 +53,11 @@ object MailCount { // extract month from time string m._1.substring(0, 7), // extract email address from sender - m._2.substring(m._2.lastIndexOf("<") + 1, m._2.length - 1) ) } + m._2.substring(m._2.lastIndexOf("<") + 1, m._2.length - 1), + // add counter to each record + 1) } // group by month and sender and count the number of records per group - .groupBy(0, 1).reduceGroup { ms => ms.foldLeft(("","",0))( (c, m) => (m._1, m._2, c._3+1)) } + .groupBy(0, 1).sum(2) // print the result .print diff --git a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/replyGraph/ReplyGraph.scala b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/replyGraph/ReplyGraph.scala index 7323211..43d1498 100644 --- a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/replyGraph/ReplyGraph.scala +++ b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/replyGraph/ReplyGraph.scala @@ -34,8 +34,8 @@ object ReplyGraph { def main(args: Array[String]) { // parse parameters - val params = ParameterTool.fromArgs(args); - val input = params.getRequired("input"); + val params = ParameterTool.fromArgs(args) + val input = params.getRequired("input") // set up the execution environment val env = ExecutionEnvironment.getExecutionEnvironment @@ -59,11 +59,11 @@ object ReplyGraph { // compute reply connections by joining on messageId and reply-to val replyConnections = addressMails - .join(addressMails).where(2).equalTo(0) { (l,r) => (l._2, r._2) } + .join(addressMails).where(2).equalTo(0) { (l,r) => (l._2, r._2, 1) } // count connections for each pair of addresses replyConnections - .groupBy(0,1).reduceGroup( cs => cs.foldLeft(("","",0))( (l,r) => (r._1, r._2, l._3+1) ) ) + .groupBy(0,1).sum(2) .print } diff --git a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala index d51c199..8e5e62b 100644 --- a/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala +++ b/flink-exercises/src/main/scala/com/dataArtisans/flinkTraining/exercises/dataSetScala/tfIdf/MailTFIDF.scala @@ -18,17 +18,14 @@ package com.dataArtisans.flinkTraining.exercises.dataSetScala.tfIdf -import java.util.StringTokenizer import java.util.regex.Pattern import com.dataArtisans.flinkTraining.dataSetPreparation.MBoxParser -import org.apache.flink.api.common.functions.{FlatMapFunction} +import org.apache.flink.api.common.functions.FlatMapFunction import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.api.scala._ import org.apache.flink.util.Collector -import scala.collection.mutable.{HashMap, HashSet} - /** * Scala reference implementation for the "TF-IDF" exercise of the Flink training. * The task of the exercise is to compute the TF-IDF score for words in mails of the @@ -40,137 +37,110 @@ import scala.collection.mutable.{HashMap, HashSet} */ object MailTFIDF { - val STOP_WORDS: Array[String] = Array ( - "the", "i", "a", "an", "at", "are", "am", "for", "and", "or", "is", "there", "it", "this", - "that", "on", "was", "by", "of", "to", "in", "to", "message", "not", "be", "with", "you", - "have", "as", "can") - def main(args: Array[String]) { - // parse paramters - val params = ParameterTool.fromArgs(args); - val input = params.getRequired("input"); + // parse parameters + val params = ParameterTool.fromArgs(args) + val input = params.getRequired("input") + + val stopWords = List( + "the", "i", "a", "an", "at", "are", "am", "for", "and", "or", "is", "there", "it", "this", + "that", "on", "was", "by", "of", "to", "in", "to", "message", "not", "be", "with", "you", + "have", "as", "can") + + // pattern for recognizing acceptable 'word' + val wordPattern: Pattern = Pattern.compile("(\\p{Alpha})+") // set up the execution environment val env = ExecutionEnvironment.getExecutionEnvironment // read messageId and body field of the input data val mails = env.readCsvFile[(String, String)]( - "/users/fhueske/data/flinkdevlistparsed/", + input, lineDelimiter = MBoxParser.MAIL_RECORD_DELIM, fieldDelimiter = MBoxParser.MAIL_FIELD_DELIM, includedFields = Array(0,4) ) // count mails in data set - val mailCnt = mails.count + val mailCnt = mails.count() // compute term-frequency (TF) - val tf = mails - .flatMap(new TFComputer(STOP_WORDS)) + val tf = mails.flatMap(new TFComputer(stopWords, wordPattern)) // compute document frequency (number of mails that contain a word at least once) - val df = mails - // extract unique words from mails - .flatMap(new UniqueWordExtractor(STOP_WORDS)) - // count number of mails for each word - .groupBy(0).reduce { (l,r) => (l._1, l._2 + r._2) } - + val df = mails.flatMap(new UniqueWordExtractor(stopWords, wordPattern)) + // group by the words + .groupBy(0) + // count the number of documents in each group (df) + .sum(1) + // compute TF-IDF score from TF, DF, and total number of mails - val tfidf = tf.join(df).where(1).equalTo(0) - { (l, r) => (l._1, l._2, l._3 * (mailCnt.toDouble / r._2) ) } + val tfidf = tf + .join(df) + // where "word" from tf + .where(1) + // is equal "word" from df + .equalTo(0) { + (l, r) => (l._1, l._2, l._3 * (mailCnt.toDouble / r._2)) + } // print the result tfidf - .print + .print() } /** - * Computes the frequency of each word in a mail. - * Words consist only of alphabetical characters. Frequent words (stop words) are filtered out. + * extract list of unique words in each document * - * @param stopWordsA Array of words that are filtered out. + * @param stopWords a list of stop words that should be omitted + * @param wordPattern pattern that defines a word */ - class TFComputer(stopWordsA: Array[String]) - extends FlatMapFunction[(String, String), (String, String, Int)] { - - val stopWords: HashSet[String] = new HashSet[String] - val wordCounts: HashMap[String, Int] = new HashMap[String, Int] - // initialize word pattern match for sequences of alphabetical characters - val wordPattern: Pattern = Pattern.compile("(\\p{Alpha})+") - - // initialize set of stop words - for(sw <- stopWordsA) { - this.stopWords.add(sw) + class UniqueWordExtractor(stopWords: List[String], wordPattern: Pattern) + extends FlatMapFunction[(String, String), (String, Int)] { + + def flatMap(mail: (String, String), out: Collector[(String, Int)]): Unit = { + val output = mail._2.toLowerCase + // split the body + .split(Array(' ', '\t', '\n', '\r', '\f')) + // filter out stop words and non-words + .filter(w => !stopWords.contains(w) && wordPattern.matcher(w).matches()) + // count the number of occurrences of a word in each document + .distinct + // emit every word that appeared in a document + output.foreach(m => out.collect(m, 1)) } - override def flatMap(t: (String, String), out: Collector[(String, String, Int)]): Unit = { - // clear word counts - wordCounts.clear - - // split mail along whitespaces - val tokens = new StringTokenizer(t._2) - // for each word candidate - while (tokens.hasMoreTokens) { - // normalize word to lower case - val word = tokens.nextToken.toLowerCase - if (!stopWords.contains(word) && wordPattern.matcher(word).matches) { - // word candidate is not a stop word and matches the word pattern - // increase word count - val cnt = wordCounts.getOrElse(word, 0) - wordCounts.put(word, cnt+1) - } - } - // emit all word counts per document and word - for (wc <- wordCounts.iterator) { - out.collect( (t._1, wc._1, wc._2) ) - } - } } /** - * Extracts the unique words in a mail. - * Words consist only of alphabetical characters. Frequent words (stop words) are filtered out. + * Calculate term frequency for each word per document * - * @param stopWordsA Array of words that are filtered out. + * @param stopWords a list of stop words that should be omitted + * @param wordPattern pattern that defines a word */ - class UniqueWordExtractor(stopWordsA: Array[String]) - extends FlatMapFunction[(String, String), (String, Int) ] { - - val stopWords: HashSet[String] = new HashSet[String] - val uniqueWords: HashSet[String] = new HashSet[String] - // initalize pattern to match words - val wordPattern: Pattern = Pattern.compile("(\\p{Alpha})+") - - // initialize set of stop words - for(sw <- stopWordsA) { - this.stopWords.add(sw) - } - - override def flatMap(t: (String, String), out: Collector[(String, Int)]): Unit = { - // clear unique words - uniqueWords.clear() - - // split mail along whitespaces - val tokens = new StringTokenizer(t._2) - // for each word candidate - while(tokens.hasMoreTokens) { - // normalize word to lower case - val word = tokens.nextToken.toLowerCase - if (!stopWords.contains(word) && wordPattern.matcher(word).matches) { - // word candiate is not a stop word and matches the word pattern - uniqueWords.add(word) - } - } + class TFComputer(stopWords: List[String], wordPattern: Pattern) + extends FlatMapFunction[(String, String), (String, String, Int)] { - // emit all words that occurred at least once - for(w <- uniqueWords) { - out.collect( (w, 1) ) + def flatMap(mail: (String, String), out: Collector[(String, String, Int)]): Unit = { + // extract email id + val id = mail._1 + val output = mail._2.toLowerCase + // split the body + .split(Array(' ', '\t', '\n', '\r', '\f')) + // filter out stop words and non-words + .filter(w => !stopWords.contains(w) && wordPattern.matcher(w).matches()) + // count the number of occurrences of a word in each document + .map(m => (m, 1)).groupBy(_._1).map { + case (item, count) => (item, count.foldLeft(0)(_ + _._2)) } + // emit the document id, a term, and its number of occurrences in the document + output.foreach(m => out.collect(id, m._1, m._2)) } + } - } +}