شعار زيفيرنت

قم بتحميل وتحويل البيانات من Delta Lake باستخدام Amazon SageMaker Studio و Apache Spark

التاريخ:

أصبحت بحيرات البيانات هي القاعدة في الصناعة لتخزين بيانات الأعمال الهامة. الأساس المنطقي الأساسي لبحيرة البيانات هو الحصول على جميع أنواع البيانات ، من البيانات الأولية إلى البيانات المعالجة والمعالجة بعد المعالجة ، وقد تتضمن تنسيقات بيانات منظمة وغير منظمة. يسمح وجود مخزن بيانات مركزي لجميع أنواع البيانات لتطبيقات البيانات الضخمة الحديثة بتحميل وتحويل ومعالجة أي نوع من البيانات المطلوبة. تشمل الفوائد تخزين البيانات كما هي دون الحاجة إلى هيكلتها أولاً أو تحويلها. الأهم من ذلك ، تسمح بحيرات البيانات بالوصول المتحكم فيه إلى البيانات من العديد من أنواع التحليلات المختلفة وعمليات التعلم الآلي (ML) من أجل توجيه عملية صنع القرار بشكل أفضل.

قام بائعون متعددون بإنشاء هياكل بحيرة البيانات ، بما في ذلك تكوين بحيرة AWS. بالإضافة إلى ذلك ، تسمح الحلول مفتوحة المصدر للشركات بالوصول إلى البيانات وتحميلها ومشاركتها بسهولة. أحد خيارات تخزين البيانات في سحابة AWS هو دلتا ليك. تتيح مكتبة Delta Lake القراءة والكتابة بمصدر مفتوح اباتشي باركيه تنسيق الملف ، ويوفر إمكانات مثل معاملات ACID ، ومعالجة البيانات الوصفية القابلة للتطوير ، والتدفق الموحد ومعالجة البيانات المجمعة. تقدم Delta Lake واجهة برمجة تطبيقات لطبقة التخزين يمكنك استخدامها لتخزين البيانات أعلى تخزين طبقة الكائن مثل خدمة تخزين أمازون البسيطة (أمازون S3).

تقع البيانات في صميم عملية تعلم الآلة - من المستحيل تدريب النموذج التقليدي الخاضع للإشراف دون الوصول إلى البيانات التاريخية عالية الجودة ، والتي يتم تخزينها بشكل شائع في بحيرة البيانات. الأمازون SageMaker هي خدمة مُدارة بالكامل توفر طاولة عمل متعددة الاستخدامات لبناء حلول ML وتوفر أدوات مصممة للغاية لاستيعاب البيانات ومعالجة البيانات والتدريب على النموذج واستضافة النماذج. أباتشي سبارك هو العمود الفقري لمعالجة البيانات الحديثة مع واجهة برمجة تطبيقات شاملة لتحميل البيانات ومعالجتها. SageMaker لديه القدرة على تجهيز البيانات على نطاق بيتابايت باستخدام Spark لتمكين تدفقات عمل ML التي تعمل بطريقة موزعة للغاية. يسلط هذا المنشور الضوء على كيفية الاستفادة من الإمكانات التي توفرها دلتا ليك باستخدام أمازون ساجميكر ستوديو.

حل نظرة عامة

في هذا المنشور ، نصف كيفية استخدام أجهزة الكمبيوتر المحمولة SageMaker Studio لتحميل وتحويل البيانات المخزنة بتنسيق Delta Lake بسهولة. نستخدم مفكرة Jupyter قياسية لتشغيل أوامر Apache Spark التي تقرأ وتكتب بيانات الجدول بتنسيق CSV و Parquet. مكتبة مفتوحة المصدر دلتا سبارك يسمح لك بالوصول المباشر إلى هذه البيانات بتنسيقها الأصلي. تتيح لك هذه المكتبة الاستفادة من العديد من عمليات واجهة برمجة التطبيقات (API) لتطبيق تحويلات البيانات ، وإجراء تعديلات على المخطط ، واستخدام استعلامات السفر عبر الزمن أو استعلامات الطوابع الزمنية لسحب إصدار معين من البيانات.

