Pandas で、Grafana 用の仮想データソースを作る

投稿者: | 2022年7月26日

Plotting the dew points as derived values in Grafana using a Pandas virtual data source.

久々のブログになってしまいました。

今までに何度か Grafana を紹介して参りました。Grafana は、各種の時系列データベースに問い合わせて数値データをウェブインターフェイス上で閲覧するツールです。あ、見た方が早いですね。こんなのです。

派生値をプロットしたい

今まで、ずっとやりたかったテーマとして、気温と相対湿度から露点(dew point)を計算してプロットすることがありました。ネットで検索すると分かるように、気温と相対湿度が分かると、近似式を使って露点を求めることができます。この計算は加減乗除だけではダメで、対数関数などを駆使する必要があります。

データベースには気温と湿度だけが格納されていて、そのから派生値として露点をプロットしたい、というニーズは絶対にあるはずなのですが、やろうとすると意外に面倒ですし、ネットで検索してもあまり取り上げられていません。

技術的な視点から言うと、これを実現する直接的な方法は 2つ考えられます。

  • データベース(Grafana では data source と呼びます)から得られた値を使って、Grafana 上で派生値を計算する。
  • データベース側で、派生値を計算して Grafana に返す。

いずれの方法にも利点と欠点があります。

Grafana 側で計算する

利点:

  • GUI で直感的に設定ができる(できたら嬉しいなあ)

欠点:

  • Grafana が提供していない演算子や関数は利用できない
  • 古いバージョンの Grafana だと対応していない場合がある(再インストールとセットアップが必要)
  • Grafana が想定していないような使い方はできない

実際、Grafana のドキュメントを見ると簡単な計算ならできるようですが、ちょっと力不足に感じます。

データベース側で計算する

利点:

  • 計算結果だけを通信で返せば良いので、ネットワーク負荷が軽い
  • おそらく、他の方法に比べて高速に計算できる

欠点:

  • 異なるデータベースソフトウェアでは、異なるやり方を覚えなくてはならない
  • 複数の異なるデータベース上にあるデータから派生値を計算することができない

そこで Simple JSON Datasource の出番

私はこのテーマでずっとモヤモヤした気分でいたのですが、ある日、アイデアを思いつきました。

Grafana とデータベースの間に仮想的な data source を置いて、Grafana からの query を加工してからデータベースに投げ、データベースからの結果も加工してから Grafana に返してあげれば良いのではないか?

つまり、中間サーバー的なものを作る訳です。こんなイメージです。

調べてみると、こんなツールを見つけました。

読んでいくと、これは Grafana の Simple JSON Datasource という仮想 data source 機能を応用しているということが分かりました。素晴らしい!  唯一の不安な点は、Simple JSON Datasource の GitHub  サイトを見ると

This plugin is no longer maintained by the Grafana team.

と書かれていることです。素晴らしい機能だと思うんだけどなー。

まずは grafana-pandas-datasource を覗いてみる

まずは、上記の grafana-pandas-datasource を覗いてみました。また、動かしてもみました。確かにこれは面白い実装だと思うのですが、ちょっとばかり利用形態を限定しすぎていて、私のやりたいことには適用できなそうなことが分かりました。

自分でコードを書いてみる

