ゼファーネットのロゴ

Elasticsearch、Kubeflow、Katibを使用した完全なAIベースの検索エンジンの構築

日付:

AIベースの検索エンジン

検索システムの構築は困難です。 機械学習で動作するように準備するのは本当に難しいです。 AIと統合された完全な検索エンジンフレームワークを開発することは本当に難しいです。

それでは作りましょう。 ✌️

この投稿では、検索エンジンを最初から構築し、KubeflowとKatibを使用して機械学習レイヤーを追加することで結果をさらに最適化する方法について説明します。 この新しいレイヤーは、ユーザーのコンテキストを考慮して結果を取得できるようになり、この記事の主な焦点となります。

後で説明するように、KubeflowとKatibのおかげで、最終結果はかなりシンプルで効率的で、保守が簡単です。

システム全体のオーケストレーションを担当するKubeflowによって実行される完全なパイプライン。 著者による画像。

実際の概念を理解するために、実際の経験でシステムを実装します。 Kubernetesの上に構築されているため、(適切な適応があれば)好きなインフラストラクチャを使用できます。 Google Cloud Platformを使用します(GCP)この投稿で。

概念の簡単な紹介から始めて、システム実装の議論に移ります。

NLPアプリケーションに関するこの詳細な技術教育が役立つと思いますか? 新しい関連コンテンツがリリースされたときに更新するには、以下で購読してください.

だから、それ以上の苦労なしに、

1.あなたが知っている、検索のために

あなたがあなたの会社のために検索システムを構築するという挑戦を受けたり、あなた自身のためにそれを構築したいなら、あなたはすぐに最初のステップがいくぶん簡単である傾向があることに気付くでしょう。

何よりもまず、検索エンジンには検索用のドキュメントが含まれている必要があります。 Elasticsearchを使用するので、参照として使用しましょう(概要については、Elasticsearchを参照してください。 公式ドキュメント)

ドキュメントは、JSON形式に従ってElasticsearchにアップロードする必要があります。 たとえば、ファッションeコマースストアの検索エンジンを構築している場合、ドキュメントの例を次に示します。

Elasticsearchにアップロードされたドキュメントの例。 著者による画像。

次に、検索クエリをドキュメントフィールドと照合することを本質的に含む検索ステップがあります。

検索語とドキュメントフィールドのマッチングの例。 著者による画像。

ランキングフェーズでは、次のようないくつかの数学的ルールが適用されます。 TF-IDF or BM25F 適切にランク付けする方法を理解し、ドキュメントを最適なものから最悪のものに並べ替えます。 次のようになります。

データベースに保存されているドキュメント間でBM25スコアリングを実行しているElasticsearchによって取得された適切にランク付けされた結果の例。 著者による画像。

さらに最適化すると、パフォーマンスメトリックを含むドキュメントの特定のフィールドを活用できます。 たとえば、前の例では、Tシャツのクリック率(CTR、つまりクリックと合計インプレッションの理由)は次のようになります。 CTR = 0.36。 この情報を使用して、クリック率の高いドキュメントを優先して上部に表示する別の検索レイヤーを追加できます(「ブースト」とも呼ばれます)。

検索ルールにパフォーマンスメトリックレイヤーを追加する例。 前のXNUMX番目に良いドキュメントが結果のトップに上がります。 著者による画像。

ここまでは順調ですね。 しかし、さらに最適化する方法を見てみましょう。

各ユーザーには特定のコンテキストがあることを考慮してください。 もう一度、ファッションオンラインストアを例にとってみましょう。 一部のトラフィックは、北部の地域よりも暖かい南部の地域から来る場合があります。 彼らはおそらく冬特有の製品よりも軽い服にさらされたほうがいいでしょう。

方程式にさらにコンテキストを追加できます。お気に入りのブランド、カテゴリ、色、サイズ、使用するデバイス、平均消費チケット、職業、年齢に基づいて顧客を区別でき、リストはどんどん増えていきます…

そのためには、いくつかの追加ツールも必要です。 それについてもう少し深く掘り下げてみましょう。

2.機械学習レイヤー

ランク付けを学ぶ(LTR)は、ドキュメントのリストを適切にランク付けすることを主な目的とするアルゴリズムを研究する機械学習の分野です。

基本的に他の学習アルゴリズムと同じように機能します。トレーニングデータセットが必要で、次のような問題が発生します。 偏りと分散、各モデルには、特定のシナリオなどに比べて利点があります。

基本的に変更されるのは、トレーニングプロセスのコスト関数は、アルゴリズムがランキングについて学習できるように設計されており、モデルの出力は、特定のドキュメントが特定のクエリに対してどの程度一致しているかの値であるということです。

数学的には、それは単に次のように与えられます。

X この場合、検索にコンテキストを追加するために使用するすべての機能を理解しています。 これらは、ユーザーの地域、年齢、お気に入りのブランド、クエリとドキュメントフィールドの相関関係などの値にすることができます。

f は、トレーニングおよび評価されることになっているランキングモデルです。

最後に、 J 延長する Judgment そして私たちにとって、それは0(ドキュメントが機能を与えられたクエリによく一致しないことを意味します)から4(ドキュメントは非常によく一致する)までの範囲の整数値です。 判断を用いて、文書を最良から最悪まで整理します.

私たちの主な目標は これは、検索結果に機械学習レイヤーを追加するランキングアルゴリズムを表すためです。 そして得るために f、判断の値がすでに含まれているデータセットが必要です。含まれていないと、モデルをトレーニングできません。

結局のところ、これらの値を見つけることは非常に困難な場合があります。 その方法の詳細はここでは説明しませんが、これは 役職 このテーマについて徹底的な議論があります。 簡単に言うと、ユーザーが検索エンジンを操作したときのクリックストリームデータ(クリックと購入)を使用して、変数が判断値のプロキシとなるモデルを適合させます。

