شعار زيفيرنت

أفضل الممارسات لتنفيذ التحليلات في الوقت الفعلي تقريبًا باستخدام Amazon Redshift Streaming Ingestion مع Amazon MSK | خدمات الويب الأمازون

التاريخ:

الأمازون الأحمر هو مستودع بيانات سحابي مُدار بالكامل وقابل للتطوير يعمل على تسريع وقتك للوصول إلى الرؤى من خلال تحليلات سريعة ومباشرة وآمنة على نطاق واسع. يعتمد عشرات الآلاف من العملاء على Amazon Redshift لتحليل الإكزا بايت من البيانات وتشغيل الاستعلامات التحليلية المعقدة، مما يجعله مستودع البيانات السحابية الأكثر استخدامًا. يمكنك تشغيل التحليلات وتوسيع نطاقها في ثوانٍ على جميع بياناتك، دون الحاجة إلى إدارة البنية الأساسية لمستودع البيانات لديك.

يمكنك استخدام الأمازون الانزياح الأحمر المتدفق القدرة على تحديث قواعد بيانات التحليلات الخاصة بك في الوقت الحقيقي تقريبًا. يعمل استيعاب تدفق Amazon Redshift على تبسيط مسارات البيانات من خلال السماح لك بإنشاء طرق عرض مادية مباشرةً أعلى تدفقات البيانات. باستخدام هذه الإمكانية في Amazon Redshift، يمكنك استخدام لغة الاستعلام الهيكلية (SQL) للاتصال بالبيانات واستيعابها مباشرة من تدفقات البيانات، مثل الأمازون كينسيس دفق البيانات or Amazon Managed Streaming لأباتشي كافكا (Amazon MSK) تدفقات البيانات، وسحب البيانات مباشرة إلى Amazon Redshift.

في هذا المنشور، نناقش أفضل الممارسات لتنفيذ التحليلات في الوقت الفعلي تقريبًا باستخدام تدفق Amazon Redshift المتدفق مع Amazon MSK.

نظرة عامة على الحل

نحن نسير عبر مسار مثال لاستيعاب البيانات من موضوع MSK في Amazon Redshift باستخدام استيعاب تدفق Amazon Redshift. نعرض أيضًا كيفية إلغاء تداخل بيانات JSON باستخدام التدوين النقطي في Amazon Redshift. يوضح الرسم البياني التالي بنية الحل لدينا.

يتكون تدفق العملية من الخطوات التالية:

  1. قم بإنشاء عرض متدفق في مجموعة Redshift الخاصة بك لاستهلاك بيانات البث المباشر من موضوعات MSK.
  2. استخدم الإجراء المخزن لتنفيذ التقاط بيانات التغيير (CDC) باستخدام المجموعة الفريدة من Kafka Partition وKafka Offset على مستوى السجل لموضوع MSK الذي تم استيعابه.
  3. قم بإنشاء جدول يواجه المستخدم في مجموعة Redshift واستخدم التدوين النقطي لإلغاء تداخل مستند JSON من العرض المادي المتدفق في أعمدة بيانات الجدول. يمكنك تحميل البيانات الجديدة بشكل مستمر عن طريق استدعاء الإجراء المخزن على فترات زمنية منتظمة.
  4. إنشاء الاتصال بين أمازون QuickSight لوحة القيادة وAmazon Redshift لتقديم التصورات والرؤى.

كجزء من هذه التدوينة، نناقش أيضًا المواضيع التالية:

  • خطوات تكوين البث عبر الحسابات من Amazon MSK إلى Amazon Redshift
  • أفضل الممارسات لتحقيق الأداء الأمثل من خلال دفق العروض الفعلية
  • تقنيات المراقبة لتتبع حالات الفشل في استيعاب تدفق Amazon Redshift

المتطلبات الأساسية المسبقة

يجب أن يكون لديك ما يلي:

  • حساب AWS.
  • أحد الموارد التالية، اعتمادًا على حالة الاستخدام الخاصة بك:
  • مجموعة MSK. للحصول على التعليمات، راجع قم بإنشاء مجموعة Amazon MSK.
  • موضوع في مجموعة MSK الخاصة بك حيث يمكن لمنتج البيانات الخاص بك نشر البيانات.
  • منتج بيانات لكتابة البيانات إلى الموضوع في مجموعة MSK الخاصة بك.

