| Version | Datum | Autor | Beschreibung |
|---|---|---|---|
| 1.0 | 2026-02-10 | Marcus Pauli | Erstversion: 5 Pipelines, Route-Tabelle, Design-Entscheidungen |
In Cribl Stream ist eine Pipeline eine geordnete Kette von Verarbeitungsschritten (Functions), die auf eingehende Events angewendet werden. Jede Function kann Events transformieren, anreichern, filtern oder verwerfen.
| Komponente | Beschreibung |
|---|---|
| Function | Einzelner Verarbeitungsschritt (z.B. Regex Extract, Eval, Serialize/Deserialize) |
| Filter | JavaScript-Ausdruck, der bestimmt, ob eine Function auf ein Event angewendet wird |
| Pipeline | Geordnete Sequenz von Functions mit einem eindeutigen Bezeichner |
| Route | Verbindet einen Filter-Ausdruck mit einer Pipeline und einer Destination |
Der Datenfluss ist: Quelle → Route (Filter) → Pipeline (Functions) → Destination
configs/stream/pipelines/. Jede Pipeline fuegt mindestens die Felder
cribl_host und cribl_pipeline hinzu, um die Herkunft und den
Verarbeitungspfad jedes Events nachvollziehbar zu machen (Auditierbarkeit).
Datei: configs/stream/pipelines/pipeline-syslog-enrichment.json
Zweck: Anreicherung und Parsing von Syslog-formatierten Events (z.B. Apache Error Logs, Systemd Journal)
| # | Function | Filter | Beschreibung |
|---|---|---|---|
| 1 | comment | true | Dokumentation: Zweck der Pipeline |
| 2 | eval | true | Fuegt Metadaten hinzu: cribl_host, cribl_environment, cribl_pipeline |
| 3 | regex_extract | true | Extrahiert aus _raw: syslog_timestamp, syslog_host, syslog_program, syslog_pid, message |
| 4 | auto_timestamp | syslog_timestamp | Automatische Timestamp-Erkennung und -Normalisierung |
| + | eval | true |
Setzt Splunk-Routing-Felder: index='os_journal', sourcetype='syslog',
source='cribl:journal' |
Regex-Pattern:
/^(?<syslog_timestamp>\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+
(?<syslog_host>\S+)\s+
(?<syslog_program>[^\[:\s]+)
(?:\[(?<syslog_pid>\d+)\])?:?\s+
(?<message>.*)/
Beispiel-Input:
Feb 10 14:23:01 nuc sshd[12345]: Connection from 192.168.1.100 port 52341
Extrahierte Felder:
| Feld | Wert |
|---|---|
syslog_timestamp | Feb 10 14:23:01 |
syslog_host | nuc |
syslog_program | sshd |
syslog_pid | 12345 |
message | Connection from 192.168.1.100 port 52341 |
cribl_host | (Hostname des Cribl-Servers) |
cribl_environment | production |
cribl_pipeline | pipeline_syslog_enrichment |
Datei: configs/stream/pipelines/pipeline-apache-clf.json
Zweck: Parsing des Apache Combined Log Format (CLF) mit vollstaendiger Feldextraktion
| # | Function | Filter | Beschreibung |
|---|---|---|---|
| 1 | comment | true | Dokumentation: Zweck der Pipeline |
| 2 | eval | true | Fuegt Metadaten hinzu: cribl_host, cribl_environment, cribl_pipeline |
| 3 | regex_extract | true | Extrahiert CLF-Felder: client_ip, remote_user, timestamp, method, uri, http_version, status, bytes, referrer, user_agent |
| 4 | eval | status | Typkonvertierung: status und bytes zu numerischen Werten |
| 5 | auto_timestamp | timestamp | Automatische Timestamp-Erkennung |
Beispiel-Input:
192.168.1.50 - admin [10/Feb/2026:14:30:15 +0100] "GET /dashboard HTTP/1.1" 200 4523 "https://portal.example.de/" "Mozilla/5.0"
Extrahierte Felder:
| Feld | Wert | Typ |
|---|---|---|
client_ip | 192.168.1.50 | String |
remote_user | admin | String |
method | GET | String |
uri | /dashboard | String |
http_version | HTTP/1.1 | String |
status | 200 | Number |
bytes | 4523 | Number |
referrer | https://portal.example.de/ | String |
user_agent | Mozilla/5.0 | String |
status und bytes werden
explizit in numerische Werte konvertiert (Number(status)), um in Splunk korrekte
statistische Auswertungen (Durchschnitt, Summe, Percentile) zu ermoeglichen. Das Feld bytes
wird bei einem Wert von "-" auf 0 gesetzt.
Datei: configs/stream/pipelines/pipeline-docker-json.json
Zweck: Parsing von Docker-JSON-Logs mit Container-ID-Extraktion und Log-Level-Erkennung
| # | Function | Filter | Beschreibung |
|---|---|---|---|
| 1 | comment | true | Dokumentation: Zweck der Pipeline |
| 2 | eval | true | Fuegt Metadaten hinzu: cribl_host, cribl_environment, cribl_pipeline |
| 3 | serde (JSON Extract) | true | Deserialisiert das JSON-Objekt aus _raw |
| 4 | regex_extract | source | Extrahiert container_id (12 Zeichen) aus dem Dateipfad |
| 5 | regex_extract | log | Extrahiert log_timestamp, log_level, log_message aus dem Log-Feld |
| 6 | eval | log | Setzt message auf log_message oder log (Fallback) |
| 7 | auto_timestamp | time | Timestamp aus dem Docker-time-Feld |
Beispiel-Input (Docker JSON):
{"log":"2026-02-10T14:30:15.123Z INFO Starting Home Assistant Core...\n","stream":"stdout","time":"2026-02-10T14:30:15.123456789Z"}
Extrahierte Felder:
| Feld | Wert |
|---|---|
container_id | a1b2c3d4e5f6 (aus Dateipfad) |
stream | stdout |
log_timestamp | 2026-02-10T14:30:15.123Z |
log_level | INFO |
message | Starting Home Assistant Core... |
/var/lib/docker/containers/<id>/<id>-json.log). Im PoC
wird hauptsaechlich der Home-Assistant-Container (docker_ha) und der
Mosquitto-MQTT-Broker ueberwacht.
Datei: configs/stream/pipelines/pipeline-security-auth.json
Zweck: Sicherheitsorientiertes Parsing von SSH-Authentifizierungs-Events mit MITRE ATT&CK-Tagging
| # | Function | Filter | Beschreibung |
|---|---|---|---|
| 1 | comment | true | Dokumentation: SSH/Auth Security Pipeline |
| 2 | eval | true | Metadaten: cribl_host, cribl_environment, cribl_pipeline |
| 3 | regex_extract | true | Syslog-Parsing: auth_timestamp, auth_host, auth_program, auth_pid, auth_message |
| 4 | regex_extract | /Failed password/ | Extraktion bei fehlgeschlagenem Login: failed_user, src_ip, src_port |
| 5 | eval | failed_user | MITRE-Tagging: T1110 Brute Force, auth_action=failed_login |
| 6 | regex_extract | /Accepted/ | Extraktion bei erfolgreichem Login: auth_method, accepted_user, src_ip, src_port |
| 7 | eval | accepted_user | Tagging: auth_action=successful_login |
| 8 | regex_extract | /session opened/ | Session-Erkennung: session_user |
| 9 | auto_timestamp | auth_timestamp | Timestamp-Normalisierung |
Bei erkannten fehlgeschlagenen SSH-Logins werden folgende MITRE-Felder automatisch hinzugefuegt:
| Feld | Wert | Beschreibung |
|---|---|---|
mitre_technique_id | T1110 | MITRE ATT&CK Technique ID |
mitre_technique_name | Brute Force | Name der Technik |
mitre_tactic | Credential Access | Zugehoerige Taktik |
auth_action | failed_login | Klassifikation der Aktion |
security_tag | authentication_failure | Sicherheits-Tag fuer Korrelation |
Beispiel-Input (fehlgeschlagener Login):
Feb 10 14:45:22 nuc sshd[9876]: Failed password for invalid user admin from 10.0.0.50 port 44312 ssh2
Resultierende Felder:
{
"auth_host": "nuc",
"auth_program": "sshd",
"auth_pid": "9876",
"failed_user": "admin",
"src_ip": "10.0.0.50",
"src_port": "44312",
"auth_action": "failed_login",
"security_tag": "authentication_failure",
"mitre_technique_id": "T1110",
"mitre_technique_name": "Brute Force",
"mitre_tactic": "Credential Access",
"cribl_pipeline": "pipeline_security_auth"
}
Datei: configs/stream/pipelines/pipeline-generic-passthrough.json
Zweck: Minimale Verarbeitung – fuegt nur Herkunftsmetadaten hinzu, ohne das Event zu veraendern
| # | Function | Filter | Beschreibung |
|---|---|---|---|
| 1 | comment | true | Dokumentation: Minimale Verarbeitung |
| 2 | eval | true | Fuegt hinzu: cribl_host, cribl_pipeline |
| + | eval | true |
Setzt Splunk-Routing-Felder: index='os_journal', sourcetype='syslog',
source='cribl:syslog' |
index,
sourcetype und source setzen, damit Events im korrekten Splunk-Index landen.
Ohne diese Felder landen Events im Default-Index main mit sourcetype=httpevent.
Diese Pipeline wird fuer Quellen verwendet, die entweder bereits in einem gut strukturierten Format vorliegen (z.B. Samba-Logs) oder fuer die im PoC kein spezifisches Parsing definiert wurde. Sie dient als Auffang-Pipeline fuer die Default-Route.
cribl_host und cribl_pipeline. Dies ermoeglicht im Nachhinein die
lueckenlose Nachverfolgung, welcher Cribl-Server und welche Pipeline ein Event verarbeitet hat
– eine zentrale Anforderung fuer die Auditierbarkeit.
Die Routen sind in configs/stream/pipelines/routes.json definiert und werden in der
angegebenen Reihenfolge (Prioritaet) ausgewertet. Die erste passende Route gewinnt ("final": true).
| Prio | Route-ID | Name | Filter | Pipeline | Destination |
|---|---|---|---|---|---|
| 1 | route_journal |
Systemd Journal | __inputId.startsWith('journal_files:') |
syslog_enrichment |
splunk_hec |
| 2 | route_syslog |
Syslog | __inputId.startsWith('syslog:') |
passthrough |
splunk_s2s |
| 3 | route_default |
Default | true |
passthrough |
splunk_hec |
Die Filter basieren auf dem internen Feld __inputId, das Cribl automatisch fuer jedes Event setzt.
Das Format ist Quellentyp:Quellen-ID:Protokoll:
journal_files:in_journal_local – Systemd-Journal-Quellesyslog:in_syslog:tcp – Syslog via TCPsyslog:in_syslog:udp – Syslog via UDP__inputId.startsWith() statt exakter Vergleiche (==),
da das Protokoll-Suffix (z.B. :tcp) variabel ist. Ein Filter wie
__inputId == "syslog:in_syslog" wuerde nicht matchen, da der tatsaechliche
Wert syslog:in_syslog:tcp lautet.
Die Zuordnung der Datenquellen zu den Destinations folgt einem bewussten Sicherheits- und Architekturkonzept:
| Destination | Quellen | Begruendung |
|---|---|---|
| S2S (splunk_s2s) | SSH Auth, Systemd Journal, Samba | Sicherheitsrelevante Daten: Kein HTTP-Endpoint exponiert, geringere Angriffsflaeche, natives Splunk-Protokoll fuer maximale Kompatibilitaet mit Splunk Enterprise Security (ES) |
| HEC (splunk_hec) | Apache Access/Error, Docker/HA, Default | Web- und Anwendungsdaten: Flexibles HTTP-Protokoll, einfachere Firewall-Konfiguration, gzip-Kompression fuer volumenreiche Web-Logs, Token-basierte Zugriffskontrolle pro Index |
Fuer jede Pipeline sollten repraesentative Testdaten vorbereitet werden. Die Testdaten koennen in der Cribl-UI unter Pipelines → [Pipeline] → Preview eingefuegt werden.
# pipeline_syslog_enrichment
Feb 10 14:23:01 nuc sshd[12345]: Connection from 192.168.1.100 port 52341
Feb 10 14:23:05 nuc systemd[1]: Started Session 42 of user root.
# pipeline_apache_clf
192.168.1.50 - admin [10/Feb/2026:14:30:15 +0100] "GET /api/v1/status HTTP/1.1" 200 1234 "-" "curl/7.68.0"
10.0.0.1 - - [10/Feb/2026:14:30:16 +0100] "POST /login HTTP/1.1" 401 89 "https://app.example.de/login" "Mozilla/5.0"
# pipeline_docker_json
{"log":"2026-02-10T14:30:15.123Z INFO Starting service...\n","stream":"stdout","time":"2026-02-10T14:30:15.123456789Z"}
# pipeline_security_auth
Feb 10 14:45:22 nuc sshd[9876]: Failed password for invalid user admin from 10.0.0.50 port 44312 ssh2
Feb 10 14:45:30 nuc sshd[9877]: Accepted publickey for mpauli from 192.168.1.1 port 55123 ssh2
# pipeline_generic_passthrough
[2026/02/10 14:30:00, 0] smbd: server started
Die Preview-Funktion in der Cribl-UI ermoeglicht es, Testdaten durch eine Pipeline zu schicken und das Ergebnis sofort zu sehen: