如何在kafka-streams实现两次group操作

如何通过kafka-streams实现去重后再groupBy统计频率?
使用Kafka-Streams做流计算的大家想必知道,它本身提供了 groupBy 操作可以方便我们做一些聚合计算,比如统计每分钟内每个人发出的消息数量,这个时候就可以 groupBy 用户的uid去,统计用户间互动消息频率就可以使用groupBy 用户uid和他的互动用户uid即可。
但如果现在需求是:如何统计每分钟和任一用户互动消息超过某频率的用户?

简化一下,如果,不考虑时间(Time Window)维度,上面需求简化成SQL就是类似如下:

1
2
3
4
5
select count(*), uid from (
select uid,uid+fuid,
from kafka_stream_A
group by uid+fuid
) A group by A.uid having count(*) > limit

找了很久,发现是可以实现的,KS是可以groupByKey时进行aggregate操作的,groupByKey 本身其实也是通过aggregate实现的。
简单来说,就是通过 group by uid时候,构建一个 fuid的集合去保存这样的状态,然后再对状态进行聚合,即:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class ActionRecordAggregate {
private Set<String> users = new HashSet<>();
public void add(ActionRecord rec) {
users.add(rec.getUser());
}
public int count() {
return users.size();
}
}
stream()
.map((key, val) -> KeyValue.pair(val.actionType, val))
.groupByKey()
.windowedBy(TimeWindows.of(60*1000))
.aggregate(
ActionRecordAggregate::new,
(key, value, agg) -> agg.add(value),
Materialized
.<String, ActionRecordAggregate, WindowStore<Bytes, byte[]>>as("actionTypeLookup")
.withValueSerde(getSerdeForActionRecordAggregate())
);

上面这个例子是stackoverflow上的一个问答,原文:https://stackoverflow.com/questions/51048125/apache-kafka-grouping-twice/51071663

感兴趣可以试下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class StreamJobDemo {
private static Logger log = LoggerFactory.getLogger(StreamJobDemo.class);
private static final long ONE_MINUTES_in_millis = TimeUnit.MINUTES.toMillis(1);
private static final long ignored_MINUTES_in_millis = TimeUnit.MINUTES.toMillis(500000);
public static final DateTimeFormatter dateformat = DateTimeFormatter.ofPattern("MM-dd HH:mm:ss,SSS");
public static void main(String[] args) {
String topic = "packet";
String msgKey = "InFromClient";
String group_A = "from";
String group_B = "fuid";// "request_time";
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(topic);
KStream<String, Map<String, Object>> mapped_streams = textLines
.filter((k, v) -> msgKey.equalsIgnoreCase(k)).mapValues(value -> getValFromJsonQuietly(value));
mapped_streams.map(new KeyValueMapper<String, Map<String, Object>, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, Map<String, Object> value) {
String lineKey = getOrDefault(value, group_A);
lineKey = lineKey.replaceAll("@.*", "");
String mid = getOrDefault(value, group_B);
log.info("source: key-{}->{}.", lineKey, mid);
return new KeyValue<String, String>(lineKey, mid);
}
}).groupByKey().aggregate(new Initializer<String>() {
@Override
public String apply() {
return "";
}
}, new Aggregator<String, String, String>() {
@Override
public String apply(String aggKey, String value, String aggregate) {
// use Set instead? Serdes.String()
log.info("aggr: {}-{}-{}.", aggKey, aggregate, value);
if (!aggregate.contains(value + ";")) {
aggregate = aggregate + value + ";";
}
return aggregate;
}
}, TimeWindows.of(ONE_MINUTES_in_millis).until(ONE_MINUTES_in_millis), Serdes.String(), "cnt")
.toStream().filter((key, value) -> {
log.info("filter: key-{}-{}", key, value);
return value != null && key.window().end() > System.currentTimeMillis() - ignored_MINUTES_in_millis;
}).mapValues(new ValueMapper<String, Integer>() {
@Override
public Integer apply(String value) {
return value.split(";").length;
}
}).filter((k,v)->v.intValue() > 2).foreach((key, count) -> {
log.info("grouped: date-{}, key-{}, cnt-{}",
dateformat.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(key.window().end()), ZoneId.systemDefault())),
key.key(), count);
});
KafkaStreams streams = new KafkaStreams(builder,
new StreamsConfig(ImmutableMap.<String, String>builder()
.put("application.id", "nearby-rate")
.put("bootstrap.servers", "10.136.24.103:9091")
.put("commit.interval.ms", "20000")
.put("timestamp.extractor", RQTimeStampExtra.class.getName())
.put(KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName())
.put(VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName())
.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").build()));
streams.start();
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
// return streams;
}
public static Map<String, Object> getValFromJsonQuietly(String info) {
Map<String, Object> kvInfo = Jacksons.parse(info, new TypeReference<Map<String, Object>>() {
});
if (null != kvInfo) {
return kvInfo;
} else {
return Maps.newHashMap();
}
}
public static String getOrDefault(Map<String, ?> kvInfo, String key) {
String default_key = "-1";
Object obj = kvInfo.get(key);
return (null == obj) ? default_key : obj.toString();
}
}