K8S Event ve AI
Kubernetes (k8s) eventleri, Kubernetes kümesindeki bileşenlerin durumları ve hareketleri hakkında bilgi sağlayan önemli bildirimlerdir. Bu eventler, pod’lar, servisler, dağıtımlar, düğümler ve daha fazlası gibi Kubernetes nesneleri üzerinde gerçekleşen olayları kaydeder.
Kubernetes eventlerinin takibi için çeşitli çözümler ve araçlar mevcuttur. Bu araçlar, eventleri toplamak, analiz etmek ve görselleştirmek için kullanılır.
Kubernetes (k8s) eventlerinin izlenmesi ve analizi için yapay zeka (AI) ve makine öğrenimi (ML) çözümleri, daha ileri seviye analiz, tahmin ve otomasyon imkanı sunar. Bunlardan bir kaçı:
- Dynatrace: AI destekli bir izleme aracıdır. Kubernetes eventlerini ve metriklerini izler ve anomali tespiti, kök neden analizi gibi gelişmiş özellikler sunar.
- AppDynamics: Uygulama performans yönetimi (APM) için AI tabanlı bir çözümdür. Kubernetes ortamındaki eventleri izler ve performans sorunlarını tespit eder.
- New Relic One: New Relic’in AI destekli platformu, Kubernetes eventlerini ve diğer metrikleri izleyerek, anomali tespiti ve performans analizi sağlar.
Fakat k8s eventlerini dinleyerek çıktıları chatgpt ve geminiye soracak bir sistem hazırlamak şu zamanda çok zor olmasa gerek…
Bunun için basit bir diagram çizerek başlayalım.
- K8S cluster bağlanacak ve eventleri dinleyecek bir akış.
- Gelen eventleri maskelecek.
- Aynı eventler tekrar tekrar gelebilir, gelen eventin farklı olduğunu karar verecek doğal dil işleme kütüphanesi ve karşılaştırma için verileri tutacak bir veritabanı(mongo)
- Veritabanında ki çıktıları paylaşabilmek için bir web çerçevesi.(flask)
Eventleri Dinle
K8S eventelerini dinlemek için cluster’ın config dosyasını python daki kubernetes api kütüphanesini kullanıyoruz.
class EventK8s:
def __init__(self):
config.load_kube_config(config_file="/home/elvan/k8s-event/k8s-test-.config")
self.v1 = client.CoreV1Api()
self.w = watch.Watch()
async def get_event(self):
try:
for event in self.w.stream(self.v1.list_event_for_all_namespaces):
if event['raw_object']['type'] == 'Warning':
await self.process_data(data=event['object'].message,namespace=event['object'].metadata.namespace)
except Exception as err:
self.logger.warning(" get event {}".format(err))
Görüldüğü üzere sadece ‘Warning’ eventleri ilgileniyorum.
Gelen Verileri İşle
Bu eventleri ne yapmalı? Önce içindeki ip ve port bilgilerini bir güzel saklamalı. Sonra aynı event veri tabanında var mı diye kontrol et. Sonra gemini ve chatgpt sor ve cevapları kayıt et.
MongoDB
MongoDB de veritabanı ve collection oluşturmak için;
const database = 'EVENTK8S';
const collection = 'Response';
// Create a new database.
use(database);
// Create a new collection.
db.createCollection(collection);
Spacy
en_core_web_sm
, spaCy kütüphanesi tarafından sağlanan bir doğal dil işleme (NLP) modelidir. Bu model, İngilizce dilinde çeşitli NLP görevlerini gerçekleştirmek için kullanılır. Gelen eventlerin benzerliklerini 0 ve 1 arasında bir değer belirleyecek ve 0.85 değerinden yukarı değerleri aynı event olarak kabul edecek.
async def sentence_score(self,sentence_event):
try:
if self.collection_mongo.count_documents != 0:
doc1 = self.nlp(sentence_event)
for document in self.collection_mongo.find():
doc2= self.nlp(document["event"])
similarity = doc1.similarity(doc2)
if similarity > 0.85:
return "Same"
return "Different"
except Exception as err:
self.logger.warning("score {}".format(err))
Mask
Maskeleme işlemi burda sadece ip ve port bilgileri için gerçekleştirildi fakat yüzeyi genişletilebilir.
def mask_words(self,text):
try:
ip_port_pattern = r'(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):(\d+)'
matches = re.findall(ip_port_pattern, text)
# Bulunan IP adreslerini ve port numaralarını maskele
masked_text = text
for match in matches:
ip, port = match
masked_text = masked_text.replace(ip, '***').replace(port, '***')
ip_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
ips = re.findall(ip_pattern, masked_text)
for ip in ips:
masked_text = masked_text.replace(ip, '***')
return masked_text
except Exception as err:
self.logger.warning("masked words {}".format(err))
Chatgpt’ye sor
Bu kısımlarda işimiz daha da kolay. Apı dokumanı;
https://platform.openai.com/docs/introduction
async def ask_chatgpt(self,message):
try:
chat_completion = self.chatgpt_client.chat.completions.create(
messages=[
{
"role": "user",
"content": "{}".format(message),
}
],
model="gpt-3.5-turbo")
return chat_completion.choices[0].message.content
except Exception as err:
self.logger.warning("ask chatgpt {}".format(err))
Gemini’ye sor
async def ask_gemini(self,message):
try:
ask_string = "Share the most clicked google outputs about this problem: {} ".format(message)
response = self.gemini_client.generate_content(ask_string)
response_list = []
for chunk in response:
response_list.append(chunk.text)
return response_list
except Exception as err:
self.logger.warning(" ask gemini {}".format(err))
Son olarak bu dataları belirli bir sırayla işleyecek ve veritabanına kayıt edecek fonksiyon.
async def process_data(self,data,namespace):
# Gelen veriyi işle
try:
data_mask = self.mask_words(data)
self.logger.info("Masked_event => {}".format(data_mask))
check_same = await self.sentence_score(data_mask)
self.logger.debug("Score ===>>>>>> {}".format(check_same))
if check_same == "Different":
question_web = await self.ask_gemini(data_mask)
question_gpt = await self.ask_chatgpt(data_mask)
mongo_data = {
'event' : data_mask,
'namespace': namespace,
'question_web' : question_web,
'question_gpt' : question_gpt,
'control' : True,
'rank': 0
}
self.collection_mongo.insert_one(mongo_data)
except Exception as err:
self.logger.warning("process data {}".format(err))
Gelen verileri sırasıyla maskeledik, anlam bütünlüğüne göre karşılaştırdık ve gemini ve chatgpt den gelen cevapları MongoDb kayıt ettik. Şimdi sırada bu verilerin çıktılarını görüntülemekte;
HTML
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Flask DataTables Example</title>
<link rel="stylesheet" type="text/css" href="https://cdn.datatables.net/1.10.24/css/jquery.dataTables.css">
<script type="text/javascript" charset="utf8" src="https://code.jquery.com/jquery-3.5.1.js"></script>
<script type="text/javascript" charset="utf8" src="https://cdn.datatables.net/1.10.24/js/jquery.dataTables.js"></script>
<script>
$(document).ready( function () {
$('#k8sTable').DataTable();
} );
</script>
</head>
<body>
<h1>K8S Turksat Test - Warning Event</h1>
<table id="k8sTable">
<thead>
<tr>
<th>NAMESPACE</th>
<th>EVENT</th>
<th>GEMINI</th>
<th>CHATGPT</th>
<th>CHECK</th>
<th>RANK</th>
<th>CONFLUENCE</th>
</tr>
</thead>
<tbody>
{% for item in data %}
<tr>
<td>{{ item.namespace }}</td>
<td>{{ item.event }}</td>
<td> {{ item.question_web }} </td>
<td>{{ item.question_gpt }}</td>
<td>{{ item.control }}</td>
<td>{{ item.rank }}</td>
<td><button class="action-btn" value='{{ item._id }}'>Send To Confluence</button></td>
</tr>
{% endfor %}
</tbody>
</table>
<script>
$(document).ready(function() {
var table = $('#k8sTable').DataTable();
// Butona tıklama olayını yakalama
$('#k8sTable').on('click', '.action-btn', function() {
var buttonValue = $(this).val(); // Butonun value değerini al
$.ajax({
type: "POST",
url: "/submit_data",
contentType: "application/json",
data: JSON.stringify(buttonValue),
success: function(response) {
alert('Success: ' + JSON.stringify(result));
},
error: function(error) {
alert('Error: ' + JSON.stringify(result));
}
});
});
});
</script>
</body>
</html>
Web çatısı
from pymongo import MongoClient
from flask import Flask, request, render_template,jsonify
from atlassian import Confluence
from bson.objectid import ObjectId
app = Flask(__name__)
# MongoDB bağlantısı
client = MongoClient('mongodb://localhost:27017/')
db = client['EVENTK8S']
collection = db['Response']
jira_confluence = Confluence(url="https://confluence.com.tr", token="xxxxxxxxxxxxxxxxxxxxxxxxx")
@app.route('/')
def index():
# MongoDB'den verileri çekme
data_from_mongodb = list(collection.find({}))
# Verileri HTML şablonuna aktarma
return render_template('index.html', data=data_from_mongodb)
def generate_list_html(items):
list_html = "<ul>"
for item in items:
list_html += f"<li>{item}</li>"
list_html += "</ul>"
return list_html
@app.route('/submit_data', methods=['POST'])
def submit_data():
id = request.json
print(id)
document = collection.find_one({"_id": ObjectId("{}".format(id))})
if document:
info_content = """
<ac:structured-macro ac:name="column">
<ac:parameter ac:name="width">100px</ac:parameter>
<ac:rich-text-body>
{}
</ac:rich-text-body>
</ac:structured-macro>
""".format(generate_list_html(document['question_gpt']))
description_content = """
<ac:structured-macro ac:name="code">
{}
</ac:structured-macro>
""".format(generate_list_html(document['question_web']))
links_content = """
<p>{}</p>
""".format(document['namespace'])
create_page = jira_confluence.create_page('ELVAN-TEST', document['event'], f"{info_content}{description_content}{links_content}", parent_id=23232323, type='page', representation='storage', editor='v2', full_width=False)
if create_page['status'] == 'current':
return jsonify({"success": "Document create"}), 200
else:
return jsonify({"error": "Document not found"}), 404
if __name__ == '__main__':
app.run(debug=True,host="0.0.0.0",port=8000)
Sonuç
Proje Dosyaları
Linkler
Not: Bu sadece deneysel bir çalışmadır. Sadece fikir vermek ve yeni fikirleri ateşlemek için yazılmıştır.