اعتبارات أثناء إعداد موضوع MSK الخاص بك

ضع في اعتبارك الاعتبارات التالية عند تكوين موضوع MSK الخاص بك:

  • تأكد من أن اسم موضوع MSK الخاص بك لا يزيد عن 128 حرفًا.
  • حتى كتابة هذه السطور، لا يمكن الاستعلام مباشرة عن سجلات MSK التي تحتوي على بيانات مضغوطة في Amazon Redshift. لا يدعم Amazon Redshift أي طرق أصلية لإلغاء الضغط للبيانات المضغوطة من جانب العميل في موضوع MSK.
  • تابعني: أفضل الممارسات أثناء إعداد مجموعة MSK الخاصة بك.
  • راجع عملية استيعاب البث القيود لأية اعتبارات أخرى.

إعداد البث المباشر

لإعداد البث المباشر ، أكمل الخطوات التالية:

  1. إعداد إدارة الهوية والوصول AWS دور (IAM) وسياسة الثقة المطلوبة للعرض المتدفق. للحصول على التعليمات، راجع إعداد IAM وتنفيذ البث المتدفق من Kafka.
  2. تأكد من تدفق البيانات إلى موضوع MSK الخاص بك باستخدام الأمازون CloudWatch المقاييس (فمثلا، BytesOutPerSec).
  3. قم بتشغيل محرر الاستعلام v2 من وحدة تحكم Amazon Redshift أو استخدم عميل SQL المفضل لديك للاتصال بمجموعة Redshift الخاصة بك للخطوات التالية. تم تشغيل الخطوات التالية في محرر الاستعلام v2.
  4. قم بإنشاء مخطط خارجي لتعيينه إلى مجموعة MSK. استبدل دور IAM ARN ومجموعة MSK ARN في العبارة التالية:
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  'iam-role-arn' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn';
    

  5. اختياريًا، إذا كانت أسماء موضوعاتك حساسة لحالة الأحرف، فستحتاج إلى تمكينها enable_case_sensitive_identifier لتتمكن من الوصول إليها في Amazon Redshift. لاستخدام معرفات حساسة لحالة الأحرف، قم بتعيين enable_case_sensitive_identifier إلى true على مستوى الجلسة أو المستخدم أو المجموعة:
    SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;

  6. قم بإنشاء عرض مادي لاستهلاك بيانات الدفق من موضوع MSK:
    CREATE MATERIALIZED VIEW Orders_Stream_MV AS
    SELECT kafka_partition, 
     kafka_offset, 
     refresh_time,
     JSON_PARSE(kafka_value) as Data
    FROM custschema."ORDERTOPIC"
    WHERE CAN_JSON_PARSE(kafka_value);
    

عمود البيانات التعريفية kafka_value التي تصل من Amazon MSK يتم تخزينها فيها فاريت التنسيق في Amazon Redshift. لهذا المنصب، يمكنك استخدام JSON_PARSE وظيفة للتحويل kafka_value ل نوع البيانات سوبر. يمكنك أيضًا استخدام CAN_JSON_PARSE تعمل في حالة المرشح لتخطي سجلات JSON غير الصالحة والحماية من الأخطاء الناجمة عن فشل تحليل JSON. نناقش كيفية تخزين البيانات غير الصالحة لتصحيح الأخطاء في المستقبل لاحقًا في هذا المنشور.

  1. قم بتحديث العرض المادي المتدفق، مما يؤدي إلى تشغيل Amazon Redshift للقراءة من موضوع MSK وتحميل البيانات إلى العرض المادي:
    REFRESH MATERIALIZED VIEW Orders_Stream_MV;

يمكنك أيضًا ضبط طريقة العرض المتدفقة لاستخدام إمكانيات التحديث التلقائي. سيؤدي هذا إلى تحديث العرض الفعلي تلقائيًا عند وصول البيانات إلى الدفق. يرى إنشاء عرض ملموس للحصول على تعليمات لإنشاء عرض حقيقي مع التحديث التلقائي.

قم بفك مستند JSON