في نموذج دفتر الملاحظات الخاص بنا ، نقوم بتحميل البيانات الأولية في Spark DataFrame ، وإنشاء جدول دلتا ، والاستعلام عنه ، وعرض سجل التدقيق ، وإظهار تطور المخطط ، وإظهار طرق مختلفة لتحديث بيانات الجدول. نحن نستخدم ال واجهة برمجة تطبيقات DataFrame من مكتبة PySpark لاستيعاب سمات مجموعة البيانات وتحويلها. نحن نستخدم ال delta-spark مكتبة لقراءة البيانات وكتابتها بتنسيق Delta Lake وللتعامل مع بنية الجدول الأساسية ، المشار إليها باسم مخطط.

نستخدم SageMaker Studio ، IDE المدمج من SageMaker ، لإنشاء وتشغيل كود Python من دفتر Jupyter. لقد أنشأنا ملف مستودع جيثب يحتوي على هذا الكمبيوتر المحمول وغيرها من الموارد لتشغيل هذا النموذج بنفسك. يوضح الكمبيوتر الدفتري بالضبط كيفية التفاعل مع البيانات المخزنة بتنسيق Delta Lake ، والذي يسمح بالوصول إلى الجداول في مكانها دون الحاجة إلى نسخ البيانات عبر مخازن بيانات مختلفة.

في هذا المثال ، نستخدم مجموعة بيانات متاحة للجمهور من نادي الإقراض التي تمثل بيانات قروض العملاء. قمنا بتنزيل ملف accepted ملف البيانات (accepted_2007_to_2018Q4.csv.gz) ، وحدد مجموعة فرعية من السمات الأصلية. مجموعة البيانات هذه متاحة تحت رخصة المشاع الإبداعي (CCO).

المتطلبات الأساسية المسبقة

يجب عليك تثبيت بعض المتطلبات الأساسية قبل استخدام delta-spark وظائف. لتلبية التبعيات المطلوبة ، يتعين علينا تثبيت بعض المكتبات في بيئة الاستوديو الخاصة بنا ، والتي تعمل كحاوية Dockerized ويمكن الوصول إليها عبر تطبيق Jupyter Gateway:

  • OpenJDK للوصول إلى Java والمكتبات المرتبطة بها
  • مكتبة PySpark (Spark for Python)
  • مكتبة دلتا سبارك مفتوحة المصدر

يمكننا استخدام أي منهما conda or pip لتثبيت هذه المكتبات ، والمتاحة للجمهور في أيٍّ منهما conda-forgeأو خوادم PyPI أو مستودعات Maven.

تم تصميم هذا الكمبيوتر المحمول ليتم تشغيله داخل SageMaker Studio. بعد تشغيل الكمبيوتر الدفتري داخل Studio ، تأكد من اختيار ملف Python 3 (علم البيانات) نوع النواة. بالإضافة إلى ذلك ، نقترح استخدام نوع مثيل به 16 جيجابايت على الأقل من ذاكرة الوصول العشوائي (مثل ml.g4dn.xlarge) ، مما يسمح لأوامر PySpark بالعمل بشكل أسرع. استخدم الأوامر التالية لتثبيت التبعيات المطلوبة ، والتي تشكل الخلايا العديدة الأولى في الكمبيوتر الدفتري:

%conda install openjdk -q -y
%pip install pyspark==3.2.0
%pip install delta-spark==1.1.0
%pip install -U "sagemaker>2.72"

بعد اكتمال أوامر التثبيت ، نحن جاهزون لتشغيل المنطق الأساسي في الكمبيوتر المحمول.

تنفيذ الحل

