Kafka Connectorエクステンションの使用
はじめに
このエクステンションはAltair AI StudioとKafkaサーバーを接続し、Kafkaトピックからメッセージを読み込むことや、Kafkaトピックにデータを書き込むことができます。
Kafka Connectorエクステンションのインストール
エクステンションをインストールするには、拡張機能メニューに移動し、マーケットプレイス(更新と拡張機能)を開いて、Kafka Connectorを検索します。詳細については「エクステンションの追加」を参照してください。
Kafka サーバーの接続
Kafkaサーバーに接続するには、リポジトリに新しいKafka接続オブジェクトを作成します。これにより、接続を一元管理し、オペレータ間の接続を再利用することができます。このエクステンションは、以下のセキュリティオプションをサポートしています。
- none
- SASL Plain
- SASL SCRAM
- SSL two-way
さらに、SASL PlainおよびSASL SCRAMは、パブリック証明書、またはローカルで提供されるSSLキーストアとトラストストアファイルのいずれかを使用して、暗号化されたSSLチャネルを使用するように設定することができます。
認証なし
追加のセキュリティ機能を提供しないKafkaサーバーと接続するには、到達可能なホストアドレスとポート番号を入力するだけです。
SASL Plain
認証にSASL(Simple Authentication and Security Layer) Plainを使用するKafkaサーバーと接続するには、ユーザー名とパスワードが必要です。
Encryptionがnoneに設定されている場合は、認証情報は暗号化されずに転送されます。サーバーが暗号化証明書(SSL)を提供する場合は、Encryptionを「yes」に設定します。
SASL SCRAM
認証にSASL SCRAM(Salted Challenge Response Authentication Mechanism)を使用するKafkaサーバーと接続するには、ユーザー名とパスワード、およびハッシュ化のための正しいSCRAMのバージョンが必要です。
Encryptionがnoneに設定されている場合は、認証情報は暗号化されずに転送されます。サーバーが暗号化証明書(SSL)を提供する場合は、Encryptionを「yes」に設定します。
2方向SSL
Kafkaサーバーへの通信が、追加の認証を必要としないSSLセキュアチャネルで保護される場合に、このオプションを選択します。
キーストアファイルの使用
自身で作成した証明書の場合、必要な情報を持っているファイルを提供する必要があります。
keystoreファイルは秘密鍵と公開鍵、証明書を保持するリポジトリであり、truststoreファイルは信頼する証明書を保持します。この情報は機密性が高いため、両方のファイルにはそれぞれパスワードが設定されます。
例
Read Kafka TopicオペレータおよびWrite Kafka Topicオペレータは、オペレータパネルでKafkaと検索すると見つけることができます。
どちらのオペレータでも、接続入力ポートに、保存されている接続オブジェクトを接続することで、使用するKafkaサーバーを指定することができます。認証情報はすべて接続オブジェクトに保存されているため、追加の認証情報は必要ありません。接続を切り替えることで、異なるサーバーへ簡単に切り替えることができます。
どちらのオペレータでも、サーバー上ですでに利用可能なトピックのリストを提供するオプションがあります。これにより、操作したいトピックを探す際や、すでに存在するトピックに新しいメッセージを送信する際などは非常に便利です。
既存のKafkaトピックからの読み込み
Read Kafka Topicオペレータを使用すると、Kafkaサーバーからメッセージを取得することができます。
offset strategiesというアクセス方法には2種類あります。
offset strategyをearliestに設定すると、このトピックで利用可能な最も古いメッセージから始まった、過去のメッセージが取得できます。過去のデータで機械学習モデルを学習する場合や、最近のイベントを確認する場合などに有効です。選択した数のメッセージを照会することや、そのトピックの過去のすべてのメッセージを取得することが可能です。
offset strategyをlatestに設定すると、オペレータは新着メッセージを収集するよう待機します。collection strategyは一定数の新しいメッセージが到着するまで待機するか、一定時間が経過するまで待機するかのどちらかです。また戻せるように、一定数を待っている場合でも、time outパラメータがあるため、無限に待つことを防ぎます。この戦略は、イベントの監視や学習済みモデルによるスコアリングなど、新しいイベントの収集に使用することができます。
どちらの場合も、結果として得られるデータセットには、各メッセージが行として含まれています。
トラブルシューティング
サーバーと接続速度によっては、読み取りリクエストがタイムアウトになることがあり、その場合は空のデータセットが返されます。特にリモートサービスでは、信頼性の高い結果を得るために、タイムアウトの設定を増やすこと(ただし、プロセスの実行時間をあまり増やしすぎない)が有効な場合があります。
Kafkaトピックへのデータの書き込み
Write Kafkaオペレータを使用すると、指定したKafkaトピックにデータを送信することができます。これは、bulk sendingオプションで、一回の大きなバッチで行うか、指定したメッセージインターバルで行うことができます。
また、2種類のメッセージ形式を選択することが可能です。message formatをJSON(JavaScript Object Notation)に設定すると、各行はJSONメッセージに変換され、データセットの属性名をキーとして含みます。
message formatがStringの場合は、属性名を除いた生データを、指定されたセパレータトークン(デフォルトは”;”)で送信するだけです。