فيما يلي عينة من مستند JSON الذي تم استيعابه من موضوع MSK إلى عمود البيانات من النوع SUPER في العرض الفعلي المتدفق Orders_Stream_MV:

{
   "EventType":"Orders",
   "OrderID":"103",
   "CustomerID":"C104",
   "CustomerName":"David Smith",
   "OrderDate":"2023-09-02",
   "Store_Name":"Store-103",
   "ProductID":"P004",
   "ProductName":"Widget-X-003",
   "Quatity":"5",
   "Price":"2500",
   "OrderStatus":"Initiated"
}

استخدم التدوين النقطي كما هو موضح في الكود التالي لإلغاء تداخل حمولة JSON الخاصة بك:

SELECT 
    data."OrderID"::INT4 as OrderID
    ,data."ProductID"::VARCHAR(36) as ProductID
    ,data."ProductName"::VARCHAR(36) as ProductName
    ,data."CustomerID"::VARCHAR(36) as CustomerID
    ,data."CustomerName"::VARCHAR(36) as CustomerName
    ,data."Store_Name"::VARCHAR(36) as Store_Name
    ,data."OrderDate"::TIMESTAMPTZ as OrderDate
    ,data."Quatity"::INT4 as Quatity
    ,data."Price"::DOUBLE PRECISION as Price
    ,data."OrderStatus"::VARCHAR(36) as OrderStatus
    ,"kafka_partition"::BIGINT  
    ,"kafka_offset"::BIGINT
FROM orders_stream_mv;

توضح لقطة الشاشة التالية كيف تبدو النتيجة بعد فك التداخل.

إذا كان لديك مصفوفات في مستند JSON الخاص بك، ففكر في إلغاء تداخل بياناتك باستخدام بارتيQL البيانات في Amazon Redshift. لمزيد من المعلومات، راجع القسم قم بفك مستند JSON في المنشور تحليلات في الوقت الفعلي تقريبًا باستخدام تدفق تدفق Amazon Redshift مع Amazon Kinesis Data Streams وAmazon DynamoDB.

استراتيجية تحميل البيانات المتزايدة

أكمل الخطوات التالية لتنفيذ تحميل بيانات تزايدي:

  1. قم بإنشاء جدول يسمى الطلبات في Amazon Redshift، والذي سيستخدمه المستخدمون النهائيون للتصور وتحليل الأعمال:
    CREATE TABLE public.Orders (
        orderid integer ENCODE az64,
        productid character varying(36) ENCODE lzo,
        productname character varying(36) ENCODE lzo,
        customerid character varying(36) ENCODE lzo,
        customername character varying(36) ENCODE lzo,
        store_name character varying(36) ENCODE lzo,
        orderdate timestamp with time zone ENCODE az64,
        quatity integer ENCODE az64,
        price double precision ENCODE raw,
        orderstatus character varying(36) ENCODE lzo
    ) DISTSTYLE AUTO;
    

بعد ذلك، يمكنك إنشاء إجراء مخزن يسمى SP_Orders_Load لتنفيذ CDC من عرض متدفق وتحميله إلى النهائي Orders طاولة. يمكنك استخدام مزيج من Kafka_Partition و Kafka_Offset متاح في العرض المتدفق كأعمدة نظام لتنفيذ CDC. سيكون الجمع بين هذين العمودين فريدًا دائمًا ضمن موضوع MSK، مما يضمن عدم فقدان أي من السجلات أثناء العملية. يحتوي الإجراء المخزن على المكونات التالية:

  • لاستخدام معرفات حساسة لحالة الأحرف، قم بتعيين enable_case_sensitive_identifier إلى true إما على مستوى الجلسة أو المستخدم أو المجموعة.
  • قم بتحديث العرض الفعلي للبث يدويًا إذا لم يتم تمكين التحديث التلقائي.
  • قم بإنشاء جدول تدقيق يسمى Orders_Streaming_Audit إذا لم يكن موجودًا لتتبع الإزاحة الأخيرة للقسم الذي تم تحميله في جدول الطلبات أثناء التشغيل الأخير للإجراء المخزن.
  • قم بفك التداخل وإدراج البيانات الجديدة أو المتغيرة فقط في جدول مرحلي يسمى Orders_Staging_Table، القراءة من وجهة النظر المتدفقة Orders_Stream_MV، حيث Kafka_Offset أكبر من المعالجة الأخيرة Kafka_Offset مسجلة في جدول المراجعة Orders_Streaming_Audit ل Kafka_Partition يتم معالجتها.
  • عند التحميل لأول مرة باستخدام هذا الإجراء المخزن، لن تكون هناك أي بيانات في الملف Orders_Streaming_Audit الجدول وجميع البيانات من Orders_Stream_MV سيتم تحميلها في جدول الطلبات.
  • أدخل فقط الأعمدة ذات الصلة بالأعمال في الواجهة التي تواجه المستخدم Orders الجدول، واختيار من الجدول التدريج Orders_Staging_Table.
  • أدخل الحد الأقصى Kafka_Offset لكل محملة Kafka_Partition في جدول التدقيق Orders_Streaming_Audit

