Logo Zephyrnet

Các phương pháp hay nhất để tối ưu hóa chi phí và hiệu suất cho các công việc ETL phát trực tuyến AWS Glue

Ngày:

Keo AWS Các công việc trích xuất, chuyển đổi và tải trực tuyến (ETL) cho phép bạn xử lý và làm phong phú lượng lớn dữ liệu đến từ các hệ thống như Luồng dữ liệu Amazon Kinesis, Truyền trực tuyến được quản lý của Amazon cho Apache Kafka (Amazon MSK) hoặc bất kỳ dịch vụ nào khác Kafka Apache cụm. Nó sử dụng Truyền phát có cấu trúc Spark framework để thực hiện xử lý dữ liệu trong thời gian gần như thực.

Bài đăng này đề cập đến các trường hợp sử dụng trong đó dữ liệu cần được xử lý, phân phối và có thể được xử lý một cách hiệu quả trong một khoảng thời gian giới hạn. Điều này có thể bao gồm nhiều trường hợp, chẳng hạn như xử lý và cảnh báo nhật ký, nhập và làm giàu dữ liệu liên tục, xác thực dữ liệu, internet vạn vật, học máy (ML), v.v.

Chúng tôi thảo luận về các chủ đề sau:

  • Các công cụ phát triển giúp bạn viết mã nhanh hơn bằng cách sử dụng các công cụ mới ra mắt của chúng tôi Máy tính xách tay AWS Glue Studio
  • Cách theo dõi và điều chỉnh công việc phát trực tuyến của bạn
  • Các phương pháp hay nhất để định cỡ và thay đổi quy mô cụm AWS Glue của bạn bằng cách sử dụng các tính năng mới ra mắt của chúng tôi như tự động mở rộng và loại công nhân nhỏ G 0.25X

Công cụ phát triển

Máy tính xách tay AWS Glue Studio có thể tăng tốc độ phát triển công việc phát trực tuyến của bạn bằng cách cho phép các kỹ sư dữ liệu làm việc bằng sổ ghi chép tương tác và kiểm tra các thay đổi mã để nhận phản hồi nhanh—từ mã hóa logic nghiệp vụ đến kiểm tra các thay đổi cấu hình—như một phần của quá trình điều chỉnh.

Trước khi chạy bất kỳ mã nào trong sổ ghi chép (sẽ bắt đầu phiên), bạn cần đặt một số cấu hình quan trọng.

Phép thuật %streaming tạo cụm phiên sử dụng thời gian chạy giống như các tác vụ phát trực tuyến AWS Glue. Bằng cách này, bạn sẽ phát triển và kiểm tra mã của mình một cách tương tác bằng cách sử dụng cùng thời gian chạy mà bạn sử dụng sau này trong công việc sản xuất.

Ngoài ra, hãy định cấu hình nhật ký Spark UI, điều này sẽ rất hữu ích cho việc theo dõi và điều chỉnh công việc.

Xem cấu hình sau:

%streaming
%%configure
{
"--enable-spark-ui": "true",
"--spark-event-logs-path": "s3://your_bucket/sparkui/"
}

Để biết các tùy chọn cấu hình bổ sung như phiên bản hoặc số lượng công nhân, hãy tham khảo Định cấu hình phiên tương tác AWS Glue cho sổ ghi chép Jupyter và AWS Glue Studio.

Để trực quan hóa nhật ký Spark UI, bạn cần có máy chủ lịch sử Spark. Nếu bạn chưa có, hãy tham khảo Khởi chạy máy chủ lịch sử Spark để biết hướng dẫn triển khai.

Truyền phát có cấu trúc dựa trên truyền phát DataFrames, đại diện cho các lô tin nhắn vi mô.
Đoạn mã sau là ví dụ về cách tạo luồng DataFrame bằng cách sử dụng Amazon Kinesis như nguồn:

kinesis_options = {
  "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
  "startingPosition": "TRIM_HORIZON",
  "inferSchema": "true",
  "classification": "json"
}
kinesisDF = glueContext.create_data_frame_from_options(
   connection_type="kinesis",
   connection_options=kinesis_options
)

API AWS Glue giúp bạn tạo DataFrame bằng cách thực hiện phát hiện lược đồ và tự động giải nén, tùy thuộc vào định dạng. Bạn cũng có thể tự xây dựng nó bằng API Spark trực tiếp:

kinesisDF = spark.readStream.format("kinesis").options(**kinesis_options).load()

Sau khi bạn chạy bất kỳ ô mã nào, nó sẽ kích hoạt khởi động phiên và ứng dụng sẽ sớm xuất hiện trong máy chủ lịch sử dưới dạng một ứng dụng chưa hoàn chỉnh (ở cuối trang có một liên kết để hiển thị các ứng dụng chưa hoàn chỉnh) có tên là GlueReplApp, vì đó là một cụm phiên. Đối với một công việc thông thường, nó được liệt kê cùng với tên công việc được cung cấp khi nó được tạo.

Từ cuốn sổ tay, bạn có thể lấy mẫu dữ liệu truyền phát. Điều này có thể giúp phát triển và đưa ra dấu hiệu về loại cũng như kích thước của tin nhắn phát trực tuyến, điều này có thể ảnh hưởng đến hiệu suất.

Giám sát cụm bằng Truyền phát có cấu trúc

