|
|
@@ -0,0 +1,104 @@
|
|
|
+package org.springblade.transform;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.ldt.consts.QueueConst;
|
|
|
+import com.ldt.entity.ApiLogData;
|
|
|
+import com.ldt.entity.alarm.IPAlarm;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
|
|
|
+import org.apache.flink.api.common.functions.MapFunction;
|
|
|
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
|
|
|
+import org.apache.flink.api.common.typeinfo.Types;
|
|
|
+import org.apache.flink.api.java.tuple.Tuple3;
|
|
|
+import org.apache.flink.streaming.api.datastream.DataStream;
|
|
|
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
|
|
|
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
|
|
|
+import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
|
|
|
+import org.apache.flink.streaming.api.windowing.time.Time;
|
|
|
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
|
|
|
+import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
|
|
|
+import org.springblade.sink.mongodb.IPAlarmSink;
|
|
|
+import org.springblade.sink.mongodb.MongoSink;
|
|
|
+import org.springblade.sink.rabbitmq.ObjectJsonParser;
|
|
|
+import org.springblade.utils.RabbitUtils;
|
|
|
+
|
|
|
+import java.time.Duration;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.Optional;
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author: lianghanqiang
|
|
|
+ * @description: api 日志处理算子
|
|
|
+ * @since: 10/27/21 -- 3:12 PM
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+public class ApiLogTransformer {
|
|
|
+
|
|
|
+ public static void main(String[] args) throws Exception {
|
|
|
+
|
|
|
+
|
|
|
+ //获取执行环境
|
|
|
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
|
|
|
+
|
|
|
+ //添加 rabbitmq 数据源,获取队列中的数据
|
|
|
+ final DataStream<String> stream = env.addSource(
|
|
|
+ new RMQSource<>(
|
|
|
+ RabbitUtils.getRMQConnectionConfig(), //rabbitmq 配置环境
|
|
|
+ QueueConst.apiLog, //rabbitmq 监听队列名称
|
|
|
+ true,
|
|
|
+ new SimpleStringSchema())) //序列化规则
|
|
|
+ .setParallelism(1); //设置任务的并行数量(线程数)
|
|
|
+
|
|
|
+
|
|
|
+ //日志数据
|
|
|
+ SingleOutputStreamOperator<ApiLogData> apiLogData = stream.map(mapString -> JSON.parseObject(mapString, ApiLogData.class));
|
|
|
+
|
|
|
+ /** 高频IP访问告警 */
|
|
|
+ SingleOutputStreamOperator<IPAlarm> ipAlarms = apiLogData
|
|
|
+ .map(
|
|
|
+ data -> IPAlarm.builder().ip(data.getRemoteIp())
|
|
|
+ .createTime(Optional.ofNullable(data.getCreateTime()).orElse(new Date()).getTime())
|
|
|
+ .times(1)
|
|
|
+ .build()
|
|
|
+ )
|
|
|
+ .assignTimestampsAndWatermarks(
|
|
|
+ WatermarkStrategy
|
|
|
+ .<IPAlarm>forBoundedOutOfOrderness(Duration.ofSeconds(10))
|
|
|
+ .withTimestampAssigner((data, timestamp) -> data.getCreateTime())
|
|
|
+ )
|
|
|
+ .keyBy(data -> data.getIp())
|
|
|
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
|
|
|
+ .sum("times")
|
|
|
+ .filter(data -> data.getTimes() > 5);
|
|
|
+
|
|
|
+ ipAlarms.addSink(new RMQSink<IPAlarm>(RabbitUtils.getRMQConnectionConfig(),QueueConst.ipAlarm, new ObjectJsonParser()));
|
|
|
+ ipAlarms.addSink(new IPAlarmSink());
|
|
|
+
|
|
|
+ /** 高频接口调用告警 */
|
|
|
+ SingleOutputStreamOperator<Tuple3<String, Long, Integer>> methodAlarm = apiLogData
|
|
|
+ .map(
|
|
|
+ (MapFunction<ApiLogData, Tuple3<String, Long, Integer>>) apiLogData1 -> new Tuple3(
|
|
|
+ apiLogData1.getRequestUri(),
|
|
|
+ Optional.ofNullable(apiLogData1.getCreateTime()).orElse(new Date()).getTime(),
|
|
|
+ 1)
|
|
|
+ )
|
|
|
+ .returns(Types.TUPLE(Types.STRING, Types.LONG, Types.INT))
|
|
|
+// .assignTimestampsAndWatermarks(
|
|
|
+// WatermarkStrategy
|
|
|
+// .<Tuple3<String, Long, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(10))
|
|
|
+// .withTimestampAssigner((data, timestamp) -> data.f1)
|
|
|
+// )
|
|
|
+ .keyBy(data -> data.f0)
|
|
|
+ .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
|
|
|
+ .sum(2)
|
|
|
+ .filter(data -> data.f2 > 5);
|
|
|
+/*
|
|
|
+ ipAlarms.addSink(new RMQSink<IPAlarm>(RabbitUtils.getRMQConnectionConfig(),QueueConst.ipAlarm, new ObjectJsonParser()));
|
|
|
+ ipAlarms.addSink(new IPAlarmSink());*/
|
|
|
+
|
|
|
+ //执行job
|
|
|
+ env.execute("ApiLogJob");
|
|
|
+
|
|
|
+ }
|
|
|
+}
|