Zephyrnet Logosu

Amazon SageMaker Feature Store'u kullanarak öneri sistemi eğitimini ve tahminlerini hızlandırın ve iyileştirin

Tarih:

Birçok şirket, yüksek düzeyde optimize edilmiş bir öneri sistemi oluşturmanın zor kullanım durumuyla mücadele etmelidir. Zorluk, modeli günlük olarak yeni verilerle eğitmek ve ayarlamak ve ardından aktif bir etkileşim sırasında kullanıcı davranışına dayalı tahminler yapmak için büyük hacimli verilerin işlenmesinden kaynaklanmaktadır. Bu yazıda, nasıl kullanılacağını gösteriyoruz Amazon SageMaker Özellik Mağazası, şirketinizdeki ekipler arasında model özelliklerini depolayabileceğiniz, erişebileceğiniz ve paylaşabileceğiniz amaca yönelik bir havuz. Hem çevrimiçi hem de çevrimdışı Özellik Mağazası ile, tüketici davranışına dayalı bir ürün tavsiye motoru oluşturma karmaşık görevini yerine getirebilirsiniz. Bu gönderi beraberindeki bir atölye ve GitHub repo.

Gönderi ve atölye, özel modeller oluşturması gereken veri bilimcilerine ve uzman makine öğrenimi (ML) uygulayıcılarına yöneliktir. Veri bilimcisi olmayan veya makine öğrenimi uzmanı hedef kitle için AI hizmetimize göz atın Amazon KişiselleştirBu, geliştiricilerin makine öğrenimi uzmanlığına ihtiyaç duymadan çok çeşitli kişiselleştirilmiş deneyimler oluşturmasına olanak tanır. Bu, amazon.com'a güç veren teknolojinin aynısıdır.

Çözüme genel bakış

Makine öğreniminde uzman uygulayıcılar, bir modeli eğitirken ve modelin genel tahmin doğruluğunu etkileyen özellikler tasarlarken yüksek kaliteli verileri beslemenin ne kadar önemli olduğunu bilirler. Bu süreç genellikle oldukça zahmetlidir ve istenen duruma ulaşmak için birden fazla yineleme gerektirir. ML iş akışındaki bu adıma özellik mühendisliğive genellikle sürecin %60-70'i sadece bu adım için harcanır. Büyük kuruluşlarda, sorun daha da kötüleşir ve daha büyük bir üretkenlik kaybına neden olur, çünkü farklı ekipler genellikle aynı eğitim işlerini yürütür, hatta önceki çalışma hakkında hiçbir bilgileri olmadığı için yinelenen özellik mühendisliği kodu yazar, bu da tutarsız sonuçlara yol açar. Ek olarak, özelliklerin sürümü yoktur ve merkezi bir depo kavramı olmadığı için en son özelliğe erişim mümkün değildir.

Bu zorlukların üstesinden gelmek için Feature Store, ML özellikleri için tam olarak yönetilen bir merkezi depo sağlayarak, altyapıyı yönetmenin ağır yükü olmadan özellikleri güvenli bir şekilde depolamayı ve almayı kolaylaştırır. Özellik grupları tanımlamanıza, toplu alım ve akış alımını kullanmanıza ve düşük gecikmeyle en son özellik değerlerini almanıza olanak tanır. Daha fazla bilgi için, bkz Amazon Sagemaker Özellik Mağazasına Başlarken.

Aşağıdaki Özellik Mağazası bileşenleri, kullanım durumumuzla ilgilidir:

  • Özellik grubu – Bu, bir kaydı açıklamak için Özellik Mağazasında bir şema aracılığıyla tanımlanan bir özellik grubudur. Özellik grubunu bir çevrimiçi veya çevrimdışı mağaza veya her ikisi için yapılandırabilirsiniz.
  • Online mağaza – Çevrimiçi mağaza, öncelikle düşük milisaniye gecikmeli okumalar ve yüksek verimli yazma işlemleri gerektiren gerçek zamanlı tahminleri desteklemek için tasarlanmıştır.
  • Çevrimdışı mağaza – Çevrimdışı mağaza, öncelikle toplu tahminler ve model eğitimi için tasarlanmıştır. Yalnızca ekleme amaçlı bir mağazadır ve geçmiş özellik verilerini depolamak ve bunlara erişmek için kullanılabilir. Çevrimdışı mağaza, keşif ve model eğitimi için özellikleri depolamanıza ve sunmanıza yardımcı olabilir.

Gerçek zamanlı öneriler zamana duyarlıdır, görev açısından kritiktir ve bağlama bağlıdır. Müşterilerin ilgisini kaybettiği veya talep başka bir yerde karşılandığı için gerçek zamanlı önerilere olan talep hızla azalır. Bu yazıda, sentetik bir çevrimiçi bakkal veri seti kullanan bir e-ticaret web sitesi için gerçek zamanlı bir öneri motoru oluşturuyoruz.

Model eğitimi, doğrulama ve gerçek zamanlı çıkarım için kullandığımız özellik gruplarını kullanarak müşteri, ürün ve sipariş verilerini depolamak için Feature Store'u (hem çevrimiçi hem de çevrimdışı) kullanırız. Öneri motoru, ultra düşük gecikme süresi ve yüksek aktarım hızı tahminleri için özel olarak oluşturulmuş çevrimiçi özellik deposundan özellikleri alır. Müşterinin satın alma geçmişine, gerçek zamanlı tıklama verilerine ve diğer müşteri profili bilgilerine dayalı olarak, bir müşterinin e-ticaret web sitesinde gezinirken satın alması muhtemel en iyi ürünleri önerir. Bu çözüm, son teknoloji ürünü bir öneri aracı olarak değil, Feature Store kullanımını keşfetmek için yeterince zengin bir örnek sağlamak amacıyla tasarlanmıştır.

