レスポンス動作
アラート発火時に実行されるアクション群と、その安全装置の仕様
概要
ルールがマッチすると alerts 表への INSERT に続いて
ResponseHandler.dispatch() が呼ばれ、ルールの response に
列挙されたアクションを順に実行する。各アクションには 許可ゲート
(allowed_actions)と 個別の安全装置(PID / パス / アドレス検査)が
あり、両方を通過した場合のみ実体が動く。
alerts.INSERT
│
▼
ResponseHandler.dispatch(rule, alert_id, event)
│
│ for each action in rule.response:
▼
┌───────────────────────────────────────────────┐
│ ① allowed_actions に含まれているか │── No ──▶ refused: not in allowed_actions
└─────────────┬─────────────────────────────────┘
│ Yes
▼
┌───────────────────────────────────────────────┐
│ ② アクション固有の安全装置 │── No ──▶ refused: protected ...
│ (PID / パス / アドレス検査) │
└─────────────┬─────────────────────────────────┘
│ Yes
▼
┌───────────────────────────────────────────────┐
│ ③ dry_run か? │── Yes ──▶ response_log に dry_run=1
└─────────────┬─────────────────────────────────┘
│ No
▼
④ 実体の処理 (kill / move / ...)
│
▼
response_log.INSERT
アクション一覧
| アクション | 状態 | 必要権限 | 対象 event_type |
|---|---|---|---|
alert |
有効 | 不要 | すべて |
kill_process |
有効 | kill 対象を撃てる権限 (通常 root) | process / 他で pid を持つもの |
quarantine_file |
有効 | 対象ファイルへの書込権限 + /var/quarantine/ursus/ 作成権限 (通常 root) |
file |
block_network |
無効化 | — | — |
dispatch のフロー
コード本体: src/ursus/detector/responses.py
def dispatch(self, conn, rule, alert_id, event):
for action in rule.response:
if action not in self.allowed_actions:
self._record(conn, alert_id, action, None, success=False,
detail="not in allowed_actions")
continue
handler = ACTIONS.get(action)
if handler is None:
self._record(conn, alert_id, action, None, success=False,
detail="unknown action")
continue
try:
handler(self, conn, alert_id, event)
except Exception as e:
self._record(conn, alert_id, action, None, success=False,
detail=f"error: {e}")
各アクションが投げた例外は dispatch ループで捕捉され、response_log に
失敗として記録される。1 つのアクションが失敗しても残りのアクションは実行される。
allowed_actions ゲート
config.yml の detector.response.allowed_actions に列挙されたアクション
だけが実行される。リストにないアクションがルールの response に書かれていても、
not in allowed_actions として response_log に拒否記録が残る。
detector:
response:
dry_run: false
allowed_actions:
- alert
- kill_process
- quarantine_file
これにより「ルール自体は強い response を宣言しているが、運用ポリシー側で alert だけ許可する」という運用が可能になる。
dry_run
detector.response.dry_run: true のとき、各アクションは安全装置を通過した
あとも実体処理を行わず、response_log に dry_run=1, success=1 として
記録するだけで終える。「実行されていれば何が起きていたか」を UI から確認できる。
URSUS の既定値は dry_run: true。false に変更すると Detector の起動時に
警告ログが出る。
{"event": "destructive_actions_enabled",
"actions": ["kill_process", "quarantine_file"]}
action: alert
通知用の "no-op" アクション。response_log に action=alert, success=1,
detail=event_id=<n> を記録するだけで、外部に何の作用も及ぼさない。
ほぼ全てのルールで含めることを想定している。
def _action_alert(handler, conn, alert_id, event):
handler._record(conn, alert_id, "alert", target=None, success=True,
detail=f"event_id={event['id']}")
action: kill_process
os.kill(pid, SIGTERM) によるプロセス終了。実装は単純な SIGTERM のみで、
SIGKILL へのフォールバックは行わない。
対象 PID
イベント行の pid 非正規化カラムを使う。process / network / auth イベントの
いずれでも pid がセットされていれば対象になりうる。
安全装置
以下のいずれかに該当する PID は 拒否される。実体は呼ばれず、
response_log に refused: protected pid (<pid>) が記録される。
pid <= 1(init / kernel 由来)pid == os.getpid()(Detector 自身)pid == os.getppid()(Detector の親プロセス。systemd 等の保険)
実装
def _action_kill_process(handler, conn, alert_id, event):
pid = event["pid"]
if not _is_safe_to_kill_pid(pid):
handler._record(conn, alert_id, "kill_process", str(pid), False,
f"refused: protected pid ({pid})")
return
if handler.dry_run:
handler._record(conn, alert_id, "kill_process", str(pid), True, "dry_run")
return
try:
os.kill(pid, signal_mod.SIGTERM)
handler._record(conn, alert_id, "kill_process", str(pid), True, "SIGTERM")
except ProcessLookupError:
handler._record(conn, alert_id, "kill_process", str(pid), False, "no such process")
except PermissionError as e:
handler._record(conn, alert_id, "kill_process", str(pid), False, str(e))
レース条件
プロセス起動の検知から os.kill 発射までの間に対象が既に終了している場合は
ProcessLookupError を捕捉し、refused ではなく
success=0, detail=no such process として記録する。
action: quarantine_file
対象ファイルを chmod 000 したうえで /var/quarantine/ursus/ 配下に
移動する。基本的に file イベント用。
対象パス
イベント行の file_path 非正規化カラムを使う。/etc/passwd 等の重要パスを
誤って隔離するとシステム破壊につながるため、安全装置で防止する。
安全装置
os.path.realpath() でシンボリックリンクを解決した正規パスが、以下の
プレフィックスのいずれかに合致した場合は拒否する (refused: protected
system path)。
_PROTECTED_PATH_PREFIXES = (
"/etc/passwd", "/etc/shadow", "/etc/group", "/etc/gshadow",
"/etc/sudoers", "/etc/hosts", "/etc/resolv.conf", "/etc/fstab",
"/bin/", "/sbin/", "/usr/bin/", "/usr/sbin/",
"/lib/", "/lib64/", "/usr/lib/", "/usr/lib64/",
"/boot/", "/proc/", "/sys/", "/dev/", "/run/",
)
末尾スラッシュ付きはディレクトリ配下の前方一致、無印は完全一致または
prefix + "/" 配下のいずれか。
実装
QUARANTINE_DIR = Path("/var/quarantine/ursus")
def _action_quarantine_file(handler, conn, alert_id, event):
path = event["file_path"]
if not path:
handler._record(conn, alert_id, "quarantine_file", None, False, "no file_path")
return
if not _is_safe_to_quarantine(path):
handler._record(conn, alert_id, "quarantine_file", path, False,
"refused: protected system path")
return
if handler.dry_run:
handler._record(conn, alert_id, "quarantine_file", path, True, "dry_run")
return
try:
QUARANTINE_DIR.mkdir(parents=True, exist_ok=True)
os.chmod(path, 0o000)
dest = QUARANTINE_DIR / Path(path).name
shutil.move(path, dest)
handler._record(conn, alert_id, "quarantine_file", path, True,
f"moved to {dest}")
except Exception as e:
handler._record(conn, alert_id, "quarantine_file", path, False, str(e))
同名ファイルの衝突
/var/quarantine/ursus/<basename> に既存ファイルがある場合、
shutil.move は上書きする。タイムスタンプ等を含めた一意化は実装していない。
同じ basename の隔離が複数回起きると 2 つ目が 1 つ目を上書きする。
action: block_network
ルール側で response: [block_network] を指定しても iptables は呼ばれず、
response_log に refused: feature permanently disabled in this build
が記録されるのみ。
無効化の理由はループバック断による操作不能化 (UI 自身、systemd-resolved、 ローカル DB アクセスを巻き込む可能性) を避けるため。再有効化したい場合は git 履歴から旧実装を戻すこと。旧実装に含まれていた最低限のガード:
- ループバック (127.0.0.0/8, ::1) を拒否
- リンクローカル (169.254.0.0/16, fe80::/10) を拒否
- 未指定 (0.0.0.0, ::) を拒否
- マルチキャストを拒否
旧実装は無効化された IP 以外を iptables -A OUTPUT -d <ip> -j DROP で
ブロックしていた。
response_log の構造
全アクションの実行結果は response_log 表に書き込まれる。UI のアラート詳細
画面で表示される。
CREATE TABLE response_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp REAL NOT NULL,
alert_id INTEGER NOT NULL REFERENCES alerts(id),
action TEXT NOT NULL, -- alert / kill_process / quarantine_file / block_network
target TEXT, -- PID / file path / IP 等
dry_run INTEGER NOT NULL, -- 0 / 1
success INTEGER NOT NULL, -- 0 / 1
detail TEXT -- "SIGTERM" / "moved to /var/quarantine/ursus/foo" / "refused: ..." 等
);
detail に入る代表的な文字列
| action | success | dry_run | detail 例 |
|---|---|---|---|
| alert | 1 | — | event_id=12345 |
| kill_process | 1 | 0 | SIGTERM |
| kill_process | 1 | 1 | dry_run |
| kill_process | 0 | — | refused: protected pid (1) |
| kill_process | 0 | 0 | no such process |
| quarantine_file | 1 | 0 | moved to /var/quarantine/ursus/foo.sh |
| quarantine_file | 0 | — | refused: protected system path |
| block_network | 0 | — | refused: feature permanently disabled in this build |
| (any) | 0 | — | not in allowed_actions |
| (any) | 0 | — | error: <exception message> |
アクションを追加する
新しいアクションは responses.py に関数を追加し、ACTIONS 辞書に
登録する。
def _action_my_custom(handler, conn, alert_id, event):
target = event["file_path"]
# 安全装置を必要なら自前で実装
if handler.dry_run:
handler._record(conn, alert_id, "my_custom", target, True, "dry_run")
return
# 実体の処理
...
handler._record(conn, alert_id, "my_custom", target, True, "done")
ACTIONS = {
"alert": _action_alert,
"kill_process": _action_kill_process,
"quarantine_file": _action_quarantine_file,
"block_network": _action_block_network,
"my_custom": _action_my_custom, # ← 追加
}
追加後は config.yml の allowed_actions に名前を加え、ルール側の
response でも参照できるようになる。
関連ドキュメント
- 検知ルールの定義 — response の宣言場所
- 設定の変更: response 設定 — dry_run / allowed_actions の運用