1.4 C
New York

Verwenden Sie Amazon Kinesis Data Streams, um Echtzeitdaten an Amazon OpenSearch Service-Domänen mit Amazon OpenSearch Ingestion zu liefern

Datum:

In diesem Beitrag zeigen wir die Verwendung Amazon Kinesis-Datenströme zum Puffern und Aggregieren von Echtzeit-Streaming-Daten für die Bereitstellung in Amazon OpenSearch-Dienst Domänen und Sammlungen mit Amazon OpenSearch-Aufnahme. Sie können diesen Ansatz für eine Vielzahl von Anwendungsfällen verwenden, von der Echtzeit-Protokollanalyse bis zur Integration von Anwendungsnachrichtendaten für die Echtzeitsuche. In diesem Beitrag konzentrieren wir uns auf den Anwendungsfall der Zentralisierung der Protokollaggregation für eine Organisation, die ihre Protokolldaten aus Compliance-Gründen archivieren und aufbewahren muss.

Kinesis Data Streams ist ein vollständig verwalteter, serverloser Datenstreamingdienst, der verschiedene Streamingdaten in Echtzeit in jedem Umfang speichert und aufnimmt. Für Anwendungsfälle der Protokollanalyse verbessert Kinesis Data Streams die Protokollaggregation, indem es Produzenten- und Konsumentenanwendungen entkoppelt und einen belastbaren, skalierbaren Puffer zum Erfassen und Bereitstellen von Protokolldaten bereitstellt. Diese Entkopplung bietet Vorteile gegenüber herkömmlichen Architekturen. Während Protokollproduzenten hoch- und herunterskaliert werden, können Kinesis Data Streams dynamisch skaliert werden, um Protokolldaten dauerhaft zu puffern. Dies verhindert, dass Laständerungen eine OpenSearch Service-Domäne beeinträchtigen, und bietet einen belastbaren Speicher für Protokolldaten zur Nutzung. Außerdem können mehrere Konsumenten Protokolldaten in Echtzeit verarbeiten, wodurch ein dauerhafter Speicher für Echtzeitdaten für Anwendungen bereitgestellt wird. Dadurch kann die Protokollanalyse-Pipeline die Well-Architected-Best Practices für Belastbarkeit erfüllen (REL04-BP02) und Kosten (COST09-BP02).

OpenSearch Ingestion ist eine serverlose Pipeline, die leistungsstarke Tools zum Extrahieren, Transformieren und Laden von Daten in eine OpenSearch Service-Domäne bietet. OpenSearch Ingestion lässt sich in viele AWS-Dienste integrieren und bietet vorgefertigte Blaupausen, um die Aufnahme von Daten für eine Vielzahl von Analyseanwendungsfällen in OpenSearch Service-Domänen zu beschleunigen. In Verbindung mit Kinesis Data Streams ermöglicht OpenSearch Ingestion anspruchsvolle Echtzeitanalysen von Daten und hilft dabei, die undifferenzierte Schwerstarbeit beim Erstellen einer Echtzeit-Such- und Analysearchitektur zu reduzieren.

Lösungsüberblick

In dieser Lösung betrachten wir einen häufigen Anwendungsfall für die zentrale Protokollaggregation in einer Organisation. Organisationen können aus verschiedenen Gründen einen Ansatz für die zentrale Protokollaggregation in Betracht ziehen. Viele Organisationen haben Compliance- und Governance-Anforderungen, die Bestimmungen dazu enthalten, welche Daten protokolliert werden müssen und wie lange Protokolldaten aufbewahrt und für Untersuchungen durchsuchbar bleiben müssen. Andere Organisationen möchten Anwendungs- und Sicherheitsvorgänge konsolidieren und ihren Teams gemeinsame Toolsets und Funktionen zur Beobachtung bereitstellen.

Um diese Anforderungen zu erfüllen, müssen Sie Daten aus Protokollquellen (Produzenten) auf skalierbare, zuverlässige und kostengünstige Weise erfassen. Protokollquellen können je nach Anwendungs- und Infrastrukturanwendungsfall und -konfiguration variieren, wie in der folgenden Tabelle dargestellt.

Protokollproduzent Beispiel Beispiel für eine Produzentenprotokollkonfiguration
Anwendungsprotokolle AWS Lambda Amazon CloudWatch-Protokolle
Anwendungsagenten FluentBit Amazon OpenSearch-Aufnahme
AWS-Dienstprotokolle Amazon Web Application Firewall Amazon S3

Das folgende Diagramm veranschaulicht eine Beispielarchitektur.

Sie können Kinesis Data Streams für eine Vielzahl dieser Anwendungsfälle verwenden. Sie können konfigurieren Amazon CloudWatch Protokolle zum Senden von Daten an Kinesis Data Streams mithilfe eines Abonnementfilters (siehe Echtzeitverarbeitung von Protokolldaten mit Abonnements). Wenn Sie Daten mit Kinesis Data Streams für Analyseanwendungsfälle senden, können Sie OpenSearch Ingestion verwenden, um eine skalierbare, erweiterbare Pipeline zu erstellen, mit der Sie Ihre Streaming-Daten nutzen und in OpenSearch Service-Indizes schreiben können. Kinesis Data Streams bietet einen Puffer, der mehrere Verbraucher, konfigurierbare Aufbewahrung und integrierte Integration mit einer Vielzahl von AWS-Diensten unterstützen kann. Für andere Anwendungsfälle, in denen Daten in Amazon Simple Storage-Service (Amazon S3) oder wenn ein Agent Daten wie FluentBit schreibt, kann ein Agent dank der integrierten persistenten Puffer und der automatischen Skalierung von OpenSearch Ingestion Daten ohne Zwischenpuffer direkt in OpenSearch Ingestion schreiben.