Cách tốt nhất để giám sát và điều chỉnh công việc phát trực tuyến AWS Glue của bạn là sử dụng Spark UI; nó cung cấp cho bạn xu hướng công việc phát trực tuyến tổng thể trên Truyền trực tuyến có cấu trúc tab và thông tin chi tiết về từng công việc xử lý vi mô riêng lẻ.

Cái nhìn tổng thể về công việc phát trực tuyến

trên Truyền trực tuyến có cấu trúc tab, bạn có thể xem bản tóm tắt các luồng đang chạy trong cụm, như trong ví dụ sau.

Thông thường chỉ có một truy vấn phát trực tuyến, đại diện cho một ETL phát trực tuyến. Nếu bạn bắt đầu nhiều lệnh song song, sẽ tốt hơn nếu bạn đặt cho nó một cái tên dễ nhận biết, gọi queryName() nếu bạn sử dụng writeStream API trực tiếp trên DataFrame.

Sau khi hoàn thành một số lượng lớn các lô (chẳng hạn như 10), đủ để giá trị trung bình ổn định, bạn có thể sử dụng Đầu vào trung bình/giây để theo dõi số lượng sự kiện hoặc thông báo mà công việc đang xử lý. Điều này có thể gây nhầm lẫn vì cột bên phải, Quá trình trung bình/giây, tương tự nhưng thường có số cao hơn. Sự khác biệt là thời gian xử lý này cho chúng ta biết mã của chúng ta hiệu quả như thế nào, trong khi đầu vào trung bình cho chúng ta biết cụm đang đọc và xử lý bao nhiêu tin nhắn.

Điều quan trọng cần lưu ý là nếu hai giá trị giống nhau thì có nghĩa là công việc đang hoạt động ở công suất tối đa. Nó đang tận dụng tối đa phần cứng nhưng có thể sẽ không thể đáp ứng được việc tăng âm lượng mà không gây ra sự chậm trễ.

Cột cuối cùng là số lô mới nhất. Vì chúng được đánh số tăng dần từ 0 nên điều này cho chúng ta biết cho đến nay có bao nhiêu đợt truy vấn đã được xử lý.

Khi chọn liên kết trong cột “ID chạy” của truy vấn phát trực tuyến, bạn có thể xem lại chi tiết bằng đồ thị và biểu đồ, như trong ví dụ sau.

Hai hàng đầu tiên tương ứng với dữ liệu được sử dụng để tính giá trị trung bình được hiển thị trên trang tóm tắt.

Trong Tỷ lệ đầu vào, mỗi điểm dữ liệu được tính bằng cách chia số lượng sự kiện đã đọc cho lô cho thời gian trôi qua giữa lần bắt đầu lô hiện tại và lần bắt đầu lô trước đó. Trong một hệ thống khỏe mạnh có khả năng theo kịp, giá trị này bằng khoảng thời gian kích hoạt được định cấu hình (trong GlueContext.forEachBatch() API, điều này được đặt bằng tùy chọn windowSize).

Bởi vì nó sử dụng các hàng lô hiện tại với độ trễ của lô trước đó nên biểu đồ này thường không ổn định ở các lô đầu tiên cho đến khi Thời lượng hàng loạt (biểu đồ dòng cuối cùng) ổn định.

Trong ví dụ này, khi nó ổn định, nó hoàn toàn phẳng. Điều này có nghĩa là luồng tin nhắn không đổi hoặc công việc đang đạt đến giới hạn trên mỗi tập hợp lô (chúng ta sẽ thảo luận cách thực hiện việc này ở phần sau của bài đăng).

Hãy cẩn thận nếu bạn đặt giới hạn cho mỗi lô liên tục bị tấn công, bạn có thể đang âm thầm tạo ra một lượng tồn đọng, nhưng mọi thứ có thể trông ổn trong các chỉ số công việc. Để theo dõi điều này, hãy sử dụng thước đo độ trễ đo lường sự khác biệt giữa dấu thời gian của tin nhắn khi nó được tạo và thời gian nó được xử lý.

Tốc độ xử lý được tính bằng cách chia số lượng tin nhắn trong một đợt cho thời gian xử lý đợt đó. Ví dụ: nếu lô chứa 1,000 tin nhắn và khoảng thời gian kích hoạt là 10 giây nhưng lô chỉ cần 5 giây để xử lý thì tốc độ xử lý sẽ là 1000/5 = 200 msg/giây. trong khi tốc độ đầu vào cho lô đó (giả sử lô trước đó cũng chạy trong khoảng thời gian) là 1000/10 = 100 msg/giây.

Số liệu này hữu ích để đo lường hiệu quả xử lý lô của mã của chúng tôi và do đó mã có thể cao hơn tốc độ đầu vào (điều này không có nghĩa là mã xử lý nhiều thư hơn mà chỉ sử dụng ít thời gian hơn). Như đã đề cập trước đó, nếu cả hai chỉ số gần nhau, điều đó có nghĩa là thời lượng của lô gần với khoảng thời gian và do đó lưu lượng truy cập bổ sung có thể bắt đầu gây ra độ trễ kích hoạt lô (vì lô trước đó vẫn đang chạy) và tăng độ trễ.

Ở phần sau của bài đăng này, chúng tôi sẽ trình bày cách tự động chia tỷ lệ có thể giúp ngăn chặn tình trạng này.

Hàng đầu vào hiển thị số lượng tin nhắn được đọc cho mỗi lô, chẳng hạn như tốc độ đầu vào nhưng sử dụng âm lượng thay vì tốc độ.

