# CC ```java package org.apache.flink.streaming.examples.wordcount; // import org.apache.flink.api.common.eventtime.WatermarkGenerator; // import org.apache.flink.api.common.eventtime.WatermarkOutput; // import org.apache.flink.streaming.api.watermark.Watermark; import java.time.Duration; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.MultipleParameterTool; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows; import org.apache.flink.streaming.api.windowing.time.Time; DataStream<Tuple2<String, String>> watermarkedText = text .assignTimestampsAndWatermarks(WatermarkStrategy .<Tuple2<String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20)) .withTimestampAssigner((event, timestamp) -> event.f0)); DataStream<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .keyBy(value -> value.f0) .window(EventTimeSessionWindows.withGap(Time.minutes(10))) .process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() { @Override public void process(String key, Context context, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) { HashMap<String, Integer> scores = new HashMap<String, Integer>(); // for (Tuple2<String, Integer> in: input) { // if (scores.containsKey(in.f0)) { // scores.put(in.f0, scores.get(in.f0) + 1); // } else { // scores.put(in.f0, 1); // } // } scores.put("A", 100); scores.put("B", 200); scores.put("C", 150); Map<String, Integer> sorted = scores.entrySet().stream() .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())) .collect(Collectors.toMap( Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new)); for (HashMap.Entry<String, Integer> entry : sorted.entrySet()) { out.collect(new Tuple2<String, Integer>(entry.getKey(), entry.getValue())); } } }); ```