07 - Pipelines und Routen

Version: 1.0 Datum: 2026-02-10 Klassifikation: Vertraulich / Intern Projekt: IceDataEmphasise

Revisionshistorie

VersionDatumAutorBeschreibung
1.02026-02-10Marcus PauliErstversion: 5 Pipelines, Route-Tabelle, Design-Entscheidungen

Inhaltsverzeichnis

  1. Pipeline-Konzept in Cribl
  2. Die 5 Pipelines im Detail
    1. pipeline_syslog_enrichment
    2. pipeline_apache_clf
    3. pipeline_docker_json
    4. pipeline_security_auth
    5. pipeline_generic_passthrough
  3. Route-Tabelle
  4. Design-Entscheidung: Security via S2S, Web/IoT via HEC
  5. Testing von Pipelines

1. Pipeline-Konzept in Cribl

In Cribl Stream ist eine Pipeline eine geordnete Kette von Verarbeitungsschritten (Functions), die auf eingehende Events angewendet werden. Jede Function kann Events transformieren, anreichern, filtern oder verwerfen.

Kernkomponenten

KomponenteBeschreibung
FunctionEinzelner Verarbeitungsschritt (z.B. Regex Extract, Eval, Serialize/Deserialize)
FilterJavaScript-Ausdruck, der bestimmt, ob eine Function auf ein Event angewendet wird
PipelineGeordnete Sequenz von Functions mit einem eindeutigen Bezeichner
RouteVerbindet einen Filter-Ausdruck mit einer Pipeline und einer Destination

Der Datenfluss ist: Quelle → Route (Filter) → Pipeline (Functions) → Destination

Hinweis: Alle Pipeline-Konfigurationen befinden sich unter configs/stream/pipelines/. Jede Pipeline fuegt mindestens die Felder cribl_host und cribl_pipeline hinzu, um die Herkunft und den Verarbeitungspfad jedes Events nachvollziehbar zu machen (Auditierbarkeit).

2. Die 5 Pipelines im Detail

2.1 pipeline_syslog_enrichment

Datei: configs/stream/pipelines/pipeline-syslog-enrichment.json
Zweck: Anreicherung und Parsing von Syslog-formatierten Events (z.B. Apache Error Logs, Systemd Journal)

#FunctionFilterBeschreibung
1commenttrueDokumentation: Zweck der Pipeline
2evaltrueFuegt Metadaten hinzu: cribl_host, cribl_environment, cribl_pipeline
3regex_extracttrueExtrahiert aus _raw: syslog_timestamp, syslog_host, syslog_program, syslog_pid, message
4auto_timestampsyslog_timestampAutomatische Timestamp-Erkennung und -Normalisierung
+evaltrue Setzt Splunk-Routing-Felder: index='os_journal', sourcetype='syslog', source='cribl:journal'

Regex-Pattern:

