从告警报文中挖掘出关联规则后,保存在下面的RDD data中,
每一条规则包含4项:支持度、置信度、规则前项和规则后项。
前项和后项分别是一个字符串,是一个逗号分隔的多个网元列表,
例如一条规则(0.2, 0.3, "a,b", "c")
表示“网元a,b上的告警导致网元c上告警发生的支持度是0.2,置信度是0.3。
也就是a,b,c在所有告警中发生的概率是20%,当a,b已经发生时,c发生的比率是30%。
在另一张资源表中,每一行包含一对网元,保存在下面的RDD res_data中, 例如"a,c"这一行表示网元a和c有资源上的关联关系,可能是物理链路连接,可能是同一物理位置等等。
所谓的资源过滤就是只有在资源表中的规则才算有效规则, 资源关系是没有顺序的,不论规则(a => c)还是(c => a)都符合(a,c)这一资源约束, 对于规则(x1, x2, ..., xm => y1, y2, ..., yn),只有前项和后项的所有笛卡尔积
(x1, y1), (x1, y2), ... (x1, yn)
...
(xm, y1), (xm, y2), ... (xm, yn)
都在资源表中,才表明这一条规则通过了资源过滤。 例如对于规则(a,b => c),只有a与c,a与b都有资源关系(即(a,b)和(a,c)都在资源表中),这条规则才有效。
实现算法是:对于一条规则R1,求出其所有笛卡尔积R2,然后求R2与资源表的交集R3,如果
下面是描述这一筛选过程的Spark代码:
val data = sc.parallelize(List((0.2, 0.3, "a,b", "c"), (0.5, 0.2, "b,c", "a,d")))
val combine_pre_suf = data.flatMap(x => (x._3.split(",").flatMap(y => (x._4.split(",").map(z => (y+","+z, x))))))
val suf_pre = combine_pre_suf.map(x => (x._1.split(",")(1) + "," + x._1.split(",")(0), x._2))
val double_pre_suf_rule = suf_pre ++ combine_pre_suf
val res_data = sc.parallelize(List("a,c","b,c","c,e","c,d"))
val res_join_double = res_data.map(x => (x,1)).join(double_pre_suf_rule)
val rule_in_res_cnt = res_join_double.map(x => (x._2._2, x._2._1)).reduceByKey(_+_)
val flt_res_rules = rule_in_res_cnt.filter(x => x._1._3.split(",").size * x._1._4.split(",").size == x._2)
将这段代码保存在文件res-filter.script中,运行spark-shell -i res-filter.script
。