grafana-pandas-datasource は、中で Pandas と NumPy、そして Flask を使っています。なーんだ、それくらいだったら私でもできそうだ。ということで、自分でコードを書いてみました。じゃん!(ライセンス: CC BY-SA 4.0

# © Copyright 2022 Atsushi Yokoyama, Firmlogics
# This work is licensed under a <a href="http://creativecommons.org/licenses/by-sa/4.0/" rel="license">Creative Commons Attribution-ShareAlike 4.0 International License</a>.

import math

import numpy as np
import pandas as pd
from elasticsearch import Elasticsearch
from flask import Flask, jsonify, request

ES_HOST = "your_elasticsearch_host"
ES_INDEX = "elasticsearch_index"
MIN_INTERVAL_MS = 60 * 1000


def dewpoint(t, rh):
    """
    Ref: https://en.wikipedia.org/wiki/Dew_point
    t: temperature (degC)
    rh: relative humidity (from 0 to 1.0)
    return dew point (degC)
    """
    # a = [6.1121, 6.1121]
    b = [17.368, 17.966]
    c = [238.88, 247.15]
    index = 0 if t > 0 else 1
    gamma = math.log(rh) + b[index] * t / (c[index] + t)
    return c[index] * gamma / (b[index] - gamma)


def df_query_elastic(dt_from, dt_to, interval_ms):
    """
    Inquire of Elasticsearch and return calculated dataframe

    dt_from: datetime in ISO 8601 string
    dt_to: datetime in ISO 8601 string
    interval_ms: interval in milliseconds (integer)
    """

    if interval_ms < MIN_INTERVAL_MS:
        interval_ms = MIN_INTERVAL_MS

    es = Elasticsearch(host=ES_HOST)

    body = {
        "size": 0,
        "query": {
            "range": {
                "datetime": {
                    "gte": dt_from,
                    "lte": dt_to,
                }
            }
        },
        "aggs": {
            "datehisto": {
                "date_histogram": {
                    "field": "datetime",
                    "interval": "{}s".format(int(interval_ms / 1000)),
                },
                "aggs": {
                    "average_temp": {
                        "avg": {
                            "field": "temp",
                        },
                    },
                    "average_humid": {
                        "avg": {
                            "field": "humid",
                        }
                    },
                },
            }
        },
    }

    resp = es.search(index=ES_INDEX, body=body)
    df = pd.json_normalize(resp["aggregations"]["datehisto"]["buckets"])

    # Calculate dew points
    df["dewpoint"] = df.apply(
        lambda x: dewpoint(
            x["average_temp.value"], x["average_humid.value"] / 100
        ),
        axis=1,
    )

    return df


methods = ["GET", "POST"]
app = Flask(__name__)


@app.route("/", methods=methods)
def req_root():
    app.logger.info("Requested /.")
    return "Server is running."


@app.route("/search", methods=methods)
def req_search():
    app.logger.info("Requested /search.")
    return jsonify(["temp", "humid", "dewpoint"])


@app.route("/query", methods=methods)
def req_query():
    app.logger.info("Requested /query.")

    j = request.get_json()

    # Identify which targets are requested
    req_targets = []
    for _ in j["targets"]:
        req_targets.append(_["target"])

    df = df_query_elastic(
        dt_from=j["range"]["from"],
        dt_to=j["range"]["to"],
        interval_ms=j["intervalMs"],
    )

    df = df.replace({np.nan: None})

    # List of targets and columns in Pandas DataFrame
    target_columns = {
        ("temp", "average_temp.value"),
        ("humid", "average_humid.value"),
        ("dewpoint", "dewpoint"),
    }

    # Construct query results
    results = []
    timestamps = df["key"]
    for (target, column) in target_columns:
        if target not in req_targets:
            continue

        d = {}
        d["target"] = target
        d["datapoints"] = list(map(list, zip(df[column], timestamps)))
        results.append(d)

    return jsonify(results)


if __name__ == "__main__":
    app.run(host="0.0.0.0", port=3003)

ちなみに、Pandas などへのバージョン依存性があるかも知れないので、requirements.txt も書いておきます。

Flask~=2.1
pandas~=1.4
numpy~=1.23
elasticsearch~=6.8

これを見てお気づきの方があると思いますが、私の使っている Elasticsearch は、かなりバージョンが古いです。最新の Elasticsearch では、書き方が違ってくると思いますので、御了承ください。

動いた!

これで、冒頭に上げたグラフのように、無事に露点をプロットできるようになりました。

今日はここまで。

お問い合わせはお気軽に!

お問い合わせを頂いた後、継続して営業活動をしたり、ニュースレター等をお送りしたりすることはございません。
御返答は 24時間以内(営業時間中)とさせて頂いております。もし返答が届かない場合、何らかの事情でメールが不達となっている可能性がございます。大変お手数ですが、別のメールアドレス等で督促頂けますと幸いです。