に実装されたグラフィカルモデルの例 pyClickModels 検索エンジンの結果に関連するドキュメントの関連性を見つけるため。 著者による画像。

判断を計算した後、ランキングモデルのトレーニングが残ります。 Elasticsearchはすでに ランク付けする この実装で使用するプラグイン。 プラグインは、決定木からニューラルネットワークに至るまでのさまざまなランキングアルゴリズムを提供します。

これは、必要なトレーニングファイルの例です。

ElasticsearchLearn2Rankプラグインで必要とされるトレーニングステップの入力ファイルの例。 著者による画像。

結果ページに印刷されたすべてのドキュメントをクエリ(「女性のTシャツ」)ごとに登録するという考え方です。 それぞれについて、期待される判断を計算し、特徴のマトリックスを構築します X.

実際には、最初にこのすべてのデータを準備し、ElasticsearchのLearn-To-Rankプラグインにフィードして、トレーニングされたランキングモデルを作成します。 次に、それを使用して、探しているパーソナライズレイヤーを追加できます。

構築の詳細 X すぐに議論されます。

これで、モデルをトレーニングする準備が整いました。 ここまでは順調ですね。 しかし、それでもまだ難しい問題があります。それが機能しているかどうかを知る方法は?

2.1評価フレームワーク

ランキングモデルのパフォーマンスを確認するために、いくつかの方法から選択できます。 ここで説明するのは平均です ランク ユーザーがクリックまたは購入したものに基づくメトリック(pySearchML 購入イベントのみに焦点を当てていますが、クリックは同じ意味で使用できます)。

数学的には、次の式で与えられます。

この式は基本的に、ドキュメントの完全なリストを参照して、購入した(またはクリックした)各アイテムに関連付けられた各ランクを合計します。 分母は、プロセスで合計されたアイテムの数(ユーザーがクリックまたは購入したアイテムの総数)のカーディナリティです。

実際には、ランキングモデルをトレーニングした後、検証データセット(ユーザーが検索したものと購入したものを含む)をループし、各検索用語を使用してElasticsearchにクエリを送信します。 次に、結果をユーザーが購入したものと比較して、適切な平均ランキングを計算します。

実際の検証フレームワークの例。 データセット内の各ユーザーについて、検索用語を使用して、ランキングモデルがすでに実装されているES結果から取得します。 次に、ユーザーが購入したアイテムの平均ランクと、ES結果でのユーザーの位置を取得します。 著者による画像。

上の画像はその概念を示しています。 ユーザーごとに、最近トレーニングされたモデルがすでに含まれているElasticsearchに検索語を送信します。 次に、検索結果をユーザーが購入したものと比較し、ランクを計算します。 前の例では、取得した2つのアイテムのうち3番目の位置に赤いTシャツが表示されています。 その時購入したのはたった一つのアイテムなので ランク= 66%。

データベース内のすべてのユーザーに対して同じ計算を実行し、それらをすべて平均して最終的なランク式を作成します。

最終ランクメトリックは以下でなければならないことに注意してください 視聴者の38%が それ以外の場合、アルゴリズムはドキュメントのランダムセレクターとして実行されます。

この値は、最高のランキングモデルを選択するために使用されるため、重要です。 そこで、KubeflowのKatibを使用します。

これらすべての概念をまとめて検索エンジンを構築する方法を見てみましょう。

3.Kubeflowオーケストレーション

前に説明したように、Kubeflowはパイプライン処理のオーケストレーターです。 Elasticsearchのデータの準備やトレーニングから、トレーニングプロセス全体の実行まで、さまざまな責任があります。

コンポーネントとそれぞれのタスクを定義することで機能します。 pySearchMLの場合、ここに完全なものがあります パイプライン 実装された:

