1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| public class StreamJobDemo { private static Logger log = LoggerFactory.getLogger(StreamJobDemo.class);
private static final long ONE_MINUTES_in_millis = TimeUnit.MINUTES.toMillis(1); private static final long ignored_MINUTES_in_millis = TimeUnit.MINUTES.toMillis(500000); public static final DateTimeFormatter dateformat = DateTimeFormatter.ofPattern("MM-dd HH:mm:ss,SSS");
public static void main(String[] args) { String topic = "packet"; String msgKey = "InFromClient"; String group_A = "from"; String group_B = "fuid"; KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> textLines = builder.stream(topic); KStream<String, Map<String, Object>> mapped_streams = textLines .filter((k, v) -> msgKey.equalsIgnoreCase(k)).mapValues(value -> getValFromJsonQuietly(value));
mapped_streams.map(new KeyValueMapper<String, Map<String, Object>, KeyValue<String, String>>() { @Override public KeyValue<String, String> apply(String key, Map<String, Object> value) { String lineKey = getOrDefault(value, group_A); lineKey = lineKey.replaceAll("@.*", ""); String mid = getOrDefault(value, group_B); log.info("source: key-{}->{}.", lineKey, mid); return new KeyValue<String, String>(lineKey, mid); } }).groupByKey().aggregate(new Initializer<String>() { @Override public String apply() { return ""; } }, new Aggregator<String, String, String>() { @Override public String apply(String aggKey, String value, String aggregate) { log.info("aggr: {}-{}-{}.", aggKey, aggregate, value); if (!aggregate.contains(value + ";")) { aggregate = aggregate + value + ";"; } return aggregate; } }, TimeWindows.of(ONE_MINUTES_in_millis).until(ONE_MINUTES_in_millis), Serdes.String(), "cnt") .toStream().filter((key, value) -> { log.info("filter: key-{}-{}", key, value); return value != null && key.window().end() > System.currentTimeMillis() - ignored_MINUTES_in_millis; }).mapValues(new ValueMapper<String, Integer>() { @Override public Integer apply(String value) { return value.split(";").length; } }).filter((k,v)->v.intValue() > 2).foreach((key, count) -> { log.info("grouped: date-{}, key-{}, cnt-{}", dateformat.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(key.window().end()), ZoneId.systemDefault())), key.key(), count); });
KafkaStreams streams = new KafkaStreams(builder, new StreamsConfig(ImmutableMap.<String, String>builder() .put("application.id", "nearby-rate") .put("bootstrap.servers", "10.136.24.103:9091") .put("commit.interval.ms", "20000") .put("timestamp.extractor", RQTimeStampExtra.class.getName()) .put(KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()) .put(VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()) .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").build())); streams.start(); Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } public static Map<String, Object> getValFromJsonQuietly(String info) { Map<String, Object> kvInfo = Jacksons.parse(info, new TypeReference<Map<String, Object>>() { }); if (null != kvInfo) { return kvInfo; } else { return Maps.newHashMap(); } }
public static String getOrDefault(Map<String, ?> kvInfo, String key) { String default_key = "-1"; Object obj = kvInfo.get(key); return (null == obj) ? default_key : obj.toString(); } }
|