Điều quan trọng cần lưu ý là nếu lô xử lý dữ liệu nhiều lần (ví dụ: ghi vào nhiều đích), thì tin nhắn sẽ được tính nhiều lần. Nếu tỷ lệ này lớn hơn dự kiến, đây có thể là lý do. Nói chung, để tránh đọc tin nhắn nhiều lần, bạn nên lưu trữ lô trong khi xử lý nó, đây là mặc định khi bạn sử dụng GlueContext.forEachBatch() API.

Hai hàng cuối cùng cho chúng ta biết phải mất bao lâu để xử lý mỗi lô và thời gian đó được sử dụng như thế nào. Việc thấy các đợt đầu tiên mất nhiều thời gian hơn cho đến khi hệ thống ấm lên và ổn định là điều bình thường.
Điều quan trọng cần chú ý là khoảng thời gian này gần như ổn định và nằm trong khoảng thời gian kích hoạt đã định cấu hình. Nếu không phải như vậy, lô tiếp theo sẽ bị trì hoãn và có thể bắt đầu độ trễ gộp bằng cách tạo ra một lượng tồn đọng hoặc tăng kích thước lô (nếu giới hạn cho phép nhận thêm thư đang chờ xử lý).

In Thời lượng hoạt động, phần lớn thời gian nên dành cho addBatch (màu mù tạt), đó là công việc thực tế. Phần còn lại là chi phí cố định, do đó quy trình xử lý hàng loạt càng nhỏ thì càng mất nhiều phần trăm thời gian. Điều này thể hiện sự cân bằng giữa các lô nhỏ có độ trễ thấp hơn hoặc các lô lớn hơn nhưng hiệu quả tính toán cao hơn.

Ngoài ra, việc đợt đầu tiên dành nhiều thời gian cho việc chuẩn bị là điều bình thường. latestOffset (thanh màu nâu), định vị điểm cần bắt đầu xử lý khi không có điểm kiểm tra.

Thống kê truy vấn sau đây cho thấy một ví dụ khác.

Trong trường hợp này, đầu vào có một số biến thể (có nghĩa là nó không đạt giới hạn lô). Ngoài ra, tốc độ xử lý gần giống với tốc độ đầu vào. Điều này cho chúng ta biết hệ thống đang ở công suất tối đa và đang cố gắng theo kịp. Bằng cách so sánh các hàng đầu vào và tốc độ đầu vào, chúng ta có thể đoán rằng khoảng thời gian được định cấu hình chỉ là 3 giây và thời lượng lô hầu như không thể đáp ứng độ trễ đó.

Cuối cùng, trong Thời lượng hoạt động, bạn có thể nhận thấy rằng vì các đợt xuất hiện quá thường xuyên nên một lượng thời gian đáng kể (nói theo tỷ lệ) được dành để lưu điểm kiểm tra (thanh màu xanh đậm).

Với thông tin này, chúng tôi có thể cải thiện tính ổn định của công việc bằng cách tăng khoảng thời gian kích hoạt lên 5 giây trở lên. Bằng cách này, nó ít kiểm tra các điểm kiểm tra hơn và có nhiều thời gian hơn để xử lý dữ liệu, điều này có thể đủ để có được thời lượng lô nhất quán trong khoảng thời gian đó. Sự đánh đổi là độ trễ giữa thời điểm tin nhắn được xuất bản và thời điểm tin nhắn được xử lý sẽ dài hơn.

Giám sát quá trình xử lý hàng loạt riêng lẻ

trên việc làm tab, bạn có thể xem mỗi đợt mất bao lâu và tìm hiểu các bước khác nhau mà quá trình xử lý bao gồm để hiểu thời gian được sử dụng như thế nào. Bạn cũng có thể kiểm tra xem có tác vụ nào thành công sau khi thử lại hay không. Nếu điều này xảy ra liên tục, nó có thể âm thầm ảnh hưởng đến hiệu suất.

Ví dụ: ảnh chụp màn hình sau đây hiển thị các lô trên việc làm tab Spark UI của công việc phát trực tuyến của chúng tôi.

Mỗi đợt được Spark coi là một công việc (đừng nhầm lẫn ID công việc với số lô; chúng chỉ khớp nếu không có hành động nào khác). Nhóm công việc là ID truy vấn phát trực tuyến (điều này chỉ quan trọng khi chạy nhiều truy vấn).

Công việc phát trực tuyến trong ví dụ này có một giai đoạn duy nhất với 100 phân vùng. Cả hai đợt đều xử lý thành công nên giai đoạn này được đánh dấu là đã thành công và tất cả nhiệm vụ đã hoàn thành (100/100 trên thanh tiến trình).

Tuy nhiên, có một điểm khác biệt trong đợt đầu tiên: có 20 nhiệm vụ thất bại. Bạn biết tất cả các nhiệm vụ thất bại đã thành công trong các lần thử lại, nếu không giai đoạn này sẽ bị đánh dấu là không thành công. Để giai đoạn thất bại, nhiệm vụ tương tự sẽ phải thất bại bốn lần (hoặc được cấu hình bởi spark.task.maxFailures).

Nếu giai đoạn này thất bại, lô cũng thất bại và có thể là toàn bộ công việc; nếu công việc được bắt đầu bằng cách sử dụng GlueContext.forEachBatch(), nó có một số lần thử lại theo batchMaxRetries tham số (ba theo mặc định).

