15 – Phase 2: Hands-on Uebungen

Version: 1.0 Datum: 2026-02-17 Klassifikation: Vertraulich / Intern Projekt: IceDataEmphasise

Revisionshistorie

VersionDatumAutorBeschreibung
1.02026-02-17Marcus PauliErstversion: 10 Hands-on Uebungen fuer Phase 2

Inhaltsverzeichnis

  1. Container-Namen via Lookup extrahieren (Einfach)
  2. Log-Level Extraktion + Severity-Mapping (Einfach)
  3. Sampling/Volumen-Reduktion (Mittel)
  4. Eigene Route fuer Mosquitto-MQTT (Mittel)
  5. Sensitive Daten maskieren (Mittel)
  6. Metriken aus Redis-Logs extrahieren (Mittel)
  7. Conditional Routing mit Regex (Fortgeschritten)
  8. Cribl Pack erstellen (Fortgeschritten)
  9. Backpressure + Persistent Queue testen (Fortgeschritten)
  10. A/B-Test: Regeln vs. Ollama-Klassifikation (Fortgeschritten)
Hinweis: Alle Uebungen bauen auf der in Phase 1 eingerichteten Infrastruktur auf. Stellen Sie sicher, dass Cribl Stream, Docker und die konfigurierten Quellen betriebsbereit sind, bevor Sie mit den Uebungen beginnen. Die Verifikation erfolgt mit: sudo ./scripts/07-verify-deployment.sh

Uebersicht der Schwierigkeitsgrade

Nr.UebungSchwierigkeitGeschaetzte Dauer
1Container-Namen via Lookup extrahierenEinfach20 Min.
2Log-Level Extraktion + Severity-MappingEinfach25 Min.
3Sampling/Volumen-ReduktionMittel30 Min.
4Eigene Route fuer Mosquitto-MQTTMittel30 Min.
5Sensitive Daten maskierenMittel35 Min.
6Metriken aus Redis-Logs extrahierenMittel40 Min.
7Conditional Routing mit RegexFortgeschritten40 Min.
8Cribl Pack erstellenFortgeschritten45 Min.
9Backpressure + Persistent Queue testenFortgeschritten45 Min.
10A/B-Test: Regeln vs. Ollama-KlassifikationFortgeschritten60 Min.

Uebung 1: Container-Namen via Lookup extrahieren

Container-Namen via Lookup extrahieren

Einfach
Lernziel: Verstehen, wie Cribl Lookup-Tabellen funktionieren, um Events mit zusaetzlichen Kontextinformationen anzureichern. Nach dieser Uebung koennen Sie eine CSV-Lookup-Datei erstellen und in einer Pipeline verwenden, um container_id auf container_name abzubilden.
Voraussetzungen:

Schritt 1: Container-IDs ermitteln

Ermitteln Sie die laufenden Container und deren IDs:

# Alle laufenden Container mit ID und Name auflisten
docker ps --format "{{.ID}},{{.Names}}" --no-trunc

Schritt 2: CSV-Lookup-Datei erstellen

Erstellen Sie die Datei container_lookup.csv im Cribl-Lookup-Verzeichnis:

# Verzeichnis fuer Lookups
sudo mkdir -p /opt/cribl/local/cribl/lookups

# CSV-Datei erstellen (Beispielwerte anpassen!)
sudo tee /opt/cribl/local/cribl/lookups/container_lookup.csv <<'EOF'
container_id,container_name,service_type
a1b2c3d4e5f6,redis-stack,cache
f6e5d4c3b2a1,mosquitto-mqtt,messaging
1a2b3c4d5e6f,nginx-proxy,webserver
6f5e4d3c2b1a,node-red,automation
EOF
Hinweis: Ersetzen Sie die Beispiel-Container-IDs durch die tatsaechlichen IDs aus Schritt 1. Verwenden Sie die vollstaendigen 64-Zeichen-IDs oder die 12-Zeichen-Kurzform konsistent.

Schritt 3: Lookup-Funktion in der Pipeline konfigurieren

Oeffnen Sie die Cribl UI und navigieren Sie zu:

  1. Processing → Pipelines → pipeline_docker_json
  2. Klicken Sie auf + Add Function
  3. Waehlen Sie Lookup aus der Kategorie "Standard"
  4. Konfigurieren Sie die Funktion:
    • Lookup File: container_lookup.csv
    • Input Field: container_id
    • Output Fields: container_name, service_type
  5. Klicken Sie auf Save

Schritt 4: Testen mit Sample-Event

Testen Sie die Pipeline mit einem Beispiel-Event:

  1. In der Pipeline-Ansicht klicken Sie auf Preview
  2. Geben Sie ein Test-Event ein:
{
  "_raw": "{\"log\":\"Connection accepted\",\"stream\":\"stdout\",\"time\":\"2026-02-17T10:00:00Z\"}",
  "container_id": "a1b2c3d4e5f6",
  "source": "docker"
}

Das Ergebnis sollte die neuen Felder container_name und service_type enthalten.

Erwartetes Ergebnis: Das verarbeitete Event enthaelt die zusaetzlichen Felder container_name: "redis-stack" und service_type: "cache". In Splunk koennen Sie nun nach container_name=redis-stack filtern, anstatt die kryptische Container-ID verwenden zu muessen.
Tipps:

Uebung 2: Log-Level Extraktion + Severity-Mapping

Log-Level Extraktion + Severity-Mapping

Einfach
Lernziel: Lernen, wie man mit Regex Extract und Eval-Funktionen strukturierte Felder aus unstrukturierten Log-Nachrichten gewinnt und numerische Severity-Werte zuordnet (angelehnt an RFC 5424 Syslog Severity Levels).
Voraussetzungen:

Schritt 1: Log-Level per Regex extrahieren

Fuegen Sie eine Regex Extract-Funktion zur Docker-Pipeline hinzu:

  1. Navigieren Sie zu Processing → Pipelines → pipeline_docker_json
  2. Fuegen Sie eine neue Funktion Regex Extract hinzu
  3. Konfiguration:
    • Source Field: _raw
    • Regex: /\b(DEBUG|INFO|WARNING|WARN|ERROR|CRITICAL|FATAL)\b/i
    • Output Field: log_level
