K8S Event ve AI

ergün elvan bilsel
5 min readJun 2, 2024

--

Kubernetes (k8s) eventleri, Kubernetes kümesindeki bileşenlerin durumları ve hareketleri hakkında bilgi sağlayan önemli bildirimlerdir. Bu eventler, pod’lar, servisler, dağıtımlar, düğümler ve daha fazlası gibi Kubernetes nesneleri üzerinde gerçekleşen olayları kaydeder.

Kubernetes eventlerinin takibi için çeşitli çözümler ve araçlar mevcuttur. Bu araçlar, eventleri toplamak, analiz etmek ve görselleştirmek için kullanılır.

Kubernetes (k8s) eventlerinin izlenmesi ve analizi için yapay zeka (AI) ve makine öğrenimi (ML) çözümleri, daha ileri seviye analiz, tahmin ve otomasyon imkanı sunar. Bunlardan bir kaçı:

  • Dynatrace: AI destekli bir izleme aracıdır. Kubernetes eventlerini ve metriklerini izler ve anomali tespiti, kök neden analizi gibi gelişmiş özellikler sunar.
  • AppDynamics: Uygulama performans yönetimi (APM) için AI tabanlı bir çözümdür. Kubernetes ortamındaki eventleri izler ve performans sorunlarını tespit eder.
  • New Relic One: New Relic’in AI destekli platformu, Kubernetes eventlerini ve diğer metrikleri izleyerek, anomali tespiti ve performans analizi sağlar.

Fakat k8s eventlerini dinleyerek çıktıları chatgpt ve geminiye soracak bir sistem hazırlamak şu zamanda çok zor olmasa gerek…

Bunun için basit bir diagram çizerek başlayalım.

  1. K8S cluster bağlanacak ve eventleri dinleyecek bir akış.
  2. Gelen eventleri maskelecek.
  3. Aynı eventler tekrar tekrar gelebilir, gelen eventin farklı olduğunu karar verecek doğal dil işleme kütüphanesi ve karşılaştırma için verileri tutacak bir veritabanı(mongo)
  4. Veritabanında ki çıktıları paylaşabilmek için bir web çerçevesi.(flask)

Eventleri Dinle

K8S eventelerini dinlemek için cluster’ın config dosyasını python daki kubernetes api kütüphanesini kullanıyoruz.

class EventK8s:
def __init__(self):
config.load_kube_config(config_file="/home/elvan/k8s-event/k8s-test-.config")
self.v1 = client.CoreV1Api()
self.w = watch.Watch()

async def get_event(self):
try:
for event in self.w.stream(self.v1.list_event_for_all_namespaces):
if event['raw_object']['type'] == 'Warning':
await self.process_data(data=event['object'].message,namespace=event['object'].metadata.namespace)
except Exception as err:
self.logger.warning(" get event {}".format(err))

Görüldüğü üzere sadece ‘Warning’ eventleri ilgileniyorum.

Gelen Verileri İşle

Bu eventleri ne yapmalı? Önce içindeki ip ve port bilgilerini bir güzel saklamalı. Sonra aynı event veri tabanında var mı diye kontrol et. Sonra gemini ve chatgpt sor ve cevapları kayıt et.

MongoDB

MongoDB de veritabanı ve collection oluşturmak için;

const database = 'EVENTK8S';
const collection = 'Response';

// Create a new database.
use(database);

// Create a new collection.
db.createCollection(collection);

Spacy

en_core_web_sm, spaCy kütüphanesi tarafından sağlanan bir doğal dil işleme (NLP) modelidir. Bu model, İngilizce dilinde çeşitli NLP görevlerini gerçekleştirmek için kullanılır. Gelen eventlerin benzerliklerini 0 ve 1 arasında bir değer belirleyecek ve 0.85 değerinden yukarı değerleri aynı event olarak kabul edecek.

    async def sentence_score(self,sentence_event):
try:
if self.collection_mongo.count_documents != 0:
doc1 = self.nlp(sentence_event)
for document in self.collection_mongo.find():
doc2= self.nlp(document["event"])
similarity = doc1.similarity(doc2)
if similarity > 0.85:
return "Same"
return "Different"
except Exception as err:
self.logger.warning("score {}".format(err))

Mask

Maskeleme işlemi burda sadece ip ve port bilgileri için gerçekleştirildi fakat yüzeyi genişletilebilir.

    def mask_words(self,text):
try:
ip_port_pattern = r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):(\d+)'
matches = re.findall(ip_port_pattern, text)
# Bulunan IP adreslerini ve port numaralarını maskele
masked_text = text
for match in matches:
ip, port = match
masked_text = masked_text.replace(ip, '***').replace(port, '***')

ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
ips = re.findall(ip_pattern, masked_text)
for ip in ips:
masked_text = masked_text.replace(ip, '***')
return masked_text
except Exception as err:
self.logger.warning("masked words {}".format(err))

Chatgpt’ye sor

Bu kısımlarda işimiz daha da kolay. Apı dokumanı;

https://platform.openai.com/docs/introduction

    async def ask_chatgpt(self,message):
try:
chat_completion = self.chatgpt_client.chat.completions.create(
messages=[
{
"role": "user",
"content": "{}".format(message),
}
],
model="gpt-3.5-turbo")
return chat_completion.choices[0].message.content
except Exception as err:
self.logger.warning("ask chatgpt {}".format(err))

Gemini’ye sor

    async def ask_gemini(self,message):