Những thất bại này rất quan trọng vì chúng có hai tác động:

  • Chúng có thể âm thầm gây ra sự chậm trễ trong quá trình xử lý hàng loạt, tùy thuộc vào thời gian xử lý thất bại và thử lại.
  • Chúng có thể khiến các bản ghi được gửi đi nhiều lần nếu lỗi nằm ở giai đoạn cuối của lô, tùy thuộc vào loại đầu ra. Nếu đầu ra là các tập tin, nói chung nó sẽ không gây ra sự trùng lặp. Tuy nhiên, nếu đích đến là Máy phát điện Amazon, JDBC, Dịch vụ Tìm kiếm Mở của Amazonhoặc đầu ra khác sử dụng tính năng theo đợt, có thể một phần đầu ra đã được gửi. Nếu bạn không thể chấp nhận bất kỳ sự trùng lặp nào, hệ thống đích sẽ xử lý việc này (ví dụ: không có giá trị).

Việc chọn liên kết mô tả sẽ đưa bạn đến Các giai đoạn tab cho công việc đó. Tại đây bạn có thể tìm hiểu sâu hơn về lỗi: Ngoại lệ là gì? Có phải nó luôn luôn trong cùng một người thực thi? Liệu nó có thành công trong lần thử lại đầu tiên hay mất nhiều lần?

Lý tưởng nhất là bạn muốn xác định những thất bại này và giải quyết chúng. Ví dụ: có thể hệ thống đích đang điều tiết chúng tôi vì không có đủ dung lượng được cung cấp hoặc cần có thời gian chờ lớn hơn. Nếu không, ít nhất bạn nên theo dõi nó và quyết định xem nó mang tính hệ thống hay lẻ tẻ.

Định cỡ và chia tỷ lệ

Xác định cách phân chia dữ liệu là yếu tố chính trong bất kỳ hệ thống phân tán nào để chạy và mở rộng quy mô một cách hiệu quả. Các quyết định thiết kế trên hệ thống nhắn tin sẽ có ảnh hưởng mạnh mẽ đến cách hoạt động và quy mô của công việc phát trực tuyến, từ đó ảnh hưởng đến tính song song của công việc.

Trong trường hợp AWS Glue Streaming, việc phân chia công việc này dựa trên các phân vùng Apache Spark, xác định cách phân chia công việc để có thể xử lý song song. Mỗi khi công việc đọc một lô từ nguồn, nó sẽ chia dữ liệu đến thành các phân vùng Spark.

Đối với Apache Kafka, mỗi phân vùng chủ đề sẽ trở thành phân vùng Spark; tương tự, đối với Kinesis, mỗi phân đoạn luồng sẽ trở thành một phân vùng Spark. Để đơn giản hóa, tôi sẽ gọi mức độ song song này là số lượng phân vùng, nghĩa là các phân vùng Spark sẽ được xác định bởi các phân vùng Kafka hoặc phân đoạn Kinesis đầu vào trên cơ sở 1-1.

Mục tiêu là có đủ khả năng song song và khả năng xử lý từng lô dữ liệu trong thời gian ngắn hơn khoảng thời gian của lô được định cấu hình và do đó có thể theo kịp. Ví dụ: với khoảng thời gian theo đợt là 60 giây, công việc sẽ cho phép 60 giây dữ liệu tích tụ rồi xử lý dữ liệu đó. Nếu công việc đó mất hơn 60 giây, thì đợt tiếp theo sẽ đợi cho đến khi đợt trước hoàn tất trước khi bắt đầu đợt mới với dữ liệu đã được tích lũy kể từ khi đợt trước bắt đầu.

Bạn nên giới hạn lượng dữ liệu cần xử lý trong một đợt thay vì chỉ lấy mọi dữ liệu đã được thêm vào kể từ đợt cuối cùng. Điều này giúp công việc ổn định hơn và có thể dự đoán được trong thời gian cao điểm. Nó cho phép bạn kiểm tra xem công việc có thể xử lý khối lượng dữ liệu mà không gặp sự cố hay không (ví dụ: bộ nhớ hoặc điều tiết).

Để làm như vậy, hãy chỉ định giới hạn khi xác định DataFrame luồng nguồn:

  • Đối với Kinesis, chỉ định giới hạn bằng cách sử dụng kinesis.executor.maxFetchRecordsPerShardvà sửa lại con số này nếu số lượng phân đoạn thay đổi đáng kể. Bạn có thể cần phải tăng kinesis.executor.maxFetchTimeInMs đồng thời, để có thêm thời gian đọc lô và đảm bảo nó không bị cắt bớt.
  • Đối với Kafka, đặt maxOffsetsPerTrigger, chia đều khoản trợ cấp đó cho số lượng phân vùng.

Sau đây là ví dụ về cách đặt cấu hình này cho Kafka (đối với Kinesis, nó tương đương nhưng sử dụng thuộc tính Kinesis):

kafka_properties= {
  "kafka.bootstrap.servers": "bootstrapserver1:9092",
  "subscribe": "mytopic",
  "startingOffsets": "latest",
  "maxOffsetsPerTrigger": "5000000"
}
# Pass the properties as options when creating the DataFrame
spark.spark.readStream.format("kafka").options(**kafka_properties).load()

Điểm chuẩn ban đầu

