| Version | Datum | Autor | Beschreibung |
|---|---|---|---|
| 1.0 | 2026-02-17 | Marcus Pauli | Erstversion: 10 Hands-on Uebungen fuer Phase 2 |
sudo ./scripts/07-verify-deployment.sh
| Nr. | Uebung | Schwierigkeit | Geschaetzte Dauer |
|---|---|---|---|
| 1 | Container-Namen via Lookup extrahieren | Einfach | 20 Min. |
| 2 | Log-Level Extraktion + Severity-Mapping | Einfach | 25 Min. |
| 3 | Sampling/Volumen-Reduktion | Mittel | 30 Min. |
| 4 | Eigene Route fuer Mosquitto-MQTT | Mittel | 30 Min. |
| 5 | Sensitive Daten maskieren | Mittel | 35 Min. |
| 6 | Metriken aus Redis-Logs extrahieren | Mittel | 40 Min. |
| 7 | Conditional Routing mit Regex | Fortgeschritten | 40 Min. |
| 8 | Cribl Pack erstellen | Fortgeschritten | 45 Min. |
| 9 | Backpressure + Persistent Queue testen | Fortgeschritten | 45 Min. |
| 10 | A/B-Test: Regeln vs. Ollama-Klassifikation | Fortgeschritten | 60 Min. |
http://localhost:9000pipeline_docker_json ist konfiguriert (siehe Pipelines und Routen)Ermitteln Sie die laufenden Container und deren IDs:
# Alle laufenden Container mit ID und Name auflisten
docker ps --format "{{.ID}},{{.Names}}" --no-trunc
Erstellen Sie die Datei container_lookup.csv im Cribl-Lookup-Verzeichnis:
# Verzeichnis fuer Lookups
sudo mkdir -p /opt/cribl/local/cribl/lookups
# CSV-Datei erstellen (Beispielwerte anpassen!)
sudo tee /opt/cribl/local/cribl/lookups/container_lookup.csv <<'EOF'
container_id,container_name,service_type
a1b2c3d4e5f6,redis-stack,cache
f6e5d4c3b2a1,mosquitto-mqtt,messaging
1a2b3c4d5e6f,nginx-proxy,webserver
6f5e4d3c2b1a,node-red,automation
EOF
Oeffnen Sie die Cribl UI und navigieren Sie zu:
container_lookup.csvcontainer_idcontainer_name, service_typeTesten Sie die Pipeline mit einem Beispiel-Event:
{
"_raw": "{\"log\":\"Connection accepted\",\"stream\":\"stdout\",\"time\":\"2026-02-17T10:00:00Z\"}",
"container_id": "a1b2c3d4e5f6",
"source": "docker"
}
Das Ergebnis sollte die neuen Felder container_name und service_type enthalten.
container_name: "redis-stack" und service_type: "cache". In Splunk
koennen Sie nun nach container_name=redis-stack filtern, anstatt die kryptische
Container-ID verwenden zu muessen.
docker ps --format ... > container_lookup.csvmatchMode: exact fuer performante Lookups bei grossen TabellenmatchType: regex, falls Container-IDs nur als Praefix im Event vorliegenFuegen Sie eine Regex Extract-Funktion zur Docker-Pipeline hinzu:
_raw/\b(DEBUG|INFO|WARNING|WARN|ERROR|CRITICAL|FATAL)\b/ilog_level# Der regulaere Ausdruck im Detail:
# \b - Wortgrenze (verhindert Teilmatches wie "INFORMATION")
# (DEBUG|...) - Capture Group mit den moeglichen Log-Levels
# \b - Wortgrenze am Ende
# /i - Case-insensitive Matching
Fuegen Sie eine Eval-Funktion nach der Regex-Extraktion hinzu:
// Feld: severity
// Ausdruck:
log_level === 'DEBUG' ? 7
: log_level === 'INFO' ? 6
: (log_level === 'WARNING' || log_level === 'WARN') ? 4
: log_level === 'ERROR' ? 3
: (log_level === 'CRITICAL' || log_level === 'FATAL') ? 2
: 6
// Feld: severity_label (optional, fuer bessere Lesbarkeit)
// Ausdruck:
severity <= 3 ? 'high' : severity <= 4 ? 'medium' : 'low'
Ergaenzen Sie eine weitere Eval-Zeile, um Varianten zu vereinheitlichen:
// Feld: log_level
// Ausdruck (Normalisierung):
log_level === 'WARN' ? 'WARNING'
: log_level === 'FATAL' ? 'CRITICAL'
: log_level ? log_level.toUpperCase()
: 'INFO'
Testen Sie die Pipeline im Preview-Modus mit diesen Beispiel-Events:
// Docker JSON Log (Redis)
{"log":"1:M 17 Feb 2026 10:15:00.123 # WARNING Memory usage high","stream":"stdout"}
// Docker JSON Log (Node-RED)
{"log":"17 Feb 10:15:01 - [info] Server now running at http://127.0.0.1:1880/","stream":"stdout"}
// Docker JSON Log (Mosquitto)
{"log":"1708171500: Error: Connection refused","stream":"stderr"}
log_level
(normalisiert: DEBUG, INFO, WARNING, ERROR, CRITICAL), severity (numerisch: 2-7)
und severity_label (high/medium/low). Die Severity-Mapping-Tabelle:
| log_level | severity | severity_label |
|---|---|---|
| DEBUG | 7 | low |
| INFO | 6 | low |
| WARNING | 4 | medium |
| ERROR | 3 | high |
| CRITICAL | 2 | high |
C.Lookup statt verschachtelter Ternary-Operatoren, wenn die Mapping-Tabelle groesser wird__inputId=='docker:in_docker' als Filter, um die Funktion nur auf Docker-Events anzuwenden# als Praefix fuer Warnungen)Unsere Strategie: Hohe Prioritaet = alles behalten, niedrige Prioritaet = nur Stichprobe:
| Log-Level | Sampling-Rate | Begruendung |
|---|---|---|
| CRITICAL | 100% (alles behalten) | Kritische Fehler duerfen nicht verloren gehen |
| ERROR | 100% (alles behalten) | Fehler sind fuer Troubleshooting essenziell |
| WARNING | 50% | Warnungen sind wichtig, aber haeufig redundant |
| INFO | 25% | Informationsmeldungen sind oft repetitiv |
| DEBUG | 10% | Debug-Logs erzeugen das meiste Volumen |
Fuegen Sie eine Sampling-Funktion in der Pipeline hinzu:
# Filter: Nur auf DEBUG-Events anwenden
# Filter Expression:
log_level === 'DEBUG'
# Sample Rate: 10 (behaelt jedes 10. Event = 10%)
Erstellen Sie fuer jedes Level eine eigene Sampling-Funktion oder verwenden Sie eine Eval-basierte Loesung:
// Alternative: Dynamisches Sampling ueber Eval + Drop
// Funktion: Eval
// Feld: __sampling_rate
// Ausdruck:
log_level === 'DEBUG' ? 10
: log_level === 'INFO' ? 4
: log_level === 'WARNING' ? 2
: 1
// Danach: Sampling-Funktion mit:
// Sample Rate Field: __sampling_rate
Pruefen Sie die Reduktion ueber die Cribl Monitoring-Seite:
# Alternativ ueber die API:
TOKEN=$(curl -s -X POST "http://localhost:9000/api/v1/auth/login" \
-H "Content-Type: application/json" \
-d '{"username":"admin","password":"admin"}' | jq -r '.token')
curl -s -H "Authorization: Bearer $TOKEN" \
"http://localhost:9000/api/v1/system/metrics" | jq '.pipelines.pipeline_docker_json'
cribl_sample_rate, das angibt,
wie viele aehnliche Events durch das gesamplete Event repraesentiert werden.
cribl_sample_rate in Splunk-Suchen: | stats sum(cribl_sample_rate) as estimated_totaldocker ps | grep mosquittoUntersuchen Sie das Mosquitto-Log-Format:
# Mosquitto-Logs anzeigen
docker logs mosquitto --tail 20
# Typisches Format:
# 1708171500: New connection from 192.168.1.100:54321 on port 1883.
# 1708171500: New client connected from 192.168.1.100:54321 as client_01 (p2, c1, k60).
# 1708171500: Client client_01 disconnected.
# 1708171501: Warning: Unable to open log file /mosquitto/log/mosquitto.log.
# 1708171502: Error: Invalid protocol "MQTT" in connect from 192.168.1.200.
pipeline_mosquitto_mqttFuegen Sie folgende Funktionen hinzu:
Funktion 1: Regex Extract (Timestamp + Nachricht)
// Source Field: _raw
// Regex:
/^(\d+):\s+(.*)/
// Output Fields:
// g1 -> mqtt_epoch
// g2 -> mqtt_message
Funktion 2: Regex Extract (Verbindungs-Details)
// Source Field: mqtt_message
// Regex:
/(?:connection from|connected from)\s+(\d+\.\d+\.\d+\.\d+):(\d+)(?:\s+as\s+(\S+))?/
// Output Fields:
// g1 -> mqtt_client_ip
// g2 -> mqtt_client_port
// g3 -> mqtt_client_id
Funktion 3: Eval (Anreicherung)
// Felder:
// source_type = 'mosquitto:mqtt'
// mqtt_timestamp = new Date(Number(mqtt_epoch) * 1000).toISOString()
// log_level = /Error/i.test(mqtt_message) ? 'ERROR'
// : /Warning/i.test(mqtt_message) ? 'WARNING'
// : 'INFO'
route_mosquittocontainer_name=='mosquitto' || source.includes('mosquitto')pipeline_mosquitto_mqttsplunk_hec (oder Ihre konfigurierte Destination)mqtt_client_ip, mqtt_client_id,
mqtt_timestamp und source_type: mosquitto:mqtt. In Splunk koennen Sie
mit sourcetype=mosquitto:mqtt gezielt nach MQTT-Events suchen und nach Client-IP
filtern.
final: true, um zu verhindern, dass Events doppelt verarbeitet werdenIn unseren Logs koennen folgende sensitive Daten vorkommen:
| Datentyp | Beispiel | Vorkommen |
|---|---|---|
| E-Mail-Adressen | user@example.com | Apache-Logs, Anwendungslogs |
| IP-Adressen | 192.168.1.100 | Alle Netzwerk-Logs |
| Benutzernamen | admin, jdoe | Auth-Logs, SSH-Logs |
| API-Tokens | Bearer eyJhb... | HTTP-Request-Logs |
pipeline_generic_passthrough)// Regex Pattern fuer E-Mail-Adressen:
/[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/g
// Replace Expression:
C.Mask.REDACT
// Oder spezifisch:
'***@***.***'
// Felder, die durchsucht werden:
_raw
Fuegen Sie eine weitere Mask-Funktion hinzu:
// Regex Pattern fuer IPv4-Adressen:
/\b(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.\d{1,3}\b/g
// Replace Expression (letztes Oktett maskieren):
`${g1}.${g2}.${g3}.xxx`
// Alternativ: Komplette Maskierung
C.Mask.HASH
// Erzeugt einen konsistenten Hash, sodass gleiche IPs
// gleich maskiert werden (nützlich fuer Korrelation)
// Regex Pattern fuer Bearer Tokens:
/Bearer\s+[A-Za-z0-9\-._~+\/]+=*/g
// Replace Expression:
'Bearer [REDACTED]'
// Regex Pattern fuer generische API-Keys:
/(?:api[_-]?key|token|secret)[=:]\s*["']?([A-Za-z0-9\-._~+\/]{16,})["']?/gi
// Replace Expression:
'$1=[REDACTED]'
Erstellen Sie Test-Events mit sensitiven Daten:
{
"_raw": "2026-02-17 10:00:00 INFO User login: user@example.com from 192.168.1.100 with Bearer eyJhbGciOiJIUzI1NiJ9.secret",
"source": "application.log"
}
Das maskierte Ergebnis sollte so aussehen:
{
"_raw": "2026-02-17 10:00:00 INFO User login: ***@***.*** from 192.168.1.xxx with Bearer [REDACTED]",
"source": "application.log"
}
C.Mask.HASH statt C.Mask.REDACT, wenn Sie Korrelation benoetigen (gleiche Eingabe = gleicher Hash)docker ps | grep redisUntersuchen Sie die Redis-Logs:
# Redis-Logs anzeigen
docker logs redis-stack --tail 30
# Typische Redis-Log-Eintraege:
# 1:M 17 Feb 2026 10:15:00.123 * DB loaded from disk: 0.002 seconds
# 1:M 17 Feb 2026 10:15:00.124 * Ready to accept connections tcp
# 1:M 17 Feb 2026 10:20:00.001 # WARNING Memory usage > 90%
# 1:M 17 Feb 2026 10:20:00.002 * 10 changes in 300 seconds. Saving...
# 1:M 17 Feb 2026 10:20:00.100 * Background saving terminated with success
# Redis INFO-Kommando fuer aktuelle Metriken:
docker exec redis-stack redis-cli INFO memory | head -5
# used_memory:1234567
# used_memory_human:1.18M
# used_memory_rss:2345678
# used_memory_peak:3456789
pipeline_redis_metrics// Pattern 1: Memory-Meldungen
// Source: _raw
// Regex:
/used_memory:(\d+)/
// Output: redis_memory_bytes
// Pattern 2: Verbindungen
// Regex:
/connected_clients:(\d+)/
// Output: redis_connected_clients
// Pattern 3: Operationen pro Sekunde
// Regex:
/instantaneous_ops_per_sec:(\d+)/
// Output: redis_ops_per_sec
// Pattern 4: DB-Saves
// Regex:
/(\d+)\s+changes\s+in\s+(\d+)\s+seconds/
// Output: g1 -> redis_changes, g2 -> redis_save_interval
// Eval-Funktion: Felder in Zahlen konvertieren
// redis_memory_bytes = Number(redis_memory_bytes) || 0
// redis_memory_mb = Math.round(redis_memory_bytes / 1024 / 1024 * 100) / 100
// redis_connected_clients = Number(redis_connected_clients) || 0
// redis_ops_per_sec = Number(redis_ops_per_sec) || 0
// metric_type = 'redis_performance'
// _time = Date.now() / 1000
Erstellen Sie optional einen separaten Output fuer Metriken:
splunk_hec_metricsice_metrics (separater Metriken-Index)// Route-Filter:
container_name === 'redis-stack' && /used_memory|connected_clients|ops_per_sec|changes in/.test(_raw)
// Pipeline: pipeline_redis_metrics
// Output: splunk_hec_metrics
// Final: false (Events auch an regulaere Pipeline weiterleiten)
index=ice_metrics metric_type=redis_performance | timechart avg(redis_memory_mb), avg(redis_ops_per_sec).
Die Metriken umfassen Speicherverbrauch (MB), aktive Verbindungen und Operationen pro Sekunde.
redis-cli INFO periodisch (z.B. via Cron) als zusaetzliche Metrik-Quelleredis_memory_mb > 500 oder redis_ops_per_sec > 10000redis-cli INFOFinal: false auf der Route erhalten Sie sowohl Metriken als auch die Original-LogsWir konfigurieren folgende Routing-Regeln:
| Szenario | Regex-Filter | Ziel-Pipeline | Destination |
|---|---|---|---|
| Security-Events (SSH, Auth) | /(?:ssh|auth|sudo|login|password)/i |
pipeline_security_auth | splunk_s2s (Security-Index) |
| HTTP-Fehler (4xx, 5xx) | /HTTP\/\d\.\d"\s+[45]\d{2}/ |
pipeline_http_errors | splunk_hec (Error-Index) |
| Performance-Warnungen | /(?:memory|cpu|disk|timeout|slow)/i |
pipeline_performance | splunk_hec_metrics |
| Health-Check-Noise | /(?:health|ping|alive|ready)/i |
devnull (Drop) | - |
Erstellen Sie die Routes in der Cribl UI:
Route 1: Security Events
// Route: route_security_regex
// Filter Expression:
/(?:ssh|auth|sudo|login|password|failed|accepted|session opened|session closed)/i.test(_raw)
// Pipeline: pipeline_security_auth
// Output: splunk_s2s
// Final: true
Route 2: HTTP Errors
// Route: route_http_errors
// Filter Expression:
/HTTP\/\d\.\d"\s+[45]\d{2}\s/.test(_raw) || (statusCode && statusCode >= 400)
// Pipeline: pipeline_apache_clf
// Output: splunk_hec
// Final: true
Route 3: Performance Warnings
// Route: route_performance
// Filter Expression:
/(?:out of memory|high cpu|disk full|timeout exceeded|slow query|latency)/i.test(_raw)
&& severity <= 4
// Pipeline: pipeline_generic_passthrough
// Output: splunk_hec_metrics
// Final: false (auch an regulaere Destination senden)
Route 4: Health-Check Noise (Drop)
// Route: route_drop_healthchecks
// Filter Expression:
/^(?:GET|HEAD)\s+\/(?:health|ping|alive|ready|status)\s+HTTP/i.test(_raw)
|| (/health.*check/i.test(_raw) && log_level === 'DEBUG')
// Pipeline: devnull
// Output: devnull
// Final: true
Die Reihenfolge in der Route-Tabelle ist entscheidend:
route_drop_healthchecks (Noise zuerst eliminieren)route_mosquitto (spezifische Quellen)route_security_regex (Security-Events)route_http_errors (HTTP-Fehler)route_performance (Performance, nicht final)route_docker_default (Fallback fuer alle Docker-Events)route_default (Catch-all)# Test-Events ueber Syslog senden:
# Security Event:
echo "<34>Feb 17 10:30:00 nuc sshd[1234]: Failed password for admin from 10.0.0.1 port 22 ssh2" \
| nc -u localhost 514
# HTTP Error:
echo '192.168.1.1 - - [17/Feb/2026:10:30:00 +0100] "GET /admin HTTP/1.1" 403 287' \
| nc -u localhost 514
# Health Check (should be dropped):
echo '10.0.0.1 - - [17/Feb/2026:10:30:00 +0100] "GET /health HTTP/1.1" 200 2' \
| nc -u localhost 514
__inputId als Vorbedingung, um Regex nur auf relevante Quellen anzuwenden: __inputId=='syslog:in_syslog' && /pattern/.test(_raw)# Pack-Verzeichnis erstellen
PACK_DIR="/opt/cribl/local/cribl/packs/ice-data-phase2"
sudo mkdir -p "${PACK_DIR}"/{pipelines,routes,lookups,knowledge}
# Pack-Verzeichnisstruktur:
# ice-data-phase2/
# ├── pack.json # Manifest-Datei
# ├── pipelines/ # Pipeline-Konfigurationen
# │ ├── pipeline_mosquitto_mqtt/
# │ │ └── conf.yml
# │ ├── pipeline_redis_metrics/
# │ │ └── conf.yml
# │ └── ...
# ├── routes/
# │ └── routes.yml # Route-Tabelle
# ├── lookups/
# │ └── container_lookup.csv
# └── knowledge/
# └── README.md # Dokumentation des Packs
sudo tee "${PACK_DIR}/pack.json" <<'EOF'
{
"name": "ice-data-phase2",
"displayName": "IceDataEmphasise Phase 2",
"description": "Cribl Pack mit Phase 2 Konfigurationen: Docker-Log-Verarbeitung, Severity-Mapping, Sampling, MQTT-Routing, PII-Maskierung und Redis-Metriken.",
"version": "1.0.0",
"author": "Marcus Pauli",
"tags": ["docker", "mqtt", "redis", "pii", "sampling", "metrics"],
"minCriblVersion": "4.0.0",
"dependencies": [],
"readme": "knowledge/README.md",
"config": {
"splunk_hec_url": {
"type": "string",
"title": "Splunk HEC URL",
"description": "URL des Splunk HEC Endpoints",
"default": "http://splunk:8088/services/collector"
},
"sampling_debug_rate": {
"type": "number",
"title": "DEBUG Sampling Rate",
"description": "Sampling-Rate fuer DEBUG-Events (1 = alle behalten, 10 = jedes 10.)",
"default": 10
}
}
}
EOF
# Bestehende Pipeline-Konfigurationen kopieren
CRIBL_CONF="/opt/cribl/local/cribl"
# Pipeline: Docker JSON (erweitert)
sudo cp -r "${CRIBL_CONF}/pipelines/pipeline_docker_json" \
"${PACK_DIR}/pipelines/"
# Pipeline: Mosquitto MQTT
sudo cp -r "${CRIBL_CONF}/pipelines/pipeline_mosquitto_mqtt" \
"${PACK_DIR}/pipelines/"
# Pipeline: Redis Metrics
sudo cp -r "${CRIBL_CONF}/pipelines/pipeline_redis_metrics" \
"${PACK_DIR}/pipelines/"
# Lookup-Datei kopieren
sudo cp "${CRIBL_CONF}/lookups/container_lookup.csv" \
"${PACK_DIR}/lookups/"
# Nur die Phase-2-Routes extrahieren und in das Pack kopieren
# (Manuell die relevanten Eintraege aus routes.yml kopieren)
sudo tee "${PACK_DIR}/routes/routes.yml" <<'EOF'
routes:
- id: route_drop_healthchecks
name: Drop Health Checks
filter: "/^(?:GET|HEAD)\\s+\\/(?:health|ping|alive|ready)\\s+HTTP/i.test(_raw)"
pipeline: devnull
output: devnull
final: true
description: "Eliminiert Health-Check-Noise"
- id: route_mosquitto
name: Mosquitto MQTT
filter: "container_name=='mosquitto' || source.includes('mosquitto')"
pipeline: pipeline_mosquitto_mqtt
output: splunk_hec
final: true
description: "Dedizierte Route fuer Mosquitto MQTT Broker Logs"
- id: route_security_regex
name: Security Events
filter: "/(?:ssh|auth|sudo|login|password|failed|accepted)/i.test(_raw)"
pipeline: pipeline_security_auth
output: splunk_s2s
final: true
description: "Sicherheitsrelevante Events an Security-Index"
EOF
# Pack-Struktur pruefen
find "${PACK_DIR}" -type f | sort
# Cribl-Konfiguration neu laden
sudo /opt/cribl/bin/cribl reload
# In der Cribl UI pruefen:
# Settings → Packs → "ice-data-phase2" sollte sichtbar sein
# Pack als .crbl-Datei exportieren (fuer Transport):
cd /opt/cribl/local/cribl/packs
sudo tar czf /tmp/ice-data-phase2-v1.0.0.crbl ice-data-phase2/
ls -lh /tmp/ice-data-phase2-v1.0.0.crbl
ice-data-phase2 ist erstellt und in der Cribl UI unter Settings → Packs
sichtbar. Das Pack enthaelt alle Phase-2-Pipelines, Routes, Lookups und ist als .crbl-Datei
exportierbar. Auf einer anderen Cribl-Instanz kann das Pack ueber
Settings → Packs → Import importiert werden.
C.vars) fuer umgebungsspezifische Werte (z.B. Splunk-URL, Indices)knowledge/README.md mit Installations- und Konfigurationsanleitung hinzuAktivieren Sie die PQ auf der Splunk-HEC-Destination:
100 MB10 MBBlock (alternativ: Drop)gzip# Aktuelle Queue-Groesse und Events-Throughput notieren
TOKEN=$(curl -s -X POST "http://localhost:9000/api/v1/auth/login" \
-H "Content-Type: application/json" \
-d '{"username":"admin","password":"admin"}' | jq -r '.token')
# Destination-Status pruefen
curl -s -H "Authorization: Bearer $TOKEN" \
"http://localhost:9000/api/v1/system/outputs/splunk_hec" | jq '{
status: .status,
pq_enabled: .pq.enabled,
pq_maxSize: .pq.maxSize
}'
# Queue-Verzeichnis pruefen
ls -la /opt/cribl/state/queues/
du -sh /opt/cribl/state/queues/*
# Option A: Splunk HEC temporaer deaktivieren
# (Auf dem Splunk-Server)
# Settings → Data Inputs → HTTP Event Collector → Disable
# Option B: Firewall-Regel (blockiert Port 8088)
sudo iptables -A OUTPUT -p tcp --dport 8088 -j DROP
# Option C: Cribl Destination auf ungueltige URL aendern
# In der Cribl UI: Destinations → splunk_hec → URL aendern auf:
# http://localhost:19999/services/collector (nicht existierender Port)
# Queue-Wachstum ueberwachen (alle 10 Sekunden)
watch -n 10 'du -sh /opt/cribl/state/queues/* 2>/dev/null; echo "---"; \
curl -s http://localhost:9000/api/v1/health 2>/dev/null | jq .destinations'
# Cribl-Logs auf Backpressure-Meldungen pruefen
tail -f /opt/cribl/log/cribl.log | grep -i "backpressure\|queue\|blocked"
# In der Cribl UI beobachten:
# Monitoring → Destinations → splunk_hec
# - Status sollte "Error" oder "Blocked" zeigen
# - Queue Depth waechst an
Lassen Sie die Simulation 2-5 Minuten laufen, damit sich die Queue fuellt.
# Destination wiederherstellen:
# Option A: Splunk HEC wieder aktivieren
# Option B: Firewall-Regel entfernen
sudo iptables -D OUTPUT -p tcp --dport 8088 -j DROP
# Option C: URL in Cribl zuruecksetzen
# Queue-Drain beobachten:
watch -n 5 'du -sh /opt/cribl/state/queues/* 2>/dev/null'
# Die Queue sollte schrittweise kleiner werden,
# bis alle gepufferten Events zugestellt sind.
# Verifizierung in Splunk:
# Suchen Sie nach einer Luecke in den Events und pruefen Sie,
# ob nach der Wiederherstellung alle Events lueckenlos ankommen:
# index=ice_data | timechart span=1m count
/opt/cribl/state/queues/ an. In den Cribl-Logs erscheinen Backpressure-Warnungen.
Nach der Wiederherstellung der Destination werden alle gepufferten Events zugestellt (Queue-Drain).
In Splunk ist eine zeitliche Luecke sichtbar, aber keine Events gehen verloren.
Die Queue schrumpft nach Wiederherstellung auf 0.
Queue-full Behavior: Drop, um den Unterschied zu Block zu verstehenBlock stoppt Cribl die Aufnahme neuer Events, wenn die Queue voll ist – dies kann zu Datenverlust an der Quelle fuehrenMax Queue Size sollte den erwarteten Durchsatz fuer die maximale Ausfallzeit abdeckendf -h /opt/cribl/state/queues/ollama listcurl http://localhost:11434/api/tagsErstellen Sie einen gelabelten Test-Datensatz mit 100 Events:
# Test-Events aus den letzten Logs extrahieren
# und manuell mit dem korrekten Label versehen
sudo tee /tmp/test_events.jsonl <<'JSONL'
{"_raw":"Feb 17 10:00:01 nuc sshd[1234]: Failed password for root from 10.0.0.1 port 22","expected_category":"security","expected_severity":"high"}
{"_raw":"192.168.1.1 - - [17/Feb/2026:10:00:02] \"GET /index.html HTTP/1.1\" 200 1234","expected_category":"web_access","expected_severity":"low"}
{"_raw":"{\"log\":\"1:M 17 Feb 10:00:03 # WARNING Memory high\",\"stream\":\"stdout\"}","expected_category":"performance","expected_severity":"medium"}
{"_raw":"Feb 17 10:00:04 nuc systemd[1]: Started Docker Container nginx.","expected_category":"system","expected_severity":"low"}
{"_raw":"{\"log\":\"Error: Connection refused to database\",\"stream\":\"stderr\"}","expected_category":"error","expected_severity":"high"}
JSONL
# Erweitern Sie diesen Datensatz auf mindestens 50-100 Events
# mit verschiedenen Kategorien und Severity-Levels
Nutzen Sie die bestehenden Cribl-Pipelines zur Klassifikation:
# Die regelbasierte Klassifikation ergibt sich aus:
# - Uebung 2: log_level + severity (Regex + Eval)
# - Uebung 7: Route-Zuordnung (Security, HTTP, Performance)
# Ergebnis-Felder nach Pipeline-Verarbeitung:
# rule_category (security | web_access | performance | system | error)
# rule_severity (high | medium | low)
# Messen Sie fuer jedes Test-Event:
# 1. Stimmt rule_category mit expected_category ueberein?
# 2. Stimmt rule_severity mit expected_severity ueberein?
# 3. Verarbeitungszeit pro Event
Erstellen Sie ein Skript fuer die KI-Klassifikation:
#!/usr/bin/env bash
# ollama_classify.sh - Klassifiziert Log-Events via Ollama
MODEL="llama3.2:3b" # Oder ein anderes lokales Modell
OLLAMA_URL="http://localhost:11434/api/generate"
classify_event() {
local raw_event="$1"
local prompt="Klassifiziere dieses Log-Event. Antworte NUR mit einem JSON-Objekt.
Kategorien: security, web_access, performance, system, error, other
Severity: high, medium, low
Log-Event: ${raw_event}
Antwort (nur JSON):
{\"ai_category\": \"...\", \"ai_severity\": \"...\", \"ai_confidence\": 0.0-1.0}"
curl -s "${OLLAMA_URL}" \
-d "$(jq -n --arg model "${MODEL}" --arg prompt "${prompt}" \
'{model: $model, prompt: $prompt, stream: false}')" \
| jq -r '.response'
}
# Test mit einem Event:
classify_event "Feb 17 10:00:01 nuc sshd[1234]: Failed password for root from 10.0.0.1"
#!/usr/bin/env bash
# ab_test.sh - Vergleicht regelbasierte und KI-Klassifikation
RESULTS_FILE="/tmp/ab_test_results.csv"
echo "event_id,expected_cat,expected_sev,rule_cat,rule_sev,rule_ms,ai_cat,ai_sev,ai_ms,ai_confidence" \
> "${RESULTS_FILE}"
EVENT_ID=0
while IFS= read -r line; do
EVENT_ID=$((EVENT_ID + 1))
expected_cat=$(echo "${line}" | jq -r '.expected_category')
expected_sev=$(echo "${line}" | jq -r '.expected_severity')
raw=$(echo "${line}" | jq -r '._raw')
# Regelbasierte Klassifikation (simuliert)
RULE_START=$(date +%s%N)
# ... (Pipeline-Preview-API oder lokale Regex-Auswertung)
RULE_END=$(date +%s%N)
rule_ms=$(( (RULE_END - RULE_START) / 1000000 ))
# KI-Klassifikation
AI_START=$(date +%s%N)
ai_result=$(classify_event "${raw}")
AI_END=$(date +%s%N)
ai_ms=$(( (AI_END - AI_START) / 1000000 ))
ai_cat=$(echo "${ai_result}" | jq -r '.ai_category // "unknown"')
ai_sev=$(echo "${ai_result}" | jq -r '.ai_severity // "unknown"')
ai_conf=$(echo "${ai_result}" | jq -r '.ai_confidence // 0')
echo "${EVENT_ID},${expected_cat},${expected_sev},${rule_cat},${rule_sev},${rule_ms},${ai_cat},${ai_sev},${ai_ms},${ai_conf}" \
>> "${RESULTS_FILE}"
echo "Event ${EVENT_ID}: Rule=${rule_ms}ms, AI=${ai_ms}ms"
done < /tmp/test_events.jsonl
echo "Ergebnisse gespeichert in: ${RESULTS_FILE}"
# Einfache Auswertung mit awk:
echo "=== A/B-Test Ergebnisse ==="
# Genauigkeit der Regel-Klassifikation
echo "--- Regelbasiert ---"
awk -F',' 'NR>1 {
total++
if ($2==$4) cat_correct++
if ($3==$5) sev_correct++
rule_ms_sum+=$6
} END {
printf "Kategorie-Genauigkeit: %.1f%% (%d/%d)\n", cat_correct/total*100, cat_correct, total
printf "Severity-Genauigkeit: %.1f%% (%d/%d)\n", sev_correct/total*100, sev_correct, total
printf "Durchschn. Latenz: %.1f ms\n", rule_ms_sum/total
}' "${RESULTS_FILE}"
# Genauigkeit der KI-Klassifikation
echo "--- KI (Ollama) ---"
awk -F',' 'NR>1 {
total++
if ($2==$7) cat_correct++
if ($3==$8) sev_correct++
ai_ms_sum+=$9
conf_sum+=$10
} END {
printf "Kategorie-Genauigkeit: %.1f%% (%d/%d)\n", cat_correct/total*100, cat_correct, total
printf "Severity-Genauigkeit: %.1f%% (%d/%d)\n", sev_correct/total*100, sev_correct, total
printf "Durchschn. Latenz: %.1f ms\n", ai_ms_sum/total
printf "Durchschn. Confidence: %.2f\n", conf_sum/total
}' "${RESULTS_FILE}"
| Metrik | Regelbasiert | Ollama KI |
|---|---|---|
| Kategorie-Genauigkeit | 85-95% | 70-90% |
| Severity-Genauigkeit | 90-98% | 75-90% |
| Latenz pro Event | < 1 ms | 100-2000 ms |
| Ressourcenverbrauch | Minimal | 4-8 GB RAM |
Regelbasierte Klassifikation ist schneller und bei bekannten Mustern praeziser. KI-Klassifikation kann bei unbekannten oder komplexen Mustern besser sein, hat aber deutlich hoehere Latenz und Ressourcenanforderungen. Die Empfehlung fuer den PoC: Regelbasiert als Primaer-Klassifikation, KI als Fallback fuer nicht klassifizierte Events.
llama3.2:3b (schnell) vs. llama3.2:8b (genauer)category=unknown an Ollama weiterleitendocker stats ollama waehrend des TestsDie 10 Uebungen decken die wichtigsten Phase-2-Themen ab:
| Bereich | Uebungen | Wichtigste Erkenntnis |
|---|---|---|
| Datenanreicherung | 1, 2 | Lookups und Regex-Extraktion sind die Grundbausteine fuer strukturierte Log-Verarbeitung |
| Volumenoptimierung | 3 | Intelligentes Sampling kann 40-60% Volumen sparen, ohne kritische Events zu verlieren |
| Routing | 4, 7 | Praezises Routing mit dedizierten Pipelines verbessert die Datenqualitaet erheblich |
| Compliance | 5 | PII-Maskierung ist fuer DSGVO-Konformitaet essenziell und sollte frueh implementiert werden |
| Monitoring | 6 | Metriken koennen direkt aus Logs extrahiert werden, ohne zusaetzliche Agenten |
| Portabilitaet | 8 | Cribl Packs ermoeglichen reproduzierbare Konfigurationen ueber Umgebungen hinweg |
| Resilienz | 9 | Persistent Queues schuetzen vor Datenverlust bei Destination-Ausfaellen |
| KI-Integration | 10 | Regelbasierte Klassifikation bleibt fuer bekannte Muster ueberlegen, KI ergaenzt bei unbekannten |