Durch die Standardisierung von Protokollierungsansätzen verringert sich der Entwicklungs- und Betriebsaufwand für Unternehmen. Sie können beispielsweise die Protokollierung aller Anwendungen, wenn möglich, in CloudWatch-Protokolle standardisieren und auch Amazon S3-Protokolle verarbeiten, wenn CloudWatch-Protokolle nicht unterstützt werden. Dies reduziert die Anzahl der Anwendungsfälle, die ein zentralisiertes Team in seinem Protokollaggregationsansatz verarbeiten muss, und verringert die Komplexität der Protokollaggregationslösung. Für anspruchsvollere Entwicklungsteams können Sie die Verwendung von FluentBit-Agenten standardisieren, um Daten direkt in OpenSearch Ingestion zu schreiben, um die Kosten zu senken, wenn Protokolldaten nicht in CloudWatch gespeichert werden müssen.

Diese Lösung konzentriert sich auf die Verwendung von CloudWatch-Protokollen als Datenquelle für die Protokollaggregation. Informationen zum Amazon S3-Protokoll-Anwendungsfall finden Sie unter Verwenden einer OpenSearch Ingestion-Pipeline mit Amazon S3. Informationen zu agentenbasierten Lösungen finden Sie in der agentenspezifischen Dokumentation zur Integration mit OpenSearch Ingestion, z. B. Verwenden einer OpenSearch Ingestion-Pipeline mit Fluent Bit.

Voraussetzungen:

Um Daten mit OpenSearch Ingestion in OpenSearch Service aufzunehmen, sind mehrere wichtige Teile der in dieser Lösung verwendeten Infrastruktur erforderlich:

  • Ein Kinesis-Datenstrom zum Aggregieren der Protokolldaten von CloudWatch
  • Eine OpenSearch-Domäne zum Speichern der Protokolldaten

Beim Erstellen des Kinesis-Datenstroms empfehlen wir, mit dem On-Demand-Modus zu beginnen. Dadurch können Kinesis Data Streams die Anzahl der für Ihren Protokolldurchsatz erforderlichen Shards automatisch skalieren. Nachdem Sie die stabile Arbeitslast für Ihren Anwendungsfall der Protokollaggregation ermittelt haben, empfehlen wir, in den bereitgestellten Modus zu wechseln und dabei die im On-Demand-Modus ermittelte Anzahl von Shards zu verwenden. Dies kann Ihnen dabei helfen, die langfristigen Kosten für Anwendungsfälle mit hohem Durchsatz zu optimieren.

Im Allgemeinen empfehlen wir die Verwendung eines Kinesis-Datenstroms für Ihre Protokollaggregations-Workload. OpenSearch Ingestion unterstützt bis zu 96 OCUs pro Pipeline und 24,000 Zeichen pro Pipeline-Definitionsdatei (siehe OpenSearch-Aufnahmekontingente). Das bedeutet, dass jede Pipeline einen Kinesis-Datenstrom mit bis zu 96 Shards unterstützen kann, da jede OCU einen Shard verarbeitet. Die Verwendung eines Kinesis-Datenstroms vereinfacht den Gesamtprozess zum Aggregieren von Protokolldaten in OpenSearch Service und vereinfacht den Prozess zum Erstellen und Verwalten von Abonnementfiltern für Protokollgruppen.

Abhängig vom Umfang Ihrer Protokoll-Workloads und der Komplexität Ihrer OpenSearch Ingestion-Pipeline-Logik können Sie für Ihren Anwendungsfall mehrere Kinesis-Datenströme in Betracht ziehen. Beispielsweise können Sie für jeden Hauptprotokolltyp in Ihrer Produktions-Workload einen Stream in Betracht ziehen. Wenn Protokolldaten für verschiedene Anwendungsfälle in verschiedene Streams aufgeteilt sind, kann dies dazu beitragen, die betriebliche Komplexität der Verwaltung von OpenSearch Ingestion-Pipelines zu verringern, und Sie können bei Bedarf Änderungen an jedem Protokoll-Anwendungsfall separat skalieren und bereitstellen.

Informationen zum Erstellen eines Kinesis Data Stream finden Sie unter Erstellen Sie einen Datenstrom.

Informationen zum Erstellen einer OpenSearch-Domäne finden Sie unter Erstellen und Verwalten von Amazon OpenSearch-Domänen.

Konfigurieren von Protokollabonnementfiltern

Sie können Abonnementfilter für CloudWatch-Protokollgruppen auf Konto- oder Protokollgruppenebene implementieren. In beiden Fällen empfehlen wir, einen Abonnementfilter mit einer zufälligen Verteilungsmethode zu erstellen, um sicherzustellen, dass die Protokolldaten gleichmäßig auf die Kinesis-Datenstrom-Shards verteilt werden.