Nếu các sự kiện có thể được xử lý riêng lẻ (không phụ thuộc lẫn nhau như nhóm), bạn có thể ước tính sơ bộ số lượng tin nhắn mà một lõi Spark có thể xử lý bằng cách chạy với một nguồn phân vùng duy nhất (một phân vùng Kafka hoặc một luồng phân đoạn Kinesis) với dữ liệu được tải sẵn vào đó và chạy các đợt có giới hạn và khoảng thời gian tối thiểu (1 giây). Điều này mô phỏng một bài kiểm tra căng thẳng không có thời gian ngừng hoạt động giữa các đợt.

Đối với các thử nghiệm lặp lại này, hãy xóa thư mục điểm kiểm tra, sử dụng một thư mục khác (ví dụ: làm cho nó động bằng cách sử dụng dấu thời gian trong đường dẫn) hoặc chỉ tắt tính năng điểm kiểm tra (nếu sử dụng trực tiếp API Spark), để bạn có thể sử dụng lại cùng một dữ liệu .
Để lại một vài đợt chạy (ít nhất 10 đợt) để hệ thống và các số liệu có thời gian ổn định.

Bắt đầu với một giới hạn nhỏ (sử dụng các thuộc tính cấu hình giới hạn được giải thích ở phần trước) và thực hiện nhiều lần chạy lại, làm tăng giá trị. Ghi lại thời lượng lô cho giới hạn đó và tốc độ đầu vào thông lượng (vì đây là bài kiểm tra căng thẳng nên tốc độ xử lý phải tương tự).

Nói chung, các lô lớn hơn có xu hướng hiệu quả hơn ở một mức độ nào đó. Điều này là do chi phí cố định dành cho mỗi điểm kiểm tra, lập kế hoạch và điều phối các nút sẽ đáng kể hơn nếu các lô nhỏ hơn và do đó thường xuyên hơn.

Sau đó chọn cài đặt ban đầu tham chiếu của bạn dựa trên các yêu cầu:

  • Nếu yêu cầu SLA mục tiêu, hãy sử dụng kích thước lô lớn nhất có thời lượng lô nhỏ hơn một nửa SLA độ trễ. Điều này là do trong trường hợp xấu nhất, một tin nhắn được lưu trữ ngay sau khi một lô được kích hoạt phải đợi ít nhất một khoảng thời gian và sau đó là thời gian xử lý (phải nhỏ hơn khoảng thời gian đó). Khi hệ thống tiếp tục duy trì, độ trễ trong trường hợp xấu nhất này sẽ gần gấp đôi khoảng thời gian, vì vậy hãy đặt mục tiêu thời lượng lô nhỏ hơn một nửa độ trễ mục tiêu.
  • Trong trường hợp thông lượng được ưu tiên hơn độ trễ, chỉ cần chọn kích thước lô cung cấp tốc độ xử lý trung bình cao hơn và xác định khoảng thời gian cho phép một số khoảng đệm trong khoảng thời gian lô được quan sát.

Bây giờ bạn đã biết số lượng tin nhắn trên mỗi lõi mà ETL của chúng tôi có thể xử lý và độ trễ. Những con số này mang tính lý tưởng vì hệ thống sẽ không mở rộng tuyến tính một cách hoàn hảo khi bạn thêm nhiều phân vùng và nút. Bạn có thể sử dụng các tin nhắn trên mỗi lõi thu được để chia tổng số tin nhắn mỗi giây để xử lý và lấy số lượng phân vùng Spark tối thiểu cần thiết (mỗi lõi xử lý song song một phân vùng).

Với số lượng lõi Spark ước tính này, hãy tính số lượng nút cần thiết tùy thuộc vào loại và phiên bản, như được tóm tắt trong bảng sau.

Phiên bản keo AWS Loại công nhân vCore Lõi Spark trên mỗi công nhân
2 G 1X 4 8
2 G 2X 8 16
3 G 0.25X 2 2
3 G 1X 4 4
3 G 2X 8 8

Nên sử dụng phiên bản 3 mới hơn vì nó bao gồm nhiều tính năng và tối ưu hóa hơn như tự động điều chỉnh quy mô (mà chúng ta sẽ thảo luận sau). Về kích thước, trừ khi công việc có một số thao tác chiếm nhiều bộ nhớ, tốt hơn nên sử dụng các phiên bản nhỏ hơn để không có quá nhiều lõi cạnh tranh về tài nguyên chia sẻ bộ nhớ, đĩa và mạng.

Lõi Spark tương đương với luồng; do đó, bạn có thể có nhiều hơn (hoặc ít hơn) số lõi thực tế có sẵn trong phiên bản. Điều này không có nghĩa là việc có nhiều lõi Spark hơn nhất thiết sẽ nhanh hơn nếu chúng không được hỗ trợ bởi lõi vật lý, điều đó chỉ có nghĩa là bạn có nhiều khả năng song song hơn để cạnh tranh cho cùng một CPU.

Định cỡ cụm khi bạn điều khiển hệ thống thông báo đầu vào

Đây là trường hợp lý tưởng vì bạn có thể tối ưu hóa hiệu suất và hiệu quả khi cần thiết.

Với thông tin điểm chuẩn vừa thu thập, bạn có thể xác định kích thước cụm AWS Glue ban đầu của mình và định cấu hình Kafka hoặc Kinesis với số lượng phân vùng hoặc chủ đề ước tính, cùng với một số bộ đệm. Kiểm tra thiết lập cơ bản này và điều chỉnh nếu cần cho đến khi công việc có thể đáp ứng tổng khối lượng và độ trễ cần thiết một cách thoải mái.