@dsl.pipeline()
def build_pipeline( bucket='pysearchml', es_host='elasticsearch.elastic-system.svc.cluster.local:9200', force_restart=False, train_init_date='20160801', train_end_date='20160801', validation_init_date='20160802', validation_end_date='20160802', test_init_date='20160803', test_end_date='20160803', model_name='lambdamart0', ranker='lambdamart', index='pysearchml'
): pvc = dsl.PipelineVolume(pvc='pysearchml-nfs') prepare_op = dsl.ContainerOp( name='prepare env', image=f'gcr.io/{PROJECT_ID}/prepare_env', arguments=[f'--force_restart={force_restart}', f'--es_host={es_host}', f'--bucket={bucket}', f'--model_name= {model_name}'], pvolumes={'/data': pvc} ) val_reg_dataset_op = dsl.ContainerOp( name='validation regular dataset', image=f'gcr.io/{PROJECT_ID}/data_validation', arguments=[f'--bucket={bucket}/validation/regular', f'--validation_init_date={validation_init_date}', f'--validation_end_date={validation_end_date}', f'--destination=/data/pysearchml/{model_name}/validation_regular'], pvolumes={'/data': pvc} ).set_display_name('Build Regular Validation Dataset').after(prepare_op) val_train_dataset_op = dsl.ContainerOp( name='validation train dataset', image=f'gcr.io/{PROJECT_ID}/data_validation', arguments=[f'--bucket={bucket}/validation/train', f'--validation_init_date={train_init_date}', f'--validation_end_date={train_end_date}', f'--destination=/data/pysearchml/{model_name}/validation_train'], pvolumes={'/data': pvc} ).set_display_name('Build Train Validation Dataset').after(prepare_op) val_test_dataset_op = dsl.ContainerOp( name='validation test dataset', image=f'gcr.io/{PROJECT_ID}/data_validation', arguments=[f'--bucket={bucket}/validation/test', f'--validation_init_date={test_init_date}', f'--validation_end_date={test_end_date}', f'--destination=/data/pysearchml/{model_name}/validation_test'], pvolumes={'/data': pvc} ).set_display_name('Build Test Validation Dataset').after(prepare_op) train_dataset_op = dsl.ContainerOp( name='train dataset', image=f'gcr.io/{PROJECT_ID}/data_train', command=['python', '/train/run.py'], arguments=[f'--bucket={bucket}', f'--train_init_date={train_init_date}', f'--train_end_date={train_end_date}', f'--es_host={es_host}', f'--model_name={model_name}', f'--index={index}', f'--destination=/data/pysearchml/{model_name}/train'], pvolumes={'/data': pvc} ).set_display_name('Build Training Dataset').after(prepare_op) katib_op = dsl.ContainerOp( name='pySearchML Bayesian Optimization Model', image=f'gcr.io/{PROJECT_ID}/model', command=['python', '/model/launch_katib.py'], arguments=[f'--es_host={es_host}', f'--model_name={model_name}', f'--ranker={ranker}', '--name=pysearchml', f'--train_file_path=/data/pysearchml/{model_name}/train/train_dataset.txt', f'--validation_files_path=/data/pysearchml/{model_name}/validation_regular', '--validation_train_files_path=/data/pysearchml/{model_name}/validation_train', f'--destination=/data/pysearchml/{model_name}/'], pvolumes={'/data': pvc} ).set_display_name('Katib Optimization Process').after( val_reg_dataset_op, val_train_dataset_op, val_test_dataset_op, train_dataset_op ) post_model_op = dsl.ContainerOp( name='Post Best RankLib Model to ES', image=f'gcr.io/{PROJECT_ID}/model', command=['python', '/model/post_model.py'], arguments=[f'--es_host={es_host}', f'--model_name={model_name}', f'--destination=/data/pysearchml/{model_name}/best_model.txt'], pvolumes={'/data': pvc} ).set_display_name('Post RankLib Model to ES').after(katib_op) _ = dsl.ContainerOp( name='Test Model', image=f'gcr.io/{PROJECT_ID}/model', command=['python', '/model/test.py'], arguments=[f'--files_path=/data/pysearchml/{model_name}/validation_test', f'--index={index}', f'--es_host={es_host}', f'--model_name={model_name}'], pvolumes={'/data': pvc} ).set_display_name('Run Test Step').after(post_model_op)

パイプラインは、次のようなさまざまな入力パラメータを受け取ることによって定義されます。 bucket & model_name 実行時にこれらの値を変更できるようになります(すぐにわかります)。

パイプライン実装の各コンポーネントとその目的を見てみましょう。

pySearchMLに実装されているステップバイステップ。 著者による画像。

1.prepare_env

ここに 準備環境 コンポーネントが定義されています:

prepare_op = dsl.ContainerOp( name='prepare env', image=f'gcr.io/{PROJECT_ID}/prepare_env', arguments=[f'--force_restart={force_restart}', f'--es_host={es_host}', f'--bucket={bucket}', f'--model_name= {model_name}'], pvolumes={'/data': pvc}
)
  • 画像 そのステップで実行するコンポーネントのDockerリファレンスです。
  • Arguments Dockerのイメージで実行されるスクリプトに送信される入力パラメーターです ENTRYPOINT.
  • ボリューム ボリュームクレームをにマウントします data.

ここにprepare_env内のすべてのファイル:

  • run.py に対してクエリを実行する責任があります ビッグクアyとElasticsearchの準備。 その入力引数のXNUMXつは model_name これは、データを処理するための参照として使用するフォルダーを設定します。 lambdamart0 Google Analytics(GA)で動作するようにすでに実装されているアルゴリズムです 公開サンプル データセット。
  • Dockerfile コード全体をバンドルして、 ENTRYPOINT の実行 run.py スクリプト:
FROM python:3.7.7-alpine3.12 as python COPY kubeflow/components/prepare_env /prepare_env
WORKDIR /prepare_env
COPY ./key.json . ENV GOOGLE_APPLICATION_CREDENTIALS=./key.json RUN pip install -r requirements.txt ENTRYPOINT ["python", "run.py"]
  • lambdamart0 このそれぞれの名前のアルゴリズムの実装専用のフォルダです。 GA公開データを処理するために構築されており、システムの例として機能します。 含まれているファイルは次のとおりです。
  • ga_data.sql GAパブリックデータセットからドキュメントを取得してElasticsearchにエクスポートするクエリです。
  • es_mapping.json ドキュメントの各フィールドのインデックス定義です
  • features の値を運ぶ X 前に説明したように。 に lambdamart0 たとえば、機能を構築するための参照としてGA公開データを使用します。

と呼ばれる機能に注意してください name.json:

{ "query": { "bool": { "minimum_should_match": 1, "should": [ { "match": { "name": "{{search_term}}" } } ] } }, "params": ["search_term"], "name": "BM25 name"
}

Learn-To-Rankプラグインでは、各機能が有効なElasticsearchクエリとして定義され、スコア結果がに関連付けられている必要があります。 X.

前の例では、パラメーターを受け取ります search_term フィールドでのマッチングを続行します name BM25の一致を返す各ドキュメントのX0"。

クエリと名前フィールドの間にBM25を使用するだけでは、結果にパーソナライズを追加するのに十分ではありません。 カスタマイズレイヤーで進歩するもうXNUMXつの機能は、 channel_group.json 次のように定義されます。