لقد أضفنا جدول التدريج المتوسط Orders_Staging_Table في هذا الحل للمساعدة في تصحيح الأخطاء في حالة حدوث أعطال غير متوقعة وإمكانية التتبع. تخطي خطوة التدريج والتحميل مباشرة إلى الجدول النهائي من Orders_Stream_MV يمكن أن يوفر زمن وصول أقل اعتمادًا على حالة الاستخدام الخاصة بك.

  1. قم بإنشاء الإجراء المخزن بالكود التالي:
    CREATE OR REPLACE PROCEDURE SP_Orders_Load()
        AS $$
        BEGIN
    
        SET ENABLE_CASE_SENSITIVE_IDENTIFIER TO TRUE;
        REFRESH MATERIALIZED VIEW Orders_Stream_MV;
    
        --create an audit table if not exists to keep track of Max Offset per Partition that was loaded into Orders table  
    
        CREATE TABLE IF NOT EXISTS Orders_Streaming_Audit
        (
        "kafka_partition" BIGINT,
        "kafka_offset" BIGINT
        )
        SORTKEY("kafka_partition", "kafka_offset"); 
    
        DROP TABLE IF EXISTS Orders_Staging_Table;  
    
        --Insert only newly available data into staging table from streaming View based on the max offset for new/existing partitions
      --When loading for 1st time i.e. there is no data in Orders_Streaming_Audit table then all the data gets loaded from streaming View  
        CREATE TABLE Orders_Staging_Table as 
        SELECT 
        data."OrderID"."N"::INT4 as OrderID
        ,data."ProductID"."S"::VARCHAR(36) as ProductID
        ,data."ProductName"."S"::VARCHAR(36) as ProductName
        ,data."CustomerID"."S"::VARCHAR(36) as CustomerID
        ,data."CustomerName"."S"::VARCHAR(36) as CustomerName
        ,data."Store_Name"."S"::VARCHAR(36) as Store_Name
        ,data."OrderDate"."S"::TIMESTAMPTZ as OrderDate
        ,data."Quatity"."N"::INT4 as Quatity
        ,data."Price"."N"::DOUBLE PRECISION as Price
        ,data."OrderStatus"."S"::VARCHAR(36) as OrderStatus
        , s."kafka_partition"::BIGINT , s."kafka_offset"::BIGINT
        FROM Orders_Stream_MV s
        LEFT JOIN (
        SELECT
        "kafka_partition",
        MAX("kafka_offset") AS "kafka_offset"
        FROM Orders_Streaming_Audit
        GROUP BY "kafka_partition"
        ) AS m
        ON nvl(s."kafka_partition",0) = nvl(m."kafka_partition",0)
        WHERE
        m."kafka_offset" IS NULL OR
        s."kafka_offset" > m."kafka_offset";
    
        --Insert only business relevant column to final table selecting from staging table
        Insert into Orders 
        SELECT 
        OrderID
        ,ProductID
        ,ProductName
        ,CustomerID
        ,CustomerName
        ,Store_Name
        ,OrderDate
        ,Quatity
        ,Price
        ,OrderStatus
        FROM Orders_Staging_Table;
    
        --Insert the max kafka_offset for every loaded Kafka partitions into Audit table 
        INSERT INTO Orders_Streaming_Audit
        SELECT kafka_partition, MAX(kafka_offset)
        FROM Orders_Staging_Table
        GROUP BY kafka_partition;   
    
        END;
        $$ LANGUAGE plpgsql;
    

  2. قم بتشغيل الإجراء المخزن لتحميل البيانات إلى الملف Orders الجدول:
    call SP_Orders_Load();

  3. التحقق من صحة البيانات في جدول الطلبات.

