ゼファーネットのロゴ

財務における Python: Jupyter Notebook 内のリアルタイム データ ストリーミング – KDnuggets

日付:

Finance における Python: Jupyter Notebook 内でのリアルタイム データ ストリーミング
 

このブログでは、お気に入りのツールである Jupyter Notebook を使って、ライブ データ ストリームをリアルタイムで視覚化する方法を学びます。 

ほとんどのプロジェクトでは、Jupyter Notebook 内の動的チャートは手動で更新する必要があります。たとえば、グラフを更新するために新しいデータを取得するにはリロードが必要な場合があります。これは、金融を含め、ペースの速い業界ではうまく機能しません。ユーザーがその時点でリロードを行わなかったために、重要な購入の兆候や不正行為のアラートを見逃すことを検討してください。

ここでは、Jupyter Notebook で手動更新からストリーミングまたはリアルタイムの方法に移行し、プロジェクトをより効率的かつ反応的にする方法を説明します。

対象:

  • リアルタイムの視覚化: データが秒ごとに進化する様子を目の前で見ながら、データに命を吹き込む方法を学びます。
  • Jupyter Notebook のマスタリー: 静的データ分析だけでなく、動的ストリーミング データにも Jupyter Notebook の能力を最大限に活用します。
  • Quant Finance での Python の使用例: 現実世界のデータを使用して金融で広く使用されている機能を実装する、実用的な金融アプリケーションを詳しく見てみましょう。
  • ストリームデータ処理: リアルタイムでのデータ処理の基礎と利点を理解します。このスキルは、今日のペースの速いデータの世界でますます重要になっています。

このブログを読み終えるまでに、Jupyter Notebook 内で以下のような同様のリアルタイム ビジュアライゼーションを構築する方法がわかるでしょう。

 

Finance における Python: Jupyter Notebook 内でのリアルタイム データ ストリーミング

私たちのプロジェクトの中心となるのはストリーム処理の概念です。 

簡単に言えば、ストリーム処理とは、生成されたデータをリアルタイムで処理および分析することです。これは、ラッシュアワーのドライブ中の Google マップのようなものだと考えてください。交通状況の最新情報がライブで表示され、即座に効率的にルートを変更できるようになります。

興味深いことに、ゴールドマン・サックスのCIOはこのフォーブス誌で次のように述べています。 ポッドキャストストリームまたはリアルタイムのデータ処理への移行は、私たちが向かっている重要なトレンドの 1 つです。 

リアルタイム データ処理の力と、Jupyter Notebook のインタラクティブで使い慣れた環境を組み合わせることが重要です。 

それに加えて、Jupyter Notebook はコンテナ化された環境とうまく連携します。したがって、私たちのプロジェクトはローカルマシンに留まるだけではありません。どこにでも持ち運ぶことができ、同僚のラップトップからクラウド サーバーまで、あらゆる環境でスムーズに実行できます。

金融では、不正行為の検出でも取引でも、一秒一秒が重要であるため、ストリーム データ処理が不可欠になっています。ここにスポットライトが当たっています ボリンジャーバンド、金融取引に役立つツール。このツールは次のもので構成されています。

  • ミドルバンド: これは 20 期間の移動平均で、過去 20 期間 (高頻度分析の場合は 20 分など) の平均株価を計算し、最近の価格傾向のスナップショットを提供します。
  • 外側のバンド: 中間バンドの上下 2 標準偏差に位置し、市場のボラティリティを示します。バンドが広いほどボラティリティが高く、バンドが狭いほどボラティリティが低いことを示します。

 

Finance における Python: Jupyter Notebook 内でのリアルタイム データ ストリーミング
 

ボリンジャーバンドでは、移動平均価格が上部バンド(多くの場合赤でマークされる売りの手がかり)に触れるか超えると買われすぎの可能性が示され、価格が下部バンド(買いの手がかり)を下回ると売られすぎの状態が示されます。 、通常は緑色でマークされます)。

アルゴトレーダーは通常、ボリンジャーバンドを他のテクニカル指標と組み合わせます。 

ここでは、取引高を統合してボリンジャーバンドを生成する際に重要な調整を行いました。従来、ボリンジャーバンドは取引高を考慮せず、価格データのみに基づいて計算されます。

したがって、VWAP ± 2 × VWSTD の距離にあるボリンジャー バンドを示しました。ここで、 

  • VWAP:A 1分間の出来高加重平均価格 よりボリューム重視の観点から。
  • VWSTD: 焦点を当てたものを表します。 20分間の標準偏差つまり、市場のボラティリティの尺度です。