Aşağıdaki üst düzey adımlarda size yol gösteriyoruz:

  1. Verileri ayarlayın ve Feature Store'a alın.
  2. Modellerinizi eğitin.
  3. Kullanıcı etkinliğini simüle edin ve tıklama akışı olaylarını yakalayın.
  4. Gerçek zamanlı önerilerde bulunun.

Önkoşullar

Bu gönderiyi takip etmek için aşağıdaki ön koşullara ihtiyacınız var:

Verileri ayarlayın ve bunları Feature Store'a alın

Sentetik çevrimiçi bakkal veri kümesine dayalı beş farklı veri kümesiyle çalışıyoruz. Feature Store'da her veri kümesinin kendi özellik grubu vardır. İlk adım, iki modelimiz için eğitim işlerini başlatabilmemiz için bu verileri Feature Store'a almaktır. Bakın 1_feature_store.ipynb GitHub'da not defteri.

Aşağıdaki tablolar, Feature Store'da depoladığımız verilere ilişkin örnekleri göstermektedir.

Müşteriler

A Müşteri Kimliği isim belirtmek, bildirmek yaş evli müşteri_sağlık_dizini
0 C1 Justin Gutierrez Alaska 52 1 0.59024
1 C2 karen çapraz idaho 29 1 0.6222
2 C3 amy kral oklahoma 70 1 0.22548
3 C4 nicole hartman Missouri 52 1 0.97582
4 C5 jessica güçleri minnesota 31 1 0.88613

Ürünler

A Ürün adı Ürün Kategorisi ürün kimliği ürün_sağlık_index
0 çikolatalı sandviç kurabiyeleri cookie_cakes P1 0.1
1 cevizli tereyağlı kurabiye ısırıkları go-pak cookie_cakes P25 0.1
2 Danimarka tereyağlı kurabiye cookie_cakes P34 0.1
3 glutensiz tüm doğal çikolata parçalı kurabiyeler cookie_cakes P55 0.1
4 mini nilla gofret munch paketi cookie_cakes P99 0.1

Siparişlerim

A Müşteri Kimliği ürün kimliği satın alma miktarı
0 C1 P10852 87.71
1 C1 P10940 101.71
2 C1 P13818 42.11
3 C1 P2310 55.37
4 C1 P393 55.16

Tıklama akışı geçmişi

A Müşteri Kimliği ürün kimliği aldım sağlıklı_aktivite_last_2m değerlendirme
0 C1 P10852 1 1 3.04843
1 C3806 P10852 1 1 1.67494
2 C5257 P10852 1 0 2.69124
3 C8220 P10852 1 1 1.77345
4 C1 P10852 0 9 3.04843

Tıklama akışı gerçek zamanlı

A Müşteri Kimliği toplam_aktivite_ağırlık_last_2m avg_product_health_index_last_2m
0 C09234 8 0.2
1 D19283 3 0.1
2 C1234 9 0.8

Ardından, Özellik Mağazasında ilgili özellik gruplarını oluştururuz:

customers_feature_group = create_feature_group(df_customers, customers_feature_group_name,'customer_id', prefix, sagemaker_session)

products_feature_group = create_feature_group(df_products, products_feature_group_name, 'product_id',prefix, sagemaker_session)

orders_feature_group = create_feature_group(df_orders, orders_feature_group_name, 'order_id', prefix,sagemaker_session)

click_stream_historical_feature_group = create_feature_group(df_click_stream_historical,click_stream_historical_feature_group_name,'click_stream_id', prefix, sagemaker_session)

click_stream_feature_group = create_feature_group(df_click_stream, click_stream_feature_group_name, 'customer_id',prefix, sagemaker_session)

Özellik grupları oluşturulduktan ve kullanılabilir olduktan sonra, verileri her gruba alırız:

ingest_data_into_feature_group(df_customers, customers_feature_group)
customers_count = df_customers.shape[0]

ingest_data_into_feature_group(df_products, products_feature_group)
products_count = df_products.shape[0]

ingest_data_into_feature_group(df_orders, orders_feature_group)
orders_count = df_orders.shape[0]

ingest_data_into_feature_group(df_click_stream_historical, click_stream_historical_feature_group)
click_stream_historical_count = df_click_stream_historical.shape[0]

# Add Feature Group counts for later use
ps.add({'customers_count': customers_count,
        'products_count': products_count,
        'orders_count': orders_count,
        'click_stream_historical_count': click_stream_historical_count,
        'click_stream_count': 0})

içine veri girmiyoruz click_stream_feature_group çünkü verilerin gerçek zamanlı tıklama akışı olaylarından gelmesini bekliyoruz.

Modellerinizi eğitin

Bu kullanım durumu için iki model eğitiyoruz: ortak bir filtreleme modeli ve bir sıralama modeli. Aşağıdaki diyagram, eğitim iş akışını göstermektedir.

İşbirliğine dayalı filtreleme modeli, geçmiş kullanıcı-ürün etkileşimlerine dayalı ürünler önerir.