{ "query": { "function_score": { "query": { "match_all": {} }, "script_score" : { "script" : { "params": { "channel_group": "{{channel_group}}" }, "source": "if (params.channel_group == 'paid_search') { return doc['performances.channel.paid_search.CTR'].value * 10 } else if (params.channel_group == 'referral') { return doc['performances.channel.referral.CTR'].value * 10 } else if (params.channel_group == 'organic_search') { return doc['performances.channel.organic_search.CTR'].value * 10 } else if (params.channel_group == 'social') { return doc['performances.channel.social.CTR'].value * 10 } else if (params.channel_group == 'display') { return doc['performances.channel.display.CTR'].value * 10 } else if (params.channel_group == 'direct') { return doc['performances.channel.direct.CTR'].value * 10 } else if (params.channel_group == 'affiliates') { return doc['performances.channel.affiliates.CTR'].value * 10 }" } } } }, "params": ["channel_group"], "name": "channel_group"
}

パラメータを入力として受け取ります channel_group (ユーザーをウェブストアに誘導したチャネル)、それぞれのチャネルのクリック率を返します。

これにより、モデルが効果的に準備され、ユーザーとその起源および各グループのランク付け方法が区別されます。 具体的には、有料ソースからのユーザーは、たとえばオーガニックチャネルからのユーザーとは異なる動作をする可能性があります。 トレーニングが十分に優れている場合は、これらの状況を処理するためにランク付けアルゴリズムを準備する必要があります。

それでも、これは、各ユーザーの固有の特性を使用して結果をパーソナライズする方法についてはあまり説明していません。 それで、これを解決するためのXNUMXつの可能性があります。 機能 avg_customer_price.json と定義されている:

{ "query": { "function_score": { "query": { "match_all": {} }, "script_score" : { "script" : { "params": { "customer_avg_ticket": "{{customer_avg_ticket}}" }, "source": "return Math.log(1 + Math.abs(doc['price'].value - Float.parseFloat(params.customer_avg_ticket)))" } } } }, "params": ["customer_avg_ticket"], "name": "customer_avg_ticket"
}

パラメータを入力として受け取ります customer_avg_ticket 各ドキュメントについて、平均ユーザーチケットとドキュメントの価格の間の距離のログを返します。

これで、ランキングモデルは、トレーニングフェーズで、価格がWebサイトでのユーザーの平均支出からどれだけ離れているかに基づいて各アイテムのランクを管理する方法を学習できます。

これらのXNUMX種類の機能を使用して、Elasticsearchの上にある検索システムに完全なパーソナライズレイヤーを追加できます。 機能は、有効な検索クエリに抽象化でき、最終的に値として変換されるスコアリングメトリックを返す必要がある限り、何でもかまいません。 X.

次のことについて 準備環境 成分:

  • 機能はElasticsearchにエクスポートされます。
  • ドキュメントフィールドを定義するインデックスがElasticsearchに作成されます。
  • ドキュメントはからクエリされます ビッグクエリー Elasticsearchにアップロードしました。
  • RankLibの要件 作成されます(機能セットストアなど)。

新しいデータと機能を備えた新しいモデルを実装するには、prepare_env内に別のフォルダーを作成するだけです(次のようなもの)。 modelname2)そして、データをクエリしてElasticsearchにアップロードする方法を設定します。

2.検証データセット

これは簡単なステップです。 これは、ユーザーが検索したもの、検索のコンテキスト、購入した製品のリストを含むデータをBigQueryから取得することで構成されます。

これがBigQueryです クエリー データの取得に使用されます。 基本的に、すべてのユーザー、ユーザーの検索と購入を選択し、コンテキストと組み合わせます。 結果の例:

{ "search_keys": { "search_term": "office", "channel_group": "direct", "customer_avg_ticket": "13" }, "docs": [ {"purchased":["GGOEGOAQ012899"]} ]
}

search_keys 顧客のコンテキストを設定する利用可能な情報を含めることができます。 前の例では、Webサイトでチャネルグループと平均支出チケットを使用しています。

このデータは、前述のように平均ランクを計算するときに検証フレームワークに入力するものです。

システムがXNUMXつの異なる検証データセットを構築していることに注意してください。XNUMXつはトレーニング期間用、もうXNUMXつは定期的な検証用、最後にXNUMXつ目は最終テストステップ用です。 ここでの考え方は、訓練されたモデルのバイアスと分散を分析することです。

3.トレーニングデータセット

これは、前に説明したように、RankLibトレーニングファイルの構築を担当するコンポーネントです。 完全な スクリプト 実際には非常に簡単です。 まず、BigQueryから、検索ページでのユーザーの操作で構成される入力クリックストリームデータをダウンロードします。 次に例を示します。

{ "search_keys": { "search_term": "drinkware", "channel_group": "direct", "customer_avg_ticket": "20" }, "judgment_keys": [ { "session": [ {"doc":"GGOEGDHC017999","click":"0","purchase":"0"}, {"doc":"GGOEADHB014799","click":"0","purchase":"0"}, {"doc":"GGOEGDHQ015399","click":"1","purchase":"0"} ] } ]
}

検索に関連付けられたキーが内部に集約されていることに注意してください search_keys。 これらの値は、Elasticsearchに送信し、各機能を適切に置き換える値です。 X で説明したように 準備環境。 前のJSONの例では、ユーザー検索コンテキストは次のとおりです。

  • 検索 ドリンクウェア.
  • 直接お店に来ました。
  • ウェブサイトで平均20ドルを費やしました。

judgment_keys 検索ページで見たドキュメントで構成されるユーザーセッションと、特定のドキュメントでのインタラクションを組み合わせます。

その後、この情報はに送信されます pyClickModels 次に、データを処理し、クエリとドキュメントの各ペアの判断を評価します。 結果は、次のように改行で区切られたJSONドキュメントです。

{ "search_term: bags|channel_group:organic_search|customer_avg_ticket:30": { "GGOEGBRJ037299": 0.3333377540111542, "GGOEGBRA037499": 0.222222238779068, "GGOEGBRJ037399": 0.222222238779068 }
}

キーの値は次のとおりです。 

search_term:bags|channel_group:organic|customer_avg_ticket:30.