Abonnementfilter auf Kontoebene werden auf alle Protokollgruppen in einem Konto angewendet und können verwendet werden, um alle Protokolldaten für ein einzelnes Ziel zu abonnieren. Dies funktioniert gut, wenn Sie alle Ihre Protokolldaten mithilfe von Kinesis Data Streams in OpenSearch Service speichern möchten. Es gibt ein Limit von einem Abonnementfilter auf Kontoebene pro Konto. Wenn Sie Kinesis Data Streams als Ziel verwenden, können Sie auch mehrere Protokollkonsumenten haben, um die Kontoprotokolldaten bei Bedarf zu verarbeiten. Informationen zum Erstellen eines Abonnementfilters auf Kontoebene finden Sie unter Abonnementfilter auf Kontoebene.

Abonnementfilter auf Protokollgruppenebene werden auf jede Protokollgruppe angewendet. Dieser Ansatz funktioniert gut, wenn Sie eine Teilmenge Ihrer Protokolldaten in OpenSearch Service mithilfe von Kinesis Data Streams speichern möchten und wenn Sie mehrere verschiedene Datenströme zum Speichern und Verarbeiten mehrerer Protokolltypen verwenden möchten. Es gibt ein Limit von zwei Abonnementfiltern auf Protokollgruppenebene pro Protokollgruppe. Informationen zum Erstellen eines Abonnementfilters auf Protokollgruppenebene finden Sie unter Abonnementfilter auf Protokollgruppenebene.

Nachdem Sie Ihren Abonnementfilter erstellt haben, überprüfen Sie, ob Protokolldaten an Ihren Kinesis-Datenstrom gesendet werden. Wählen Sie in der Kinesis Data Streams-Konsole den Link für Ihren Stream-Namen.

Wählen Sie eine Scherbe mit Startposition festlegen als Horizont trimmen, und wähle Datensätze abrufen.

Sie sollten Datensätze mit einem eindeutigen Partitionsschlüssel Spaltenwert und Binärwert Daten-Management Spalte. Dies liegt daran, dass CloudWatch Daten im GZIP-Format sendet, um Protokolldaten zu komprimieren.

Konfigurieren einer OpenSearch Ingestion-Pipeline

Nachdem Sie nun über einen Kinesis-Datenstrom und CloudWatch-Abonnementfilter zum Senden von Daten an den Datenstrom verfügen, können Sie Ihre OpenSearch Ingestion-Pipeline so konfigurieren, dass Ihre Protokolldaten verarbeitet werden. Zu Beginn erstellen Sie einen AWS Identity and Access Management and (IAM)-Rolle, die Lesezugriff auf den Kinesis-Datenstrom und Lese-/Schreibzugriff auf die OpenSearch-Domäne ermöglicht. Um Ihre Pipeline zu erstellen, erfordert Ihre Managerrolle, die zum Erstellen der Pipeline verwendet wird, iam:PassRole Berechtigungen für die in diesem Schritt erstellte Pipeline-Rolle.

  1. Erstellen Sie eine IAM-Rolle mit den folgenden Berechtigungen, um aus Ihrem Kinesis-Datenstrom zu lesen und auf Ihre OpenSearch-Domäne zuzugreifen:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "allowReadFromStream",
                "Effect": "Allow",
                "Action": [
                    "kinesis:DescribeStream",
                    "kinesis:DescribeStreamConsumer",
                    "kinesis:DescribeStreamSummary",
                    "kinesis:GetRecords",
                    "kinesis:GetShardIterator",
                    "kinesis:ListShards",
                    "kinesis:ListStreams",
                    "kinesis:ListStreamConsumers",
                    "kinesis:RegisterStreamConsumer",
                    "kinesis:SubscribeToShard"
                ],
                "Resource": [
                    "arn:aws:kinesis:{{region}}:{{account-id}}:stream/{{stream-name}}"
                ]
            },
            {
                "Sid": "allowAccessToOS",
                "Effect": "Allow",
                "Action": [
                    "es:DescribeDomain",
                    "es:ESHttp*"
                ],
                "Resource": [
                    "arn:aws:es:{region}:{account-id}:domain/{domain-name}",
                    "arn:aws:es:{region}:{account-id}:domain/{domain-name}/*"
                ]
            }
        ]
    }

  2. Weisen Sie Ihrer Rolle eine Vertrauensrichtlinie zu, die den Zugriff ermöglicht von osis-pipelines.amazonaws.com:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "",
                "Effect": "Allow",
                "Principal": {
                    "Service": [
                        "osis-pipelines.amazonaws.com"
                    ]
                },
                "Action": "sts:AssumeRole",
                "Condition": {
                    "StringEquals": {
                        "aws:SourceAccount": "{account-id}"
                    },
                    "ArnLike": {
                        "aws:SourceArn": "arn:aws:osis:{region}:{account-id}:pipeline/*"
                    }
                }
            }
        ]
    }