لتشغيل أوامر Apache Spark ، نحتاج إلى إنشاء مثيل لملف SparkSession هدف. بعد أن نقوم بتضمين أوامر الاستيراد الضرورية ، نقوم بتهيئة ملف SparkSession عن طريق تعيين بعض معلمات التكوين الإضافية. المعلمة بالمفتاح spark.jars.packages يمرر أسماء المكتبات الإضافية التي يستخدمها Spark للتشغيل delta أوامر. تجمع الأسطر الأولية التالية من التعليمات البرمجية قائمة بالحزم ، باستخدام إحداثيات Maven التقليدية (groupId:artifactId:version) ، لتمرير هذه الحزم الإضافية إلى SparkSession.

بالإضافة إلى ذلك ، المعلمات مع المفتاح spark.sql.extensions و spark.sql.catalog.spark_catalog تمكين Spark للتعامل بشكل صحيح مع وظائف Delta Lake. معلمة التكوين النهائية بالمفتاح fs.s3a.aws.credentials.provider يضيف ال ContainerCredentialsProvider class ، والتي تسمح للاستوديو بالبحث عن ملف إدارة الهوية والوصول AWS إتاحة أذونات دور (IAM) عبر بيئة الحاوية. يقوم الكود بإنشاء ملف SparkSession الكائن الذي تمت تهيئته بشكل صحيح لبيئة SageMaker Studio:

# Configure Spark to use additional library packages to satisfy dependencies # Build list of packages entries using Maven coordinates (groupId:artifactId:version)
pkg_list = []
pkg_list.append("io.delta:delta-core_2.12:1.1.0")
pkg_list.append("org.apache.hadoop:hadoop-aws:3.2.2") packages=(",".join(pkg_list))
print('packages: '+packages) # Instantiate Spark via builder
# Note: we use the `ContainerCredentialsProvider` to give us access to underlying IAM role permissions spark = (SparkSession .builder .appName("PySparkApp") .config("spark.jars.packages", packages) .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config("fs.s3a.aws.credentials.provider", "com.amazonaws.auth.ContainerCredentialsProvider") .getOrCreate()) sc = spark.sparkContext print('Spark version: '+str(sc.version))

في القسم التالي ، نقوم بتحميل ملف يحتوي على مجموعة فرعية من مجموعة بيانات قروض Lending Club الاستهلاكية إلى دلو S3 الافتراضي الخاص بنا. مجموعة البيانات الأصلية كبيرة جدًا (أكثر من 600 ميجابايت) ، لذلك نقدم ملفًا تمثيليًا واحدًا (2.6 ميجابايت) لاستخدامه بواسطة هذا الكمبيوتر الدفتري. يستخدم PySpark امتداد s3a بروتوكول لتمكين وظائف مكتبة Hadoop الإضافية. لذلك ، نقوم بتعديل كل S3 URI أصلي من ملف s3 بروتوكول للاستخدام s3a في الخلايا في جميع أنحاء هذا الكمبيوتر الدفتري.

نستخدم Spark لقراءة البيانات الأولية (مع خيارات لملفات CSV أو Parquet) بالشفرة التالية ، والتي تُرجع Spark DataFrame المسمى loans_df:

loans_df = spark.read.csv(s3a_raw_csv, header=True)

تُظهر لقطة الشاشة التالية أول 10 صفوف من DataFrame الناتج.

يمكننا الآن كتابة DataFrame كجدول Delta Lake بسطر واحد من التعليمات البرمجية عن طريق تحديد .format("delta") وتوفير موقع S3 URI حيث نريد كتابة بيانات الجدول:

loans_df.write.format("delta").mode("overwrite").save(s3a_delta_table_uri)

تعرض خلايا دفتر الملاحظات القليلة التالية خيارًا للاستعلام عن جدول Delta Lake. يمكننا إنشاء استعلام SQL قياسي ، وتحديده delta وموقع الجدول ، وأرسل هذا الأمر باستخدام بناء جملة Spark SQL:

sql_cmd = f'SELECT * FROM delta.`{s3a_delta_table_uri}` ORDER BY loan_amnt'
sql_results = spark.sql(sql_cmd)

تُظهر لقطة الشاشة التالية نتائج استعلام SQL لدينا حسب ترتيبها loan_amnt.

التفاعل مع جداول دلتا ليك