إنشاء استيعاب متدفق عبر الحسابات

إذا كانت مجموعة MSK الخاصة بك تنتمي إلى حساب مختلف، فأكمل الخطوات التالية لإنشاء أدوار IAM لإعداد عرض البث عبر الحسابات. لنفترض أن مجموعة Redshift موجودة في الحساب A ومجموعة MSK موجودة في الحساب B، كما هو موضح في الرسم البياني التالي.

أكمل الخطوات التالية:

  1. في الحساب B، قم بإنشاء دور IAM يسمى MyRedshiftMSKRole الذي يسمح لـ Amazon Redshift (الحساب A) بالتواصل مع مجموعة MSK (الحساب B) المسماة MyTestCluster. اعتمادًا على ما إذا كانت مجموعة MSK الخاصة بك تستخدم مصادقة IAM أو الوصول غير المصادق للاتصال، فإنك تحتاج إلى إنشاء دور IAM باستخدام إحدى السياسات التالية:
    • IAM policAmazonAmazon MSK يستخدم الوصول غير المصادق:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }

    • سياسة IAM لـ Amazon MSK عند استخدام مصادقة IAM:
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "RedshiftMSKIAMpolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka-cluster:ReadData",
                      "kafka-cluster:DescribeTopic",
                      "kafka-cluster:Connect"
                  ],
                  "Resource": [
                      "arn:aws:kafka:us-east-1:0123456789:cluster/MyTestCluster/abcd1234-0123-abcd-5678-1234abcd-1",
                      "arn:aws:kafka:us-east-1:0123456789:topic/MyTestCluster/*"
                  ]
              },
              {
                  "Sid": "RedshiftMSKPolicy",
                  "Effect": "Allow",
                  "Action": [
                      "kafka:GetBootstrapBrokers"
                  ],
                  "Resource": "*"
              }
          ]
      }
      

يتيح قسم الموارد في المثال السابق الوصول إلى كافة المواضيع الموجودة في الملف MyTestCluster مجموعة MSK. إذا كنت بحاجة إلى تقييد دور IAM على موضوعات محددة، فستحتاج إلى استبدال مورد الموضوع بسياسة موارد أكثر تقييدًا.

  1. بعد إنشاء دور IAM في الحساب B، قم بتدوين دور IAM ARN (على سبيل المثال، arn:aws:iam::0123456789:role/MyRedshiftMSKRole).
  2. في الحساب أ، قم بإنشاء دور IAM قابل للتخصيص في Redshift يسمى MyRedshiftRole، والتي ستفترضها Amazon Redshift عند الاتصال بـ Amazon MSK. يجب أن يكون للدور سياسة مثل ما يلي، والتي تسمح لدور Amazon Redshift IAM في الحساب A بتولي دور Amazon MSK في الحساب B:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "RedshiftMSKAssumePolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::0123456789:role/MyRedshiftMSKRole"        
           }
        ]
    }
    

  3. لاحظ دور ARN لدور Amazon Redshift IAM (على سبيل المثال، arn:aws:iam::9876543210:role/MyRedshiftRole).
  4. ارجع إلى الحساب "ب" وأضف هذا الدور في سياسة الثقة الخاصة بدور IAM arn:aws:iam::0123456789:role/MyRedshiftMSKRole للسماح للحساب B بالثقة في دور IAM من الحساب A. يجب أن تبدو سياسة الثقة مثل الكود التالي:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": "sts:AssumeRole",
          "Principal": {
            "AWS": "arn:aws:iam::9876543210:role/MyRedshiftRole"
          }
        }
      ]
    } 
    

  5. قم بتسجيل الدخول إلى وحدة تحكم Amazon Redshift كحساب أ.
  6. قم بتشغيل محرر الاستعلام v2 أو عميل SQL المفضل لديك وقم بتشغيل العبارات التالية للوصول إلى موضوع MSK في الحساب B. للتعيين إلى مجموعة MSK، قم بإنشاء مخطط خارجي باستخدام تسلسل الأدوار من خلال تحديد ARNs لدور IAM، مفصولة بفاصلة دون أي مسافات حولها. يأتي الدور المرتبط بمجموعة Redshift أولاً في السلسلة.
    CREATE EXTERNAL SCHEMA custschema
    FROM MSK
    IAM_ROLE  
    'arn:aws:iam::9876543210:role/MyRedshiftRole,arn:aws:iam::0123456789:role/MyRedshiftMSKRole' 
    AUTHENTICATION { none | iam }
    CLUSTER_ARN 'msk-cluster-arn'; --replace with ARN of MSK cluster 
    