前に説明したように、検索エンジンがコンテキストを認識し、その上でさらに最適化することを望んでいます。 結果として、判断は、search_termだけでなく、選択されたコンテキスト全体に基づいて抽出されます。

そうすることで、コンテキストごとにドキュメントを区別できます。たとえば、製品が北部地域からの顧客に対して判断4を受け取り、それ以外の場合は判断0を受け取るシナリオがあります。

pyClickModelsで指定された判定値の範囲は0〜1であることに注意してください。Learn-To-RankElasticsearchプラグインはRankLibの上に構築されているため、この値は整数0〜4の範囲であると予想されます。 次に、パーセンタイルを参照として使用して変数を変換します。 最終的な判断ファイルを作成するための完全なコードは次のとおりです。

def build_judgment_files(model_name: str) -> None: model = DBN.DBNModel() clickstream_files_path = f'/tmp/pysearchml/{model_name}/clickstream/' model_path = f'/tmp/pysearchml/{model_name}/model/model.gz' rmtree(os.path.dirname(model_path), ignore_errors=True) os.makedirs(os.path.dirname(model_path)) judgment_files_path = f'/tmp/pysearchml/{model_name}/judgments/judgments.gz' rmtree(os.path.dirname(judgment_files_path), ignore_errors=True) os.makedirs(os.path.dirname(judgment_files_path)) model.fit(clickstream_files_path, iters=10) model.export_judgments(model_path) with gzip.GzipFile(judgment_files_path, 'wb') as f: for row in gzip.GzipFile(model_path): row = json.loads(row) result = [] search_keys = list(row.keys())[0] docs_judgments = row[search_keys] search_keys = dict(e.split(':') for e in search_keys.split('|')) judgments_list = [judge for doc, judge in docs_judgments.items()] if all(x == judgments_list[0] for x in judgments_list): continue percentiles = np.percentile(judgments_list, [20, 40, 60, 80, 100]) judgment_keys = [{'doc': doc, 'judgment': process_judgment(percentiles, judgment)} for doc, judgment in docs_judgments.items()] result = {'search_keys': search_keys, 'judgment_keys': judgment_keys} f.write(json.dumps(result).encode() + 'n'.encode()) def process_judgment(percentiles: list, judgment: float) -> int: if judgment <= percentiles[0]: return 0 if judgment <= percentiles[1]: return 1 if judgment <= percentiles[2]: return 2 if judgment <= percentiles[3]: return 3 if judgment <= percentiles[4]: return 4

このステップの出力例を次に示します。

{ "search_keys": { "search_term": "office", "channel_group": "organic_search", "customer_avg_ticket": "24" }, "judgment_keys": [ {"doc": "0", "judgment": 0}, {"doc": "GGOEGAAX0081", "judgment": 4}, {"doc": "GGOEGOAB016099", "judgment": 0} ]
}

このデータは、RankLibに必要なトレーニングファイルに変換する必要があります。 ここで、判断、ドキュメント、クエリのコンテキストの情報を機能と組み合わせます (これが コード 検索の例 X Elasticsearchから)。

検索コンテキストと判定キーを含む前のステップの各JSON行がループされ、Elasticsearchに対するクエリとして送信されます。 search_keys。 結果は、の各値になります X 以前からすでに定義されているように 準備環境 ステップ。

最終結果は、次のようなトレーニングファイルです。

0	qid:0	1:3.1791792	2:0	3:0.0	4:2.3481672
4	qid:0	1:3.0485907	2:0	3:0.0	4:2.3481672
0	qid:0	1:3.048304	2:0	3:0.0	4:0
0	qid:0	1:2.9526825	2:0	3:0.0	4:0
4	qid:1	1:2.7752903	2:0	3:0.0	4:3.61228
0	qid:1	1:2.8348017	2:0	3:0.0	4:2.3481672

クエリごとおよびドキュメントごとに、pyClickModelsによって計算された推定判断、クエリのID、および機能のリストがあります。 X それぞれの値で。

このファイルを使用して、ランキングアルゴリズムをトレーニングできます。

4.Katibの最適化

カティブ は、自動ハイパーパラメータ最適化のためのインターフェースを提供するKubeflowのツールです。 いくつかの利用可能な方法があります。 ベイズ最適化は、pySearchMLで選択されたものです。

アルゴリズムのベイズ最適化の例。 許可されたドメインからより多くのデータポイントをサンプリングすると、特定の関数の最適値に近づく可能性があります。 pySearchMLでは、ドメインはランカーがデータにどのように適合するかを設定する変数のセットであり、最適化するコスト関数は平均ランクです。 から撮影した画像 ウィキメディア財団.

Katibが行うことは、ハイパーパラメータごとに、間のトレードオフに基づいて新しい値を選択することです。 探査-探査。 次に、新しいモデルをテストし、将来のステップで使用される結果を観察します。

pySearchMLの場合、各パラメーターは次の入力です。 ランクリブ これは、モデルがどのように適合するかを設定します(使用するツリーの数、リーフノードの総数、ネット内のニューロンの数など)。

Katibは、 カスタムリソース Kubernetesの。 YAMLファイルを定義し、それをクラスターにデプロイすることで、次のように実行できます。

kubectl create -f katib_def.yaml

Katibが行うことは、YAMLファイルを読み取って開始することです 試験、それぞれがハイパーパラメータの特定の値を実験しています。 実験定義で指定されたコードを実行して、並行して実行されている複数のポッドをインスタンス化できます。

このステップのファイルは次のとおりです。

launch_katib.py PythonスクリプトからKatibを起動する責任があります。 入力引数を受け取り、YAML定義を作成し、KubernetesAPIを使用してスクリプト自体からKatibを起動します。

experiment.json 実験の定義のテンプレートとして機能します。 その定義は次のとおりです。