技術的な実装:

  • 一時的なものを使用します スライディングウィンドウ ('pw.temporal.sliding') を使用して、リアルタイムでデータ上で虫眼鏡を動かすのと同じように、20 分のセグメントでデータを分析します。
  • 採用 減速機 ('pw.reducers')。各ウィンドウ内のデータを処理して、各ウィンドウの特定の結果 (この場合は標準偏差) を生成します。
  • ポリゴン.io: リアルタイムおよび過去の市場データのプロバイダー。確かにその API をライブ データに使用できますが、このデモでは一部のデータを CSV ファイルに事前保存しており、API キーを必要とせずに簡単に理解できるようにしています。
  • パスウェイ: 高速データ処理のためのオープンソースの Python フレームワーク。バッチ (静的) データとストリーミング (リアルタイム) データの両方を処理します。 
  • ボケ: Bokeh は、動的なビジュアライゼーションの作成に最適で、魅力的でインタラクティブなグラフでストリーミング データに命を吹き込みます。
  • パネル: リアルタイム ダッシュボード機能でプロジェクトを強化し、Bokeh と連携して新しいデータ ストリームが入ってくるとビジュアライゼーションを更新します。

これには次の 6 つのステップが含まれます。

  1. 関連するフレームワークに対して pip install を実行し、関連するライブラリをインポートします。
  2. サンプルデータの取得
  3. 計算用のデータソースのセットアップ
  4. ボリンジャーバンドに不可欠な統計の計算
  5. ボケとパネルを使用したダッシュボードの作成
  6. 実行コマンドを押すと

1. インポートとセットアップ

まず、必要なコンポーネントを簡単にインストールしましょう。

%%capture --no-display
!pip install pathway

 

まず、必要なライブラリをインポートします。これらのライブラリは、データ処理、視覚化、およびインタラクティブなダッシュボードの構築に役立ちます。

# Importing libraries for data processing, visualization, and dashboard creation

import datetime
import bokeh.models
import bokeh.plotting
import panel
import pathway as pw

2. サンプルデータの取得

次に、GitHub からサンプルデータをダウンロードします。このステップは、視覚化のためにデータにアクセスするために重要です。ここでは、Apple Inc (AAPL) の株価を取得しました。

# Command to download the sample APPLE INC stock prices extracted via Polygon API and stored in a CSV for ease of review of this notebook.

%%capture --no-display
!wget -nc https://gist.githubusercontent.com/janchorowski/e351af72ecd8d206a34763a428826ab7/raw/ticker.csv

 

注: このチュートリアルでは、公開されたショーケースを活用します こちら

3. データソースのセットアップ

CSV ファイルを使用してストリーミング データ ソースを作成します。これはライブ データ ストリームをシミュレートし、初めてプロジェクトを構築するときに API キーを必要とせずにリアルタイム データを操作する実用的な方法を提供します。 

# Creating a streaming data source from a CSV file

fname = "ticker.csv"
schema = pw.schema_from_csv(fname)
data = pw.demo.replay_csv(fname, schema=schema, input_rate=1000)

# Uncommenting the line below will override the data table defined above and switch the data source to static mode, which is helpful for initial testing
# data = pw.io.csv.read(fname, schema=schema, mode="static")

# Parsing the timestamps in the data

data = data.with_columns(t=data.t.dt.utc_from_timestamp(unit="ms"))

 

注: データ処理はすぐには行われませんが、最後に run コマンドを押したときに行われます。

4. ボリンジャーバンドに不可欠な統計の計算

ここでは、上で説明した取引アルゴリズムを簡単に構築します。 Apple Inc. の株価のダミー ストリームがあります。さて、ボリンジャーバンドを作るには、 

  1. 加重 20 分標準偏差 (VWSTD) を計算します。
  2. 価格の 1 分間加重移動平均 (VWAP)
  3. 上の2つを結合します。
# Calculating the 20-minute rolling statistics for Bollinger Bands


minute_20_stats = (
    data.windowby(
        pw.this.t,
        window=pw.temporal.sliding(
            hop=datetime.timedelta(minutes=1),
            duration=datetime.timedelta(minutes=20),
        ),
        behavior=pw.temporal.exactly_once_behavior(),
        instance=pw.this.ticker,
    )
    .reduce(
        ticker=pw.this._pw_instance,
        t=pw.this._pw_window_end,
        volume=pw.reducers.sum(pw.this.volume),
        transact_total=pw.reducers.sum(pw.this.volume * pw.this.vwap),
        transact_total2=pw.reducers.sum(pw.this.volume * pw.this.vwap**2),
    )
    .with_columns(vwap=pw.this.transact_total / pw.this.volume)
    .with_columns(
        vwstd=(pw.this.transact_total2 / pw.this.volume - pw.this.vwap**2)
        ** 0.5
    )
    .with_columns(
        bollinger_upper=pw.this.vwap + 2 * pw.this.vwstd,
        bollinger_lower=pw.this.vwap - 2 * pw.this.vwstd,
    )
)
# Computing the 1-minute rolling statistics