Ví dụ: nếu chúng tôi đã xác định rằng chúng tôi cần 32 lõi để đáp ứng yêu cầu về độ trễ đối với khối lượng dữ liệu cần xử lý thì chúng tôi có thể tạo cụm AWS Glue 3.0 với 9 nút G.1X (một trình điều khiển và 8 nhân viên với 4 nút cores = 32) đọc từ luồng dữ liệu Kinesis có 32 phân đoạn.

Hãy tưởng tượng rằng khối lượng dữ liệu trong luồng đó tăng gấp đôi và chúng tôi muốn duy trì các yêu cầu về độ trễ. Để làm như vậy, chúng tôi nhân đôi số lượng công nhân (16 + 1 trình điều khiển = 17) và số lượng phân đoạn trên luồng (hiện là 64). Hãy nhớ rằng đây chỉ là tài liệu tham khảo và cần được xác thực; trong thực tế, bạn có thể cần nhiều hoặc ít nút hơn tùy thuộc vào kích thước cụm, liệu hệ thống đích có thể theo kịp hay không, độ phức tạp của các phép biến đổi hoặc các tham số khác.

Định cỡ cụm khi bạn không kiểm soát cấu hình hệ thống thông báo

Trong trường hợp này, các tùy chọn điều chỉnh của bạn bị hạn chế hơn nhiều.

Kiểm tra xem một cụm có cùng số lõi Spark như các phân vùng hiện có (được xác định bởi hệ thống thông báo) có thể theo kịp khối lượng dữ liệu và độ trễ dự kiến ​​hay không, cộng với một số phụ cấp cho thời gian cao điểm.

Nếu không, việc thêm nhiều nút hơn sẽ không giúp ích gì. Bạn cần phân vùng lại dữ liệu đến bên trong AWS Glue. Hoạt động này bổ sung thêm chi phí để phân phối lại dữ liệu nội bộ nhưng đó là cách duy nhất để công việc có thể mở rộng quy mô trong trường hợp này.

Hãy minh họa bằng một ví dụ. Hãy tưởng tượng chúng ta có luồng dữ liệu Kinesis với một phân đoạn mà chúng ta không kiểm soát và không có đủ âm lượng để yêu cầu chủ sở hữu tăng phân đoạn. Trong cụm, cần tính toán đáng kể cho mỗi thông báo; đối với mỗi tin nhắn, nó sẽ chạy các phương pháp phỏng đoán và các kỹ thuật ML khác để thực hiện hành động tùy theo tính toán. Sau khi chạy một số điểm chuẩn, việc tính toán có thể được thực hiện nhanh chóng đối với khối lượng tin nhắn dự kiến ​​sử dụng 8 lõi hoạt động song song. Theo mặc định, vì chỉ có một phân đoạn nên chỉ có một lõi sẽ xử lý tất cả các tin nhắn một cách tuần tự.

Để giải quyết tình huống này, chúng tôi có thể cung cấp cụm AWS Glue 3.0 với 3 G 1X các nút có sẵn 8 lõi công nhân. Trong phân vùng lại mã, lô phân phối các tin nhắn một cách ngẫu nhiên (đồng đều nhất có thể) giữa chúng:

def batch_function(data_frame, batch_id):
    # Repartition so the udf is called in parallel for each partition
    data_frame.repartition(8).foreach(process_event_udf)

glueContext.forEachBatch(frame=streaming_df, batch_function=batch_function)

Nếu hệ thống nhắn tin thay đổi kích thước số lượng phân vùng hoặc phân đoạn, công việc sẽ thực hiện thay đổi này trong đợt tiếp theo. Bạn có thể cần điều chỉnh dung lượng cụm phù hợp với khối lượng dữ liệu mới.

Công việc phát trực tuyến có thể xử lý nhiều phân vùng hơn lõi Spark hiện có, nhưng có thể gây ra sự kém hiệu quả vì các phân vùng bổ sung sẽ được xếp hàng đợi và sẽ không bắt đầu được xử lý cho đến khi các phân vùng khác kết thúc. Điều này có thể dẫn đến nhiều nút không hoạt động trong khi các phân vùng còn lại kết thúc và lô tiếp theo có thể được kích hoạt.

Khi các tin nhắn có sự phụ thuộc lẫn nhau trong quá trình xử lý

Nếu các tin nhắn cần xử lý phụ thuộc vào các tin nhắn khác (trong cùng đợt hoặc các đợt trước đó), thì đó có thể là yếu tố hạn chế về khả năng mở rộng. Trong trường hợp đó, có thể hữu ích khi phân tích một lô (công việc trong Spark UI) để xem thời gian được sử dụng ở đâu và liệu có sự mất cân bằng hay không bằng cách kiểm tra phần trăm thời lượng nhiệm vụ trên Các giai đoạn tab (bạn cũng có thể truy cập trang này bằng cách chọn một giai đoạn trên việc làm chuyển hướng).

Tự động chia tỷ lệ

Cho đến nay, bạn đã thấy các phương pháp định cỡ để xử lý luồng dữ liệu ổn định thỉnh thoảng có đỉnh.
Tuy nhiên, đối với khối lượng dữ liệu đến có thể thay đổi, điều này không hiệu quả về mặt chi phí vì bạn cần điều chỉnh kích thước cho trường hợp xấu nhất hoặc chấp nhận độ trễ cao hơn vào thời gian cao điểm.

