Apache Kafka

Erste Schritte mit Apache Kafka und Python

Erste Schritte mit Apache Kafka und Python
In dieser Lektion sehen wir, wie wir Apache Kafka mit Python verwenden und eine Beispielanwendung mit dem Python-Client für Apache Kafka erstellen.

Um diese Lektion abzuschließen, müssen Sie auf Ihrem Computer eine aktive Installation von Kafka haben. Lesen Sie Apache Kafka unter Ubuntu installieren, um zu erfahren, wie das geht.

Python-Client für Apache Kafka installieren

Bevor wir mit Apache Kafka im Python-Programm arbeiten können, müssen wir den Python-Client für Apache Kafka installieren. Dies kann mit Pip (Python-Paket-Index). Hier ist ein Befehl, um dies zu erreichen:

pip3 installieren kafka-python

Dies wird eine schnelle Installation auf dem Terminal sein:

Installation des Python Kafka-Clients mit PIP

Nachdem wir nun eine aktive Installation für Apache Kafka und auch den Python Kafka-Client installiert haben, können wir mit der Codierung beginnen.

Einen Produzenten machen

Das erste, was Sie zum Veröffentlichen von Nachrichten auf Kafka benötigen, ist eine Producer-Anwendung, die Nachrichten an Themen in Kafka senden kann.

Beachten Sie, dass Kafka-Produzenten asynchrone Nachrichtenproduzenten sind. Dies bedeutet, dass die Operationen, die ausgeführt werden, während eine Nachricht auf der Kafka Topic-Partition veröffentlicht wird, nicht blockieren. Um die Dinge einfach zu halten, werden wir für diese Lektion einen einfachen JSON-Publisher schreiben.

Erstellen Sie zunächst eine Instanz für den Kafka Producer:

aus Kafka-Import KafkaProduzent
json importieren
pprint importieren
Produzent = KafkaProduzent(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.Deponien (v).codieren('utf-8'))

Das Attribut bootstrap_servers informiert über den Host & Port für den Kafka-Server. Das Attribut value_serializer dient nur der JSON-Serialisierung der gefundenen JSON-Werte.

Um mit dem Kafka Producer zu spielen, versuchen wir, die Metriken für den Producer und den Kafka-Cluster auszudrucken:

Metriken = Produzent.Metriken()
pprint.pprint(Metriken)

Wir werden jetzt folgendes sehen:

Kafka Mterics

Versuchen wir nun endlich, eine Nachricht an die Kafka-Warteschlange zu senden. Ein einfaches JSON-Objekt ist ein gutes Beispiel:

Produzent.send('linuxhint', 'topic': 'kafka')

Das linuxhint ist die Themenpartition, an die das JSON-Objekt gesendet wird. Wenn Sie das Skript ausführen, erhalten Sie keine Ausgabe, da die Nachricht nur an die Themenpartition gesendet wird. Es ist an der Zeit, einen Verbraucher zu schreiben, damit wir unsere Anwendung testen können.

Einen Verbraucher machen

Jetzt sind wir bereit, als Verbraucheranwendung eine neue Verbindung herzustellen und die Nachrichten aus dem Kafka-Thema zu erhalten. Beginnen Sie damit, eine neue Instanz für den Verbraucher zu erstellen:

von kafka import KafkaConsumer
von kafka importieren TopicPartition
print('Verbindung herstellen.')
Consumer = KafkaConsumer(bootstrap_servers='localhost:9092')

Ordnen Sie nun dieser Verbindung ein Thema und auch einen möglichen Offset-Wert zu.

print('Thema zuweisen.')
Verbraucher.Assign([TopicPartition('linuxhint', 2)])

Endlich können wir die Nachricht drucken:

print('Nachricht erhalten.')
für Nachricht im Consumer:
print("OFFSET: " + str(Nachricht[0])+ "\t MSG: " + str(Nachricht))

Dadurch erhalten wir eine Liste aller veröffentlichten Nachrichten auf der Kafka Consumer Topic Partition. Die Ausgabe für dieses Programm ist:

Kafka Verbraucher

Hier ist das vollständige Producer-Skript zur schnellen Referenz:

aus Kafka-Import KafkaProduzent
json importieren
pprint importieren
Produzent = KafkaProduzent(
bootstrap_servers='localhost:9092',
value_serializer=lambda v: json.Deponien (v).codieren('utf-8'))
Produzent.send('linuxhint', 'topic': 'kafka')
# Messwerte = Produzent.Metriken()
# pprint.pprint(Metriken)

Und hier ist das vollständige Consumer-Programm, das wir verwendet haben:

von kafka import KafkaConsumer
von kafka importieren TopicPartition
print('Verbindung herstellen.')
Consumer = KafkaConsumer(bootstrap_servers='localhost:9092')
print('Thema zuweisen.')
Verbraucher.Assign([TopicPartition('linuxhint', 2)])
print('Nachricht erhalten.')
für Nachricht im Consumer:
print("OFFSET: " + str(Nachricht[0])+ "\t MSG: " + str(Nachricht))

Fazit

In dieser Lektion haben wir uns angesehen, wie wir Apache Kafka in unseren Python-Programmen installieren und verwenden können. Wir haben gezeigt, wie einfach es ist, einfache Aufgaben im Zusammenhang mit Kafka in Python mit dem demonstrierten Kafka-Client für Python auszuführen.

Top 5 Karten zur Spielaufnahme
Wir alle haben Streaming-Gameplays auf YouTube gesehen und geliebt. PewDiePie, Jakesepticye und Markiplier sind nur einige der Top-Gamer, die Millione...
So entwickeln Sie ein Spiel unter Linux
Vor einem Jahrzehnt hätten nicht viele Linux-Benutzer vorhergesagt, dass ihr Lieblingsbetriebssystem eines Tages eine beliebte Spieleplattform für kom...
Open-Source-Ports kommerzieller Spiele-Engines
Kostenlose, quelloffene und plattformübergreifende Spiel-Engine-Nachbildungen können verwendet werden, um sowohl alte als auch einige der relativ neue...