Detector の仕組み
events 表をストリーム的に追跡し、ルールを評価してアラートを発火するエンジン
パイプライン全体
┌─────────────────────────────┐
│ events 表 │
│ (Sensor が常に追記) │
└─────────────┬───────────────┘
│
│ SELECT * FROM events
│ WHERE id > checkpoint
│ ORDER BY id ASC LIMIT 500
▼
┌─────────────────────────────┐
│ _tick (1 sec interval) │
└─────────────┬───────────────┘
│
▼
┌─────────────────────────────┐
│ _evaluate_event(ev) │
│ ↓ event_type で分類 │
│ ↓ 該当ルールをループ │
│ ↓ eval_condition(rule, ev) │
└─────────────┬───────────────┘
│ match
▼
┌─────────────────────────────┐
│ _fire_alert │
│ ↓ 二重発火チェック │
│ ↓ INSERT alerts │
│ ↓ ResponseHandler.dispatch │
└─────────────┬───────────────┘
│
▼
response_log
チェックポイント駆動
Detector が "どこまで評価したか" は detector_state 表に
last_evaluated_event_id として永続化されています。
CHECKPOINT_KEY = "last_evaluated_event_id"
def _load_checkpoint(self, conn):
row = conn.execute(
"SELECT value FROM detector_state WHERE key = ?", (CHECKPOINT_KEY,)
).fetchone()
return int(row["value"]) if row else 0
def _save_checkpoint(self, conn, last_id):
conn.execute(
"INSERT INTO detector_state(key, value) VALUES(?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
(CHECKPOINT_KEY, str(last_id)),
)
これにより以下が成立します:
- Detector の再起動に強い: 終了→再起動しても同じ event を再評価しない
- Sensor との完全な疎結合: Sensor が追記する間に Detector が落ちても events は失われない
- 追いかけ評価: バックログがある状態で起動した場合も、checkpoint を起点に追いつける
1 ティックの処理
def _tick(self, conn, last_id):
rows = conn.execute(
"SELECT * FROM events WHERE id > ? ORDER BY id ASC LIMIT ?",
(last_id, FETCH_LIMIT),
).fetchall()
for ev in rows:
self._evaluate_event(conn, ev)
last_id = ev["id"]
if rows:
self._save_checkpoint(conn, last_id)
return last_id
1 回のフェッチは最大 500 件 (FETCH_LIMIT) に制限されます。これにより:
- 大量のバックログがあっても 1 ティックでメモリを使い切らない
- 1 件ずつ評価して checkpoint を更新するので、評価中にクラッシュしても次回起動時に重複評価が最小限で済む
ルールの event_type 別分類
URSUS は起動時にルールを event_type ごとに分類しておきます。
def __init__(self, db_path, rules, response_handler, poll_interval=1.0):
...
self.rules_by_type = {}
for r in rules:
self.rules_by_type.setdefault(r.event_type, []).append(r)
def _evaluate_event(self, conn, ev):
for rule in self.rules_by_type.get(ev["event_type"], []):
if not rule.enabled:
continue
if eval_condition(rule.condition, ev):
self._fire_alert(conn, rule, ev)
これは単純な最適化です: process イベントに対して file ルールや network ルールを評価しても無駄なので、event_type が一致するルールだけを試します。
condition の評価
ルールの condition は再帰的なツリーです。詳細は
Rules DSL を参照。
def eval_condition(node, event):
if "all" in node:
return all(eval_condition(c, event) for c in node["all"])
if "any" in node:
return any(eval_condition(c, event) for c in node["any"])
if "not" in node:
return not eval_condition(node["not"], event)
# リーフ
value = extract_field(event, node["field"])
return OPERATORS[node["op"]](value, node.get("value"))
フィールド抽出
def extract_field(event, field):
if field.startswith("raw."):
# raw_json の中身を dot 記法で取り出す
data = json.loads(event["raw_json"])
for key in field.split(".")[1:]:
if not isinstance(data, dict): return None
data = data.get(key)
if data is None: return None
return data
# 非正規化カラム名
return event[field]
ルール側からは:
field: process_name→ 非正規化カラムevents.process_namefield: raw.cmdline→raw_json["cmdline"]field: raw.parent.cmdline→raw_json["parent"]["cmdline"](dot 記法)
比較演算子
URSUS の演算子は単純な純関数です。(event_value, rule_value) -> bool。
| op | 意味 | 例 |
|---|---|---|
eq | 等価 | op: eq, value: bash |
neq | 非等価 | op: neq, value: root |
in | リスト含有 | op: in, value: [bash, sh, dash] |
not_in | リスト非含有 | op: not_in, value: [systemd, init] |
regex | 正規表現マッチ (re.search) | op: regex, value: "(curl|wget).+\\|" |
contains | 部分文字列 | op: contains, value: "/.ssh/" |
startswith | 前方一致 | op: startswith, value: /tmp/ |
endswith | 後方一致 | op: endswith, value: .sh |
gt / lt | 数値比較 | op: gt, value: 1024 |
exists | None でない | op: exists |
コード: src/ursus/detector/operators.py
OPERATORS = {
"eq": lambda a, b: a == b,
"neq": lambda a, b: a != b,
"in": lambda a, b: a in b if isinstance(b, list) else False,
"not_in": lambda a, b: a not in b if isinstance(b, list) else False,
"regex": lambda a, b: bool(re.search(b, str(a))) if a is not None else False,
"contains": lambda a, b: b in str(a) if a is not None else False,
"exists": lambda a, b: a is not None,
...
}
アラート発火
def _fire_alert(self, conn, rule, ev):
# 同じ (rule, event) で二重発火しないように事前 SELECT
existing = conn.execute(
"SELECT id FROM alerts WHERE rule_id = ? AND triggered_event_id = ?",
(rule.id, ev["id"]),
).fetchone()
if existing:
return
cur = conn.execute(
"INSERT INTO alerts "
"(timestamp, rule_id, rule_title, severity, triggered_event_id, mitre) "
"VALUES (?, ?, ?, ?, ?, ?)",
(time.time(), rule.id, rule.title, rule.severity,
ev["id"], json.dumps(rule.mitre)),
)
alert_id = cur.lastrowid
self.response_handler.dispatch(conn, rule, alert_id, ev)
重複発火の防止
バックフィル機能(過去 events への遡及評価)や、Detector のクラッシュ後再起動で checkpoint より少し前から評価しなおすケースに備えて、同じ (rule, event) ペアでの INSERT を事前 SELECT で抑止しています。
本来は UNIQUE(rule_id, triggered_event_id) 制約を張る方がきれいですが、
読み下しやすさを優先して事前 SELECT 方式で実装している。
メインループ
def run(self, stop_event):
conn = get_connection(self.db_path)
try:
last_id = self._load_checkpoint(conn)
while not stop_event.is_set():
try:
last_id = self._tick(conn, last_id)
except Exception:
self.log.exception("engine_tick_failed")
stop_event.wait(self.poll_interval)
finally:
conn.close()
シンプルです。ポーリング間隔は poll_interval_sec(既定 1 秒)。
SIGTERM が来ると stop_event が set され、現在のティックが終わったら
ループを抜けて DB をクローズします。
パフォーマンス特性
- 1 イベントあたりの評価コスト: O(rules × condition_complexity)。 現状 9 ルール × 浅い AND/OR ツリーなので、1 イベント数十マイクロ秒。
- バックログがあるとき: 1 ティックで最大 500 件評価 → 1 秒で 500 件。 10 万件のバックログでも 200 秒で追いつく。
- idle 時の CPU 消費: ほぼゼロ。SELECT が空を返すだけ。
- DB ロック競合: WAL モードなので Sensor の INSERT と Detector の SELECT は 並行可能。Detector の INSERT (alerts) も短時間で済む。
未実装の拡張
バックフィル CLI
DESIGN.md §6.4.5 では ursus-detector --backfill --since "2026-04-01" で
過去の events を再評価する機能が想定されています。現状未実装ですが、
checkpoint を一旦 0 に戻して特定範囲を SELECT するだけで実装可能です。
時間窓型ルール
R006 (SSH 短時間多発失敗) のような閾値型は現在の DSL では表現できません。
condition に time_window フィールドを追加し、過去 N 秒の events を
集計してから match させる仕組みが必要です。
ルールマッチの並列化
現在は 1 イベントを 1 スレッドで全ルール評価しています。ルール数が増えたら 並列化の余地がありますが、9 ルール程度では効果ゼロです。