Sıralama modeli, kullanıcının tıklama akışı etkinliğini alarak ve bunu kişiselleştirilmiş önerilerde bulunmak için kullanarak ortak filtreleme modelinden önerilen ürünleri yeniden sıralar. bu 2_recommendation_engine_models.ipynb modelleri eğitmek için not defteri GitHub'da mevcuttur.

Ortak filtreleme modeli

kullanarak matris çarpanlarına ayırmaya dayalı bir işbirlikçi filtreleme modeli kullanıyoruz. Faktoring Makineleri algoritması bir müşteri için ürün önerileri almak için. Bu, ürün kategorisi, ad ve açıklama gibi özelliklere ek olarak bir müşteri profiline ve geçmiş satın alma geçmişine dayanır. Müşterinin geçmiş satın alma verileri ve e-ticaret mağazasının ürün kataloğundaki ürün verileri, üç ayrı çevrimdışı Özellik Mağazasında depolanır. özellik grupları: customers, products, ve click-stream-historical, son bölümde oluşturduğumuz. Eğitim verilerimizi aldıktan sonra, modelimiz için uygun bir girdi elde edebilmek için birkaç değişkeni dönüştürmemiz gerekiyor. İki tür dönüşüm kullanıyoruz: one-hot kodlama ve TF-IDF.

  1. Eğitime yardımcı olması için bu geçmiş verileri almak için oluşturduğumuz Feature Store özellik gruplarını sorgulayalım:
    query = f'''
    select click_stream_customers.customer_id,
           products.product_id,
           rating,
           state,
           age,
           is_married,
           product_name
    from (
        select c.customer_id,
               cs.product_id,
               cs.bought,
               cs.rating,
               c.state,
               c.age,
               c.is_married
        from "{click_stream_historical_table}" as cs
        left join "{customers_table}" as c
        on cs.customer_id = c.customer_id
    ) click_stream_customers
    left join
    (select * from "{products_table}") products
    on click_stream_customers.product_id = products.product_id
    where click_stream_customers.bought = 1
    '''
    
    df_cf_features, query = query_offline_store(click_stream_feature_group_name, query,
                                                sagemaker_session)
    df_cf_features.head()

A Müşteri Kimliği ürün kimliği değerlendirme belirtmek, bildirmek yaş evli Ürün adı
0 C6019 P15581 1.97827 kentucky 51 0 organik çilekli limonata meyve suyu içeceği
1 C1349 P1629 1.76518 nevada 74 0 deniz tuzu bahçe sebzeli cips
2 C3750 P983 2.6721 arkansas 41 1 saç dengesi şampuanı
3 C4537 P399 2.14151 massachusetts 33 1 sade yoğurt
4 C5265 P13699 2.40822 arkansas 44 0 kakao ucu ezilmiş taş zemin organik
  1. Ardından, eğitim için modele besleyebilmemiz için verileri hazırlayın:
    X, y = load_dataset(df_cf_features)

  2. Ardından verileri tren ve test setlerine böldük:
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

  3. Son olarak, kullanarak eğitime başlıyoruz Amazon Adaçayı Yapıcı:
    container = sagemaker.image_uris.retrieve("factorization-machines", region=region)
    
    fm = sagemaker.estimator.Estimator(
        container,
        role,
        instance_count=1,
        instance_type="ml.c5.xlarge",
        output_path=output_prefix,
        sagemaker_session=sagemaker_session,
    )
    
    # Set our hyperparameters
    input_dims = X_train.shape[1]
    fm.set_hyperparameters(
        feature_dim=input_dims,
        predictor_type="regressor",
        mini_batch_size=1000,
        num_factors=64,
        epochs=20,
    )

  4. Aşağıdakileri kullanarak modeli eğitmeye başlayın:
    fm.fit({'train': train_data_location, 'test': test_data_location})

  5. Modelimiz eğitimi tamamladığında, daha sonra kullanmak üzere gerçek zamanlı bir uç nokta dağıtırız:
    cf_model_predictor = fm.deploy(
        endpoint_name = cf_model_endpoint_name,
        initial_instance_count=1,
        instance_type="ml.m4.xlarge",
        serializer=FMSerializer(),
        deserializer=JSONDeserializer(),
        wait=False
    )

sıralama modeli

