zuntan02のはてなブログ

備忘録的なものです。時々職場の技術者ブログにも転記してますが、メインはこちらで。

【AWS】Cloudwatchlogsのサブスクリプションフィルター→Teamsに通知するメモ

【概要】

先に書いた
https://zuntan02.hateblo.jp/entry/2021/04/14/183922
をベースに、CloudWatchlogsで特定のロググループに特定の文字列が出たらサブスクリプションフィルター経由でTeamsに通知してみたメモ

[CloudWatchロググループのサブスクリプションフィルターでフィルターパターンにヒット]
 → [AWS Lambda]
  → [Microsoft Teams]

のような通知。例によってとりあえず届けばいい人向け。

※以下はjsonでログが出るようなアプリケーションでの内容です

【詳細】

CloudWatchロググループのサブスクリプションフィルター

フィルターパターン:"ERROR"
サブスクリプションフィルター名:ERROR

Lambdaに届くログのイメージ

実際にはgzip化されているが、展開すると以下が投げられていた

{
  "awslogs": {
    "data": {
    "messageType": "DATA_MESSAGE",
    "owner": "xxxxxxxxxx",
    "logGroup": "/log/group/logname",
    "logStream": "logname.log",
    "subscriptionFilters": [
        "ERROR"
    ],
    "logEvents": [
        {
            "id": "xxxx",
            "timestamp": xxxxx,
            "message": "{\"a\":\"aaaa\",\"b\":\"bbbb\",\"c":\"cccc\",\"d\":\dd(省略)....

こさえたLambda

cwl-to-Teams-python

#!/usr/bin/python3.8
import boto3
import json
import logging
import os
import base64
import gzip

from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

HOOK_URL = os.environ['HookUrl']

def lambda_handler(event, context): 

    # CloudWatchLogsからのデータはbase64エンコードされているのでデコード
    decoded_data = base64.b64decode(event['awslogs']['data'])
    # バイナリに圧縮されているため展開
    json_data = json.loads(gzip.decompress(decoded_data))

    # ログデータ取得
    log_group = json_data['logGroup']
    log_stream = json_data['logStream']
    
    # イベントのメッセージをjsonとしてevent_messageに取り込み
    message = json_data['logEvents'][0]['message']
    event_message = json.loads(message)

    log_a = event_message['a']
    log_b = event_message['b']
    log_c = event_message['c']

    # Teamsに直接通知
    alert_msg = {
     "@type": "MessageCard",
        "@context": "http://schema.org/extensions",
        "themeColor": "0076D7",
        "summary": "エラー出ました",
        "sections": [
            {
                "activityTitle": "エラー出ました",
                "facts": [
                    {
                        "name": "ロググループ名",
                        "value": log_group
                    },
                    {
                        "name": "ログストリーム名",
                        "value": log_stream
                    },
                    {
                        "name": "項目a",
                        "value": log_a
                    },
                    {
                        "name": "項目b",
                        "value": log_b
                    },
                    {
                        "name": "項目c",
                        "value": log_c
                    }
                ]
            }
        ]
    }

    req = Request(HOOK_URL, json.dumps(alert_msg).encode('utf-8'))
    
    
    try:
        response = urlopen(req)
        response.read()
        logger.info("Message posted")
    except HTTPError as e:
        logger.error("Request failed: %d %s", e.code, e.reason)
    except URLError as e:
        logger.error("Server connection failed: %s", e.reason)

環境変数HookUrlにTeamsのWebhookのURLを入れておくとメッセージが飛んでくる・・・はず・・・

※Teamsのコネクタメッセージの書式は以下を参考のこと
https://docs.microsoft.com/ja-jp/microsoftteams/platform/webhooks-and-connectors/how-to/connectors-using