{ "apiVersion": "kubeflow.org/v1alpha3", "kind": "Experiment", "metadata": { "namespace": "kubeflow", "name": "", "labels": { "controller-tools.k8s.io": "1.0" } }, "spec": { "objective": { "type": "minimize", "objectiveMetricName": "Validation-rank", "additionalMetricNames": [ "rank" ] }, "algorithm": { "algorithmName": "bayesianoptimization" }, "parallelTrialCount": 1, "maxTrialCount": 2, "maxFailedTrialCount": 1, "parameters": [], "trialTemplate": { "goTemplate": { "rawTemplate": { "apiVersion": "batch/v1", "kind": "Job", "metadata":{ "name": "{{.Trial}}", "namespace": "{{.NameSpace}}" }, "spec": { "template": { "spec": { "restartPolicy": "Never", "containers": [ { "name": "{{.Trial}}", "image": "gcr.io/{PROJECT_ID}/model", "command": [ "python /model/train.py --train_file_path={train_file_path} --validation_files_path={validation_files_path} --validation_train_files_path={validation_train_files_path} --es_host={es_host} --destination={destination} --model_name={model_name} --ranker={ranker} {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}" ], "volumeMounts": [ { "mountPath": "/data", "name": "pysearchmlpvc", "readOnly": false } ] } ], "volumes": [ { "name": "pysearchmlpvc", "persistentVolumeClaim": { "claimName": "pysearchml-nfs", "readOnly": false } } ] } } } } } } }
}

基本的に、並行して実行するポッドの数と、各試行で実行するDockerイメージとその入力コマンドを定義します。 並行して実行されているポッドの総数と最大試行回数がpySearchMLにハードコーディングされていることに注意してください。 最善のアプローチは、パイプライン実行からこれらのパラメーターを受け取り、それに応じて置き換えることです。

launch_katib.py このテンプレートを読み取り、最終的なYAML定義を作成して、Kubernetesに送信します。KubernetesはKatibプロセスを開始します。

入力パラメータのXNUMXつは ランカー これは、RankLibから選択するランキングアルゴリズムです(lambdaMart、listNetなど)。 各ランカーには独自のパラメーターのセットがあります。launch_katib.pyに実装されているLambdaMartアルゴリズムの例を次に示します。

def get_ranker_parameters(ranker: str) -> List[Dict[str, Any]]: return { 'lambdamart': [ {"name": "--tree", "parameterType": "int", "feasibleSpace": {"min": "1", "max": "500"}}, {"name": "--leaf", "parameterType": "int", "feasibleSpace": {"min": "2", "max": "40"}}, {"name": "--shrinkage", "parameterType": "double", "feasibleSpace": {"min": "0.01", "max": "0.2"}}, {"name": "--tc", "parameterType": "int", "feasibleSpace": {"min": "-1", "max": "300"}}, {"name": "--mls", "parameterType": "int", "feasibleSpace": {"min": "1", "max": "10"}} ] }.get(ranker)

Katibは、上記で定義されたドメインからパラメーターを選択して実行します train.py ここで、RankLibはランキングモデルのトレーニングに効果的に使用されます。 Pythonで実装されたコマンドの例:

cmd = ('java -jar ranklib/RankLib-2.14.jar -ranker ' f'{ranker} -train {args.train_file_path} -norm sum -save ' f'{args.destination}/model.txt ' f'{(" ".join(X)).replace("--", "-").replace("=", " ")} -metric2t ERR')

この文字列はに送信されます subprocess トレーニングプロセスを開始する呼び出し(RankLibのためにJavaが必要であることに注意してください)。 その結果、Elasticsearchにエクスポートできる新しくトレーニングされたランキングモデルが作成されます。

モデルがフィットするように、 検証.py 期待されるランクを計算するために呼び出されます。 実行される手順は次のとおりです。

  • スクリプトは、検証データセットの各JSONをループします。
  • 各行には、Elasticsearchクエリを作成するために使用される検索コンテキストが含まれています。 モデルで使用されるクエリは次のとおりです lambdamart0 これは後で使用します:
{ "query": { "function_score": { "query": { "bool": { "must": { "bool": { "minimum_should_match": 1, "should": [ { "multi_match": { "operator": "and", "query": "{query}", "type": "cross_fields", "fields": [ "sku", "name", "category" ] } } ] } } } }, "functions": [ { "field_value_factor": { "field": "performances.global.CTR", "factor": 10, "missing": 0, "modifier": "none" } } ], "boost_mode": "sum", "score_mode": "sum" } }, "rescore": { "window_size": "{window_size}", "query": { "rescore_query": { "sltr": { "params": "{search_keys}", "model": "{model_name}" } }, "rescore_query_weight": 20, "query_weight": 0.1, "score_mode": "total" } }
}
  • 最近作成されたクエリを指定すると、Elasticsearchにリクエストが送信されます。
  • 検索結果と購入したドキュメントが比較されます。

Elasticsearchクエリの構築を担当するコードは次のとおりです。

def get_es_query( search_keys: Dict[str, Any], model_name: str, es_batch: int = 1000
) -> str: """ Builds the Elasticsearch query to be used when retrieving data. Args ---- args: NamedTuple args.search_keys: Dict[str, Any] Search query sent by the customer as well as other variables that sets its context, such as region, favorite brand and so on. args.model_name: str Name of RankLib model saved on Elasticsearch args.index: str Index on Elasticsearch where to retrieve documents args.es_batch: int How many documents to retrieve Returns ------- query: str String representation of final query """ # it's expected that a ES query will be available at: # ./queries/{model_name}/es_query.json query = open(f'queries/{model_name}/es_query.json').read() query = json.loads(query.replace('{query}', search_keys['search_term'])) # We just want to retrieve the id of the document to evaluate the ranks between # customers purchases and the retrieve list result query['_source'] = '_id' query['size'] = es_batch query['rescore']['window_size'] = 50 # Hardcoded to optimize first 50 skus query['rescore']['query']['rescore_query']['sltr']['params'] = search_keys query['rescore']['query']['rescore_query']['sltr']['model'] = model_name return query

