docs

レスポンス動作

アラート発火時に実行されるアクション群と、その安全装置の仕様

概要

ルールがマッチすると alerts 表への INSERT に続いて ResponseHandler.dispatch() が呼ばれ、ルールの response に 列挙されたアクションを順に実行する。各アクションには 許可ゲートallowed_actions)と 個別の安全装置(PID / パス / アドレス検査)が あり、両方を通過した場合のみ実体が動く。

   alerts.INSERT
        │
        ▼
   ResponseHandler.dispatch(rule, alert_id, event)
        │
        │ for each action in rule.response:
        ▼
   ┌───────────────────────────────────────────────┐
   │ ① allowed_actions に含まれているか           │── No ──▶ refused: not in allowed_actions
   └─────────────┬─────────────────────────────────┘
                 │ Yes
                 ▼
   ┌───────────────────────────────────────────────┐
   │ ② アクション固有の安全装置                    │── No ──▶ refused: protected ...
   │  (PID / パス / アドレス検査)                  │
   └─────────────┬─────────────────────────────────┘
                 │ Yes
                 ▼
   ┌───────────────────────────────────────────────┐
   │ ③ dry_run か?                                  │── Yes ──▶ response_log に dry_run=1
   └─────────────┬─────────────────────────────────┘
                 │ No
                 ▼
   ④ 実体の処理 (kill / move / ...)
        │
        ▼
   response_log.INSERT

アクション一覧

アクション状態必要権限対象 event_type
alert 有効 不要 すべて
kill_process 有効 kill 対象を撃てる権限 (通常 root) process / 他で pid を持つもの
quarantine_file 有効 対象ファイルへの書込権限 + /var/quarantine/ursus/ 作成権限 (通常 root) file
block_network 無効化

dispatch のフロー

コード本体: src/ursus/detector/responses.py

def dispatch(self, conn, rule, alert_id, event):
    for action in rule.response:
        if action not in self.allowed_actions:
            self._record(conn, alert_id, action, None, success=False,
                         detail="not in allowed_actions")
            continue
        handler = ACTIONS.get(action)
        if handler is None:
            self._record(conn, alert_id, action, None, success=False,
                         detail="unknown action")
            continue
        try:
            handler(self, conn, alert_id, event)
        except Exception as e:
            self._record(conn, alert_id, action, None, success=False,
                         detail=f"error: {e}")

各アクションが投げた例外は dispatch ループで捕捉され、response_log に 失敗として記録される。1 つのアクションが失敗しても残りのアクションは実行される。

allowed_actions ゲート

config.ymldetector.response.allowed_actions に列挙されたアクション だけが実行される。リストにないアクションがルールの response に書かれていても、 not in allowed_actions として response_log に拒否記録が残る。

detector:
  response:
    dry_run: false
    allowed_actions:
      - alert
      - kill_process
      - quarantine_file

これにより「ルール自体は強い response を宣言しているが、運用ポリシー側で alert だけ許可する」という運用が可能になる。

dry_run

detector.response.dry_run: true のとき、各アクションは安全装置を通過した あとも実体処理を行わず、response_logdry_run=1, success=1 として 記録するだけで終える。「実行されていれば何が起きていたか」を UI から確認できる。

URSUS の既定値は dry_run: truefalse に変更すると Detector の起動時に 警告ログが出る。

{"event": "destructive_actions_enabled",
 "actions": ["kill_process", "quarantine_file"]}

action: alert

通知用の "no-op" アクション。response_logaction=alert, success=1, detail=event_id=<n> を記録するだけで、外部に何の作用も及ぼさない。 ほぼ全てのルールで含めることを想定している。

def _action_alert(handler, conn, alert_id, event):
    handler._record(conn, alert_id, "alert", target=None, success=True,
                    detail=f"event_id={event['id']}")

action: kill_process

os.kill(pid, SIGTERM) によるプロセス終了。実装は単純な SIGTERM のみで、 SIGKILL へのフォールバックは行わない。

対象 PID

イベント行の pid 非正規化カラムを使う。process / network / auth イベントの いずれでも pid がセットされていれば対象になりうる。

安全装置

以下のいずれかに該当する PID は 拒否される。実体は呼ばれず、 response_logrefused: protected pid (<pid>) が記録される。

  • pid <= 1 (init / kernel 由来)
  • pid == os.getpid() (Detector 自身)
  • pid == os.getppid() (Detector の親プロセス。systemd 等の保険)

実装

def _action_kill_process(handler, conn, alert_id, event):
    pid = event["pid"]
    if not _is_safe_to_kill_pid(pid):
        handler._record(conn, alert_id, "kill_process", str(pid), False,
                        f"refused: protected pid ({pid})")
        return
    if handler.dry_run:
        handler._record(conn, alert_id, "kill_process", str(pid), True, "dry_run")
        return
    try:
        os.kill(pid, signal_mod.SIGTERM)
        handler._record(conn, alert_id, "kill_process", str(pid), True, "SIGTERM")
    except ProcessLookupError:
        handler._record(conn, alert_id, "kill_process", str(pid), False, "no such process")
    except PermissionError as e:
        handler._record(conn, alert_id, "kill_process", str(pid), False, str(e))

レース条件