Đây là lúc tính năng tự động thay đổi quy mô của AWS Glue Streaming 3.0 phát huy tác dụng. Bạn có thể kích hoạt tính năng này cho công việc và xác định số lượng nhân viên tối đa mà bạn muốn cho phép (ví dụ: sử dụng số lượng bạn đã xác định là cần thiết cho thời gian cao điểm).

Thời gian chạy theo dõi xu hướng thời gian dành cho việc xử lý hàng loạt và so sánh nó với khoảng thời gian đã định cấu hình. Trên cơ sở đó đưa ra quyết định tăng hoặc giảm số lượng công nhân khi cần thiết, quyết liệt hơn khi thời gian của mẻ đến gần hoặc vượt quá khoảng thời gian cho phép.

Ảnh chụp màn hình sau đây là ví dụ về công việc phát trực tuyến có bật tính năng tự động chia tỷ lệ.

Phân chia khối lượng công việc

Bạn đã thấy cách mở rộng quy mô một công việc bằng cách thêm các nút và phân vùng dữ liệu khi cần, điều này là đủ trong hầu hết các trường hợp. Khi cụm phát triển, vẫn còn một trình điều khiển duy nhất và các nút phải đợi những nút khác hoàn thành lô trước khi chúng có thể đảm nhận công việc bổ sung. Nếu đạt đến điểm mà việc tăng kích thước cụm không còn hiệu quả nữa, bạn có thể cân nhắc việc phân chia khối lượng công việc giữa các công việc riêng biệt.

Trong trường hợp Kinesis, bạn cần chia dữ liệu thành nhiều luồng, nhưng đối với Apache Kafka, bạn có thể chia một chủ đề thành nhiều công việc bằng cách gán phân vùng cho mỗi luồng. Để làm như vậy, thay vì thông thường subscribe or subscribePattern nơi các chủ đề được liệt kê, hãy sử dụng thuộc tính assign để gán bằng JSON một tập hợp con của các phân vùng chủ đề mà công việc sẽ xử lý (ví dụ: {"topic1": [0,1,2]}). Tại thời điểm viết bài này, không thể chỉ định một phạm vi, vì vậy bạn cần liệt kê tất cả các phân vùng, chẳng hạn như xây dựng danh sách đó một cách linh hoạt trong mã.

Giảm kích thước

Đối với lưu lượng truy cập thấp, AWS Glue Streaming có một loại nút nhỏ đặc biệt: G 0.25X, cung cấp hai lõi và RAM 4 GB với giá chỉ bằng một phần tư DPU, vì vậy nó rất tiết kiệm chi phí. Tuy nhiên, ngay cả với khả năng tiết kiệm đó, nếu bạn có nhiều luồng nhỏ thì việc có một cụm nhỏ cho mỗi luồng vẫn là không thực tế.

Đối với những tình huống như vậy, hiện tại có một số lựa chọn:

  • Định cấu hình luồng DataFrame để cấp dữ liệu từ nhiều chủ đề Kafka hoặc luồng Kinesis. Sau đó, trong DataFrame, sử dụng các cột topicstreamName, đối với các nguồn Kafka và Kinesis tương ứng, để xác định cách xử lý dữ liệu (ví dụ: các phép biến đổi hoặc đích khác nhau). Đảm bảo DataFrame được lưu vào bộ nhớ đệm để bạn không đọc dữ liệu nhiều lần.
  • Nếu bạn có sự kết hợp giữa các nguồn Kafka và Kinesis, bạn có thể xác định DataFrame cho từng nguồn, nối chúng và xử lý khi cần bằng cách sử dụng các cột được đề cập ở điểm trước.
  • Hai trường hợp trước yêu cầu tất cả các nguồn phải có cùng khoảng thời gian theo đợt và liên kết quá trình xử lý của chúng (ví dụ: luồng bận hơn có thể trì hoãn luồng chậm hơn). Để xử lý luồng độc lập trong cùng một cụm, bạn có thể kích hoạt xử lý các DataFrame của luồng riêng biệt bằng các luồng riêng biệt. Mỗi luồng được giám sát riêng trong Spark UI, nhưng bạn chịu trách nhiệm bắt đầu và quản lý các luồng đó cũng như xử lý lỗi.

Cài đặt

Trong bài đăng này, chúng tôi đã trình bày một số cài đặt cấu hình ảnh hưởng đến hiệu suất. Bảng sau đây tóm tắt những bảng chúng ta đã thảo luận và các thuộc tính cấu hình quan trọng khác sẽ sử dụng khi tạo DataFrame luồng đầu vào.