try:
ask_string = "Share the most clicked google outputs about this problem: {} ".format(message)
response = self.gemini_client.generate_content(ask_string)
response_list = []
for chunk in response:
response_list.append(chunk.text)
return response_list
except Exception as err:
self.logger.warning(" ask gemini {}".format(err))

Son olarak bu dataları belirli bir sırayla işleyecek ve veritabanına kayıt edecek fonksiyon.

 async def process_data(self,data,namespace):
# Gelen veriyi işle
try:
data_mask = self.mask_words(data)
self.logger.info("Masked_event => {}".format(data_mask))
check_same = await self.sentence_score(data_mask)
self.logger.debug("Score ===>>>>>> {}".format(check_same))
if check_same == "Different":
question_web = await self.ask_gemini(data_mask)
question_gpt = await self.ask_chatgpt(data_mask)
mongo_data = {
'event' : data_mask,
'namespace': namespace,
'question_web' : question_web,
'question_gpt' : question_gpt,
'control' : True,
'rank': 0


}
self.collection_mongo.insert_one(mongo_data)
except Exception as err:
self.logger.warning("process data {}".format(err))

Gelen verileri sırasıyla maskeledik, anlam bütünlüğüne göre karşılaştırdık ve gemini ve chatgpt den gelen cevapları MongoDb kayıt ettik. Şimdi sırada bu verilerin çıktılarını görüntülemekte;

HTML

<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Flask DataTables Example</title>
<link rel="stylesheet" type="text/css" href="https://cdn.datatables.net/1.10.24/css/jquery.dataTables.css">
<script type="text/javascript" charset="utf8" src="https://code.jquery.com/jquery-3.5.1.js"></script>
<script type="text/javascript" charset="utf8" src="https://cdn.datatables.net/1.10.24/js/jquery.dataTables.js"></script>
<script>
$(document).ready( function () {
$('#k8sTable').DataTable();
} );
</script>
</head>
<body>
<h1>K8S Turksat Test - Warning Event</h1>
<table id="k8sTable">
<thead>
<tr>
<th>NAMESPACE</th>
<th>EVENT</th>
<th>GEMINI</th>
<th>CHATGPT</th>
<th>CHECK</th>
<th>RANK</th>
<th>CONFLUENCE</th>
</tr>
</thead>
<tbody>
{% for item in data %}
<tr>
<td>{{ item.namespace }}</td>
<td>{{ item.event }}</td>
<td> {{ item.question_web }} </td>
<td>{{ item.question_gpt }}</td>
<td>{{ item.control }}</td>
<td>{{ item.rank }}</td>
<td><button class="action-btn" value='{{ item._id }}'>Send To Confluence</button></td>
</tr>
{% endfor %}
</tbody>
</table>

<script>
$(document).ready(function() {
var table = $('#k8sTable').DataTable();

// Butona tıklama olayını yakalama
$('#k8sTable').on('click', '.action-btn', function() {
var buttonValue = $(this).val(); // Butonun value değerini al
$.ajax({
type: "POST",
url: "/submit_data",
contentType: "application/json",
data: JSON.stringify(buttonValue),
success: function(response) {
alert('Success: ' + JSON.stringify(result));
},
error: function(error) {
alert('Error: ' + JSON.stringify(result));
}
});
});
});

</script>
</body>
</html>

Web çatısı

from pymongo import MongoClient
from flask import Flask, request, render_template,jsonify
from atlassian import Confluence
from bson.objectid import ObjectId

app = Flask(__name__)



# MongoDB bağlantısı
client = MongoClient('mongodb://localhost:27017/')
db = client['EVENTK8S']
collection = db['Response']
jira_confluence = Confluence(url="https://confluence.com.tr", token="xxxxxxxxxxxxxxxxxxxxxxxxx")

@app.route('/')
def index():
# MongoDB'den verileri çekme
data_from_mongodb = list(collection.find({}))

# Verileri HTML şablonuna aktarma
return render_template('index.html', data=data_from_mongodb)

def generate_list_html(items):
list_html = "<ul>"
for item in items:
list_html += f"<li>{item}</li>"
list_html += "</ul>"
return list_html


@app.route('/submit_data', methods=['POST'])
def submit_data():
id = request.json
print(id)
document = collection.find_one({"_id": ObjectId("{}".format(id))})
if document:
info_content = """
<ac:structured-macro ac:name="column">
<ac:parameter ac:name="width">100px</ac:parameter>
<ac:rich-text-body>
{}
</ac:rich-text-body>
</ac:structured-macro>
""".format(generate_list_html(document['question_gpt']))

description_content = """
<ac:structured-macro ac:name="code">
{}
</ac:structured-macro>
""".format(generate_list_html(document['question_web']))

links_content = """

<p>{}</p>

""".format(document['namespace'])
create_page = jira_confluence.create_page('ELVAN-TEST', document['event'], f"{info_content}{description_content}{links_content}", parent_id=23232323, type='page', representation='storage', editor='v2', full_width=False)
if create_page['status'] == 'current':
return jsonify({"success": "Document create"}), 200
else:
return jsonify({"error": "Document not found"}), 404


if __name__ == '__main__':
app.run(debug=True,host="0.0.0.0",port=8000)

Sonuç

Proje Dosyaları

Linkler

Not: Bu sadece deneysel bir çalışmadır. Sadece fikir vermek ve yeni fikirleri ateşlemek için yazılmıştır.

--

--