# CC
```java
package org.apache.flink.streaming.examples.wordcount;
// import org.apache.flink.api.common.eventtime.WatermarkGenerator;
// import org.apache.flink.api.common.eventtime.WatermarkOutput;
// import org.apache.flink.streaming.api.watermark.Watermark;
import java.time.Duration;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.MultipleParameterTool;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
DataStream<Tuple2<String, String>> watermarkedText = text
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.f0));
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(value -> value.f0)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.process(new ProcessWindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
@Override
public void process(String key, Context context, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) {
HashMap<String, Integer> scores = new HashMap<String, Integer>();
// for (Tuple2<String, Integer> in: input) {
// if (scores.containsKey(in.f0)) {
// scores.put(in.f0, scores.get(in.f0) + 1);
// } else {
// scores.put(in.f0, 1);
// }
// }
scores.put("A", 100);
scores.put("B", 200);
scores.put("C", 150);
Map<String, Integer> sorted = scores.entrySet().stream()
.sorted(Map.Entry.comparingByValue(Comparator.reverseOrder()))
.collect(Collectors.toMap(
Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1, LinkedHashMap::new));
for (HashMap.Entry<String, Integer> entry : sorted.entrySet()) {
out.collect(new Tuple2<String, Integer>(entry.getKey(), entry.getValue()));
}
}
});
```