في هذا القسم ، نعرض ملف فئة DeltaTable من delta-spark مكتبة. DeltaTable هي الفئة الأساسية للتفاعل برمجيًا مع جداول Delta Lake. تتضمن هذه الفئة عدة طرق ثابتة لاكتشاف معلومات حول الجدول. على سبيل المثال ، ملف isDeltaTable تُرجع الطريقة قيمة منطقية تشير إلى ما إذا كان الجدول مخزّنًا بتنسيق دلتا:

# Use static method to determine table type
print(DeltaTable.isDeltaTable(spark, s3a_delta_table_uri))

يمكنك إنشاء DeltaTable المثيلات التي تستخدم مسار جدول دلتا ، وهو في حالتنا موقع S3 URI. في الكود التالي ، نسترجع السجل الكامل لتعديلات الجدول:

deltaTable = DeltaTable.forPath(spark, s3a_delta_table_uri)
history_df = deltaTable.history()
history_df.head(3)

يشير الإخراج إلى أن الجدول يحتوي على ستة تعديلات مخزنة في السجل ، ويظهر أحدث ثلاثة إصدارات.

تطور المخطط

في هذا القسم ، نوضح كيف يعمل تطور مخطط دلتا ليك. بشكل افتراضي، delta-spark يفرض الجدول يكتب على الالتزام بالمخطط الحالي عن طريق فرض القيود. ومع ذلك ، من خلال تحديد خيارات معينة ، يمكننا تعديل مخطط الجدول بأمان.

أولاً ، دعنا نعيد قراءة البيانات من جدول دلتا. لأن هذه البيانات تمت كتابتها باسم delta الشكل ، نحن بحاجة إلى تحديد .format("delta") عند قراءة البيانات ، فإننا نوفر S3 URI حيث يوجد جدول Delta. ثانيًا ، نكتب DataFrame مرة أخرى إلى موقع S3 مختلف حيث نعرض تطور المخطط. انظر الكود التالي:

delta_df = (spark.read.format("delta").load(s3a_delta_table_uri))
delta_df.write.format("delta").mode("overwrite").save(s3a_delta_update_uri)

الآن نستخدم Spark DataFrame API لإضافة عمودين جديدين إلى مجموعة البيانات الخاصة بنا. أسماء الأعمدة هي funding_type و excess_int_rate، ويتم تعيين قيم العمود إلى ثوابت باستخدام DataFrame withColumn طريقة. انظر الكود التالي:

funding_type_col = "funding_type"
excess_int_rate_col = "excess_int_rate" delta_update_df = (delta_df.withColumn(funding_type_col, lit("NA")) .withColumn(excess_int_rate_col, lit(0.0)))
delta_update_df.dtypes

نظرة سريعة على أنواع البيانات (dtypes) يوضح أن الأعمدة الإضافية هي جزء من DataFrame.

نقوم الآن بتمكين تعديل المخطط ، وبالتالي تغيير المخطط الأساسي لجدول دلتا ، عن طريق تعيين mergeSchema الخيار ل true في أمر Spark التالي للكتابة:

(delta_update_df.write.format("delta") .mode("overwrite") .option("mergeSchema", "true") # option - evolve schema .save(s3a_delta_update_uri)
)

دعنا نتحقق من محفوظات التعديل لجدولنا الجديد ، والذي يوضح أن مخطط الجدول قد تم تعديله:

deltaTableUpdate = DeltaTable.forPath(spark, s3a_delta_update_uri) # Let's retrieve history BEFORE schema modification
history_update_df = deltaTableUpdate.history()
history_update_df.show(3)

تعرض قائمة المحفوظات كل مراجعة للبيانات الوصفية.

تحديثات الجدول الشرطي

يمكنك استخدام DeltaTable update طريقة لتشغيل مسند ثم تطبيق تحويل كلما تم تقييم الشرط True. في حالتنا ، نكتب القيمة FullyFunded إلى funding_type العمود كلما كان ملف loan_amnt يساوي funded_amnt. هذه آلية فعالة لكتابة تحديثات شرطية لبيانات الجدول.

