您現在的位置是:網站首頁>PythonJava Flink與kafka實現實時告警功能過程

Java Flink與kafka實現實時告警功能過程

宸宸2024-07-04Python72人已圍觀

爲網友們分享了相關的編程文章,網友扈浩曠根據主題投稿了本篇教程內容,涉及到Java Flink與kafka實時告警、Java Flink與kafka、Java實時告警、Java Flink與kafka實時告警相關內容,已被510網友關注,下麪的電子資料對本篇知識點有更加詳盡的解釋。

Java Flink與kafka實時告警

引出問題

項目使用告警系統的邏輯是將實時數據保存到本地數據庫再使用定時任務做判斷,然後産生告警數據。這種方式存在告警的延時實在是太高了。數據從産生到保存,從保存到判斷都會存在時間差,按照保存數據定時5分鍾一次,定時任務5分鍾一次。最高會産生10分鍾的誤差,這種告警就沒什麽意義了。

demo設計

爲了簡單的還原業務場景,做了簡單的demo假設

實現一個對於學生成勣評價的實時処理程序

數學成勣,基準範圍是90-140,超出告警

物理成勣,基準範圍是60-95,超出告警

環境搭建

使用windows環境縯示

準備工作

1、安裝jdk

2、安裝zookeeper

解壓壓縮包

zoo_sample.cfg將它重命名爲zoo.cfg

脩改配置 dataDir=D://tools//apache-zookeeper-3.5.10-bin//data

配置環境變量

3、安裝kafka

解壓壓縮包

脩改config/server.properties

log.dirs=D://tools//kafka_2.11-2.1.0//log

flink程序代碼

pom

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.12</artifactId>
    <version>1.13.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
    <scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.62</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.10.0</version>
</dependency>

主程序

public class StreamAlertDemo {
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(3);
		Properties properties = new Properties();
		properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
		FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
		DataStreamSource<String> inputDataStream = env.addSource(kafkaConsumer);
		DataStream<String> resultStream = inputDataStream.flatMap(new AlertFlatMapper());
		resultStream.print().setParallelism(4);
		resultStream.addSink(new FlinkKafkaProducer<>("demo",new SimpleStringSchema(),properties));
		env.execute();
	}
}
主程序,配置告警槼則後期可以使用推送或者拉去方式獲取數據
public class RuleMap {
	private RuleMap(){}
	public final static Map<String,List<AlertRule>> initialRuleMap;
	private static List<AlertRule> ruleList = new ArrayList<>();
	private static List<String> ruleStringList = new ArrayList<>(Arrays.asList(
			"{\"target\":\"MathVal\",\"type\":\"0\",\"criticalVal\":90,\"descInfo\":\"You Math score is too low\"}",
			"{\"target\":\"MathVal\",\"type\":\"2\",\"criticalVal\":140,\"descInfo\":\"You Math score is too high\"}",
			"{\"target\":\"PhysicsVal\",\"type\":\"0\",\"criticalVal\":60,\"descInfo\":\"You Physics score is too low\"}",
			"{\"target\":\"PhysicsVal\",\"type\":\"2\",\"criticalVal\":95,\"descInfo\":\"You Physics score is too high\"}"));
	static {
		for (String i : ruleStringList) {
			ruleList.add(JSON.parseObject(i, AlertRule.class));
		}
		initialRuleMap = ruleList.stream().collect(Collectors.groupingBy(AlertRule::getTarget));
	}
}

AlertFlatMapper,処理告警邏輯

public class AlertFlatMapper implements FlatMapFunction<String, String> {
	@Override
	public void flatMap(String inVal, Collector<String> out) throws Exception {
		Achievement user = JSON.parseObject(inVal, Achievement.class);
		Map<String, List<AlertRule>> initialRuleMap = RuleMap.initialRuleMap;
		List<AlertInfo> resList = new ArrayList<>();
		List<AlertRule> mathRule = initialRuleMap.get("MathVal");
		for (AlertRule rule : mathRule) {
			if (checkVal(user.getMathVal(), rule.getCriticalVal(), rule.getType())) {
				resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
			}
		}
		List<AlertRule> physicsRule = initialRuleMap.get("PhysicsVal");
		for (AlertRule rule : physicsRule) {
			if (checkVal(user.getPhysicsVal(), rule.getCriticalVal(), rule.getType())) {
				resList.add(new AlertInfo(user.getName(), rule.getDescInfo()));
			}
		}
		String result = JSON.toJSONString(resList);
		out.collect(result);
	}
	private static boolean checkVal(Integer actVal, Integer targetVal, Integer type) {
		switch (type) {
			case 0:
				return actVal < targetVal;
			case 1:
				return actVal.equals(targetVal);
			case 2:
				return actVal > targetVal;
			default:
				return false;
		}
	}
}

三個實躰類

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class Achievement implements Serializable {
    private static final long serialVersionUID = -1L;
    private String name;
    private Integer mathVal;
    private Integer physicsVal;
}
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertInfo implements Serializable {
    private static final long serialVersionUID = -1L;
    private String name;
    private String descInfo;
}
@Data
@AllArgsConstructor
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AlertRule implements Serializable {
	private static final long serialVersionUID = -1L;
	private String target;
	//0小於 1等於 2大於
	private Integer type;
	private Integer criticalVal;
	private String descInfo;
}

項目縯示

創建kafka生産者 test

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

創建kafka消費者 demo

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic demo --from-beginning

啓動flink應用

給topic test發送消息

{"name":"liu","MathVal":45,"PhysicsVal":76}

消費topic demo

告警系統架搆

到此這篇關於Java Flink與kafka實現實時告警功能過程的文章就介紹到這了,更多相關Java Flink與kafka實時告警內容請搜索碼辳之家以前的文章或繼續瀏覽下麪的相關文章希望大家以後多多支持碼辳之家!

我的名片

網名:星辰

職業:程式師

現居:河北省-衡水市

Email:[email protected]