パラメータに注意してください rescore_query Elasticsearchのlearn-to-rankプラグインで機械学習レイヤーをトリガーします。

最後に、関数 compute_rank 以下に示すように、すべてをまとめます。

def compute_rank( search_arr: List[str], purchase_arr: List[List[Dict[str, List[str]]]], rank_num: List[float], rank_den: List[float], es_client: Elasticsearch
) -> None: """ Sends queries against Elasticsearch and compares results with what customers purchased. Computes the average rank position of where the purchased document falls within the retrieved items. Args ---- search_arr: List[str] Searches made by customers as observed in validation data. We send those against Elasticsearch and compare results with purchased data purchase_arr: List[List[Dict[str, List[str]]]] List of documents that were purchased by customers rank_num: List[float] Numerator value of the rank equation. Defined as list to emulate a pointer rank_den: List[float] es_client: Elasticsearch Python Elasticsearch client """ idx = 0 if not search_arr: return request = os.linesep.join(search_arr) response = es_client.msearch(body=request, request_timeout=60) for hit in response['responses']: docs = [doc['_id'] for doc in hit['hits'].get('hits', [])] if not docs or len(docs) < 2: continue purchased_docs = [ docs for purch in purchase_arr[idx] for docs in purch['purchased'] ] ranks = np.where(np.in1d(docs, purchased_docs))[0] idx += 1 if ranks.size == 0: continue rank_num[0] += ranks.sum() / (len(docs) - 1) rank_den[0] += ranks.size print('rank num: ', rank_num[0]) print('rank den: ', rank_den[0])

Katibインスタンス化 サイドカーポッド トレーニングポッドの標準入力を読み続けます。 文字列を識別するとき Validation-rank=(...)、最適化プロセスの結果として値を使用します。

永続ボリュームは、次のコンポーネントで使用されるKatibによってトレーニングされた最適なモデルの定義を保存するプロセスで使用されます。

5.投稿RankLibモデル

最も難しい部分はすでに完了しています。 ここで何が起こるかというと、スクリプトはテキストファイルに保存された最適なモデルの定義をたどってElasticsearchにアップロードするだけです。

この設計の主な利点のXNUMXつは、このコンポーネントがモデルを本番Elasticsearchにエクスポートできる一方で、ステージングレプリカエンジンで全体の最適化が行われる可能性があることに注意してください。

6.最終テスト

最後に、最適なモデルがElasticsearchにエクスポートされると、システムは最適化された最適なランキングモデルを自由に使用できます。 このステップでは、すべてが正常に機能したことを確認するだけでなく、システムが偏りと分散に苦しんでいるかどうかに関する詳細情報を提供するために、最終検証が実行されます。

それはほとんどそれです! ここでいくつかのコードを実行して、このフレームワーク全体の動作を確認しましょう。

4.ハンズオンセクション

アーキテクチャ全体を実際に実装する時が来ました! 完全なコードはpySearchMLリポジトリで入手できます。

WillianFuks / pySearchML

このセクションでは、実際のデータを使用してコードを実行するためにGCPを使用します。 また、この実験の実行にはコスト(数セント)がかかることを覚えておいてください。

GCPを初めて使用する場合は、300年間続くXNUMXドルの無料クレジットギフトがあります。 ただ サインイン このチュートリアルのプロジェクトを作成します(pysearchml 例えば)。 最終的には、次のようなダッシュボードにアクセスできるようになります。

GCPダッシュボードプロジェクトの例。 著者による画像。

gクラウド コマンドラインを介してGCPとやり取りするために必要になります。 インストールは非常に簡単です。 初期設定後、以下を実行してログインできることを確認してください。

gcloud 認証ログイン

これで、残りは非常に簡単です。 pySearchMLをローカルに複製します。

git clone pysearchml && cd pySearchML

有効にします プラットフォームのKubernetesエンジン。 その後、の実行をトリガーするだけです クラウドビルド これは、必要なインフラストラクチャ全体の作成を担当します(このステップには、5〜10分かかります)。

これが ビルド 実行をトリガーします:

#!/bin/bash
set -e SUBSTITUTIONS=
_COMPUTE_ZONE='us-central1-a',
_CLUSTER_NAME='pysearchml',
_VERSION='0.0.0' ./kubeflow/build/manage_service_account.sh gcloud builds submit --no-source --config kubeflow/build/cloudbuild.yaml --substitutions $SUBSTITUTIONS --timeout=2h

変数で適切な値を選択できます SUBSTITUTIONS。 それに注意してください _VERSION Kubeflowにエクスポートするパイプラインバージョンを設定します。 すべてが設定されたら、スクリプトを実行するだけです。

./kubeflow/build/build.sh
cloudbuildで実行されるステップ。 著者による画像。
  1. GCPツールへのアクセスと承認を可能にする秘密鍵を準備します。
  2. 構築マシンでknown_hostsを準備します。
  3. pySearchMLをローカルに複製します。
  4. ファイル create_k8.sh 手順4で実行されるのは、Google Kubernetes Engine上にKubernetesクラスターを作成する責任があります(G.K.E.)、Elasticsearch、Kubeflow、Katibをデプロイします。 並行して、システムに必要なすべてのDockerイメージが構築され、Google Container Registryにデプロイされます(GCR)後でKubeflowで使用するため。
  5. いくつかのユニットテストを実行します。 これらは、システムが期待どおりに機能していることを確認するために重要であることになりました。 また、Kubeflowパイプラインを並行してコンパイルします。
  6. 最後に、パイプラインをクラスターにデプロイします。