deltaTableUpdate.update(condition = col("loan_amnt") == col("funded_amnt"), set = { funding_type_col: lit("FullyFunded") } )

تظهر لقطة الشاشة التالية نتائجنا.

في التغيير النهائي لبيانات الجدول ، نعرض الصيغة لتمرير دالة إلى طريقة التحديث ، والتي تحسب في حالتنا معدل الفائدة الزائدة عن طريق طرح 10.0٪ من القرض int_rate ينسب. يقوم أمر SQL آخر بسحب السجلات التي تفي بشرطنا ، باستخدام جملة WHERE لتحديد موقع السجلات باستخدام int_rate أكبر من 10.0٪:

# Function that calculates rate overage (amount over 10.0)
def excess_int_rate(rate): return (rate-10.0) deltaTableUpdate.update(condition = col("int_rate") > 10.0, set = { excess_int_rate_col: excess_int_rate(col("int_rate")) } )

الجديد excess_int_rate العمود يحتوي الآن بشكل صحيح على ملف int_rate ناقص 10.0٪.

تسترد خلية دفتر الملاحظات الأخيرة لدينا المحفوظات من جدول دلتا مرة أخرى ، وهذه المرة تعرض التعديلات بعد إجراء تعديل المخطط:

# Finally, let's retrieve table history AFTER the schema modifications history_update_df = deltaTableUpdate.history()
history_update_df.show(3)

تظهر لقطة الشاشة التالية نتائجنا.

وفي الختام

يمكنك استخدام دفاتر SageMaker Studio المحمولة للتفاعل مباشرة مع البيانات المخزنة في تنسيق Delta Lake مفتوح المصدر. في هذا المنشور ، قدمنا ​​نموذجًا لرمز يقرأ ويكتب هذه البيانات باستخدام المصدر المفتوح delta-spark المكتبة ، والتي تتيح لك إنشاء مجموعة البيانات وتحديثها وإدارتها كملف جدول دلتا. أظهرنا أيضًا قوة الجمع بين هذه التقنيات المهمة لاستخراج القيمة من بحيرات البيانات الموجودة مسبقًا ، وأظهرنا كيفية استخدام إمكانات Delta Lake على SageMaker.

توفر عينة الكمبيوتر الدفتري الخاصة بنا وصفة شاملة لتثبيت المتطلبات الأساسية وإنشاء مثيل لهياكل بيانات Spark وقراءة وكتابة إطارات البيانات بتنسيق Delta Lake واستخدام وظائف مثل تطور المخطط. يمكنك دمج هذه التقنيات لتضخيم قوتها لتوفير نتائج أعمال تحويلية.


حول المؤلف

بول هارجيس ركز جهوده على التعلم الآلي في العديد من الشركات ، بما في ذلك AWS و Amazon و Hortonworks. إنه يستمتع ببناء الحلول التكنولوجية وأيضًا تعليم الناس كيفية تحقيق أقصى استفادة منها. قبل دوره في AWS ، كان مهندسًا رائدًا في عمليات التصدير والتوسعات في Amazon ، مما ساعد موقع amazon.com على تحسين تجربة المتسوقين الدوليين. يحب Paul مساعدة العملاء على توسيع مبادرات التعلم الآلي الخاصة بهم لحل مشاكل العالم الحقيقي.

فيدانت جاين هو مهندس حلول متخصص في الذكاء الاصطناعي / التعلم الآلي ، يساعد العملاء على جني قيمة من نظام التعلم الآلي في AWS. قبل انضمامه إلى AWS ، شغل Vedant مناصب ML / Data Science Specialty في العديد من الشركات مثل Databricks و Hortonworks (الآن Cloudera) و JP Morgan Chase. بعيدًا عن عمله ، فيدانت شغوف بصناعة الموسيقى واستخدام العلوم لعيش حياة ذات مغزى واستكشاف المأكولات النباتية اللذيذة من جميع أنحاء العالم.

بقعة_صورة

أحدث المعلومات الاستخباراتية

بقعة_صورة