docs

Detector の仕組み

events 表をストリーム的に追跡し、ルールを評価してアラートを発火するエンジン

パイプライン全体

                              ┌─────────────────────────────┐
                              │       events 表             │
                              │   (Sensor が常に追記)        │
                              └─────────────┬───────────────┘
                                            │
                                            │ SELECT * FROM events
                                            │ WHERE id > checkpoint
                                            │ ORDER BY id ASC LIMIT 500
                                            ▼
                              ┌─────────────────────────────┐
                              │   _tick (1 sec interval)     │
                              └─────────────┬───────────────┘
                                            │
                                            ▼
                              ┌─────────────────────────────┐
                              │   _evaluate_event(ev)        │
                              │   ↓ event_type で分類         │
                              │   ↓ 該当ルールをループ         │
                              │   ↓ eval_condition(rule, ev) │
                              └─────────────┬───────────────┘
                                            │ match
                                            ▼
                              ┌─────────────────────────────┐
                              │   _fire_alert                │
                              │   ↓ 二重発火チェック           │
                              │   ↓ INSERT alerts             │
                              │   ↓ ResponseHandler.dispatch  │
                              └─────────────┬───────────────┘
                                            │
                                            ▼
                                    response_log

チェックポイント駆動

Detector が "どこまで評価したか" は detector_state 表に last_evaluated_event_id として永続化されています。

CHECKPOINT_KEY = "last_evaluated_event_id"

def _load_checkpoint(self, conn):
    row = conn.execute(
        "SELECT value FROM detector_state WHERE key = ?", (CHECKPOINT_KEY,)
    ).fetchone()
    return int(row["value"]) if row else 0

def _save_checkpoint(self, conn, last_id):
    conn.execute(
        "INSERT INTO detector_state(key, value) VALUES(?, ?) "
        "ON CONFLICT(key) DO UPDATE SET value = excluded.value",
        (CHECKPOINT_KEY, str(last_id)),
    )

これにより以下が成立します:

  • Detector の再起動に強い: 終了→再起動しても同じ event を再評価しない
  • Sensor との完全な疎結合: Sensor が追記する間に Detector が落ちても events は失われない
  • 追いかけ評価: バックログがある状態で起動した場合も、checkpoint を起点に追いつける

1 ティックの処理

def _tick(self, conn, last_id):
    rows = conn.execute(
        "SELECT * FROM events WHERE id > ? ORDER BY id ASC LIMIT ?",
        (last_id, FETCH_LIMIT),
    ).fetchall()
    for ev in rows:
        self._evaluate_event(conn, ev)
        last_id = ev["id"]
    if rows:
        self._save_checkpoint(conn, last_id)
    return last_id

1 回のフェッチは最大 500 件 (FETCH_LIMIT) に制限されます。これにより:

  • 大量のバックログがあっても 1 ティックでメモリを使い切らない
  • 1 件ずつ評価して checkpoint を更新するので、評価中にクラッシュしても次回起動時に重複評価が最小限で済む

ルールの event_type 別分類

URSUS は起動時にルールを event_type ごとに分類しておきます。

def __init__(self, db_path, rules, response_handler, poll_interval=1.0):
    ...
    self.rules_by_type = {}
    for r in rules:
        self.rules_by_type.setdefault(r.event_type, []).append(r)

def _evaluate_event(self, conn, ev):
    for rule in self.rules_by_type.get(ev["event_type"], []):
        if not rule.enabled:
            continue
        if eval_condition(rule.condition, ev):
            self._fire_alert(conn, rule, ev)

これは単純な最適化です: process イベントに対して file ルールや network ルールを評価しても無駄なので、event_type が一致するルールだけを試します。

condition の評価

ルールの condition は再帰的なツリーです。詳細は Rules DSL を参照。

def eval_condition(node, event):
    if "all" in node:
        return all(eval_condition(c, event) for c in node["all"])
    if "any" in node:
        return any(eval_condition(c, event) for c in node["any"])
    if "not" in node:
        return not eval_condition(node["not"], event)
    # リーフ
    value = extract_field(event, node["field"])
    return OPERATORS[node["op"]](value, node.get("value"))

フィールド抽出

def extract_field(event, field):
    if field.startswith("raw."):
        # raw_json の中身を dot 記法で取り出す
        data = json.loads(event["raw_json"])
        for key in field.split(".")[1:]:
            if not isinstance(data, dict): return None
            data = data.get(key)
            if data is None: return None
        return data
    # 非正規化カラム名
    return event[field]