Damit eine Pipeline Daten in eine Domäne schreiben kann, muss die Domäne über eine Zugriffsrichtlinie auf Domänenebene das der Pipeline-Rolle den Zugriff darauf ermöglicht, und wenn Ihre Domäne differenzierte Zugangskontrolle, dann muss die IAM-Rolle einer Back-End-Rolle im OpenSearch Service-Sicherheits-Plugin zugeordnet werden, die den Zugriff zum Erstellen und Schreiben in Indizes ermöglicht.

  1. Nachdem Sie Ihre Pipeline-Rolle erstellt haben, wählen Sie in der OpenSearch Service-Konsole Pipelines für Verschlucken im Navigationsbereich.
  2. Wählen Pipeline erstellen.
  3. Suchen Sie in den Blueprints nach Kinesis, wählen Sie den Blueprint Kinesis Data Streams aus und wählen Sie Blaupause auswählen.
  4. Der Pipeline-Einstellungen, geben Sie einen Namen für Ihre Pipeline ein und legen Sie maximale Kapazität damit die Pipeline der Anzahl der Shards in Ihrem Kinesis-Datenstrom entspricht.

Wenn Sie den On-Demand-Modus für den Datenstrom verwenden, wählen Sie eine Kapazität, die der aktuellen Anzahl von Shards im Stream entspricht. Dieser Anwendungsfall erfordert keinen persistenten Puffer, da Kinesis Data Streams einen persistenten Puffer für die Protokolldaten bereitstellt und OpenSearch Ingestion seine Position im Kinesis-Datenstrom im Laufe der Zeit verfolgt, wodurch Datenverlust bei Neustarts verhindert wird.

  1. Der Pipeline-Konfiguration, aktualisieren Sie die Quelleinstellungen der Pipeline, um Ihren Kinesis-Datenstromnamen und die Amazon-Ressourcenbezeichnung (ARN) der Pipeline-IAM-Rolle zu verwenden.

Vollständige Konfigurationsinformationen finden Sie unter . Für die meisten Konfigurationen können Sie die Standardwerte verwenden. Standardmäßig schreibt die Pipeline alle 100 Sekunde Stapel von 1 Dokumenten und abonniert den Kinesis-Datenstrom ab der letzten Position im Stream mithilfe von verbessertes Fan-Out, wobei alle 2 Minuten ein Checkpoint seiner Position im Stream erstellt wird. Sie können dieses Verhalten nach Wunsch anpassen, um festzulegen, wie häufig der Consumer Checkpoints erstellt, wo er im Stream beginnt, und Polling verwenden, um die Kosten durch verbessertes Fan-Out zu reduzieren.

  source:
    kinesis-data-streams:
      acknowledgments: true
      codec:
        # JSON codec supports parsing nested CloudWatch events into
        # individual log entries that will be written as documents to
        # OpenSearch
        json:
          key_name: "logEvents"
          # These keys contain the metadata sent by CloudWatch Subscription Filters
          # in addition to the individual log events:
          # https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
          include_keys: ['owner', 'logGroup', 'logStream' ]
      streams:
        # Update to use your Kinesis Stream name used in your Subscription Filters:
        - stream_name: "KINESIS_STREAM_NAME"
          # Can customize initial position if you don't want OSI to consume the entire stream:
          initial_position: "EARLIEST"
          # Compression will always be gzip for CloudWatch, but will vary for other sources:
          compression: "gzip"
      aws:
        # Provide the Role ARN with access to KDS. This role should have a trust relationship with osis-pipelines.amazonaws.com
        # This must be the same role used below in the Sink configuration.
        sts_role_arn: "PIPELINE_ROLE_ARN"
        # Provide the region of the Data Stream.
        region: "REGION"

  1. Aktualisieren Sie die Sink-Einstellungen der Pipeline, um die URL Ihres OpenSearch-Domänenendpunkts und die ARN der IAM-Pipeline-Rolle einzuschließen.