minute_1_stats = (
    data.windowby(
        pw.this.t,
        window=pw.temporal.tumbling(datetime.timedelta(minutes=1)),
        behavior=pw.temporal.exactly_once_behavior(),
        instance=pw.this.ticker,
    )
    .reduce(
        ticker=pw.this._pw_instance,
        t=pw.this._pw_window_end,
        volume=pw.reducers.sum(pw.this.volume),
        transact_total=pw.reducers.sum(pw.this.volume * pw.this.vwap),
    )
    .with_columns(vwap=pw.this.transact_total / pw.this.volume)
)
# Joining the 1-minute and 20-minute statistics for comprehensive analysis

joint_stats = (
    minute_1_stats.join(
        minute_20_stats,
        pw.left.t == pw.right.t,
        pw.left.ticker == pw.right.ticker,
    )
    .select(
        *pw.left,
        bollinger_lower=pw.right.bollinger_lower,
        bollinger_upper=pw.right.bollinger_upper
    )
    .with_columns(
        is_alert=(pw.this.volume > 10000)
        & (
            (pw.this.vwap > pw.this.bollinger_upper)
            | (pw.this.vwap  pw.this.bollinger_lower)
        )
    )
    .with_columns(
        action=pw.if_else(
            pw.this.is_alert,
            pw.if_else(
                pw.this.vwap > pw.this.bollinger_upper, "sell", "buy"
            ),
            "hold",
        )
    )
)
alerts = joint_stats.filter(pw.this.is_alert)

 

ノートをチェックできます こちら 計算をより深く理解するために。 

5. ダッシュボードの作成

今度は、ボケ プロットとパネル テーブルの視覚化を使用して分析を実現します。 

# Function to create the statistics plot


def stats_plotter(src):
    actions = ["buy", "sell", "hold"]
    color_map = bokeh.models.CategoricalColorMapper(
        factors=actions, palette=("#00ff00", "#ff0000", "#00000000")
    )

    fig = bokeh.plotting.figure(
        height=400,
        width=600,
        title="20 minutes Bollinger bands with last 1 minute average",
        x_axis_type="datetime",
        y_range=(188.5, 191),
    )
    fig.line("t", "vwap", source=src)
    band = bokeh.models.Band(
        base="t",
        lower="bollinger_lower",
        upper="bollinger_upper",
        source=src,
        fill_alpha=0.3,
        fill_color="gray",
        line_color="black",
    )
    fig.scatter(
        "t",
        "vwap",
        color={"field": "action", "transform": color_map},
        size=10,
        marker="circle",
        source=src,
    )
    fig.add_layout(band)
    return fig


# Combining the plot and table in a Panel Row

viz = panel.Row(
    joint_stats.plot(stats_plotter, sorting_col="t"),
    alerts.select(
        pw.this.ticker, pw.this.t, pw.this.vwap, pw.this.action
    ).show(include_id=False, sorters=[{"field": "t", "dir": "desc"}]),
)
viz

 

このセルを実行すると、プロットとテーブル用のプレースホルダー コンテナーがノートブックに作成されます。計算が開始されると、これらにはライブ データが入力されます。 

6. 計算の実行

すべての準備が完了したので、データ処理エンジンを実行します。

# Command to start the Pathway data processing engine
%%capture --no-display
pw.run()

 

ダッシュボードがリアルタイムで更新されると、ボリンジャー バンドがどのようにアクションをトリガーするかがわかります。緑色のマーカーが買い、赤色のマーカーが売りで、多くの場合わずかに高い価格で表示されます。 

注: ウィジェットが初期化されて表示された後、手動で pw.run() を実行する必要があります。詳細については、この GitHub の問題を参照してください。 こちら.

このブログでは、ボリンジャー バンドについて理解し、Jupyter Notebook でリアルタイムの金融データを視覚化する旅をご案内します。ボリンジャー バンドとオープンソースの Pythonic ツールの組み合わせを使用して、ライブ データ ストリームを実用的な洞察に変換する方法を説明しました。

このチュートリアルでは、データの取得からインタラクティブなダッシュボードまでのエンドツーエンドのソリューションにオープンソースを活用した、リアルタイムの財務データ分析の実践例を示します。次の方法で同様のプロジェクトを作成できます。

  • Yahoo Finance、Polygon、Kraken などの API からライブ株価を取得することで、選択した株式に対してこれを実行します。
  • お気に入りの株やETFなどのグループに対してこれを行います。 
  • ボリンジャーバンド以外の他の取引ツールを活用する。

これらのツールを Jupyter Notebook 内のリアルタイム データと統合することで、市場を分析するだけでなく、市場の展開を体験することができます。 

ハッピーストリーミング!
 
 

ムディット・スリヴァスタヴァ パスウェイで働いています。これ以前は、AI Planet の創設メンバーであり、LLM とリアルタイム ML の分野で積極的なコミュニティ構築者として活動しています。

スポット画像

最新のインテリジェンス

スポット画像