(1) rulues/webhooknginx.yaml
# Alert when the rate of events exceeds a threshold
# (Required)
# Elasticsearch host
#es_host: 192.168.1.8
es_host: 10.0.0.208
# (Required)
# Elasticsearch port
es_port: 9200
# (OptionaL) Connect with SSL to elasticsearch
#use_ssl: True
# (Optional) basic-auth username and password for elasticsearch
#es_username: someusername
#es_password: somepassword
# (Required)
# Rule name, must be unique
name: webhook-nginx rule
# (Required)
# Type of alert.
# the frequency rule type alerts when num_events events occur with timeframe time
type: frequency
# (Required)
# Index to search, wildcard supported
index: nginx-*
# (Required, frequency specific)
# Alert when this many documents matching the query occur within a timeframe
num_events: 5
# (Required, frequency specific)
# num_events must occur within this amount of time to trigger an alert
timeframe:
minutes: 60
# (Required)
# A list of elasticsearch filters used for find events
# These filters are joined with AND and nested in a filtered query
# For more info: http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/query-dsl.html
filter:
- term:
message: "error"
# (Required)
# The alert is use when a match is found
#alert:
#- "post"
#http_post_url: "https://webhook.site/5c52b885-3deb-493f-bf4a-0f8cafccae62"
#
#http_post_url: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=1896bca0-e78e-4734-bb49-5558532320c2"
alert:
- "debug"
- "elastalert_modules.dingtalk_alert.DingTalkAlerter"
dingtalk_webhook: "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=8b47af31-e289-47e5-88eb-8923e01738fd"
dingtalk_msgtype: "text"
alert_text_type: alert_text_only
alert_text: |
【测试环境】
发生了 {} 次告警
告警模块: {}
告警ip: {}
详细日志: {}
白眉大: {}
alert_text_args:
- num_hits
- type
- remote_addr
- message
- num_hits
(2)elastalert_modules/dingtalk_alert.py
#! /usr/bin/env python
# -*- coding: utf-8 -*-
"""
@author: xuyaoqiang
@contact: xuyaoqiang@gmail.com
@date: 2017-09-14 17:35
@version: 0.0.0
@license:
@copyright:
"""
import json
import requests
from elastalert.alerts import Alerter, DateTimeEncoder
from requests.exceptions import RequestException
from elastalert.util import EAException
class DingTalkAlerter(Alerter):
required_options = frozenset(['dingtalk_webhook', 'dingtalk_msgtype'])
def __init__(self, rule):
super(DingTalkAlerter, self).__init__(rule)
self.dingtalk_webhook_url = self.rule['dingtalk_webhook']
self.dingtalk_msgtype = self.rule.get('dingtalk_msgtype', 'text')
self.dingtalk_isAtAll = self.rule.get('dingtalk_isAtAll', False)
self.digtalk_title = self.rule.get('dingtalk_title', '')
def format_body(self, body):
return body.encode('utf8')
def alert(self, matches):
headers = {
"Content-Type": "application/json",
"Accept": "application/json;charset=utf-8"
}
body = self.create_alert_body(matches)
payload = {
"msgtype": self.dingtalk_msgtype,
"text": {
"content": body
},
"at": {
"isAtAll": False
}
}
try:
response = requests.post(self.dingtalk_webhook_url,
data=json.dumps(payload, cls=DateTimeEncoder),
headers=headers)
response.raise_for_status()
except RequestException as e:
raise EAException("Error request to Dingtalk: {0}".format(str(e)))
def get_info(self):
return {
"type": "dingtalk",
"dingtalk_webhook": self.dingtalk_webhook_url
}
pass