اعتبارات الأداء

ضع في اعتبارك اعتبارات الأداء التالية:

  • اجعل العرض المادي المتدفق بسيطًا وانقل التحويلات مثل فك التداخل والتجميع وتعبيرات الحالة إلى خطوة لاحقة - على سبيل المثال، عن طريق إنشاء عرض متحقق آخر أعلى العرض المتحقق المتدفق.
  • فكر في إنشاء عرض متدفق واحد فقط في مجموعة Redshift واحدة أو مجموعة عمل لموضوع MSK محدد. يمكن أن يؤدي إنشاء طرق عرض مادية متعددة لكل موضوع MSK إلى إبطاء أداء الاستيعاب لأن كل عرض مادي يصبح مستهلكًا لذلك الموضوع ويشارك النطاق الترددي لـ Amazon MSK لهذا الموضوع. يمكن مشاركة بيانات البث المباشر في عرض متدفق عبر مجموعات Redshift المتعددة أو مجموعات عمل Redshift Serverless باستخدام تبادل البيانات.
  • أثناء تحديد طريقة العرض المتدفقة، تجنب استخدام Json_Extract_Path_Text لتمزيق البيانات مسبقًا، لأن Json_extract_path_text يعمل على صف البيانات تلو الآخر، مما يؤثر بشكل كبير على إنتاجية الاستيعاب. من الأفضل الحصول على البيانات كما هي من الدفق ثم تمزيقها لاحقًا.
  • حيثما أمكن، فكر في تخطي مفتاح الفرز في العرض الفعلي المتدفق لتسريع سرعة العرض. عندما يكون للعرض المتدفق مفتاح فرز، ستحدث عملية فرز مع كل دفعة من البيانات المستوعبة من الدفق. يتم سماع أداء الفرز اعتمادًا على نوع بيانات مفتاح الفرز وعدد أعمدة مفاتيح الفرز وكمية البيانات التي يتم استيعابها في كل دفعة. يمكن أن تؤدي خطوة الفرز هذه إلى زيادة زمن الوصول قبل إتاحة بيانات التدفق للاستعلام عنها. يجب أن تزن ما هو أكثر أهمية: زمن الوصول عند الاستيعاب أو زمن الوصول عند الاستعلام عن البيانات.
  • للحصول على أداء محسّن للعرض المادي المتدفق ولتقليل استخدام التخزين، قم أحيانًا بإزالة البيانات من العرض المادي باستخدام حذف, بترالطرق أو تغيير الجدول إلحاق.
  • إذا كنت بحاجة إلى استيعاب موضوعات MSK متعددة بالتوازي في Amazon Redshift، فابدأ بعدد أقل من طرق العرض المتدفقة واستمر في إضافة المزيد من طرق العرض المادية لتقييم أداء الاستيعاب الإجمالي داخل المجموعة أو مجموعة العمل.
  • يمكن أن تساعد زيادة عدد العقد في مجموعة Redshift المتوفرة أو وحدة RPU الأساسية لمجموعة عمل Redshift Serverless في تعزيز أداء الاستيعاب للعرض المتدفق. للحصول على الأداء الأمثل، يجب أن تهدف إلى الحصول على عدد من الشرائح في مجموعة Redshift المتوفرة لديك يساوي عدد الأقسام في موضوع MSK الخاص بك، أو 8 RPU لكل أربعة أقسام في موضوع MSK الخاص بك.