# Der regulaere Ausdruck im Detail:
# \b          - Wortgrenze (verhindert Teilmatches wie "INFORMATION")
# (DEBUG|...) - Capture Group mit den moeglichen Log-Levels
# \b          - Wortgrenze am Ende
# /i          - Case-insensitive Matching

Schritt 2: Severity-Mapping per Eval-Funktion

Fuegen Sie eine Eval-Funktion nach der Regex-Extraktion hinzu:

  1. Klicken Sie auf + Add Function → Eval
  2. Fuegen Sie folgende Evaluations hinzu:
// Feld: severity
// Ausdruck:
log_level === 'DEBUG' ? 7
  : log_level === 'INFO' ? 6
  : (log_level === 'WARNING' || log_level === 'WARN') ? 4
  : log_level === 'ERROR' ? 3
  : (log_level === 'CRITICAL' || log_level === 'FATAL') ? 2
  : 6

// Feld: severity_label (optional, fuer bessere Lesbarkeit)
// Ausdruck:
severity <= 3 ? 'high' : severity <= 4 ? 'medium' : 'low'

Schritt 3: Log-Level normalisieren

Ergaenzen Sie eine weitere Eval-Zeile, um Varianten zu vereinheitlichen:

// Feld: log_level
// Ausdruck (Normalisierung):
log_level === 'WARN' ? 'WARNING'
  : log_level === 'FATAL' ? 'CRITICAL'
  : log_level ? log_level.toUpperCase()
  : 'INFO'

Schritt 4: Testen mit verschiedenen Log-Formaten

Testen Sie die Pipeline im Preview-Modus mit diesen Beispiel-Events:

// Docker JSON Log (Redis)
{"log":"1:M 17 Feb 2026 10:15:00.123 # WARNING Memory usage high","stream":"stdout"}

// Docker JSON Log (Node-RED)
{"log":"17 Feb 10:15:01 - [info] Server now running at http://127.0.0.1:1880/","stream":"stdout"}

// Docker JSON Log (Mosquitto)
{"log":"1708171500: Error: Connection refused","stream":"stderr"}
Erwartetes Ergebnis: Jedes Event enthaelt die Felder log_level (normalisiert: DEBUG, INFO, WARNING, ERROR, CRITICAL), severity (numerisch: 2-7) und severity_label (high/medium/low). Die Severity-Mapping-Tabelle:
log_levelseverityseverity_label
DEBUG7low
INFO6low
WARNING4medium
ERROR3high
CRITICAL2high
Tipps:

Uebung 3: Sampling/Volumen-Reduktion

Sampling/Volumen-Reduktion

Mittel
Lernziel: Verstehen, wie Cribl's Sampling-Funktionen genutzt werden koennen, um das Log-Volumen intelligent zu reduzieren, ohne wichtige Events (ERROR, CRITICAL) zu verlieren. Dies ist eine der Kernfunktionen von Cribl fuer Kostenoptimierung.
Voraussetzungen:

Schritt 1: Sampling-Strategie definieren

Unsere Strategie: Hohe Prioritaet = alles behalten, niedrige Prioritaet = nur Stichprobe:

Log-LevelSampling-RateBegruendung
CRITICAL100% (alles behalten)Kritische Fehler duerfen nicht verloren gehen
ERROR100% (alles behalten)Fehler sind fuer Troubleshooting essenziell
WARNING50%Warnungen sind wichtig, aber haeufig redundant
INFO25%Informationsmeldungen sind oft repetitiv
DEBUG10%Debug-Logs erzeugen das meiste Volumen

Schritt 2: Sampling-Funktion hinzufuegen

Fuegen Sie eine Sampling-Funktion in der Pipeline hinzu:

  1. Navigieren Sie zu Processing → Pipelines → pipeline_docker_json
  2. Fuegen Sie + Add Function → Sampling hinzu (nach der Log-Level-Extraktion)
  3. Konfiguration:
# Filter: Nur auf DEBUG-Events anwenden
# Filter Expression:
log_level === 'DEBUG'

# Sample Rate: 10  (behaelt jedes 10. Event = 10%)

Schritt 3: Weitere Sampling-Regeln hinzufuegen

Erstellen Sie fuer jedes Level eine eigene Sampling-Funktion oder verwenden Sie eine Eval-basierte Loesung:

// Alternative: Dynamisches Sampling ueber Eval + Drop
// Funktion: Eval
// Feld: __sampling_rate
// Ausdruck:
log_level === 'DEBUG' ? 10
  : log_level === 'INFO' ? 4
  : log_level === 'WARNING' ? 2
  : 1

// Danach: Sampling-Funktion mit:
// Sample Rate Field: __sampling_rate

Schritt 4: Volumen-Reduktion messen

Pruefen Sie die Reduktion ueber die Cribl Monitoring-Seite:

  1. Navigieren Sie zu Monitoring → Data → Pipelines
  2. Vergleichen Sie Events In vs. Events Out fuer die Docker-Pipeline
  3. Die Differenz zeigt die eingesparten Events
# Alternativ ueber die API:
TOKEN=$(curl -s -X POST "http://localhost:9000/api/v1/auth/login" \
  -H "Content-Type: application/json" \
  -d '{"username":"admin","password":"admin"}' | jq -r '.token')

curl -s -H "Authorization: Bearer $TOKEN" \
  "http://localhost:9000/api/v1/system/metrics" | jq '.pipelines.pipeline_docker_json'
Erwartetes Ergebnis: Das Log-Volumen der Docker-Pipeline wird um ca. 40-60% reduziert (abhaengig von der Verteilung der Log-Levels). Alle ERROR- und CRITICAL-Events bleiben zu 100% erhalten. In Splunk sehen Sie das Feld cribl_sample_rate, das angibt, wie viele aehnliche Events durch das gesamplete Event repraesentiert werden.
Tipps:

Uebung 4: Eigene Route fuer Mosquitto-MQTT

Eigene Route fuer Mosquitto-MQTT

Mittel
Lernziel: Verstehen, wie dedizierte Routes in Cribl erstellt werden, um bestimmte Log-Quellen mit spezialisierten Pipelines zu verarbeiten. MQTT-Broker-Logs haben ein eigenes Format, das eine dedizierte Verarbeitung erfordert.
Voraussetzungen:

Schritt 1: Mosquitto-Log-Format analysieren

Untersuchen Sie das Mosquitto-Log-Format:

# Mosquitto-Logs anzeigen
docker logs mosquitto --tail 20

# Typisches Format:
# 1708171500: New connection from 192.168.1.100:54321 on port 1883.
# 1708171500: New client connected from 192.168.1.100:54321 as client_01 (p2, c1, k60).
# 1708171500: Client client_01 disconnected.
# 1708171501: Warning: Unable to open log file /mosquitto/log/mosquitto.log.
# 1708171502: Error: Invalid protocol "MQTT" in connect from 192.168.1.200.

Schritt 2: Neue Pipeline erstellen

  1. Navigieren Sie zu Processing → Pipelines
  2. Klicken Sie auf + Add Pipeline
  3. Name: pipeline_mosquitto_mqtt
  4. Beschreibung: "Spezialisierte Pipeline fuer Mosquitto MQTT Broker Logs"

Schritt 3: Pipeline-Funktionen konfigurieren

Fuegen Sie folgende Funktionen hinzu:

Funktion 1: Regex Extract (Timestamp + Nachricht)

// Source Field: _raw
// Regex:
/^(\d+):\s+(.*)/

// Output Fields:
//   g1 -> mqtt_epoch
//   g2 -> mqtt_message

Funktion 2: Regex Extract (Verbindungs-Details)

// Source Field: mqtt_message
// Regex:
/(?:connection from|connected from)\s+(\d+\.\d+\.\d+\.\d+):(\d+)(?:\s+as\s+(\S+))?/

// Output Fields:
//   g1 -> mqtt_client_ip
//   g2 -> mqtt_client_port
//   g3 -> mqtt_client_id

Funktion 3: Eval (Anreicherung)

// Felder:
// source_type    = 'mosquitto:mqtt'
// mqtt_timestamp = new Date(Number(mqtt_epoch) * 1000).toISOString()
// log_level      = /Error/i.test(mqtt_message) ? 'ERROR'
//                : /Warning/i.test(mqtt_message) ? 'WARNING'
//                : 'INFO'

Schritt 4: Route erstellen

  1. Navigieren Sie zu Processing → Routes
  2. Klicken Sie auf + Add Route
  3. Konfiguration:
    • Route Name: route_mosquitto
    • Filter: container_name=='mosquitto' || source.includes('mosquitto')
    • Pipeline: pipeline_mosquitto_mqtt
    • Output: splunk_hec (oder Ihre konfigurierte Destination)
    • Final: aktiviert (damit Events nicht auch von anderen Routes verarbeitet werden)
  4. Wichtig: Verschieben Sie die Route vor die generische Docker-Route in der Route-Tabelle
Erwartetes Ergebnis: Mosquitto-Logs werden ueber die dedizierte Route und Pipeline verarbeitet. Die Events enthalten die Felder mqtt_client_ip, mqtt_client_id, mqtt_timestamp und source_type: mosquitto:mqtt. In Splunk koennen Sie mit sourcetype=mosquitto:mqtt gezielt nach MQTT-Events suchen und nach Client-IP filtern.
Tipps:

Uebung 5: Sensitive Daten maskieren

Sensitive Daten maskieren

Mittel
Lernziel: Lernen, wie Cribl's Maskierungs- und Redaktionsfunktionen eingesetzt werden, um personenbezogene Daten (PII) und andere sensitive Informationen vor der Weiterleitung an Destinations zu schuetzen. Dies ist besonders relevant fuer DSGVO-Compliance.
Voraussetzungen:

Schritt 1: Sensitive Daten identifizieren

In unseren Logs koennen folgende sensitive Daten vorkommen:

DatentypBeispielVorkommen
E-Mail-Adressenuser@example.comApache-Logs, Anwendungslogs
IP-Adressen192.168.1.100Alle Netzwerk-Logs
Benutzernamenadmin, jdoeAuth-Logs, SSH-Logs
API-TokensBearer eyJhb...HTTP-Request-Logs

Schritt 2: Masking-Funktion fuer E-Mail-Adressen

  1. Oeffnen Sie die gewuenschte Pipeline (z.B. pipeline_generic_passthrough)
  2. Fuegen Sie + Add Function → Mask hinzu
  3. Konfiguration:
// Regex Pattern fuer E-Mail-Adressen:
/[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}/g

// Replace Expression:
C.Mask.REDACT
// Oder spezifisch:
'***@***.***'

// Felder, die durchsucht werden:
_raw

Schritt 3: Masking-Funktion fuer IP-Adressen

Fuegen Sie eine weitere Mask-Funktion hinzu:

// Regex Pattern fuer IPv4-Adressen:
/\b(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.\d{1,3}\b/g

// Replace Expression (letztes Oktett maskieren):
`${g1}.${g2}.${g3}.xxx`

// Alternativ: Komplette Maskierung
C.Mask.HASH
// Erzeugt einen konsistenten Hash, sodass gleiche IPs
// gleich maskiert werden (nützlich fuer Korrelation)

Schritt 4: API-Token-Redaktion

// Regex Pattern fuer Bearer Tokens:
/Bearer\s+[A-Za-z0-9\-._~+\/]+=*/g

// Replace Expression:
'Bearer [REDACTED]'

// Regex Pattern fuer generische API-Keys:
/(?:api[_-]?key|token|secret)[=:]\s*["']?([A-Za-z0-9\-._~+\/]{16,})["']?/gi

// Replace Expression:
'$1=[REDACTED]'

Schritt 5: Testen mit Sample-Daten

Erstellen Sie Test-Events mit sensitiven Daten:

{
  "_raw": "2026-02-17 10:00:00 INFO User login: user@example.com from 192.168.1.100 with Bearer eyJhbGciOiJIUzI1NiJ9.secret",
  "source": "application.log"
}

Das maskierte Ergebnis sollte so aussehen:

{
  "_raw": "2026-02-17 10:00:00 INFO User login: ***@***.*** from 192.168.1.xxx with Bearer [REDACTED]",
  "source": "application.log"
}
Erwartetes Ergebnis: Alle E-Mail-Adressen, die letzten Oktette von IP-Adressen und API-Tokens werden vor der Weiterleitung an Splunk maskiert. Die Maskierung erfolgt transparent – die Log-Struktur bleibt erhalten, nur die sensitiven Werte werden ersetzt. In Cribl Monitoring sehen Sie unter der Pipeline die Anzahl der maskierten Events.
Tipps:
DSGVO-Hinweis: Die Maskierung von personenbezogenen Daten ist eine technische Massnahme nach Art. 32 DSGVO. Stellen Sie sicher, dass die Maskierungsregeln regelmaessig ueberprueft und aktualisiert werden, wenn neue Datenquellen hinzugefuegt werden.

Uebung 6: Metriken aus Redis-Logs extrahieren

Metriken aus Redis-Logs extrahieren

Mittel
Lernziel: Verstehen, wie aus unstrukturierten Log-Daten numerische Metriken extrahiert und als eigenstaendige Metriken-Events an Monitoring-Systeme weitergeleitet werden koennen. Redis-Logs enthalten wertvolle Performance-Daten, die fuer proaktives Monitoring genutzt werden koennen.
Voraussetzungen:

Schritt 1: Redis-Log-Format verstehen

Untersuchen Sie die Redis-Logs:

# Redis-Logs anzeigen
docker logs redis-stack --tail 30

# Typische Redis-Log-Eintraege:
# 1:M 17 Feb 2026 10:15:00.123 * DB loaded from disk: 0.002 seconds
# 1:M 17 Feb 2026 10:15:00.124 * Ready to accept connections tcp
# 1:M 17 Feb 2026 10:20:00.001 # WARNING Memory usage > 90%
# 1:M 17 Feb 2026 10:20:00.002 * 10 changes in 300 seconds. Saving...
# 1:M 17 Feb 2026 10:20:00.100 * Background saving terminated with success

# Redis INFO-Kommando fuer aktuelle Metriken:
docker exec redis-stack redis-cli INFO memory | head -5
# used_memory:1234567
# used_memory_human:1.18M
# used_memory_rss:2345678
# used_memory_peak:3456789

Schritt 2: Pipeline fuer Redis-Metriken erstellen

  1. Erstellen Sie eine neue Pipeline: pipeline_redis_metrics
  2. Fuegen Sie eine Regex Extract-Funktion hinzu:
// Pattern 1: Memory-Meldungen
// Source: _raw
// Regex:
/used_memory:(\d+)/
// Output: redis_memory_bytes

// Pattern 2: Verbindungen
// Regex:
/connected_clients:(\d+)/
// Output: redis_connected_clients

// Pattern 3: Operationen pro Sekunde
// Regex:
/instantaneous_ops_per_sec:(\d+)/
// Output: redis_ops_per_sec

// Pattern 4: DB-Saves
// Regex:
/(\d+)\s+changes\s+in\s+(\d+)\s+seconds/
// Output: g1 -> redis_changes, g2 -> redis_save_interval

Schritt 3: Numerische Konvertierung per Eval

// Eval-Funktion: Felder in Zahlen konvertieren
// redis_memory_bytes  = Number(redis_memory_bytes) || 0
// redis_memory_mb     = Math.round(redis_memory_bytes / 1024 / 1024 * 100) / 100
// redis_connected_clients = Number(redis_connected_clients) || 0
// redis_ops_per_sec   = Number(redis_ops_per_sec) || 0
// metric_type         = 'redis_performance'
// _time               = Date.now() / 1000

Schritt 4: Metriken-Output konfigurieren

Erstellen Sie optional einen separaten Output fuer Metriken:

  1. Navigieren Sie zu Data → Destinations → + Add Destination
  2. Waehlen Sie Splunk HEC (oder einen Metrics-Store Ihrer Wahl)
  3. Konfiguration:
    • Name: splunk_hec_metrics
    • Index: ice_metrics (separater Metriken-Index)

Schritt 5: Route fuer Redis-Metriken erstellen

// Route-Filter:
container_name === 'redis-stack' && /used_memory|connected_clients|ops_per_sec|changes in/.test(_raw)

// Pipeline: pipeline_redis_metrics
// Output: splunk_hec_metrics
// Final: false (Events auch an regulaere Pipeline weiterleiten)
Erwartetes Ergebnis: Redis-Performance-Metriken werden als strukturierte Events mit numerischen Werten extrahiert. In Splunk koennen Sie Dashboards erstellen: index=ice_metrics metric_type=redis_performance | timechart avg(redis_memory_mb), avg(redis_ops_per_sec). Die Metriken umfassen Speicherverbrauch (MB), aktive Verbindungen und Operationen pro Sekunde.
Tipps:

Uebung 7: Conditional Routing mit Regex

Conditional Routing mit Regex

Fortgeschritten
Lernziel: Erlernen fortgeschrittener Routing-Techniken mit regex-basierten Filtern, um Events basierend auf komplexen Inhaltsmuster an unterschiedliche Pipelines und Destinations zu leiten. Dies ermoeglicht eine praezise Steuerung des Datenflusses.
Voraussetzungen:

Schritt 1: Routing-Szenarien definieren

Wir konfigurieren folgende Routing-Regeln:

SzenarioRegex-FilterZiel-PipelineDestination
Security-Events (SSH, Auth) /(?:ssh|auth|sudo|login|password)/i pipeline_security_auth splunk_s2s (Security-Index)
HTTP-Fehler (4xx, 5xx) /HTTP\/\d\.\d"\s+[45]\d{2}/ pipeline_http_errors splunk_hec (Error-Index)
Performance-Warnungen /(?:memory|cpu|disk|timeout|slow)/i pipeline_performance splunk_hec_metrics
Health-Check-Noise /(?:health|ping|alive|ready)/i devnull (Drop) -

Schritt 2: Regex-basierte Routes erstellen

Erstellen Sie die Routes in der Cribl UI:

Route 1: Security Events

// Route: route_security_regex
// Filter Expression:
/(?:ssh|auth|sudo|login|password|failed|accepted|session opened|session closed)/i.test(_raw)

// Pipeline: pipeline_security_auth
// Output: splunk_s2s
// Final: true

Route 2: HTTP Errors

// Route: route_http_errors
// Filter Expression:
/HTTP\/\d\.\d"\s+[45]\d{2}\s/.test(_raw) || (statusCode && statusCode >= 400)

// Pipeline: pipeline_apache_clf
// Output: splunk_hec
// Final: true

Route 3: Performance Warnings

// Route: route_performance
// Filter Expression:
/(?:out of memory|high cpu|disk full|timeout exceeded|slow query|latency)/i.test(_raw)
  && severity <= 4

// Pipeline: pipeline_generic_passthrough
// Output: splunk_hec_metrics
// Final: false  (auch an regulaere Destination senden)

Route 4: Health-Check Noise (Drop)

// Route: route_drop_healthchecks
// Filter Expression:
/^(?:GET|HEAD)\s+\/(?:health|ping|alive|ready|status)\s+HTTP/i.test(_raw)
  || (/health.*check/i.test(_raw) && log_level === 'DEBUG')

// Pipeline: devnull
// Output: devnull
// Final: true

Schritt 3: Route-Reihenfolge festlegen

Die Reihenfolge in der Route-Tabelle ist entscheidend:

  1. route_drop_healthchecks (Noise zuerst eliminieren)
  2. route_mosquitto (spezifische Quellen)
  3. route_security_regex (Security-Events)
  4. route_http_errors (HTTP-Fehler)
  5. route_performance (Performance, nicht final)
  6. route_docker_default (Fallback fuer alle Docker-Events)
  7. route_default (Catch-all)

Schritt 4: Testen und Validieren

# Test-Events ueber Syslog senden:
# Security Event:
echo "<34>Feb 17 10:30:00 nuc sshd[1234]: Failed password for admin from 10.0.0.1 port 22 ssh2" \
  | nc -u localhost 514

# HTTP Error:
echo '192.168.1.1 - - [17/Feb/2026:10:30:00 +0100] "GET /admin HTTP/1.1" 403 287' \
  | nc -u localhost 514

# Health Check (should be dropped):
echo '10.0.0.1 - - [17/Feb/2026:10:30:00 +0100] "GET /health HTTP/1.1" 200 2' \
  | nc -u localhost 514
Erwartetes Ergebnis: Events werden basierend auf ihrem Inhalt an die korrekte Pipeline und Destination geleitet. Security-Events landen im Security-Index, HTTP-Fehler im Error-Index, Health-Checks werden verworfen. In der Cribl Monitoring-Ansicht sehen Sie die Verteilung der Events pro Route. Die Health-Check-Route sollte eine messbare Volumenreduktion zeigen.
Tipps:
Achtung: Zu viele Regex-Filter koennen die Performance beeintraechtigen. Messen Sie die CPU-Auslastung des Cribl-Prozesses nach dem Hinzufuegen neuer Regex-Routes und optimieren Sie bei Bedarf (z.B. durch Verwendung einfacher String-Vergleiche statt Regex, wo moeglich).

Uebung 8: Cribl Pack erstellen

Cribl Pack erstellen

Fortgeschritten
Lernziel: Verstehen, wie Cribl Packs als portable Konfigurationspakete funktionieren. Sie lernen, die in Phase 2 erstellten Pipelines, Routes und Lookups als wiederverwendbares Pack zu buendeln, das auf anderen Cribl-Instanzen importiert werden kann.
Voraussetzungen:

Schritt 1: Pack-Struktur anlegen

# Pack-Verzeichnis erstellen
PACK_DIR="/opt/cribl/local/cribl/packs/ice-data-phase2"
sudo mkdir -p "${PACK_DIR}"/{pipelines,routes,lookups,knowledge}

# Pack-Verzeichnisstruktur:
# ice-data-phase2/
# ├── pack.json              # Manifest-Datei
# ├── pipelines/             # Pipeline-Konfigurationen
# │   ├── pipeline_mosquitto_mqtt/
# │   │   └── conf.yml
# │   ├── pipeline_redis_metrics/
# │   │   └── conf.yml
# │   └── ...
# ├── routes/
# │   └── routes.yml         # Route-Tabelle
# ├── lookups/
# │   └── container_lookup.csv
# └── knowledge/
#     └── README.md           # Dokumentation des Packs

Schritt 2: Pack-Manifest erstellen (pack.json)

sudo tee "${PACK_DIR}/pack.json" <<'EOF'
{
  "name": "ice-data-phase2",
  "displayName": "IceDataEmphasise Phase 2",
  "description": "Cribl Pack mit Phase 2 Konfigurationen: Docker-Log-Verarbeitung, Severity-Mapping, Sampling, MQTT-Routing, PII-Maskierung und Redis-Metriken.",
  "version": "1.0.0",
  "author": "Marcus Pauli",
  "tags": ["docker", "mqtt", "redis", "pii", "sampling", "metrics"],
  "minCriblVersion": "4.0.0",
  "dependencies": [],
  "readme": "knowledge/README.md",
  "config": {
    "splunk_hec_url": {
      "type": "string",
      "title": "Splunk HEC URL",
      "description": "URL des Splunk HEC Endpoints",
      "default": "http://splunk:8088/services/collector"
    },
    "sampling_debug_rate": {
      "type": "number",
      "title": "DEBUG Sampling Rate",
      "description": "Sampling-Rate fuer DEBUG-Events (1 = alle behalten, 10 = jedes 10.)",
      "default": 10
    }
  }
}
EOF

Schritt 3: Pipelines in das Pack kopieren

# Bestehende Pipeline-Konfigurationen kopieren
CRIBL_CONF="/opt/cribl/local/cribl"

# Pipeline: Docker JSON (erweitert)
sudo cp -r "${CRIBL_CONF}/pipelines/pipeline_docker_json" \
  "${PACK_DIR}/pipelines/"

# Pipeline: Mosquitto MQTT
sudo cp -r "${CRIBL_CONF}/pipelines/pipeline_mosquitto_mqtt" \
  "${PACK_DIR}/pipelines/"

# Pipeline: Redis Metrics
sudo cp -r "${CRIBL_CONF}/pipelines/pipeline_redis_metrics" \
  "${PACK_DIR}/pipelines/"

# Lookup-Datei kopieren
sudo cp "${CRIBL_CONF}/lookups/container_lookup.csv" \
  "${PACK_DIR}/lookups/"

Schritt 4: Routes exportieren

# Nur die Phase-2-Routes extrahieren und in das Pack kopieren
# (Manuell die relevanten Eintraege aus routes.yml kopieren)
sudo tee "${PACK_DIR}/routes/routes.yml" <<'EOF'
routes:
  - id: route_drop_healthchecks
    name: Drop Health Checks
    filter: "/^(?:GET|HEAD)\\s+\\/(?:health|ping|alive|ready)\\s+HTTP/i.test(_raw)"
    pipeline: devnull
    output: devnull
    final: true
    description: "Eliminiert Health-Check-Noise"

  - id: route_mosquitto
    name: Mosquitto MQTT
    filter: "container_name=='mosquitto' || source.includes('mosquitto')"
    pipeline: pipeline_mosquitto_mqtt
    output: splunk_hec
    final: true
    description: "Dedizierte Route fuer Mosquitto MQTT Broker Logs"

  - id: route_security_regex
    name: Security Events
    filter: "/(?:ssh|auth|sudo|login|password|failed|accepted)/i.test(_raw)"
    pipeline: pipeline_security_auth
    output: splunk_s2s
    final: true
    description: "Sicherheitsrelevante Events an Security-Index"
EOF

Schritt 5: Pack validieren und testen

# Pack-Struktur pruefen
find "${PACK_DIR}" -type f | sort

# Cribl-Konfiguration neu laden
sudo /opt/cribl/bin/cribl reload

# In der Cribl UI pruefen:
# Settings → Packs → "ice-data-phase2" sollte sichtbar sein

# Pack als .crbl-Datei exportieren (fuer Transport):
cd /opt/cribl/local/cribl/packs
sudo tar czf /tmp/ice-data-phase2-v1.0.0.crbl ice-data-phase2/
ls -lh /tmp/ice-data-phase2-v1.0.0.crbl
Erwartetes Ergebnis: Ein vollstaendiges Cribl Pack mit dem Namen ice-data-phase2 ist erstellt und in der Cribl UI unter Settings → Packs sichtbar. Das Pack enthaelt alle Phase-2-Pipelines, Routes, Lookups und ist als .crbl-Datei exportierbar. Auf einer anderen Cribl-Instanz kann das Pack ueber Settings → Packs → Import importiert werden.
Tipps:

Uebung 9: Backpressure + Persistent Queue testen

Backpressure + Persistent Queue testen

Fortgeschritten
Lernziel: Verstehen, wie Cribl mit Backpressure-Situationen umgeht und wie Persistent Queues (PQ) konfiguriert und ueberwacht werden. Sie lernen, einen Destination-Ausfall zu simulieren und die Wiederherstellung nach dem Ausfall zu verifizieren.
Voraussetzungen:

Schritt 1: Persistent Queue konfigurieren

Aktivieren Sie die PQ auf der Splunk-HEC-Destination:

  1. Navigieren Sie zu Data → Destinations → splunk_hec
  2. Scrollen Sie zu Persistent Queue
  3. Konfiguration:
    • Enable Persistent Queue: aktiviert
    • Max Queue Size: 100 MB
    • Max Queue File Size: 10 MB
    • Queue-full Behavior: Block (alternativ: Drop)
    • Compression: gzip
  4. Klicken Sie auf Save

Schritt 2: Baseline-Metriken erfassen

# Aktuelle Queue-Groesse und Events-Throughput notieren
TOKEN=$(curl -s -X POST "http://localhost:9000/api/v1/auth/login" \
  -H "Content-Type: application/json" \
  -d '{"username":"admin","password":"admin"}' | jq -r '.token')

# Destination-Status pruefen
curl -s -H "Authorization: Bearer $TOKEN" \
  "http://localhost:9000/api/v1/system/outputs/splunk_hec" | jq '{
    status: .status,
    pq_enabled: .pq.enabled,
    pq_maxSize: .pq.maxSize
  }'

# Queue-Verzeichnis pruefen
ls -la /opt/cribl/state/queues/
du -sh /opt/cribl/state/queues/*

Schritt 3: Destination-Ausfall simulieren

Achtung: Diese Uebung unterbricht temporaer die Datenweiterleitung an Splunk. Fuehren Sie dies nur in einer Test-/PoC-Umgebung durch, nicht in Produktion!
# Option A: Splunk HEC temporaer deaktivieren
# (Auf dem Splunk-Server)
# Settings → Data Inputs → HTTP Event Collector → Disable

# Option B: Firewall-Regel (blockiert Port 8088)
sudo iptables -A OUTPUT -p tcp --dport 8088 -j DROP

# Option C: Cribl Destination auf ungueltige URL aendern
# In der Cribl UI: Destinations → splunk_hec → URL aendern auf:
# http://localhost:19999/services/collector  (nicht existierender Port)

Schritt 4: Backpressure beobachten

# Queue-Wachstum ueberwachen (alle 10 Sekunden)
watch -n 10 'du -sh /opt/cribl/state/queues/* 2>/dev/null; echo "---"; \
  curl -s http://localhost:9000/api/v1/health 2>/dev/null | jq .destinations'

# Cribl-Logs auf Backpressure-Meldungen pruefen
tail -f /opt/cribl/log/cribl.log | grep -i "backpressure\|queue\|blocked"

# In der Cribl UI beobachten:
# Monitoring → Destinations → splunk_hec
# - Status sollte "Error" oder "Blocked" zeigen
# - Queue Depth waechst an

Lassen Sie die Simulation 2-5 Minuten laufen, damit sich die Queue fuellt.

Schritt 5: Wiederherstellung und Drain verifizieren

# Destination wiederherstellen:
# Option A: Splunk HEC wieder aktivieren
# Option B: Firewall-Regel entfernen
sudo iptables -D OUTPUT -p tcp --dport 8088 -j DROP
# Option C: URL in Cribl zuruecksetzen

# Queue-Drain beobachten:
watch -n 5 'du -sh /opt/cribl/state/queues/* 2>/dev/null'

# Die Queue sollte schrittweise kleiner werden,
# bis alle gepufferten Events zugestellt sind.

# Verifizierung in Splunk:
# Suchen Sie nach einer Luecke in den Events und pruefen Sie,
# ob nach der Wiederherstellung alle Events lueckenlos ankommen:
# index=ice_data | timechart span=1m count
Erwartetes Ergebnis: Waehrend des simulierten Ausfalls wachsen die PQ-Dateien unter /opt/cribl/state/queues/ an. In den Cribl-Logs erscheinen Backpressure-Warnungen. Nach der Wiederherstellung der Destination werden alle gepufferten Events zugestellt (Queue-Drain). In Splunk ist eine zeitliche Luecke sichtbar, aber keine Events gehen verloren. Die Queue schrumpft nach Wiederherstellung auf 0.
Tipps:

Uebung 10: A/B-Test: Regeln vs. Ollama-Klassifikation

A/B-Test: Regeln vs. Ollama-Klassifikation

Fortgeschritten
Lernziel: Vergleichen Sie die Genauigkeit und Performance von regelbasierter Log-Klassifikation (Regex, Lookup) mit KI-gestuetzter Klassifikation via Ollama (lokales LLM). Sie lernen, einen strukturierten A/B-Test aufzusetzen, Ergebnisse zu messen und eine fundierte Empfehlung abzugeben.
Voraussetzungen:

Schritt 1: Test-Datensatz vorbereiten

Erstellen Sie einen gelabelten Test-Datensatz mit 100 Events:

# Test-Events aus den letzten Logs extrahieren
# und manuell mit dem korrekten Label versehen

sudo tee /tmp/test_events.jsonl <<'JSONL'
{"_raw":"Feb 17 10:00:01 nuc sshd[1234]: Failed password for root from 10.0.0.1 port 22","expected_category":"security","expected_severity":"high"}
{"_raw":"192.168.1.1 - - [17/Feb/2026:10:00:02] \"GET /index.html HTTP/1.1\" 200 1234","expected_category":"web_access","expected_severity":"low"}
{"_raw":"{\"log\":\"1:M 17 Feb 10:00:03 # WARNING Memory high\",\"stream\":\"stdout\"}","expected_category":"performance","expected_severity":"medium"}
{"_raw":"Feb 17 10:00:04 nuc systemd[1]: Started Docker Container nginx.","expected_category":"system","expected_severity":"low"}
{"_raw":"{\"log\":\"Error: Connection refused to database\",\"stream\":\"stderr\"}","expected_category":"error","expected_severity":"high"}
JSONL

# Erweitern Sie diesen Datensatz auf mindestens 50-100 Events
# mit verschiedenen Kategorien und Severity-Levels

Schritt 2: Regelbasierte Klassifikation testen

Nutzen Sie die bestehenden Cribl-Pipelines zur Klassifikation:

# Die regelbasierte Klassifikation ergibt sich aus:
# - Uebung 2: log_level + severity (Regex + Eval)
# - Uebung 7: Route-Zuordnung (Security, HTTP, Performance)

# Ergebnis-Felder nach Pipeline-Verarbeitung:
# rule_category  (security | web_access | performance | system | error)
# rule_severity  (high | medium | low)

# Messen Sie fuer jedes Test-Event:
# 1. Stimmt rule_category mit expected_category ueberein?
# 2. Stimmt rule_severity mit expected_severity ueberein?
# 3. Verarbeitungszeit pro Event

Schritt 3: Ollama-Klassifikation einrichten

Erstellen Sie ein Skript fuer die KI-Klassifikation:

#!/usr/bin/env bash
# ollama_classify.sh - Klassifiziert Log-Events via Ollama

MODEL="llama3.2:3b"  # Oder ein anderes lokales Modell
OLLAMA_URL="http://localhost:11434/api/generate"

classify_event() {
    local raw_event="$1"

    local prompt="Klassifiziere dieses Log-Event. Antworte NUR mit einem JSON-Objekt.
Kategorien: security, web_access, performance, system, error, other
Severity: high, medium, low

Log-Event: ${raw_event}

Antwort (nur JSON):
{\"ai_category\": \"...\", \"ai_severity\": \"...\", \"ai_confidence\": 0.0-1.0}"

    curl -s "${OLLAMA_URL}" \
      -d "$(jq -n --arg model "${MODEL}" --arg prompt "${prompt}" \
        '{model: $model, prompt: $prompt, stream: false}')" \
      | jq -r '.response'
}

# Test mit einem Event:
classify_event "Feb 17 10:00:01 nuc sshd[1234]: Failed password for root from 10.0.0.1"

Schritt 4: A/B-Test durchfuehren

#!/usr/bin/env bash
# ab_test.sh - Vergleicht regelbasierte und KI-Klassifikation

RESULTS_FILE="/tmp/ab_test_results.csv"
echo "event_id,expected_cat,expected_sev,rule_cat,rule_sev,rule_ms,ai_cat,ai_sev,ai_ms,ai_confidence" \
  > "${RESULTS_FILE}"

EVENT_ID=0
while IFS= read -r line; do
    EVENT_ID=$((EVENT_ID + 1))

    expected_cat=$(echo "${line}" | jq -r '.expected_category')
    expected_sev=$(echo "${line}" | jq -r '.expected_severity')
    raw=$(echo "${line}" | jq -r '._raw')

    # Regelbasierte Klassifikation (simuliert)
    RULE_START=$(date +%s%N)
    # ... (Pipeline-Preview-API oder lokale Regex-Auswertung)
    RULE_END=$(date +%s%N)
    rule_ms=$(( (RULE_END - RULE_START) / 1000000 ))

    # KI-Klassifikation
    AI_START=$(date +%s%N)
    ai_result=$(classify_event "${raw}")
    AI_END=$(date +%s%N)
    ai_ms=$(( (AI_END - AI_START) / 1000000 ))

    ai_cat=$(echo "${ai_result}" | jq -r '.ai_category // "unknown"')
    ai_sev=$(echo "${ai_result}" | jq -r '.ai_severity // "unknown"')
    ai_conf=$(echo "${ai_result}" | jq -r '.ai_confidence // 0')

    echo "${EVENT_ID},${expected_cat},${expected_sev},${rule_cat},${rule_sev},${rule_ms},${ai_cat},${ai_sev},${ai_ms},${ai_conf}" \
      >> "${RESULTS_FILE}"

    echo "Event ${EVENT_ID}: Rule=${rule_ms}ms, AI=${ai_ms}ms"
done < /tmp/test_events.jsonl

echo "Ergebnisse gespeichert in: ${RESULTS_FILE}"

Schritt 5: Ergebnisse auswerten

# Einfache Auswertung mit awk:

echo "=== A/B-Test Ergebnisse ==="

# Genauigkeit der Regel-Klassifikation
echo "--- Regelbasiert ---"
awk -F',' 'NR>1 {
    total++
    if ($2==$4) cat_correct++
    if ($3==$5) sev_correct++
    rule_ms_sum+=$6
} END {
    printf "Kategorie-Genauigkeit: %.1f%% (%d/%d)\n", cat_correct/total*100, cat_correct, total
    printf "Severity-Genauigkeit:  %.1f%% (%d/%d)\n", sev_correct/total*100, sev_correct, total
    printf "Durchschn. Latenz:     %.1f ms\n", rule_ms_sum/total
}' "${RESULTS_FILE}"

# Genauigkeit der KI-Klassifikation
echo "--- KI (Ollama) ---"
awk -F',' 'NR>1 {
    total++
    if ($2==$7) cat_correct++
    if ($3==$8) sev_correct++
    ai_ms_sum+=$9
    conf_sum+=$10
} END {
    printf "Kategorie-Genauigkeit: %.1f%% (%d/%d)\n", cat_correct/total*100, cat_correct, total
    printf "Severity-Genauigkeit:  %.1f%% (%d/%d)\n", sev_correct/total*100, sev_correct, total
    printf "Durchschn. Latenz:     %.1f ms\n", ai_ms_sum/total
    printf "Durchschn. Confidence: %.2f\n", conf_sum/total
}' "${RESULTS_FILE}"
Erwartetes Ergebnis: Die Auswertung zeigt typischerweise:
MetrikRegelbasiertOllama KI
Kategorie-Genauigkeit85-95%70-90%
Severity-Genauigkeit90-98%75-90%
Latenz pro Event< 1 ms100-2000 ms
RessourcenverbrauchMinimal4-8 GB RAM

Regelbasierte Klassifikation ist schneller und bei bekannten Mustern praeziser. KI-Klassifikation kann bei unbekannten oder komplexen Mustern besser sein, hat aber deutlich hoehere Latenz und Ressourcenanforderungen. Die Empfehlung fuer den PoC: Regelbasiert als Primaer-Klassifikation, KI als Fallback fuer nicht klassifizierte Events.

Tipps:
Hinweis zur Datensicherheit: Wenn Sie Ollama verwenden, werden die Log-Events an das lokale LLM gesendet. Da Ollama lokal laeuft, verlassen die Daten nicht den Server. Bei Cloud-basierten LLMs (OpenAI, Anthropic, etc.) muessten Datenschutzaspekte (DSGVO) beruecksichtigt werden. Fuer den PoC wird ausschliesslich die lokale Ollama-Instanz empfohlen.

Zusammenfassung

Die 10 Uebungen decken die wichtigsten Phase-2-Themen ab:

BereichUebungenWichtigste Erkenntnis
Datenanreicherung 1, 2 Lookups und Regex-Extraktion sind die Grundbausteine fuer strukturierte Log-Verarbeitung
Volumenoptimierung 3 Intelligentes Sampling kann 40-60% Volumen sparen, ohne kritische Events zu verlieren
Routing 4, 7 Praezises Routing mit dedizierten Pipelines verbessert die Datenqualitaet erheblich
Compliance 5 PII-Maskierung ist fuer DSGVO-Konformitaet essenziell und sollte frueh implementiert werden
Monitoring 6 Metriken koennen direkt aus Logs extrahiert werden, ohne zusaetzliche Agenten
Portabilitaet 8 Cribl Packs ermoeglichen reproduzierbare Konfigurationen ueber Umgebungen hinweg
Resilienz 9 Persistent Queues schuetzen vor Datenverlust bei Destination-Ausfaellen
KI-Integration 10 Regelbasierte Klassifikation bleibt fuer bekannte Muster ueberlegen, KI ergaenzt bei unbekannten
Naechste Schritte: Nach Abschluss aller Uebungen koennen Sie die Konfigurationen in den produktiven Betrieb ueberfuehren. Dokumentieren Sie Ihre Ergebnisse und erstellen Sie ein Cribl Pack (Uebung 8) fuer die Reproduzierbarkeit. Nutzen Sie die Erkenntnisse aus dem A/B-Test (Uebung 10) fuer die Entscheidung ueber den Einsatz von KI in Phase 3.