プロセス起動の検知から os.kill 発射までの間に対象が既に終了している場合は ProcessLookupError を捕捉し、refused ではなく success=0, detail=no such process として記録する。

action: quarantine_file

対象ファイルを chmod 000 したうえで /var/quarantine/ursus/ 配下に 移動する。基本的に file イベント用。

対象パス

イベント行の file_path 非正規化カラムを使う。/etc/passwd 等の重要パスを 誤って隔離するとシステム破壊につながるため、安全装置で防止する。

安全装置

os.path.realpath() でシンボリックリンクを解決した正規パスが、以下の プレフィックスのいずれかに合致した場合は拒否する (refused: protected system path)。

_PROTECTED_PATH_PREFIXES = (
    "/etc/passwd", "/etc/shadow", "/etc/group", "/etc/gshadow",
    "/etc/sudoers", "/etc/hosts", "/etc/resolv.conf", "/etc/fstab",
    "/bin/", "/sbin/", "/usr/bin/", "/usr/sbin/",
    "/lib/", "/lib64/", "/usr/lib/", "/usr/lib64/",
    "/boot/", "/proc/", "/sys/", "/dev/", "/run/",
)

末尾スラッシュ付きはディレクトリ配下の前方一致、無印は完全一致または prefix + "/" 配下のいずれか。

実装

QUARANTINE_DIR = Path("/var/quarantine/ursus")

def _action_quarantine_file(handler, conn, alert_id, event):
    path = event["file_path"]
    if not path:
        handler._record(conn, alert_id, "quarantine_file", None, False, "no file_path")
        return
    if not _is_safe_to_quarantine(path):
        handler._record(conn, alert_id, "quarantine_file", path, False,
                        "refused: protected system path")
        return
    if handler.dry_run:
        handler._record(conn, alert_id, "quarantine_file", path, True, "dry_run")
        return
    try:
        QUARANTINE_DIR.mkdir(parents=True, exist_ok=True)
        os.chmod(path, 0o000)
        dest = QUARANTINE_DIR / Path(path).name
        shutil.move(path, dest)
        handler._record(conn, alert_id, "quarantine_file", path, True,
                        f"moved to {dest}")
    except Exception as e:
        handler._record(conn, alert_id, "quarantine_file", path, False, str(e))

同名ファイルの衝突

/var/quarantine/ursus/<basename> に既存ファイルがある場合、 shutil.move は上書きする。タイムスタンプ等を含めた一意化は実装していない。 同じ basename の隔離が複数回起きると 2 つ目が 1 つ目を上書きする。

action: block_network

本ビルドでは恒久的に無効化されている

ルール側で response: [block_network] を指定しても iptables は呼ばれず、 response_logrefused: feature permanently disabled in this build が記録されるのみ。

無効化の理由はループバック断による操作不能化 (UI 自身、systemd-resolved、 ローカル DB アクセスを巻き込む可能性) を避けるため。再有効化したい場合は git 履歴から旧実装を戻すこと。旧実装に含まれていた最低限のガード:

  • ループバック (127.0.0.0/8, ::1) を拒否
  • リンクローカル (169.254.0.0/16, fe80::/10) を拒否
  • 未指定 (0.0.0.0, ::) を拒否
  • マルチキャストを拒否

旧実装は無効化された IP 以外を iptables -A OUTPUT -d <ip> -j DROP で ブロックしていた。

response_log の構造

全アクションの実行結果は response_log 表に書き込まれる。UI のアラート詳細 画面で表示される。

CREATE TABLE response_log (
  id           INTEGER PRIMARY KEY AUTOINCREMENT,
  timestamp    REAL NOT NULL,
  alert_id     INTEGER NOT NULL REFERENCES alerts(id),
  action       TEXT NOT NULL,       -- alert / kill_process / quarantine_file / block_network
  target       TEXT,                -- PID / file path / IP 等
  dry_run      INTEGER NOT NULL,    -- 0 / 1
  success      INTEGER NOT NULL,    -- 0 / 1
  detail       TEXT                 -- "SIGTERM" / "moved to /var/quarantine/ursus/foo" / "refused: ..." 等
);

detail に入る代表的な文字列

actionsuccessdry_rundetail 例
alert1event_id=12345
kill_process10SIGTERM
kill_process11dry_run
kill_process0refused: protected pid (1)
kill_process00no such process
quarantine_file10moved to /var/quarantine/ursus/foo.sh
quarantine_file0refused: protected system path
block_network0refused: feature permanently disabled in this build
(any)0not in allowed_actions
(any)0error: <exception message>

アクションを追加する

新しいアクションは responses.py に関数を追加し、ACTIONS 辞書に 登録する。

def _action_my_custom(handler, conn, alert_id, event):
    target = event["file_path"]
    # 安全装置を必要なら自前で実装
    if handler.dry_run:
        handler._record(conn, alert_id, "my_custom", target, True, "dry_run")
        return
    # 実体の処理
    ...
    handler._record(conn, alert_id, "my_custom", target, True, "done")


ACTIONS = {
    "alert": _action_alert,
    "kill_process": _action_kill_process,
    "quarantine_file": _action_quarantine_file,
    "block_network": _action_block_network,
    "my_custom": _action_my_custom,            # ← 追加
}

追加後は config.ymlallowed_actions に名前を加え、ルール側の response でも参照できるようになる。

関連ドキュメント