Biz de eğitiyoruz XGBoost bir müşterinin belirli bir ürünü satın alma eğilimini tahmin etmek için geçmiş verileri bir araya getiren tıklama akışına dayalı model. Ürün kategorisi özellikleriyle birlikte gerçek zamanlı tıklama akışı verilerinde (Feature Store'dan gerçek zamanlı olarak depolanan ve alınan) toplu özellikleri kullanırız. Kullanırız Amazon Kinesis Veri Akışları gerçek zamanlı tıklama akışı verilerini yayınlamak ve Amazon Kinesis Veri Analizi kullanarak akış verilerini toplamak için sendeleyerek pencere sorgusu son 2 dakikalık bir süre boyunca. Bu toplu veriler, daha sonra sıralama modeli tarafından çıkarım için kullanılmak üzere gerçek zamanlı olarak bir çevrimiçi Özellik Mağazası özellik grubunda depolanır. Bu kullanım durumu için tahmin ediyoruz bought, bir kullanıcının bir öğeyi satın alıp almadığını gösteren bir Boole değişkenidir.

  1. Sıralama modelini eğitmek için veri almak için oluşturduğumuz özellik gruplarını sorgulayalım:
    query = f'''
    select bought,
           healthy_activity_last_2m,
           product_health_index,
           customer_health_index,
           product_category
    from (
        select c.customer_health_index,
               cs.product_id,
               cs.healthy_activity_last_2m,
               cs.bought
        from "{click_stream_historical_table}" as cs
        left join "{customers_table}" as c
        on cs.customer_id = c.customer_id
    ) click_stream_customers
    left join
    (select * from "{products_table}") products
    on click_stream_customers.product_id = products.product_id
    '''
    
    df_rank_features, query = query_offline_store(click_stream_feature_group_name, query,
                                                  sagemaker_session)
    df_rank_features.head()

A aldım sağlıklı_aktivite_last_2m ürün_sağlık_index müşteri_sağlık_dizini Ürün Kategorisi
0 0 2 0.9 0.34333 Çay
1 0 0 0.9 0.74873 vitamin_supplements
2 0 0 0.8 0.37688 yoğurt
3 0 0 0.7 0.42828 buzdolabında
4 1 3 0.2 0.24883 chip_pretzels
  1. XGBoost sıralama modeli için verileri hazırlayın:
    df_rank_features = pd.concat([df_rank_features,
        pd.get_dummies(df_rank_features['product_category'],
        prefix='prod_cat')], axis=1)del df_rank_features['product_category']

  2. Verileri tren ve test setlerine ayırın:
    train_data, validation_data, _ = np.split(
        df_rank_features.sample(frac=1, random_state=1729),
            [int(0.7 * len(df_rank_features)),
                int(0.9 * len(df_rank_features))])
    
    train_data.to_csv('train.csv', header=False, index=False)
    
    validation_data.to_csv('validation.csv', header=False, index=False)

  3. Model eğitimine başlayın:
    container = sagemaker.image_uris.retrieve('xgboost', region, version='1.2-2')
    
    xgb = sagemaker.estimator.Estimator(container,
                                        role,
                                        instance_count=1,
                                        instance_type='ml.m4.xlarge',
                                        output_path='s3://{}/{}/output'.format(default_bucket, prefix),
                                        sagemaker_session=sagemaker_session)
    
    xgb.set_hyperparameters(
        max_depth= 5,
        eta= 0.2,
        gamma= 4,
        min_child_weight= 6,
        subsample= 0.7,
        objective= 'binary:logistic',
        num_round= 50,
        verbosity= 2
    )
    
    xgb.fit({'train': s3_input_train, 'validation': s3_input_validation})

  4. Modelimiz eğitimi tamamladığında, daha sonra kullanmak üzere gerçek zamanlı bir uç nokta dağıtırız:
    xgb_predictor = xgb.deploy(
        endpoint_name = ranking_model_endpoint_name,
        initial_instance_count = 1,
        instance_type = 'ml.m4.xlarge',
        serializer = CSVSerializer(),
        wait=False
    )

Kullanıcı etkinliğini simüle edin ve tıklama akışı olaylarını yakalayın

Kullanıcı e-ticaret web sitesiyle etkileşime girdiğinde, etkinliklerini tıklama akışı etkinlikleri biçiminde yakalamanın bir yoluna ihtiyacımız var. İçinde 3_click_stream_kinesis.ipynb notebook, kullanıcı etkinliğini simüle ediyoruz ve bu tıklama akışı olaylarını Kinesis Data Streams ile yakalıyor, Kinesis Data Analytics ile topluyoruz ve bu olayları Feature Store'a alıyoruz. Aşağıdaki diyagram bu iş akışını göstermektedir.

Bir üretici, Kinesis veri akışına tıklama akışı olayları (kullanıcı etkinliğini simüle eder) yayar; Son 2 dakikalık etkinlik için tıklama akışı verilerini toplamak için Kinesis Data Analytics kullanıyoruz.

Son olarak, bir AWS Lambda işlevi, Kinesis Data Analytics'ten verileri alır ve bunları Özellik Deposuna alır (özellikle click_stream özellik grubu).

Ürünleri sepete kaydetme, ürünleri beğenme vb. gibi bir web uygulamasında müşteri tıklama akışını simüle ediyoruz. Bunun için ölçeklenebilir bir gerçek zamanlı akış hizmeti olan Kinesis Data Streams kullanıyoruz.

  1. Tıklama akışı etkinliğini aşağıdaki kodla simüle edin:
    kinesis_client = boto3.client('kinesis')
    kinesis_client.create_stream(StreamName=kinesis_stream_name, ShardCount=1)
    
    active_stream = False
    while not active_stream:
        status = kinesis_client.describe_stream(StreamName=kinesis_stream_name)['StreamDescription']['StreamStatus']
        if (status == 'CREATING'):
            print('Waiting for the Kinesis stream to become active...')
            time.sleep(20)
        elif (status == 'ACTIVE'):
            active_stream = True
            print('ACTIVE')
    
    stream_arn = kinesis_client.describe_stream(StreamName=kinesis_stream_name)['StreamDescription']['StreamARN']
    print(f'Amazon kinesis stream arn: {stream_arn}')

    Sıralama modeli, müşterinin e-ticaret web sitesindeki son 2 dakikalık etkinliğine dayalı olarak sıralanmış ürünleri müşteriye önerir. Akış bilgilerini son 2 dakikalık bir pencerede toplamak için Kinesis Data Analytics kullanıyoruz ve bir Kinesis Data Analytics uygulaması oluşturuyoruz. Kinesis Data Analytics, SQL dönüşümlerini kullanarak Kinesis Data Streams'den bir saniyeden kısa gecikmeyle verileri işleyebilir.

  2. Uygulamayı aşağıdaki kodla oluşturun:
    kda_client = boto3.client('kinesisanalytics')
    
    sql_code = '''
    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
        customer_id VARCHAR(8),
        sum_activity_weight_last_2m INTEGER,
        avg_product_health_index_last_2m DOUBLE);
    
    CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
    SELECT
        STREAM CUSTOMER_ID,
        SUM(ACTIVITY_WEIGHT) AS sum_activity_weight_last_2m,
        AVG(PRODUCT_HEALTH_INDEX) AS avg_product_health_index_last_2m
    FROM
        "SOURCE_SQL_STREAM_001"
    WINDOWED BY STAGGER (    PARTITION BY CUSTOMER_ID RANGE INTERVAL '2' MINUTE);
    '''

  3. Kinesis veri akışındaki verilerin Kinesis Data Analytics uygulamasındaki SQL sorguları için nasıl kullanılabilir hale getirildiğini tanımlamak için aşağıdaki giriş şemasını kullanın:
    kda_input_schema = [{
                    'NamePrefix': 'SOURCE_SQL_STREAM',
                    'KinesisStreamsInput': {
                           'ResourceARN': stream_arn,
                           'RoleARN': role
                    },
                    'InputSchema': {
                          'RecordFormat': {
                              'RecordFormatType': 'JSON',
                              'MappingParameters': {
                                  'JSONMappingParameters': {
                                      'RecordRowPath': '$'
                                  }
                              },
                          },
                          'RecordEncoding': 'UTF-8',
                          'RecordColumns': [
                              {'Name': 'EVENT_TIME',  'Mapping': '$.event_time',   'SqlType': 'TIMESTAMP'},
                              {'Name': 'CUSTOMER_ID','Mapping': '$.customer_id', 'SqlType': 'VARCHAR(8)'},
                              {'Name': 'PRODUCT_ID', 'Mapping': '$.product_id', 'SqlType': 'VARCHAR(8)'},
                              {'Name': 'PRODUCT_CATEGORY', 'Mapping': '$.product_category', 'SqlType': 'VARCHAR(20)'},
                              {'Name': 'HEALTH_CATEGORY', 'Mapping': '$.health_category', 'SqlType': 'VARCHAR(10)'},
                              {'Name': 'ACTIVITY_TYPE', 'Mapping': '$.activity_type', 'SqlType': 'VARCHAR(10)'},
                              {'Name': 'ACTIVITY_WEIGHT', 'Mapping': '$.activity_weight', 'SqlType': 'INTEGER'},
                              {'Name': 'PRODUCT_HEALTH_INDEX', 'Mapping': '$.product_health_index', 'SqlType': 'DOUBLE'}
                          ]
                    }
                  }
                 ]

    Şimdi Kinesis Data Analytics uygulamamızdan çıktı almak ve bu verileri Feature Store'a almak için bir Lambda işlevi oluşturmamız gerekiyor. Spesifik olarak, bu verileri click stream özellik grubu.

  4. kullanarak Lambda işlevini oluşturun. lambda-stream.py GitHub'daki kod.
  5. Ardından Lambda ARN ve hedef şemasını içeren bir çıktı şeması tanımlarız:
    kda_output_schema = [{'LambdaOutput': {'ResourceARN': lambda_function_arn,
        'RoleARN': role},'Name': 'DESTINATION_SQL_STREAM','DestinationSchema':
            {'RecordFormatType': 'JSON'}}]
    print(f'KDA output schema: {kda_output_schema}')

    Ardından, Kinesis Data Analytics uygulamasını oluşturmak için API'yi çağırıyoruz. Bu uygulama, giriş, çıkış şemaları ve Lambda işlevi kullanılarak daha önce sağlanan SQL'i kullanarak Kinesis Veri Akışlarından gelen akış verilerini toplar.

  6. API'yi aşağıdaki kodla çağırın:
    creating_app = False
    while not creating_app:
        response = kda_client.create_application(ApplicationName=kinesis_analytics_application_name,
                                  Inputs=kda_input_schema,
                                  Outputs=kda_output_schema,
                                  ApplicationCode=sql_code)
        status = response['ApplicationSummary']['ApplicationStatus']
        if (status != 'READY'):
            print('Waiting for the Kinesis Analytics Application to be in READY state...')
            time.sleep(20)
        elif (status == 'READY'):
            creating_app = True
            print('READY')

  7. Uygulama durumu Ready, Kinesis Data Analytics uygulamasını başlatıyoruz:
    kda_client.start_application(ApplicationName=kinesis_analytics_application_name,
        InputConfigurations=[{'Id': '1.1',
            'InputStartingPositionConfiguration':{'InputStartingPosition':'NOW'}}])

  8. Bu atölye için, bir web sitesinde oluşturulan tıklama akışı olaylarını simüle eden ve Kinesis veri akışına gönderen iki yardımcı işlev oluşturduk:
    def generate_click_stream_data(customer_id, product_health_index_low, product_health_index_high):
        # Let's get some random product categories to help us generate click stream data
        query = f'''
        select product_category,
               product_health_index,
               product_id
        from "{products_table}"
        where product_health_index between {product_health_index_low} and {product_health_index_high}
        order by random()
        limit 1
        '''
    
        event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
        random_products_df, query = query_offline_store(products_feature_group_name, query,
                                                        sagemaker_session)
        # Pick randon activity type and activity weights
        activities = ['liked', 'added_to_cart', 'added_to_wish_list', 'saved_for_later']
        activity_weights_dict = {'liked': 1, 'added_to_cart': 2,
                                'added_to_wish_list': 1, 'saved_for_later': 2}
        random_activity_type = random.choice(activities)
        random_activity_weight = activity_weights_dict[random_activity_type]
    
        data = {
            'event_time': event_time.isoformat(),
            'customer_id': customer_id,
            'product_id': random_products_df.product_id.values[0],
            'product_category': random_products_df.product_category.values[0],
            'activity_type': random_activity_type,
            'activity_weight': random_activity_weight,
            'product_health_index': random_products_df.product_health_index.values[0]
        }
        return data
    
    def put_records_in_kinesis_stream(customer_id, product_health_index_low,product_health_index_high):
        for i in range(n_range):
            data = generate_click_stream_data(customer_id, product_health_index_low, product_health_index_high)
            print(data)
    
            kinesis_client = boto3.client('kinesis')
            response = kinesis_client.put_record(
                StreamName=kinesis_stream_name,
                Data=json.dumps(data),
                PartitionKey="partitionkey")

Şimdi, tıklama akışı verilerimizi Kinesis Data Streams ve Kinesis Data Analytics aracılığıyla Feature Store'a aktaralım. İçin inference_customer_id, 0.1-0.3'lük daha düşük bir sağlık endeksi aralığı kullanarak çerezler, dondurma ve şekerleme gibi sağlıksız ürünler için bir müşteri tarama düzenini simüle ediyoruz.

Veri akışına alınan ve Kinesis Data Analytics tarafından tek bir kayıtta toplanan ve daha sonra veri akışına alınan altı kayıt üretiyoruz. click stream Feature Store'daki özellik grubu. Bu işlem 2 dakika sürmelidir.

  1. Tıklama akışı verilerini aşağıdaki kodla alın:
    put_records_in_kinesis_stream(inference_customer_id, 0.1, 0.3)
    # It takes 2 minutes for KDA to call lambda to update feature store
    # because we are capturing 2 minute interval of customer activity
    time.sleep(120)

  2. Verilerin şimdi içinde olduğundan emin olun. click_stream özellik grubu:
    record = featurestore_runtime.get_record(
                FeatureGroupName=click_stream_feature_group_name,
                RecordIdentifierValueAsString=inference_customer_id)
    
    print(f'Online feature store data for customer id {inference_customer_id}')
    print(f'Record: {record}')

Gerçek zamanlı önerilerde bulunun

Aşağıdaki şema, gerçek zamanlı önerilerin nasıl sağlandığını gösterir.

Model eğitilip ayarlandıktan sonra, model, uygulamanın belirli bir kullanıcı için öğelerle ilgili gerçek zamanlı öneriler için bir API üzerinden sorgulayabileceği canlı bir uç noktanın arkasına dağıtılır. Ortak filtre modeli, belirli kullanıcılar için geçmiş siparişlere ve gösterimlere dayalı olarak çevrimdışı öneriler üretir. Tıklama akışı, son taramadaki tüm olayları toplar ve bu girdiyi, uygulamanın kullanıcıya görüntülenmesi için en iyi N önerilerini üreten sıralama modeline sağlar.

Bakın 4_realtime_recommendations.ipynb GitHub'da not defteri.

  1. İlk adım, bir Predictor tahminler yapmak için kullanabilmemiz için işbirlikçi filtreleme modeli uç noktamızdan (daha önce oluşturduğumuz) nesne:
    # Make sure model has finished deploying
    existing_endpoints = sagemaker_session.sagemaker_client.list_endpoints(
        NameContains=cf_model_endpoint_name, MaxResults=30)["Endpoints"]
    while not existing_endpoints:
        time.sleep(60)
        existing_endpoints = sagemaker_session.sagemaker_client.list_endpoints(
            NameContains=cf_model_endpoint_name, MaxResults=30)["Endpoints"]
    
    cf_model_predictor = sagemaker.predictor.Predictor(
                           endpoint_name=cf_model_endpoint_name,
                           sagemaker_session=sagemaker_session,
                           serializer=FMSerializer(),
                           deserializer=JSONDeserializer())

  2. Ardından, belirli bir müşteri için ilk önerilerimizi almak için önbelleğe alınan verileri bu tahmin aracına iletiriz:
    # Pass in our cached data as input to the Collaborative Filtering
    modelpredictions = cf_model_predictor.predict(cf_inference_payload)['predictions']
    
    # Add those predictions to the input DataFrame
    predictions = [prediction["score"] for prediction in predictions]
    cf_inference_df['predictions'] = predictions
    
    # Sort by predictions and take top 10
    cf_inference_df = cf_inference_df.sort_values(
        by='predictions', ascending=False).head(10).reset_index()

  3. Bu müşteri için ilk önerileri görelim:
    cf_inference_df

A indeks Müşteri Kimliği ürün kimliği belirtmek, bildirmek yaş evli Ürün adı Öngörüler
0 1 C3571 P10682 maine 35 0 mini kekler doğum günü pastası 1.65686
1 6 C3571 P6176 maine 35 0 simit "kabukları" 1.64399
2 13 C3571 P7822 maine 35 0 yağ giderici 1.62522
3 14 C3571 P1832 maine 35 0 ustaca demlenmiş kombucha 1.60065
4 5 C3571 P6247 maine 35 0 meyve yumruk kükreyen sular 1.5686
5 8 C3571 P11086 maine 35 0 badem mini fındık-ince çedar peyniri 1.54271
6 12 C3571 P15430 maine 35 0 organik domuz pirzolası baharatı 1.53585
7 4 C3571 P4152 maine 35 0 beyaz çedar tavşanları 1.52764
8 2 C3571 P16823 maine 35 0 piruet çikolatalı şekerleme krema dolgulu gofretler 1.51293
9 9 C3571 P9981 maine 35 0 kafeinsiz çay, vanilyalı chai 1.483

şimdi bir tane oluşturuyoruz Predictor Sıralama modeli uç noktamızdan (daha önce oluşturduğumuz) nesne alırız, böylece müşteri için son etkinliklerini kullanarak tahminler almak için kullanabiliriz. Yardımcı komut dosyalarını kullanarak en son davranışı simüle ettiğimizi ve Kinesis Veri Akışlarını kullanarak onu akışa aktardığımızı unutmayın. click stream özellik grubu.

  1. oluşturmak Predictor Aşağıdaki koda sahip nesne:
    # Make sure model has finished deploying
    existing_endpoints = sagemaker_session.sagemaker_client.list_endpoints(
            NameContains=ranking_model_endpoint_name, MaxResults=30)["Endpoints"]
    
    while not existing_endpoints:
        time.sleep(60)
        existing_endpoints = sagemaker_session.sagemaker_client.list_endpoints(
            NameContains=ranking_model_endpoint_name, MaxResults=30)["Endpoints"]
    
    ranking_model_predictor = sagemaker.predictor.
                            Predictor(endpoint_name=ranking_model_endpoint_name,
                            sagemaker_session=sagemaker_session,
                            serializer = CSVSerializer())

  2. Sıralama modelinin girdisini oluşturmak için, eğitimde yaptığımız gibi ürün kategorilerini one-hot kodlamamız gerekiyor:
    query = f'''select product_categoryfrom "{products_table}"order by product_category'''
    
    product_categories_df, query = query_offline_store(
                                    products_feature_group_name, query,sagemaker_session)
    
    one_hot_cat_features = product_categories_df.product_category.unique()
    
    df_one_hot_cat_features = pd.DataFrame(one_hot_cat_features)
    
    df_one_hot_cat_features.columns = ['product_category']
    df_one_hot_cat_features = pd.concat([df_one_hot_cat_features,
       pd.get_dummies(df_one_hot_cat_features['product_category'], prefix='cat')],axis=1)

    Şimdi, işbirlikçi filtreleme modelinden çıktıyı almak ve onu tek-sıcak kodlanmış ürün kategorileri ve tıklama akışı özellik grubumuzdaki gerçek zamanlı tıklama akışı verileriyle birleştirmek için bir işlev oluşturuyoruz, çünkü bu veriler önerilen ürünlerin sıralamasını etkileyecektir. Aşağıdaki diyagram bu işlemi göstermektedir.

  3. Aşağıdaki kodla işlevi oluşturun:
    def get_ranking_model_input_data(df, df_one_hot_cat_features):
        product_category_list = []
        product_health_index_list = []
    
        customer_id = df.iloc[0]['customer_id']
        # Get customer features from customers_feature_group_name
        customer_record = featurestore_runtime.get_record(FeatureGroupName=customers_feature_group_name,
                                                          RecordIdentifierValueAsString=customer_id,
                                                          FeatureNames=['customer_health_index'])
    
        customer_health_index = customer_record['Record'][0]['ValueAsString']
    
        # Get product features (instead of looping, you can optionally use
        # the `batch_get_record` Feature Store API)
        for index, row_tuple in df.iterrows():
    
            product_id = row_tuple['product_id']
    
            # Get product features from products_feature_group_name
            product_record = featurestore_runtime.get_record(FeatureGroupName=products_feature_group_name,
                                                             RecordIdentifierValueAsString=product_id,
                                                             FeatureNames=['product_category',
                                                                           'product_health_index'])
    
            product_category = product_record['Record'][0]['ValueAsString']
            product_health_index = product_record['Record'][1]['ValueAsString']
    
            product_category_list.append(product_category)
            product_health_index_list.append(product_health_index)
    
    
    
        # Get click stream features from customers_click_stream_feature_group_name
        click_stream_record = featurestore_runtime.get_record(FeatureGroupName=click_stream_feature_group_name,
                                                              RecordIdentifierValueAsString=customer_id,
                                                              FeatureNames=['sum_activity_weight_last_2m',
                                                                      'avg_product_health_index_last_2m'])
    
        # Calculate healthy_activity_last_2m as this will influence ranking as well
        sum_activity_weight_last_2m = click_stream_record['Record'][0]['ValueAsString']
        avg_product_health_index_last_2m = click_stream_record['Record'][1]['ValueAsString']
        healthy_activity_last_2m = int(sum_activity_weight_last_2m) * float(avg_product_health_index_last_2m)
    
        data = {'healthy_activity_last_2m': healthy_activity_last_2m,
                'product_health_index': product_health_index_list,
                'customer_health_index': customer_health_index,
                'product_category': product_category_list}
    
        ranking_inference_df = pd.DataFrame(data)
        ranking_inference_df = ranking_inference_df.merge(df_one_hot_cat_features, on='product_category',
                                                          how='left')
        del ranking_inference_df['product_category']
    
        return ranking_inference_d

  4. İşbirlikçi filtreleme öngörücüsünden önerilen ürünlerin ilk listesindeki sıralamayı etkilemek için Feature Store'a aktarılan verileri kullanarak gerçek zamanlı kişiselleştirilmiş ürün önerileri almak için oluşturduğumuz işlevi çağırarak her şeyi bir araya getirelim:
    # Construct input data for the ranking model
    ranking_inference_df = get_ranking_model_input_data(
                                cf_inference_df, df_one_hot_cat_features)
    
    # Get our ranked product recommendations and attach the predictions to the model input
    ranking_inference_df['propensity_to_buy'] = ranking_model_predictor.predict(
                            ranking_inference_df.to_numpy()).decode('utf-8').split(',')

  5. Artık kişiselleştirilmiş dereceli önerilerimiz olduğuna göre, önerilen ilk beş ürünün neler olduğuna bakalım:
    # Join all the data back together for inspection
    personalized_recommendations = pd.concat([cf_inference_df[['customer_id',
    'product_id', 'product_name']],ranking_inference_df[['propensity_to_buy']]], axis=1)
    
    # And sort by propensity to buy
    personalized_recommendations.sort_values(by='propensity_to_buy',
        ascending=False)[['product_id','product_name']].reset_index(drop=True).head(5)

Temizlemek

Bu çözümü kullanmayı bitirdiğinizde, 5_cleanup.ipynb Bu gönderinin bir parçası olarak oluşturduğunuz kaynakları temizlemek için not defteri.

Sonuç

Bu gönderide, bir öneri modeline yönelik eğitimi hızlandırmak ve son davranışsal olaylara dayalı tahminlerin doğruluğunu artırmak için SageMaker Özellik Mağazasını kullandık. Özellik grupları ile çevrimdışı ve çevrimiçi mağazalar kavramlarını ve bunların birlikte nasıl çalıştıklarını, işletmelerin makine öğrenimi ile karşılaştıkları ortak zorlukları nasıl çözdüklerini ve öneri sistemleri gibi karmaşık kullanım durumlarını nasıl çözdüklerini tartıştık. Bu gönderi, atölye AWS re:Invent 2021'de canlı olarak gerçekleştirildi. Okuyucuları Feature Store'un tasarımını ve dahili çalışmalarını kavramak için bu gönderiyi kullanmaya ve atölyeyi denemeye teşvik ediyoruz.


Yazar Hakkında

Arnab Sinha AWS için Kıdemli Çözüm Mimarıdır ve müşterilerin veri merkezi geçişleri, dijital dönüşüm ve uygulama modernizasyonu, büyük veri analitiği ve AIML genelinde iş sonuçlarını destekleyen ölçeklenebilir çözümler tasarlamasına ve oluşturmasına yardımcı olmak için Saha CTO'su olarak görev yapar. Perakende, imalat, sağlık ve yaşam bilimleri ve tarım dahil olmak üzere çeşitli sektörlerdeki müşterileri desteklemiştir. Arnab, ML Specialty Certification da dahil olmak üzere dokuz AWS Sertifikasına sahiptir. AWS'ye katılmadan önce Arnab, 21 yılı aşkın bir süredir teknoloji lideri, Baş Kurumsal Mimar ve yazılım mühendisiydi.

 Bobby Lindsey Amazon Web Services'de Makine Öğrenimi Uzmanıdır. On yılı aşkın bir süredir teknolojinin içinde, çeşitli teknolojileri ve birden çok rolü kapsıyor. Şu anda yazılım mühendisliği, DevOps ve makine öğrenimi alanındaki geçmişini birleştirerek müşterilerin makine öğrenimi iş akışlarını uygun ölçekte sunmasına yardımcı olmaya odaklanmıştır. Boş zamanlarında okumaktan, araştırmaktan, yürüyüş yapmaktan, bisiklete binmekten ve patika koşularından hoşlanır.

vikram elango Virginia ABD merkezli Amazon Web Services'de AI/ML Uzman Çözüm Mimarıdır. Vikram, finans ve sigorta sektörü müşterilerine, makine öğrenimi uygulamalarını geniş ölçekte oluşturmak ve dağıtmak için tasarım, düşünce liderliği ile yardımcı olur. Şu anda kurum genelinde doğal dil işleme, sorumlu yapay zeka, çıkarım optimizasyonu ve makine öğrenimini ölçeklendirmeye odaklanıyor. Boş zamanlarında ailesiyle seyahat etmeyi, yürüyüş yapmayı, yemek yapmayı ve kamp yapmayı sever.

Mark Roy AWS için Başlıca Makine Öğrenimi Mimarıdır ve müşterilerin AI / ML çözümleri tasarlamasına ve oluşturmasına yardımcı olur. Mark'ın çalışması, temel ilgi alanı bilgisayarla görme, derin öğrenme ve kurum genelinde ML'yi ölçeklendirmeye yönelik çok çeşitli makine öğrenimi kullanım örneklerini kapsar. Sigorta, finansal hizmetler, medya ve eğlence, sağlık hizmetleri, kamu hizmetleri ve imalat dahil birçok sektördeki şirketlere yardım etti. Mark, ML Uzmanlık Sertifikası da dahil olmak üzere altı AWS sertifikasına sahiptir. Mark, AWS'ye katılmadan önce, 25 yılı finansal hizmetler dahil olmak üzere 19 yılı aşkın süredir mimar, geliştirici ve teknoloji lideriydi.

spot_img

En Son İstihbarat

spot_img