ルール側からは:

  • field: process_name → 非正規化カラム events.process_name
  • field: raw.cmdlineraw_json["cmdline"]
  • field: raw.parent.cmdlineraw_json["parent"]["cmdline"] (dot 記法)

比較演算子

URSUS の演算子は単純な純関数です。(event_value, rule_value) -> bool

op意味
eq等価op: eq, value: bash
neq非等価op: neq, value: root
inリスト含有op: in, value: [bash, sh, dash]
not_inリスト非含有op: not_in, value: [systemd, init]
regex正規表現マッチ (re.search)op: regex, value: "(curl|wget).+\\|"
contains部分文字列op: contains, value: "/.ssh/"
startswith前方一致op: startswith, value: /tmp/
endswith後方一致op: endswith, value: .sh
gt / lt数値比較op: gt, value: 1024
existsNone でないop: exists

コード: src/ursus/detector/operators.py

OPERATORS = {
    "eq":         lambda a, b: a == b,
    "neq":        lambda a, b: a != b,
    "in":         lambda a, b: a in b if isinstance(b, list) else False,
    "not_in":     lambda a, b: a not in b if isinstance(b, list) else False,
    "regex":      lambda a, b: bool(re.search(b, str(a))) if a is not None else False,
    "contains":   lambda a, b: b in str(a) if a is not None else False,
    "exists":     lambda a, b: a is not None,
    ...
}

アラート発火

def _fire_alert(self, conn, rule, ev):
    # 同じ (rule, event) で二重発火しないように事前 SELECT
    existing = conn.execute(
        "SELECT id FROM alerts WHERE rule_id = ? AND triggered_event_id = ?",
        (rule.id, ev["id"]),
    ).fetchone()
    if existing:
        return

    cur = conn.execute(
        "INSERT INTO alerts "
        "(timestamp, rule_id, rule_title, severity, triggered_event_id, mitre) "
        "VALUES (?, ?, ?, ?, ?, ?)",
        (time.time(), rule.id, rule.title, rule.severity,
         ev["id"], json.dumps(rule.mitre)),
    )
    alert_id = cur.lastrowid

    self.response_handler.dispatch(conn, rule, alert_id, ev)

重複発火の防止

バックフィル機能(過去 events への遡及評価)や、Detector のクラッシュ後再起動で checkpoint より少し前から評価しなおすケースに備えて、同じ (rule, event) ペアでの INSERT を事前 SELECT で抑止しています。

本来は UNIQUE(rule_id, triggered_event_id) 制約を張る方がきれいですが、 読み下しやすさを優先して事前 SELECT 方式で実装している。

メインループ

def run(self, stop_event):
    conn = get_connection(self.db_path)
    try:
        last_id = self._load_checkpoint(conn)
        while not stop_event.is_set():
            try:
                last_id = self._tick(conn, last_id)
            except Exception:
                self.log.exception("engine_tick_failed")
            stop_event.wait(self.poll_interval)
    finally:
        conn.close()

シンプルです。ポーリング間隔は poll_interval_sec(既定 1 秒)。 SIGTERM が来ると stop_event が set され、現在のティックが終わったら ループを抜けて DB をクローズします。

パフォーマンス特性

  • 1 イベントあたりの評価コスト: O(rules × condition_complexity)。 現状 9 ルール × 浅い AND/OR ツリーなので、1 イベント数十マイクロ秒。
  • バックログがあるとき: 1 ティックで最大 500 件評価 → 1 秒で 500 件。 10 万件のバックログでも 200 秒で追いつく。
  • idle 時の CPU 消費: ほぼゼロ。SELECT が空を返すだけ。
  • DB ロック競合: WAL モードなので Sensor の INSERT と Detector の SELECT は 並行可能。Detector の INSERT (alerts) も短時間で済む。

未実装の拡張

バックフィル CLI

DESIGN.md §6.4.5 では ursus-detector --backfill --since "2026-04-01" で 過去の events を再評価する機能が想定されています。現状未実装ですが、 checkpoint を一旦 0 に戻して特定範囲を SELECT するだけで実装可能です。

時間窓型ルール

R006 (SSH 短時間多発失敗) のような閾値型は現在の DSL では表現できません。 condition に time_window フィールドを追加し、過去 N 秒の events を 集計してから match させる仕組みが必要です。

ルールマッチの並列化

現在は 1 イベントを 1 スレッドで全ルール評価しています。ルール数が増えたら 並列化の余地がありますが、9 ルール程度では効果ゼロです。

関連ドキュメント