無制限のデータのリアルタイムヒストグラムプロット
リアルタイムデータでヒストグラムを使用することは、人気のあるデータサイエンスライブラリのほとんどでは不可能です。 この記事では、Pythonノートブック内でヒストグラムを動的に計算して表示する方法を学習します。
By ロメインピカード、デジタルTVに取り組んでいるデータサイエンスエンジニア.
すべてのデータサイエンスツールボックスには、いくつかの基本ツールが含まれています。 当然のことと思われるまで体系的に使用しています。 ヒストグラムはそのXNUMXつです。 これらは、探索フェーズでの視覚化、モデルを選択する前のデータ分散タイプの検証、およびその他の多くのこと(場合によっては、それを意識することなく)に使用します。 残念ながら、リアルタイムデータでヒストグラムを使用することは、ほとんどのライブラリでは不可能です。
通常、CSVデータセットのような有界データのヒストグラムを使用します。 ただし、ヒストグラムを計算する従来の方法は、無制限/ストリームデータには適用されません。 これは、アルゴリズムがデータセットのすべての要素を通過する必要があるためです。 この記事では、ヒストグラムをその場で計算して更新する方法を学習します。 リアルタイムで放出されるデータについて。
ここでの実際的な例は、システムのCPU使用率を経時的に監視することです。 CPU使用率のヒストグラムをリアルタイムで、場合によっては無限の時間で計算してプロットします。 それでも、ご覧のとおり、これにはほとんどメモリが必要ありません。
このために、ノートブックでXNUMXつのPythonパッケージを使用します。
最初にpipでそれらをインストールしましょう:
pip install psutil、bokeh、makinage
この記事のすべてのコードについて、いくつかのインポートが必要です。
コレクションからimportnamedtuple import time from datetime import datetime import ipywidgets as widgets from bokeh.ploting import figure、ColumnDataSource from bokeh.io import output_notebook、push_notebook、show import psutil import rx import rx.operators as ops import rxsci as rs
CPU使用率のストリームを生成する
まず、分析するデータを生成する必要があります。 100msごとにタイムスタンプと測定されたCPU使用率を含むアイテムを作成します。 まず、これらのアイテムのデータ構造を定義します。
CpuMeasure = namedtuple( "CpuMeasure"、['timestamp'、 'value'])
次に、これらのアイテムをデータのストリームとして出力できます。 これにはマキナゲを使用します。 Maki-Nageは、に基づくストリーム処理フレームワークです。 リアクティブX。 ソースストリームを生成するには、ReactiveXを直接使用します。
def create_cpu_observable(period = .1):return rx.timer(duetime = period、period = period).pipe(ops.map(lambda i:CpuMeasure(int(datetime.utcnow()。timestamp())、psutil.cpu_percent ()))))
この関数の結果はストリームオブジェクトです。 それはオブザーバブルと呼ばれます。 このオブザーバブルは、100ms(0.1秒)ごとにCpuMeasureオブジェクトを発行します。
棒グラフのプロットと更新
次のステップは、リアルタイムプロットの準備です。 すでに計算されたヒストグラムをプロットしたいので、棒グラフが必要です。 だから私たちはボケを使います vbar ウィジェットはこちら。 フィギュアは初期化されたばかりで、今のところデータはありません。
source_cpu_total = ColumnDataSource(data = {'edges':[]、 'values':[]})p_cpu_total = figure(title = "CPU使用率の合計分布"、plot_width = 500、plot_height = 150)p_cpu_total.vbar(x = 'エッジ '、top ='値 '、幅= 1.0、ソース= source_cpu_total)outw = widgets.Output()display(outw)with outw:h_cpu_total = show(p_cpu_total、notebook_handle = True)
後で設定してプロットを更新します ソースCPU_合計 他の値に反対します。 NS エッジ フィールドはヒストグラムのビンに対応し、 値 フィールドは、各ビンのアイテム数に対応します。
グラフの更新ステップを専用の関数でラップできます。
def update_histogram(graph_id、source、histogram):エッジ、値= zip(* histogram)source.data = {'エッジ':エッジ、 '値':値、} push_notebook(handle = graph_id)
ここに グラフ ID ボケフィギュアオブジェクトです、 source ボケデータソース、および ヒストグラム 事前に計算されたヒストグラム。
今のところ、偽の値を使用してこのコードをテストできます。
update_histogram(h_cpu_total、source_cpu_total、[(5、3)、(7、12)、(12、5)、(23、3)、(50、17)])
結果は次のようになります。
ヒストグラムの計算
データソースとグラフが利用できるようになったので、実際のヒストグラムを計算できます。 Maki Nageは、Ben-Haim etalによって定義された分布圧縮アルゴリズムを実装しています。 紙の中で ストリーミング並列決定木アルゴリズム。 これは、ApacheHiveに実装されているアルゴリズムでもあります。 ヒストグラム数値 機能。
このアルゴリズムの原理は、データ分布を動的ヒストグラムとして圧縮することです。このヒストグラムのビンのエッジは、新しいデータが入ると動的に調整されますが、ビンの数は作成時に設定されます。 この圧縮された表現から、さまざまなメトリックの近似を非常に高い精度で計算できます。
- 意味する
- 分散、標準偏差
- 分位数
- ヒストグラム
圧縮されたディストリビューションのサイズは固定されているため、メモリ効率が非常に高く、ディストリビューションのカーディナリティに完全に依存しません。入力データセットのサイズに関係なく、ディストリビューションを圧縮するために必要なのは数百バイトだけです。
圧縮された分布を埋めるために、 math.dist.update オペレータ。
この圧縮された表現はヒストグラムですが、表示可能なヒストグラムとして直接使用することはできません。各ビンには一意の幅があり、ビンの数は表示したい数よりもはるかに多い場合があります。 したがって、別の演算子を使用して、圧縮された分布からヒストグラムを計算します。 数学.分布.ヒストグラム.
計算と表示全体は、10行のコードで構成されています。
create_cpu_observable()。pipe(rs.state.with_memory_store(pipeline = rx.pipe(rs.ops.map(lambda i:i.value)、rs.math.dist.update()、rs.math.dist.histogram( bin_count = 20)、rs.ops.map(lambda i:(h_cpu_total、source_cpu_total、i))))、)。subscribe(on_next = lambda i:update_histogram(* i)、)
これらの行を分解してみましょう。
まず、CPU使用率のオブザーバブルを作成します。 次に、操作のパイプを適用します。 これらは、各アイテムに順番に適用されるデータ変換です。 ステートフルオペレーションにはステートストアが必要です。 これは、すべての中間計算が格納される場所です。 で構成します with_memory_store オペレータ。
次のステップは、各アイテムの値を抽出することです。 この最初の例ではタイムスタンプは必要ないので、 値 フィールドのおかげで 地図 オペレータ。
次の20つの手順は、分布を更新し、受信アイテムごとにXNUMX個のビンのヒストグラムを計算することです。
最後に、ヒストグラムをプロットするために必要な引数を使用してタプルが作成されます。 ヒストグラムは、このパイプラインのシンクとしてプロットされます。 次へ 折り返し電話。
このコードを実行すると、CPU使用率のライブヒストグラムが表示されます。
デュアルモニタリング
では、システムを最初から監視し、最近のステータス(過去3分間など)も確認したい場合はどうでしょうか。 これを行うには、グローバル分布と3分のウィンドウでの最近の分布のXNUMXつの分布を計算します。
これを行うには、最初に計算を専用関数に移動して、再利用できるようにします。
def compute_histogram(graph_id、source):return rx.pipe(rs.ops.map(lambda i:i.value)、rs.math.dist.update()、rs.math.dist.histogram(bin_count = 20)、 rs.ops.map(lambda i:(graph_id、source、i)))
次に、これらXNUMXつのヒストグラムを並行して計算する必要があります。 ここではさらにXNUMXつの演算子を使用します。
最初のものは、 ティーマップ オペレーター。 この演算子は、複数の計算が並行して行われるように、監視可能なソースを複数のパイプラインに転送します。
XNUMXつ目は split オペレーター。 観測可能なソースを3分のウィンドウに分割します。 これにより、XNUMX分ごとにヒストグラムをリセットし、最近のデータのみを表示できます。
使い捨て= create_cpu_observable()。pipe(rs.state.with_memory_store(pipeline = rx.pipe(rs.ops.tee_map(#無制限の分布compute_histogram(h_cpu_total、source_cpu_total)、#3分の有界分布rs.data.split(predicate = lambda i:i.timestamp-(i.timestamp%180)、pipeline = compute_histogram(h_cpu_recent、source_cpu_recent)、)、join = "merge"、)))、)。subscribe(on_next = lambda i:update_histogram(* i) 、)
グラフ作成の更新により、これはリアルタイムのXNUMXつのヒストグラムで更新されます。
これらの例のコード全体が利用可能です こちらをご覧ください。.
もっと遠く行く
ここで紹介するアルゴリズムは、分散システムにも適用できます。分散の圧縮表現はマージ可能です。 これは、複数のワーカーで計算をシャーディングする場合に特に便利です。 すべての部分表現をマージすることにより、最終結果を集約できます。
このアルゴリズムは、どのPython環境でも使用できます。 Maki-Nageの実装は、と呼ばれる専用パッケージに含まれています ディストグラム 依存関係はありません。
最後に、ストリーム処理について詳しく知りたい場合は、別の方法を実行できます。 紹介記事 マキナゲに書いた。
バイオ: ロマン(@_mainro_)はデータサイエンスエンジニアであり、デジタルTVおよび電気通信業界で働いています。 彼は特にアルゴリズムに興味があり、各ユースケースに最も適したアルゴリズムを探しています。
関連する
過去30日間の人気記事 | |||||
---|---|---|---|---|---|
|
|
PlatoAi。 Web3の再考。 増幅されたデータインテリジェンス。
アクセスするには、ここをクリックしてください。
出典:https://www.kdnuggets.com/2021/09/real-time-histogram-plots-unbounded-data.html