画个腿快跑内置菜单
127.65 MB · 2025-11-05
在实时计算领域,很多业务逻辑天然适合“事件驱动”模式:当事件到达时触发处理、在某个时间点触发补偿或汇总、根据状态变化发出告警等。Apache Flink 为此提供了强大的 ProcessFunction 家族(KeyedProcessFunction、CoProcessFunction、BroadcastProcessFunction 等),它们在算子层面同时具备“事件处理 + 定时器 + 状态”的能力,是构建复杂流式应用的核心基石。
本文基于 Flink 1.20 的语义,带你从零理解事件驱动的编程模型,并一步步实现一个“伪窗口 PseudoWindow”示例,体会 ProcessFunction 如何代替窗口完成时间分桶、累加和触发输出。
对于如下需求,事件驱动往往比简单窗口更灵活:
ProcessFunction 能满足上述场景,因为它同时提供:
目标:按司机 driverId,每小时汇总 tip(小费)之和。我们先给出窗口版本,再给出伪窗口版本以对比两者的思路差异。
// 每小时、每个司机的提示费求和(传统事件时间翻转窗口)
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
.process(new AggregateTipsProcess());
窗口版本直观,但触发逻辑受窗口边界约束。如果我们希望完全掌控“何时触发”和“如何管理多窗口并发”,可以使用 KeyedProcessFunction:
// 使用事件驱动的 KeyedProcessFunction 替代窗口
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy((TaxiFare fare) -> fare.driverId)
.process(new PseudoWindow(Duration.ofSeconds(5)));
// 伪窗口:按事件时间把每条数据归入其所在小时段,注册窗口结束时间的定时器,定时器触发时输出该小时汇总
public static class PseudoWindow extends KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
private final long durationMsec;
// MapState<窗口结束时间, 累计 tips>
private transient MapState<Long, Float> sumOfTips;
public PseudoWindow(Duration duration) {
this.durationMsec = duration.toMillis();
}
@Override
public void open(Configuration parameters) throws Exception {
MapStateDescriptor<Long, Float> sumDesc =
new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
sumOfTips = getRuntimeContext().getMapState(sumDesc);
}
@Override
public void processElement(
TaxiFare fare,
Context ctx,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {
long eventTime = fare.getEventTime();
TimerService timerService = ctx.timerService();
// 若事件时间早于当前 Watermark,说明窗口已触发,该事件为迟到事件(按需决定丢弃或补偿)
if (eventTime <= timerService.currentWatermark()) {
// 迟到事件处理策略:可以记录指标、写侧输出、或进行补偿
return;
}
// 计算该事件所属小时窗口的“窗口结束时间”戳
long endOfWindow = eventTime - (eventTime % durationMsec) + durationMsec - 1;
// 注册事件时间定时器:当 Watermark 超过 endOfWindow 时触发 onTimer
timerService.registerEventTimeTimer(endOfWindow);
// 累加该窗口的 tips
Float sum = sumOfTips.get(endOfWindow);
if (sum == null) {
sum = 0.0F;
}
sum += fare.tip;
sumOfTips.put(endOfWindow, sum);
}
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {
// 定时器时间戳即窗口结束时间,输出 (driverId, windowEnd, sum)
Float sum = sumOfTips.get(timestamp);
if (sum != null) {
Long driverId = ctx.getCurrentKey();
out.collect(Tuple3.of(driverId, timestamp, sum));
// 输出后清理该窗口的状态,避免泄漏
sumOfTips.remove(timestamp);
}
}
}
从这个实现可以观察到:
建议:涉及业务时间逻辑时优先使用事件时间,并合理设置 Watermark 与乱序容忍度;同时可以结合处理时间定时器做后台清理或补偿任务。
在批处理场景(有界数据)中,通常可以使用单调递增或默认 Watermark 策略;在流处理场景(无界数据)中,常用“有界乱序”策略。
经验法则:能用窗口优雅解决的就用窗口;当窗口表达力不够时,考虑 ProcessFunction。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10_000);
// 示例:Kafka Source + Bounded Out-Of-Orderness Watermark
KafkaSource<TaxiFare> source = KafkaSource.<TaxiFare>builder()
.setBootstrapServers("localhost:9092")
.setTopics("fares")
.setGroupId("flink-fare-group")
.setValueOnlyDeserializer(new TaxiFareDeserializer())
.build();
DataStream<TaxiFare> fares = env.fromSource(
source,
WatermarkStrategy
.<TaxiFare>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((fare, ts) -> fare.getEventTime()),
"Kafka Fares");
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
.keyBy(f -> f.driverId)
.process(new PseudoWindow(Duration.ofSeconds(5)));
hourlyTips.print();
env.execute("Event-driven Hourly Tips");
事件驱动让你在算子层面掌控“事件处理 + 定时器 + 状态”,从而能表达超越窗口 API 的复杂业务逻辑。在 Flink 中,KeyedProcessFunction 是实现事件驱动应用的核心武器:用它来注册事件或处理时间定时器、维护键控状态、为迟到与补偿设计精细策略。恰当地选择 Watermark 策略和状态清理机制,可以在保证准确性的同时兼顾性能与资源使用。
原文来自:http://blog.daimajiangxin.com.cn
源码地址:https://gitee.com/daimajiangxin/flink-learning