完了後、コンソールを参照して「Kubernetes Engine」を選択すると、コンソールがすでに稼働していることがわかります。

GKEにデプロイされたKubernetesクラスターを実行する準備ができました。 著者による画像。

多くのデータを使用しないため、これは小さなクラスターです。これにより、コストをさらに節約できます。

KubeflowとKatibはすでにインストールされています。 アクセスするには、まず次のコマンドを実行してgcloudをクラスターに接続します。

gcloudコンテナクラスターget-credentialspysearchml

その後、以下を実行して、Kubeflowを処理するサービスをローカルにポート転送します。

kubectl port-forward -n kubeflow svc / ml-pipeline-ui 8080:80 1> / dev / null&

これで、ポート8080でローカルホストにアクセスすると、次のように表示されます。

Kubeflowダッシュボード。 著者による画像。

そして、パイプラインは実行の準備ができています。

この実験は一般の人々を利用しています GoogleAnalyticsサンプル データセットであり、Googleストアで閲覧している顧客の小さなサンプルで構成されています。 それはからにまたがる 20160801 以下 20170801 また、各ユーザーが検索した内容と、検索結果をどのように操作したかが含まれています。

選択 pysearchml_0.0.0 次に、「+ CreateRun」を選択します。 Pythonパイプラインスクリプトで定義されているすべての可能な入力パラメーターを含む画面が表示されます。 適切なパラメータを選択したら、コードを実行するだけです。

実行後、期待される結果は次のとおりです。

pySearchMLで定義されているパイプライン全体の完全な実行。 著者による画像。

Katibコンポーネントの出力:

Katibコンポーネントによって印刷された出力の例。 著者による画像。

のランクを見ることができます 視聴者の38%が。 次に、テストコンポーネントの結果を比較できます。

AI検索エンジン
テストコンポーネントによって印刷された出力の例。 著者による画像。

ここのランクは40.16です% これは検証データセットよりも少し悪いです。 モデルが少し過剰適合していることを示している可能性があります。 より多くのデータまたはさらなるパラメータの調査は、この問題を軽減するのに役立つ可能性があります。

そして、ほとんど、あなたはそれを持っています! Elasticsearchには、顧客のコンテキストに基づいて結果を改善するための、完全にトレーニングされた新しい機械学習レイヤーがあります。

各ステップで作成されたファイルをナビゲートする場合は、そのための展開が利用可能です。 pySearchMLフォルダーで、次のコマンドを実行します。

kubectl apply -f kubeflow / disk-busybox.yaml

あなたが走ったら kubectl -n kubeflow get pods ポッドのXNUMXつの名前が「nfs-busybox-(…)」のようなものであることがわかります。 実行すると、ファイルにアクセスできるようになります。

kubectl -n kubeflow exec -it nfs-busybox-(...)sh

それらは次の場所に配置する必要があります /mnt/pysearchml.

プロセス全体のための迅速で汚いビジュアライザーもあります。 とにかく走れ:

kubectl port-forward service / front -n front 8088:8088 1> / dev / null&

そして、ブラウザにアクセスします localhost:8088。 この(迅速で醜い)インターフェースが表示されるはずです。

新しくトレーニングされたElasticsearchでクエリを実行するためのフロントエンドインターフェース。 著者による画像。

結果の例:

結果を試してみるだけでなく、最適化パイプラインが機能しているかどうかをより正確に把握できます。

そして、AI最適化を備えた完全な検索エンジンを実行して、あらゆる店舗の収入トラフィックを処理できるようにするために必要なのは、これだけです。

5. まとめ

さて、それは挑戦でした!

pySearchMLの構築は非常に困難であり、これは私が今まで直面した中で最も残酷な課題のXNUMXつであったと言っても過言ではありません😅。 数え切れないほどの設計、アーキテクチャ、インフラストラクチャが検討されましたが、ほとんどが失敗しました。

プロセス全体をKubeflowとKatibの上に統合することの実現は、いくつかの代替案がすでにテストされた後で初めて実現しました。

この設計の利点は、最終的なコードがいかに単純で直接的なものになるかです。 完全にモジュール化されており、各コンポーネントが単純なタスクを担当し、Kubeflowが実行全体を調整します。 その上、主にコード開発に集中して、Katibに最適なパラメーターを見つけるという大変な作業を任せることができます。

開発プロセスは単純ではありませんでした。 Kubernetesとその利用可能なリソースからの概念を含む、いくつかの教訓を学ぶ必要がありました。 それでも、それはすべてそれだけの価値がありました。 その結果、検索エンジン全体を数行のコードでゼロから構築し、実際のトラフィックを処理できるようになりました。

次のステップとして、RankLibを、データからコンテキストをさらに抽出するディープラーニングアルゴリズムに置き換えることを検討できます。 そのための主な課題のXNUMXつは、システムの応答時間がコストだけでなく増加する可能性があることです(長所と短所を評価する必要があります)。

使用されるランキングアルゴリズムに関係なく、アーキテクチャはほとんどの部分で同じままです。

うまくいけば、それはこの分野で働く人々にとって有用な投稿でした。 さあ、少し休憩して、学んだ教訓について瞑想し、次の冒険に備えましょう:)。

この投稿に関しては、それは確かに達成された使命で終了するに値します サウンドトラック.

この記事は、最初に公開された データサイエンスに向けて 著者の許可を得てTOPBOTSに再公開しました。

この記事をお楽しみですか? AIおよびNLPの更新プログラムにサインアップしてください。

より詳細な技術教育をリリースするときにお知らせします。

出典:https://www.topbots.com/ai-search-engine-elasticsearch-kubeflow-katib/

スポット画像

最新のインテリジェンス

スポット画像

私たちとチャット

やあ! どんな御用でしょうか?