تقنيات الرصد

سيتم تخطي السجلات الموجودة في الموضوع والتي تتجاوز حجم عمود العرض الفعلي المستهدف في وقت العرض. سيتم تسجيل السجلات التي تم تخطيها بواسطة تحديث العرض الفعلي في SYS_STREAM_SCAN_ERRORS جدول النظام.

ستؤدي الأخطاء التي تحدث عند معالجة سجل بسبب عملية حسابية أو تحويل نوع بيانات أو أي منطق آخر في تعريف العرض الفعلي إلى فشل تحديث العرض الفعلي حتى تنتهي صلاحية السجل المخالف من الموضوع. لتجنب هذه الأنواع من المشكلات، اختبر منطق تعريف العرض الفعلي الخاص بك بعناية؛ وإلا، قم بوضع السجلات في عمود VARBYTE الافتراضي ومعالجتها لاحقًا.

فيما يلي طرق عرض المراقبة المتاحة:

  • SYS_MV_REFRESH_HISTORY - استخدم طريقة العرض هذه لجمع معلومات حول سجل التحديث لطرق العرض المتدفقة الخاصة بك. تتضمن النتائج نوع التحديث، مثل يدوي أو تلقائي، وحالة آخر تحديث. يعرض الاستعلام التالي سجل التحديث لعرض متدفق:
    select mv_name, refresh_type, status, duration  from SYS_MV_REFRESH_HISTORY where mv_name='mv_store_sales'

  • SYS_STREAM_SCAN_ERRORS - استخدم طريقة العرض هذه للتحقق من سبب فشل تحميل السجل عبر استيعاب البث من موضوع MSK. حتى وقت كتابة هذا المنشور، عند التحميل من Amazon MSK، يقوم هذا العرض بتسجيل الأخطاء فقط عندما يكون السجل أكبر من حجم عمود العرض الفعلي. سيُظهر هذا العرض أيضًا المعرف الفريد (الإزاحة) لسجل MSK في عمود الموضع. يعرض الاستعلام التالي رمز الخطأ وسبب الخطأ عندما يتجاوز السجل الحد الأقصى للحجم:
    select mv_name, external_schema_name, stream_name, record_time, query_id, partition_id, "position", error_code, error_reason
    from SYS_STREAM_SCAN_ERRORS  where mv_name='test_mv' and external_schema_name ='streaming_schema'	;
    

  • SYS_STREAM_SCAN_STATES – استخدم طريقة العرض هذه لمراقبة عدد السجلات التي تم مسحها ضوئيًا في وقت قياسي معين. تتعقب طريقة العرض هذه أيضًا إزاحة السجل الأخير الذي تم قراءته في الدُفعة. يعرض الاستعلام التالي بيانات الموضوع لطريقة عرض مادية محددة:
    select mv_name,external_schema_name,stream_name,sum(scanned_rows) total_records,
    sum(scanned_bytes) total_bytes 
    from SYS_STREAM_SCAN_STATES where mv_name='test_mv' and external_schema_name ='streaming_schema' group by 1,2,3;
    

  • SYS_QUERY_HISTORY - استخدم طريقة العرض هذه للتحقق من المقاييس الإجمالية لتحديث العرض الفعلي المتدفق. سيؤدي هذا أيضًا إلى تسجيل الأخطاء في عمود error_message للأخطاء التي لا تظهر فيه SYS_STREAM_SCAN_ERRORS. يُظهر الاستعلام التالي الخطأ الذي تسبب في فشل تحديث طريقة العرض المتدفقة:
    select  query_id, query_type, status, query_text, error_message from sys_query_history where status='failed' and start_time>='2024-02-03 03:18:00' order by start_time desc

اعتبارات إضافية للتنفيذ

لديك خيار إنشاء عرض مادي اختياريًا أعلى العرض المادي المتدفق، مما يسمح لك بإلغاء تداخل النتائج وحسابها مسبقًا للمستخدمين النهائيين. يلغي هذا الأسلوب الحاجة إلى تخزين النتائج في جدول نهائي باستخدام الإجراء المخزن.