Die IAM-Rollen-ARN muss für die OpenSearch Servicer-Sink-Definition und die Kinesis Data Streams-Quelldefinition identisch sein. Mithilfe der Indexdefinition im Sink können Sie steuern, welche Daten in verschiedenen Indizes indiziert werden. Sie können beispielsweise Metadaten zum Kinesis-Datenstromnamen verwenden, um nach Datenstrom zu indizieren (${getMetadata("kinesis_stream_name")), oder Sie können Dokumentfelder verwenden, um Daten abhängig von der CloudWatch-Protokollgruppe oder anderen Dokumentdaten zu indizieren (${path/to/field/in/document}). In diesem Beispiel verwenden wir drei Felder auf Dokumentebene (data_stream.type, data_stream.dataset und data_stream.namespace), um unsere Dokumente zu indizieren und diese Felder in unserer Pipeline-Prozessorlogik im nächsten Abschnitt zu erstellen:

  sink:
    - opensearch:
        # Provide an AWS OpenSearch Service domain endpoint
        hosts: [ "OPENSEARCH_ENDPOINT" ]
        # Route log data to different target indexes depending on the log context:
        index: "ss4o_${data_stream/type}-${data_stream/dataset}-${data_stream/namespace}"
        aws:
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
          # This role must be the same as the role used above for Kinesis.
          sts_role_arn: "PIPELINE_ROLE_ARN"
          # Provide the region of the domain.
          region: "REGION"
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
          serverless: false

Schließlich können Sie die Pipeline-Konfiguration aktualisieren, um Folgendes einzuschließen: Prozessor Definitionen, um Ihre Protokolldaten zu transformieren, bevor Dokumente in die OpenSearch-Domäne geschrieben werden. Dieser Anwendungsfall verwendet beispielsweise Einfaches Schema für Beobachtbarkeit (SS4O) und verwendet die OpenSearch Ingestion-Pipeline, um das gewünschte Schema für SS4O zu erstellen. Dazu gehört das Hinzufügen gemeinsamer Felder, um Metadaten mit den indexierten Dokumenten zu verknüpfen, sowie das Parsen der Protokolldaten, um die Daten besser durchsuchbar zu machen. Dieser Anwendungsfall verwendet auch den Protokollgruppennamen, um verschiedene Protokolltypen als Datensätze zu identifizieren, und verwendet diese Informationen, um Dokumente je nach Anwendungsfall in verschiedene Indizes zu schreiben.

  1. Benennen Sie den CloudWatch-Ereigniszeitstempel um, um den beobachteten Zeitstempel zu markieren, bei dem das Protokoll generiert wurde. Verwenden Sie dazu rename_keys Prozessorund fügen Sie den aktuellen Zeitstempel als verarbeiteten Zeitstempel hinzu, als OpenSearch Ingestion den Datensatz mithilfe des Datenverarbeiter:
      #  Processor logic is used to change how log data is parsed for OpenSearch.
      processor:
        - rename_keys:
            entries:
            # Include CloudWatch timestamp as the observation timestamp - the time the log
            # was generated and sent to CloudWatch:
            - from_key: "timestamp"
              to_key: "observed_timestamp"
        - date:
            # Include the current timestamp that OSI processed the log event:
            from_time_received: true
            destination: "processed_timestamp"

  2. Verwenden Sie das add_entries-Prozessor um Metadaten über das verarbeitete Dokument einzuschließen, einschließlich Protokollgruppe, Protokollstream, Konto-ID, AWS-Region, Kinesis-Datenstream-Informationen und Datensatz-Metadaten:
        - add_entries:
            entries:
            # Support SS4O common log fields (https://opensearch.org/docs/latest/observing-your-data/ss4o/)
            - key: "cloud/provider"
              value: "aws"
            - key: "cloud/account/id"
              format: "${owner}"
            - key: "cloud/region"
              value: "us-west-2"
            - key: "aws/cloudwatch/log_group"
              format: "${logGroup}"
            - key: "aws/cloudwatch/log_stream"
              format: "${logStream}"
            # Include default values for the data_stream:
            - key: "data_stream/namespace"
              value: "default"
            - key: "data_stream/type"
              value: "logs"
            - key: "data_stream/dataset"
              value: "general"
            # Include metadata about the source Kinesis message that contained this log event:
            - key: "aws/kinesis/stream_name"
              value_expression: "getMetadata("stream_name")"
            - key: "aws/kinesis/partition_key"
              value_expression: "getMetadata("partition_key")"
            - key: "aws/kinesis/sequence_number"
              value_expression: "getMetadata("sequence_number")"
            - key: "aws/kinesis/sub_sequence_number"
              value_expression: "getMetadata("sub_sequence_number")"

  3. Verwenden Sie die Syntax für bedingte Ausdrücke um die zu aktualisieren data_stream.dataset Felder abhängig von der Protokollquelle, um zu steuern, in welchen Index das Dokument geschrieben wird, und verwenden Sie die delete_entries-Prozessor um die ursprünglichen CloudWatch-Dokumentfelder zu löschen, die umbenannt wurden:
        - add_entries:
            entries:
            # Update the data_stream fields based on the log event context - in this case
            # classifying the log events by their source (CloudTrail or Lambda).
            # Additional logic could be added to classify the logs by business or application context:
            - key: "data_stream/dataset"
              value: "cloudtrail"
              add_when: "contains(/logGroup, "cloudtrail") or contains(/logGroup, "CloudTrail")"
              overwrite_if_key_exists: true
            - key: "data_stream/dataset"
              value: "lambda"
              add_when: "contains(/logGroup, "/aws/lambda/")"
              overwrite_if_key_exists: true
            - key: "data_stream/dataset"
              value: "apache"
              add_when: "contains(/logGroup, "/apache/")"
              overwrite_if_key_exists: true
        # Remove the default CloudWatch fields, as we re-mapped them to SS4O fields:
        - delete_entries:
            with_keys:
              - "logGroup"
              - "logStream"
              - "owner"

  4. Analysieren Sie die Felder der Protokollnachrichten, um strukturierte und JSON-Daten in den OpenSearch-Indizes mithilfe der grok und parse_json

Grok-Prozessoren verwenden Mustervergleiche, um Daten aus strukturierten Textfeldern zu analysieren. Beispiele für integrierte Grok-Muster finden Sie unter Java-Grok-Muster und Dataprepper-Grok-Muster.

    # Use Grok parser to parse non-JSON apache logs
    - grok:
        grok_when: "/data_stream/dataset == "apache""
        match:
          message: ['%{COMMONAPACHELOG_DATATYPED}']
        target_key: "http"
    # Attempt to parse the log data as JSON to support field-level searches in the OpenSearch index:
    - parse_json:
        # Parse root message object into aws.cloudtrail to match SS4O standard for SS4O logs
        source: "message"
        destination: "aws/cloudtrail"
        parse_when: "/data_stream/dataset == "cloudtrail""
        tags_on_failure: ["json_parse_fail"]
    - parse_json:
        # Parse root message object as JSON when possible for Lambda function logs - can also set up Grok support
        # for Lambda function logs to capture non-JSON logging function data as searchable fields
        source: "message"
        destination: "aws/lambda"
        parse_when: "/data_stream/dataset == "lambda""
        tags_on_failure: ["json_parse_fail"]
    - parse_json:
        # Parse root message object as JSON when possible for general logs
        source: "message"
        destination: "body"
        parse_when: "/data_stream/dataset == "general""
        tags_on_failure: ["json_parse_fail"]

Wenn alles zusammengefügt ist, sieht Ihre Pipeline-Konfiguration wie der folgende Code aus:

version: "2"
kinesis-pipeline:
  source:
    kinesis-data-streams:
      acknowledgments: true
      codec:
        # JSON codec supports parsing nested CloudWatch events into
        # individual log entries that will be written as documents to
        # OpenSearch
        json:
          key_name: "logEvents"
          # These keys contain the metadata sent by CloudWatch Subscription Filters
          # in addition to the individual log events:
          # https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
          include_keys: ['owner', 'logGroup', 'logStream' ]
      streams:
        # Update to use your Kinesis Stream name used in your Subscription Filters:
        - stream_name: "KINESIS_STREAM_NAME"
          # Can customize initial position if you don't want OSI to consume the entire stream:
          initial_position: "EARLIEST"
          # Compression will always be gzip for CloudWatch, but will vary for other sources:
          compression: "gzip"
      aws:
        # Provide the Role ARN with access to KDS. This role should have a trust relationship with osis-pipelines.amazonaws.com
        # This must be the same role used below in the Sink configuration.
        sts_role_arn: "PIPELINE_ROLE_ARN"
        # Provide the region of the Data Stream.
        region: "REGION"
        
  #  Processor logic is used to change how log data is parsed for OpenSearch.
  processor:
    - rename_keys:
        entries:
        # Include CloudWatch timestamp as the observation timestamp - the time the log
        # was generated and sent to CloudWatch:
        - from_key: "timestamp"
          to_key: "observed_timestamp"
    - date:
        # Include the current timestamp that OSI processed the log event:
        from_time_received: true
        destination: "processed_timestamp"
    - add_entries:
        entries:
        # Support SS4O common log fields (https://opensearch.org/docs/latest/observing-your-data/ss4o/)
        - key: "cloud/provider"
          value: "aws"
        - key: "cloud/account/id"
          format: "${owner}"
        - key: "cloud/region"
          value: "us-west-2"
        - key: "aws/cloudwatch/log_group"
          format: "${logGroup}"
        - key: "aws/cloudwatch/log_stream"
          format: "${logStream}"
        # Include default values for the data_stream:
        - key: "data_stream/namespace"
          value: "default"
        - key: "data_stream/type"
          value: "logs"
        - key: "data_stream/dataset"
          value: "general"
        # Include metadata about the source Kinesis message that contained this log event:
        - key: "aws/kinesis/stream_name"
          value_expression: "getMetadata("stream_name")"
        - key: "aws/kinesis/partition_key"
          value_expression: "getMetadata("partition_key")"
        - key: "aws/kinesis/sequence_number"
          value_expression: "getMetadata("sequence_number")"
        - key: "aws/kinesis/sub_sequence_number"
          value_expression: "getMetadata("sub_sequence_number")"
    - add_entries:
        entries:
        # Update the data_stream fields based on the log event context - in this case
        # classifying the log events by their source (CloudTrail or Lambda).
        # Additional logic could be added to classify the logs by business or application context:
        - key: "data_stream/dataset"
          value: "cloudtrail"
          add_when: "contains(/logGroup, "cloudtrail") or contains(/logGroup, "CloudTrail")"
          overwrite_if_key_exists: true
        - key: "data_stream/dataset"
          value: "lambda"
          add_when: "contains(/logGroup, "/aws/lambda/")"
          overwrite_if_key_exists: true
        - key: "data_stream/dataset"
          value: "apache"
          add_when: "contains(/logGroup, "/apache/")"
          overwrite_if_key_exists: true
    # Remove the default CloudWatch fields, as we re-mapped them to SS4O fields:
    - delete_entries:
        with_keys:
          - "logGroup"
          - "logStream"
          - "owner"
    # Use Grok parser to parse non-JSON apache logs
    - grok:
        grok_when: "/data_stream/dataset == "apache""
        match:
          message: ['%{COMMONAPACHELOG_DATATYPED}']
        target_key: "http"
    # Attempt to parse the log data as JSON to support field-level searches in the OpenSearch index:
    - parse_json:
        # Parse root message object into aws.cloudtrail to match SS4O standard for SS4O logs
        source: "message"
        destination: "aws/cloudtrail"
        parse_when: "/data_stream/dataset == "cloudtrail""
        tags_on_failure: ["json_parse_fail"]
    - parse_json:
        # Parse root message object as JSON when possible for Lambda function logs - can also set up Grok support
        # for Lambda function logs to capture non-JSON logging function data as searchable fields
        source: "message"
        destination: "aws/lambda"
        parse_when: "/data_stream/dataset == "lambda""
        tags_on_failure: ["json_parse_fail"]
    - parse_json:
        # Parse root message object as JSON when possible for general logs
        source: "message"
        destination: "body"
        parse_when: "/data_stream/dataset == "general""
        tags_on_failure: ["json_parse_fail"]

  sink:
    - opensearch:
        # Provide an AWS OpenSearch Service domain endpoint
        hosts: [ "OPENSEARCH_ENDPOINT" ]
        # Route log data to different target indexes depending on the log context:
        index: "ss4o_${data_stream/type}-${data_stream/dataset}-${data_stream/namespace}"
        aws:
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
          # This role must be the same as the role used above for Kinesis.
          sts_role_arn: "PIPELINE_ROLE_ARN"
          # Provide the region of the domain.
          region: "REGION"
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
          serverless: false

  1. Wenn Ihre Konfiguration abgeschlossen ist, wählen Sie Pipeline validieren um Ihre Pipeline-Syntax auf Fehler zu überprüfen.
  2. Im Pipeline-Rolle Geben Sie optional ein Suffix ein, um eine eindeutige Servicerolle zu erstellen, die zum Starten Ihres Pipeline-Laufs verwendet wird.
  3. Im Network Abschnitt auswählen VPC-Zugriff.

Für eine Kinesis Data Streams-Quelle müssen Sie keine Virtual Private Cloud (VPC), Subnetze oder Sicherheitsgruppen auswählen. OpenSearch Ingestion erfordert diese Attribute nur für HTTP-Datenquellen, die sich innerhalb einer VPC befinden. Für Kinesis Data Streams verwendet OpenSearch Ingestion AWS PrivateLink um aus Kinesis Data Streams zu lesen und in OpenSearch-Domänen oder serverlose Sammlungen zu schreiben.

  1. Aktivieren Sie optional die CloudWatch-Protokollierung für Ihre Pipeline.
  2. Wählen Next um Ihre Pipeline zu überprüfen und zu erstellen.

Wenn Sie Abonnementfilter auf Kontoebene für CloudWatch-Protokolle in dem Konto verwenden, in dem OpenSearch Ingestion ausgeführt wird, sollte diese Protokollgruppe vom Abonnement auf Kontoebene ausgeschlossen werden. Dies liegt daran, dass Pipeline-Protokolle von OpenSearch Ingestion eine rekursive Schleife mit dem Abonnementfilter verursachen könnten, die zu hohen Mengen an aufgenommenen Protokolldaten und damit zu hohen Kosten führen könnte.

  1. Im Überprüfen und erstellen Wählen Sie im Abschnitt Pipeline erstellen.

Wenn Ihre Pipeline in die Aktives Status: Sie werden sehen, wie die Protokolle in Ihrer OpenSearch-Domäne oder serverlosen Sammlung aufgefüllt werden.

Überwachen der Lösung

Um die Integrität der Protokollaufnahme-Pipeline aufrechtzuerhalten, müssen mehrere wichtige Bereiche überwacht werden:

  • Kinesis Data Streams-Metriken – Sie sollten die folgenden Kennzahlen überwachen:
    • Fehlgeschlagene Datensätze – Zeigt ein Problem in CloudWatch-Abonnementfiltern an, die in den Kinesis-Datenstrom schreiben. Wenden Sie sich an den AWS-Support, wenn diese Metrik über einen längeren Zeitraum ungleich Null bleibt.
    • ThrottledRecords – Zeigt an, dass Ihr Kinesis-Datenstrom mehr Shards benötigt, um das Protokollvolumen von CloudWatch aufzunehmen.
    • ReadProvisionedThroughputExceeded – Zeigt an, dass Ihr Kinesis-Datenstrom mehr Verbraucher hat, die Lesedurchsatz verbrauchen, als durch die Shard-Grenzwerte bereitgestellt wird, und dass Sie möglicherweise zu einer erweiterten Fan-Out-Verbraucherstrategie wechseln müssen.
    • WriteProvisionedThroughputExceeded – Zeigt an, dass Ihr Kinesis-Datenstrom mehr Shards benötigt, um das Protokollvolumen von CloudWatch aufzunehmen, oder dass Ihr Protokollvolumen ungleichmäßig auf Ihre Shards verteilt wird. Stellen Sie sicher, dass die Verteilungsstrategie des Abonnementfilters auf „Zufällig“ eingestellt ist, und aktivieren Sie ggf. eine erweiterte Überwachung auf Shard-Ebene für den Datenstrom, um Hot Shards zu identifizieren.
    • RateÜberschritten – Zeigt an, dass ein Verbraucher für den Stream falsch konfiguriert ist und möglicherweise ein Problem in Ihrer OpenSearch Ingestion-Pipeline vorliegt, das dazu führt, dass er sich zu oft anmeldet. Untersuchen Sie Ihre Verbraucherstrategie für den Kinesis-Datenstream.
    • MillisBehindLatest – Zeigt an, dass der erweiterte Fan-Out-Consumer mit der Last im Datenstrom nicht Schritt halten kann. Untersuchen Sie die OCU-Konfiguration der OpenSearch Ingestion-Pipeline und stellen Sie sicher, dass genügend OCUs vorhanden sind, um die Kinesis-Datenstrom-Shards aufzunehmen.
    • IteratorAlterMillisekunden – Zeigt an, dass der abfragende Verbraucher mit der Last im Datenstrom nicht Schritt halten kann. Untersuchen Sie die OCU-Konfiguration der OpenSearch Ingestion-Pipeline und stellen Sie sicher, dass genügend OCUs vorhanden sind, um die Kinesis-Datenstrom-Shards aufzunehmen, und untersuchen Sie die Abfragestrategie für den Verbraucher.
  • Filtermetriken für CloudWatch-Abonnements – Sie sollten die folgenden Kennzahlen überwachen:
    • Lieferfehler – Zeigt ein Problem im CloudWatch-Abonnementfilter an, der Daten an den Kinesis-Datenstrom liefert. Untersuchen Sie die Datenstrommetriken.
    • Lieferdrosselung – Zeigt eine unzureichende Kapazität im Kinesis-Datenstrom an. Untersuchen Sie die Datenstrommetriken.
  • OpenSearch Ingestion-Metriken – Empfohlene Überwachung für OpenSearch Ingestion finden Sie unter Empfohlene CloudWatch-Alarme.
  • OpenSearch Service-Metriken – Empfohlene Überwachung für OpenSearch Service finden Sie unter Empfohlene CloudWatch-Alarme für Amazon OpenSearch Service.

Aufräumen

Stellen Sie sicher, dass Sie unerwünschte AWS-Ressourcen löschen, die beim Befolgen dieses Beitrags erstellt wurden, um zusätzliche Kosten für diese Ressourcen zu vermeiden. Befolgen Sie diese Schritte, um Ihr AWS-Konto zu löschen:

  1. Löschen Sie Ihren Kinesis-Datenstrom.
  2. Löschen Sie Ihre OpenSearch Service-Domäne.
  3. Verwenden Sie das DeleteAccountPolicy-API um Ihren CloudWatch-Abonnementfilter auf Kontoebene zu entfernen.
  4. Löschen Sie Ihren CloudWatch-Abonnementfilter auf Protokollgruppenebene:
    1. Wählen Sie in der CloudWatch-Konsole die gewünschte Protokollgruppe aus.
    2. Auf dem Aktionen Menü, wählen Sie Abonnementfilter und Alle Abonnementfilter löschen.
  5. Löschen Sie die OpenSearch Ingestion-Pipeline.

Fazit

In diesem Beitrag haben Sie gelernt, wie Sie eine serverlose Aufnahmepipeline erstellen, um CloudWatch-Protokolle mithilfe von OpenSearch Ingestion in Echtzeit an eine OpenSearch-Domäne oder serverlose Sammlung zu übermitteln. Sie können diesen Ansatz für eine Vielzahl von Anwendungsfällen zur Echtzeitdatenaufnahme verwenden und ihn zu vorhandenen Workloads hinzufügen, die Kinesis Data Streams für Echtzeitdatenanalysen verwenden.

Beachten Sie für andere Anwendungsfälle für OpenSearch Ingestion und Kinesis Data Streams Folgendes:

Um Ihre Anwendungsfälle für die Protokollanalyse in OpenSearch weiter zu verbessern, sollten Sie einige der vorgefertigten Dashboards verwenden, die in Integrationen in OpenSearch Dashboards.


Über die Autoren

M Mehrtens hat während ihrer gesamten Karriere in der Entwicklung verteilter Systeme gearbeitet und als Software-Ingenieur, Architekt und Dateningenieur gearbeitet. In der Vergangenheit hat M Systeme unterstützt und entwickelt, um Terabytes an Streaming-Daten mit geringer Latenz zu verarbeiten, Pipelines für maschinelles Lernen in Unternehmen zu betreiben und Systeme für den nahtlosen Datenaustausch zwischen Teams mit unterschiedlichen Daten-Toolsets und Software-Stacks zu erstellen. Bei AWS sind sie Sr. Solutions Architect, der US-Finanzkunden unterstützt.

Arjun Nambiar ist Produktmanager bei Amazon OpenSearch Service. Er konzentriert sich auf Aufnahmetechnologien, die die Aufnahme von Daten aus einer Vielzahl von Quellen in großem Maßstab in Amazon OpenSearch Service ermöglichen. Arjun interessiert sich für groß angelegte verteilte Systeme und Cloud-basierte Technologien und lebt in Seattle, Washington.

Muthu Pitchaimani ist ein Suchspezialist bei Amazon OpenSearch Service. Er baut umfangreiche Suchanwendungen und -lösungen. Muthu interessiert sich für die Themen Netzwerke und Sicherheit und hat seinen Sitz in Austin, Texas.

In Verbindung stehende Artikel

spot_img

Aktuelle Artikel

spot_img