Bất động sản Áp dụng cho Chi tiết
maxOffsetsPerTrigger Kafka Giới hạn tin nhắn mỗi đợt. Chia đều giới hạn cho các phân vùng.
kinesis.executor.maxFetchRecordsPerShard chuyển động Giới hạn cho mỗi phân đoạn, do đó cần được sửa đổi nếu số lượng phân đoạn thay đổi.
kinesis.executor.maxFetchTimeInMs chuyển động Khi tăng kích thước lô (bằng cách tăng khoảng thời gian của lô hoặc thuộc tính trước đó), người thực thi có thể cần nhiều thời gian hơn do thuộc tính này phân bổ.
startingOffsets Kafka Thông thường bạn muốn đọc tất cả dữ liệu có sẵn và do đó sử dụng earliest. Tuy nhiên, nếu có lượng tồn đọng lớn, hệ thống có thể mất nhiều thời gian để bắt kịp và thay vào đó hãy sử dụng latest để bỏ qua lịch sử.
startingposition chuyển động Tương tự như startedOffsets, trong trường hợp này các giá trị sử dụng là TRIM_HORIZON để tải lại và LATEST để bắt đầu xử lý từ bây giờ.
includeHeaders Kafka Bật cờ này nếu bạn cần hợp nhất và phân chia nhiều chủ đề trong cùng một công việc (xem phần trước để biết chi tiết).
kinesis.executor.maxconnections chuyển động Khi ghi vào Kinesis, theo mặc định, nó sử dụng một kết nối duy nhất. Tăng điều này có thể cải thiện hiệu suất.
kinesis.client.avoidEmptyBatches chuyển động Tốt nhất nên đặt giá trị này thành true để tránh lãng phí tài nguyên (ví dụ: tạo tệp trống) khi không có dữ liệu (như trình kết nối Kafka). GlueContext.forEachBatch ngăn chặn các lô trống theo mặc định.

Tối ưu hóa hơn nữa

Nói chung, bạn nên thực hiện một số thao tác nén đối với tin nhắn để tiết kiệm thời gian truyền (tốn một số CPU, tùy thuộc vào kiểu nén được sử dụng).

Nếu nhà sản xuất nén các tin nhắn riêng lẻ, AWS Glue có thể phát hiện và giải nén tự động trong hầu hết các trường hợp, tùy thuộc vào định dạng và loại. Để biết thêm thông tin, hãy tham khảo Thêm công việc phát trực tuyến ETL trong AWS Glue.

Nếu sử dụng Kafka, bạn có tùy chọn nén chủ đề. Bằng cách này, việc nén sẽ hiệu quả hơn vì nó được thực hiện theo lô, từ đầu đến cuối và minh bạch đối với nhà sản xuất cũng như người tiêu dùng.

Theo mặc định, GlueContext.forEachBatch chức năng lưu trữ dữ liệu đến. Điều này hữu ích nếu dữ liệu cần được gửi đến nhiều hệ thống lưu trữ (ví dụ: dưới dạng tệp Amazon S3 và cũng để cập nhật bảng DynamoDB) vì nếu không thì công việc sẽ đọc dữ liệu nhiều lần từ nguồn. Nhưng nó có thể gây bất lợi cho hiệu suất nếu khối lượng dữ liệu lớn và chỉ có một đầu ra.

Để tắt tùy chọn này, hãy đặt persistDataFrame as false:

glueContext.forEachBatch(
    frame=myStreamDataFrame,
    batch_function=processBatch,
    options={
        "windowSize": "30 seconds",
        "checkpointLocation": myCheckpointPath,
        "persistDataFrame":  "false"
    }
)

Trong các tác vụ phát trực tuyến, thông thường phải kết hợp dữ liệu phát trực tuyến với một DataFrame khác để làm phong phú thêm (ví dụ: tra cứu). Trong trường hợp đó, bạn muốn tránh bất kỳ sự xáo trộn nào nếu có thể, vì nó chia tách các giai đoạn và khiến dữ liệu được di chuyển giữa các nút.

Khi DataFrame bạn đang tham gia tương đối nhỏ để vừa với bộ nhớ, hãy cân nhắc sử dụng tính năng nối quảng bá. Tuy nhiên, hãy nhớ rằng nó sẽ được phân phối đến các nút trên mỗi lô, vì vậy nó có thể không có giá trị nếu lô quá nhỏ.

Nếu bạn cần xáo trộn, hãy cân nhắc việc bật Bộ nối tiếp Kryo (nếu sử dụng các lớp có thể tuần tự hóa tùy chỉnh, bạn cần đăng ký chúng trước để sử dụng).

Giống như bất kỳ tác vụ AWS Glue nào, hãy tránh sử dụng udf() nếu bạn có thể làm tương tự với API được cung cấp như Spark SQL. Các hàm do người dùng xác định (UDF) ngăn công cụ thời gian chạy thực hiện nhiều tối ưu hóa (mã UDF là hộp đen cho công cụ) và trong trường hợp Python, nó buộc chuyển động dữ liệu giữa các quy trình.

Tránh tạo quá nhiều tệp nhỏ (đặc biệt là tệp cột như Parquet hoặc ORC, có chi phí chung cho mỗi tệp). Để làm như vậy, có thể nên kết hợp DataFrame vi lô trước khi ghi kết quả đầu ra. Nếu bạn đang ghi dữ liệu được phân vùng vào Amazon S3, việc phân vùng lại dựa trên các cột có thể giảm đáng kể số lượng tệp đầu ra được tạo.

Kết luận

Trong bài đăng này, bạn đã biết cách tiếp cận việc định cỡ và điều chỉnh công việc phát trực tuyến AWS Glue trong các tình huống khác nhau, bao gồm các cân nhắc khi lập kế hoạch, cấu hình, giám sát, mẹo và cạm bẫy.

Giờ đây, bạn có thể sử dụng các kỹ thuật này để giám sát và cải thiện các công việc phát trực tuyến hiện có của mình hoặc sử dụng chúng khi thiết kế và xây dựng các công việc mới.


Giới thiệu về tác giả

Gonzalo Herreros là một Kiến trúc sư Dữ liệu lớn Cấp cao trong nhóm AWS Glue.

tại chỗ_img

Tin tức mới nhất

tại chỗ_img