في هذا المنشور ، يمكنك استخدام وظيفة CAN_JSON_PARSE للحماية من أي أخطاء لاستيعاب البيانات بنجاح أكبر - في هذه الحالة، يتم تخطي سجلات التدفق التي لا يمكن تحليلها بواسطة Amazon Redshift. ومع ذلك، إذا كنت تريد تتبع سجلات الأخطاء الخاصة بك، ففكر في تخزينها في عمود باستخدام SQL التالي عند إنشاء طريقة العرض المتدفقة:

CREATE MATERIALIZED VIEW Orders_Stream_MV AS 
SELECT
kafka_partition, 
kafka_offset, 
refresh_time, 
JSON_PARSE(kafka_value) as Data 
case when CAN_JSON_PARSE(kafka_value) = true then json_parse(kafka_value) end Data,
case when CAN_JSON_PARSE(kafka_value) = false then kafka_value end Invalid_Data
FROM custschema."ORDERTOPIC";

يمكنك أن تنظر أيضا تفريغ البيانات من المنظر SYS_STREAM_SCAN_ERRORS إلى خدمة تخزين أمازون البسيطة (Amazon S3) واحصل على التنبيهات إرسال تقرير عبر البريد الإلكتروني استخدام خدمة إعلام أمازون البسيطة إشعارات (Amazon SNS) عند إنشاء كائن S3 جديد.

وأخيرًا، بناءً على متطلبات حداثة البيانات الخاصة بك، يمكنك استخدامها أمازون إيفينت بريدج لجدولة المهام في مستودع البيانات الخاص بك لاستدعاء المذكور أعلاه SP_Orders_Load الإجراء المخزن على أساس منتظم. يقوم EventBridge بذلك على فترات زمنية محددة، وقد تحتاج إلى آلية (على سبيل المثال، وظائف خطوة AWS جهاز الحالة) لمراقبة ما إذا كانت المكالمة السابقة للإجراء قد اكتملت. لمزيد من المعلومات، راجع إنشاء قاعدة Amazon EventBridge التي تعمل وفقًا لجدول زمني. يمكنك أيضًا الرجوع إلى قم بتسريع تنسيق عملية ELT باستخدام AWS Step Functions و Amazon Redshift Data API. خيار آخر هو استخدام محرر استعلام Amazon Redshift v2 لجدولة التحديث. لمزيد من التفاصيل، راجع جدولة استعلام باستخدام محرر الاستعلام v2.

وفي الختام

في هذا المنشور، ناقشنا أفضل الممارسات لتنفيذ التحليلات في الوقت الفعلي تقريبًا باستخدام تدفق تدفق Amazon Redshift مع Amazon MSK. لقد أظهرنا لك مثالاً على مسار استيعاب البيانات من موضوع MSK إلى Amazon Redshift باستخدام البث المتدفق. لقد أظهرنا أيضًا استراتيجية موثوقة لتنفيذ تحميل بيانات التدفق المتزايد إلى Amazon Redshift باستخدام Kafka Partition وKafka Offset. بالإضافة إلى ذلك، قمنا بتوضيح خطوات تكوين البث عبر الحسابات من Amazon MSK إلى Amazon Redshift وناقشنا اعتبارات الأداء لتحسين معدل العرض. أخيرًا، ناقشنا تقنيات المراقبة لتتبع حالات الفشل في استيعاب تدفق Amazon Redshift.

إذا كان لديك أي أسئلة، اتركها في قسم التعليقات.


حول المؤلف

بولومي داسغوبتا هو مهندس حلول تحليلات أقدم مع AWS. إنها متحمسة لمساعدة العملاء على بناء حلول تحليلات قائمة على السحابة لحل مشاكل أعمالهم. خارج العمل ، تحب السفر وقضاء الوقت مع أسرتها.

أديكونلي أديدوتون هو مهندس قاعدة بيانات كبير مع خدمة Amazon Redshift. لقد عمل على قواعد بيانات MPP لمدة 6 سنوات مع التركيز على ضبط الأداء. كما يقدم إرشادات لفريق التطوير لميزات الخدمة الجديدة والحالية.

بقعة_صورة

أحدث المعلومات الاستخباراتية

بقعة_صورة