Plotting the dew points as derived values in Grafana using a Pandas virtual data source.
久々のブログになってしまいました。
今までに何度か Grafana を紹介して参りました。Grafana は、各種の時系列データベースに問い合わせて数値データをウェブインターフェイス上で閲覧するツールです。あ、見た方が早いですね。こんなのです。
派生値をプロットしたい
今まで、ずっとやりたかったテーマとして、気温と相対湿度から露点(dew point)を計算してプロットすることがありました。ネットで検索すると分かるように、気温と相対湿度が分かると、近似式を使って露点を求めることができます。この計算は加減乗除だけではダメで、対数関数などを駆使する必要があります。
データベースには気温と湿度だけが格納されていて、そのから派生値として露点をプロットしたい、というニーズは絶対にあるはずなのですが、やろうとすると意外に面倒ですし、ネットで検索してもあまり取り上げられていません。
技術的な視点から言うと、これを実現する直接的な方法は 2つ考えられます。
- データベース(Grafana では data source と呼びます)から得られた値を使って、Grafana 上で派生値を計算する。
- データベース側で、派生値を計算して Grafana に返す。
いずれの方法にも利点と欠点があります。
Grafana 側で計算する
利点:
- GUI で直感的に設定ができる(できたら嬉しいなあ)
欠点:
- Grafana が提供していない演算子や関数は利用できない
- 古いバージョンの Grafana だと対応していない場合がある(再インストールとセットアップが必要)
- Grafana が想定していないような使い方はできない
実際、Grafana のドキュメントを見ると簡単な計算ならできるようですが、ちょっと力不足に感じます。
データベース側で計算する
利点:
- 計算結果だけを通信で返せば良いので、ネットワーク負荷が軽い
- おそらく、他の方法に比べて高速に計算できる
欠点:
- 異なるデータベースソフトウェアでは、異なるやり方を覚えなくてはならない
- 複数の異なるデータベース上にあるデータから派生値を計算することができない
そこで Simple JSON Datasource の出番
私はこのテーマでずっとモヤモヤした気分でいたのですが、ある日、アイデアを思いつきました。
Grafana とデータベースの間に仮想的な data source を置いて、Grafana からの query を加工してからデータベースに投げ、データベースからの結果も加工してから Grafana に返してあげれば良いのではないか?
つまり、中間サーバー的なものを作る訳です。こんなイメージです。
調べてみると、こんなツールを見つけました。
読んでいくと、これは Grafana の Simple JSON Datasource という仮想 data source 機能を応用しているということが分かりました。素晴らしい! 唯一の不安な点は、Simple JSON Datasource の GitHub サイトを見ると
This plugin is no longer maintained by the Grafana team.
と書かれていることです。素晴らしい機能だと思うんだけどなー。
まずは grafana-pandas-datasource を覗いてみる
まずは、上記の grafana-pandas-datasource を覗いてみました。また、動かしてもみました。確かにこれは面白い実装だと思うのですが、ちょっとばかり利用形態を限定しすぎていて、私のやりたいことには適用できなそうなことが分かりました。
自分でコードを書いてみる
grafana-pandas-datasource は、中で Pandas と NumPy、そして Flask を使っています。なーんだ、それくらいだったら私でもできそうだ。ということで、自分でコードを書いてみました。じゃん!(ライセンス: CC BY-SA 4.0)
# © Copyright 2022 Atsushi Yokoyama, Firmlogics
# This work is licensed under a <a href="http://creativecommons.org/licenses/by-sa/4.0/" rel="license">Creative Commons Attribution-ShareAlike 4.0 International License</a>.
import math
import numpy as np
import pandas as pd
from elasticsearch import Elasticsearch
from flask import Flask, jsonify, request
ES_HOST = "your_elasticsearch_host"
ES_INDEX = "elasticsearch_index"
MIN_INTERVAL_MS = 60 * 1000
def dewpoint(t, rh):
"""
Ref: https://en.wikipedia.org/wiki/Dew_point
t: temperature (degC)
rh: relative humidity (from 0 to 1.0)
return dew point (degC)
"""
# a = [6.1121, 6.1121]
b = [17.368, 17.966]
c = [238.88, 247.15]
index = 0 if t > 0 else 1
gamma = math.log(rh) + b[index] * t / (c[index] + t)
return c[index] * gamma / (b[index] - gamma)
def df_query_elastic(dt_from, dt_to, interval_ms):
"""
Inquire of Elasticsearch and return calculated dataframe
dt_from: datetime in ISO 8601 string
dt_to: datetime in ISO 8601 string
interval_ms: interval in milliseconds (integer)
"""
if interval_ms < MIN_INTERVAL_MS:
interval_ms = MIN_INTERVAL_MS
es = Elasticsearch(host=ES_HOST)
body = {
"size": 0,
"query": {
"range": {
"datetime": {
"gte": dt_from,
"lte": dt_to,
}
}
},
"aggs": {
"datehisto": {
"date_histogram": {
"field": "datetime",
"interval": "{}s".format(int(interval_ms / 1000)),
},
"aggs": {
"average_temp": {
"avg": {
"field": "temp",
},
},
"average_humid": {
"avg": {
"field": "humid",
}
},
},
}
},
}
resp = es.search(index=ES_INDEX, body=body)
df = pd.json_normalize(resp["aggregations"]["datehisto"]["buckets"])
# Calculate dew points
df["dewpoint"] = df.apply(
lambda x: dewpoint(
x["average_temp.value"], x["average_humid.value"] / 100
),
axis=1,
)
return df
methods = ["GET", "POST"]
app = Flask(__name__)
@app.route("/", methods=methods)
def req_root():
app.logger.info("Requested /.")
return "Server is running."
@app.route("/search", methods=methods)
def req_search():
app.logger.info("Requested /search.")
return jsonify(["temp", "humid", "dewpoint"])
@app.route("/query", methods=methods)
def req_query():
app.logger.info("Requested /query.")
j = request.get_json()
# Identify which targets are requested
req_targets = []
for _ in j["targets"]:
req_targets.append(_["target"])
df = df_query_elastic(
dt_from=j["range"]["from"],
dt_to=j["range"]["to"],
interval_ms=j["intervalMs"],
)
df = df.replace({np.nan: None})
# List of targets and columns in Pandas DataFrame
target_columns = {
("temp", "average_temp.value"),
("humid", "average_humid.value"),
("dewpoint", "dewpoint"),
}
# Construct query results
results = []
timestamps = df["key"]
for (target, column) in target_columns:
if target not in req_targets:
continue
d = {}
d["target"] = target
d["datapoints"] = list(map(list, zip(df[column], timestamps)))
results.append(d)
return jsonify(results)
if __name__ == "__main__":
app.run(host="0.0.0.0", port=3003)
ちなみに、Pandas などへのバージョン依存性があるかも知れないので、requirements.txt も書いておきます。
Flask~=2.1 pandas~=1.4 numpy~=1.23 elasticsearch~=6.8
これを見てお気づきの方があると思いますが、私の使っている Elasticsearch は、かなりバージョンが古いです。最新の Elasticsearch では、書き方が違ってくると思いますので、御了承ください。
動いた!
これで、冒頭に上げたグラフのように、無事に露点をプロットできるようになりました。
今日はここまで。