/^(?<syslog_timestamp>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+
  (?<syslog_host>\S+)\s+
  (?<syslog_program>[^\[:\s]+)
  (?:\[(?<syslog_pid>\d+)\])?:?\s+
  (?<message>.*)/

Beispiel-Input:

Feb 10 14:23:01 nuc sshd[12345]: Connection from 192.168.1.100 port 52341

Extrahierte Felder:

FeldWert
syslog_timestampFeb 10 14:23:01
syslog_hostnuc
syslog_programsshd
syslog_pid12345
messageConnection from 192.168.1.100 port 52341
cribl_host(Hostname des Cribl-Servers)
cribl_environmentproduction
cribl_pipelinepipeline_syslog_enrichment

2.2 pipeline_apache_clf

Datei: configs/stream/pipelines/pipeline-apache-clf.json
Zweck: Parsing des Apache Combined Log Format (CLF) mit vollstaendiger Feldextraktion

#FunctionFilterBeschreibung
1commenttrueDokumentation: Zweck der Pipeline
2evaltrueFuegt Metadaten hinzu: cribl_host, cribl_environment, cribl_pipeline
3regex_extracttrueExtrahiert CLF-Felder: client_ip, remote_user, timestamp, method, uri, http_version, status, bytes, referrer, user_agent
4evalstatusTypkonvertierung: status und bytes zu numerischen Werten
5auto_timestamptimestampAutomatische Timestamp-Erkennung

Beispiel-Input:

192.168.1.50 - admin [10/Feb/2026:14:30:15 +0100] "GET /dashboard HTTP/1.1" 200 4523 "https://portal.example.de/" "Mozilla/5.0"

Extrahierte Felder:

FeldWertTyp
client_ip192.168.1.50String
remote_useradminString
methodGETString
uri/dashboardString
http_versionHTTP/1.1String
status200Number
bytes4523Number
referrerhttps://portal.example.de/String
user_agentMozilla/5.0String
Typkonvertierung: Die Felder status und bytes werden explizit in numerische Werte konvertiert (Number(status)), um in Splunk korrekte statistische Auswertungen (Durchschnitt, Summe, Percentile) zu ermoeglichen. Das Feld bytes wird bei einem Wert von "-" auf 0 gesetzt.

2.3 pipeline_docker_json

Datei: configs/stream/pipelines/pipeline-docker-json.json
Zweck: Parsing von Docker-JSON-Logs mit Container-ID-Extraktion und Log-Level-Erkennung

#FunctionFilterBeschreibung
1commenttrueDokumentation: Zweck der Pipeline
2evaltrueFuegt Metadaten hinzu: cribl_host, cribl_environment, cribl_pipeline
3serde (JSON Extract)trueDeserialisiert das JSON-Objekt aus _raw
4regex_extractsourceExtrahiert container_id (12 Zeichen) aus dem Dateipfad
5regex_extractlogExtrahiert log_timestamp, log_level, log_message aus dem Log-Feld
6evallogSetzt message auf log_message oder log (Fallback)
7auto_timestamptimeTimestamp aus dem Docker-time-Feld

Beispiel-Input (Docker JSON):

{"log":"2026-02-10T14:30:15.123Z INFO Starting Home Assistant Core...\n","stream":"stdout","time":"2026-02-10T14:30:15.123456789Z"}

Extrahierte Felder:

FeldWert
container_ida1b2c3d4e5f6 (aus Dateipfad)
streamstdout
log_timestamp2026-02-10T14:30:15.123Z
log_levelINFO
messageStarting Home Assistant Core...
Container-Mapping: Die Container-ID wird aus dem Source-Pfad extrahiert (z.B. /var/lib/docker/containers/<id>/<id>-json.log). Im PoC wird hauptsaechlich der Home-Assistant-Container (docker_ha) und der Mosquitto-MQTT-Broker ueberwacht.

2.4 pipeline_security_auth

Datei: configs/stream/pipelines/pipeline-security-auth.json
Zweck: Sicherheitsorientiertes Parsing von SSH-Authentifizierungs-Events mit MITRE ATT&CK-Tagging

Sicherheitsrelevant: Diese Pipeline verarbeitet Authentifizierungsdaten und reichert fehlgeschlagene Logins mit MITRE ATT&CK-Annotationen an. Die resultierenden Events werden ueber den S2S-Kanal an den Security-Index in Splunk geliefert.
#FunctionFilterBeschreibung
1commenttrueDokumentation: SSH/Auth Security Pipeline
2evaltrueMetadaten: cribl_host, cribl_environment, cribl_pipeline
3regex_extracttrueSyslog-Parsing: auth_timestamp, auth_host, auth_program, auth_pid, auth_message
4regex_extract/Failed password/Extraktion bei fehlgeschlagenem Login: failed_user, src_ip, src_port
5evalfailed_userMITRE-Tagging: T1110 Brute Force, auth_action=failed_login
6regex_extract/Accepted/Extraktion bei erfolgreichem Login: auth_method, accepted_user, src_ip, src_port
7evalaccepted_userTagging: auth_action=successful_login
8regex_extract/session opened/Session-Erkennung: session_user
9auto_timestampauth_timestampTimestamp-Normalisierung

MITRE ATT&CK-Anreicherung

Bei erkannten fehlgeschlagenen SSH-Logins werden folgende MITRE-Felder automatisch hinzugefuegt:

FeldWertBeschreibung
mitre_technique_idT1110MITRE ATT&CK Technique ID
mitre_technique_nameBrute ForceName der Technik
mitre_tacticCredential AccessZugehoerige Taktik
auth_actionfailed_loginKlassifikation der Aktion
security_tagauthentication_failureSicherheits-Tag fuer Korrelation

Beispiel-Input (fehlgeschlagener Login):

Feb 10 14:45:22 nuc sshd[9876]: Failed password for invalid user admin from 10.0.0.50 port 44312 ssh2

Resultierende Felder:

{
  "auth_host": "nuc",
  "auth_program": "sshd",
  "auth_pid": "9876",
  "failed_user": "admin",
  "src_ip": "10.0.0.50",
  "src_port": "44312",
  "auth_action": "failed_login",
  "security_tag": "authentication_failure",
  "mitre_technique_id": "T1110",
  "mitre_technique_name": "Brute Force",
  "mitre_tactic": "Credential Access",
  "cribl_pipeline": "pipeline_security_auth"
}

2.5 pipeline_generic_passthrough

Datei: configs/stream/pipelines/pipeline-generic-passthrough.json
Zweck: Minimale Verarbeitung – fuegt nur Herkunftsmetadaten hinzu, ohne das Event zu veraendern

#FunctionFilterBeschreibung
1commenttrueDokumentation: Minimale Verarbeitung
2evaltrueFuegt hinzu: cribl_host, cribl_pipeline
+evaltrue Setzt Splunk-Routing-Felder: index='os_journal', sourcetype='syslog', source='cribl:syslog'
Pflichtfelder fuer Splunk-Routing: Jede Pipeline muss die Felder index, sourcetype und source setzen, damit Events im korrekten Splunk-Index landen. Ohne diese Felder landen Events im Default-Index main mit sourcetype=httpevent.

Diese Pipeline wird fuer Quellen verwendet, die entweder bereits in einem gut strukturierten Format vorliegen (z.B. Samba-Logs) oder fuer die im PoC kein spezifisches Parsing definiert wurde. Sie dient als Auffang-Pipeline fuer die Default-Route.

Design-Prinzip: Jedes Event, das Cribl Stream durchlaeuft, erhaelt mindestens die Felder cribl_host und cribl_pipeline. Dies ermoeglicht im Nachhinein die lueckenlose Nachverfolgung, welcher Cribl-Server und welche Pipeline ein Event verarbeitet hat – eine zentrale Anforderung fuer die Auditierbarkeit.

3. Route-Tabelle

Die Routen sind in configs/stream/pipelines/routes.json definiert und werden in der angegebenen Reihenfolge (Prioritaet) ausgewertet. Die erste passende Route gewinnt ("final": true).

Prio Route-ID Name Filter Pipeline Destination
1 route_journal Systemd Journal __inputId.startsWith('journal_files:') syslog_enrichment splunk_hec
2 route_syslog Syslog __inputId.startsWith('syslog:') passthrough splunk_s2s
3 route_default Default true passthrough splunk_hec

Filter-Logik: __inputId

Die Filter basieren auf dem internen Feld __inputId, das Cribl automatisch fuer jedes Event setzt. Das Format ist Quellentyp:Quellen-ID:Protokoll:

Wichtig: Verwenden Sie __inputId.startsWith() statt exakter Vergleiche (==), da das Protokoll-Suffix (z.B. :tcp) variabel ist. Ein Filter wie __inputId == "syslog:in_syslog" wuerde nicht matchen, da der tatsaechliche Wert syslog:in_syslog:tcp lautet.

4. Design-Entscheidung: Security via S2S, Web/IoT via HEC

Die Zuordnung der Datenquellen zu den Destinations folgt einem bewussten Sicherheits- und Architekturkonzept:

DestinationQuellenBegruendung
S2S (splunk_s2s) SSH Auth, Systemd Journal, Samba Sicherheitsrelevante Daten: Kein HTTP-Endpoint exponiert, geringere Angriffsflaeche, natives Splunk-Protokoll fuer maximale Kompatibilitaet mit Splunk Enterprise Security (ES)
HEC (splunk_hec) Apache Access/Error, Docker/HA, Default Web- und Anwendungsdaten: Flexibles HTTP-Protokoll, einfachere Firewall-Konfiguration, gzip-Kompression fuer volumenreiche Web-Logs, Token-basierte Zugriffskontrolle pro Index

Sicherheitsbegruendung

5. Testing von Pipelines

5.1 Sample Data (Testdaten)

Fuer jede Pipeline sollten repraesentative Testdaten vorbereitet werden. Die Testdaten koennen in der Cribl-UI unter Pipelines → [Pipeline] → Preview eingefuegt werden.

Beispiel-Testdaten je Pipeline

# pipeline_syslog_enrichment
Feb 10 14:23:01 nuc sshd[12345]: Connection from 192.168.1.100 port 52341
Feb 10 14:23:05 nuc systemd[1]: Started Session 42 of user root.

# pipeline_apache_clf
192.168.1.50 - admin [10/Feb/2026:14:30:15 +0100] "GET /api/v1/status HTTP/1.1" 200 1234 "-" "curl/7.68.0"
10.0.0.1 - - [10/Feb/2026:14:30:16 +0100] "POST /login HTTP/1.1" 401 89 "https://app.example.de/login" "Mozilla/5.0"

# pipeline_docker_json
{"log":"2026-02-10T14:30:15.123Z INFO Starting service...\n","stream":"stdout","time":"2026-02-10T14:30:15.123456789Z"}

# pipeline_security_auth
Feb 10 14:45:22 nuc sshd[9876]: Failed password for invalid user admin from 10.0.0.50 port 44312 ssh2
Feb 10 14:45:30 nuc sshd[9877]: Accepted publickey for mpauli from 192.168.1.1 port 55123 ssh2

# pipeline_generic_passthrough
[2026/02/10 14:30:00, 0] smbd: server started

5.2 Preview-Funktion

Die Preview-Funktion in der Cribl-UI ermoeglicht es, Testdaten durch eine Pipeline zu schicken und das Ergebnis sofort zu sehen:

  1. Navigieren zu Processing → Pipelines → [Pipeline auswaehlen]
  2. Rechts auf Preview klicken
  3. Testdaten einfuegen oder eine Datei hochladen
  4. Die Ausgabe zeigt die extrahierten Felder und Transformationen
  5. Jede Function kann einzeln aktiviert/deaktiviert werden, um Schritte zu isolieren

5.3 Verifikations-Checkliste