Post

네트워크 AI 에이전트 3

🛡️ 실전 배치: 잠들지 않는 AI 보안관의 탄생

지난 포스팅에서 에이전트에게 뇌(LLM)와 손(Tools)을 달아주었다면, 이번에는 이 에이전트가 실제 전장에 나가 어떻게 능동적으로 위협을 감지하고, 24시간 백그라운드에서 서버를 지키는지 그 실전 기록을 공유합니다.

단순히 질문에 답하는 비서를 넘어, 위협이 발생하면 먼저 말을 거는 ‘지능형 보안관’으로 진화시킨 과정입니다.


1. 감시의 ‘눈’을 뜨다: monitor.py (Sentinel)

에이전트가 24시간 로그를 읽고 있을 수는 없습니다. 그래서 별도의 경량 감시 데몬인 monitor.py를 구축했습니다. 이 시스템은 서버의 핵심 로그를 실시간으로 ‘구독’합니다.

  • 실시간 로그 추적: auth.log를 한 줄씩 읽으며 무단 접근 징후를 감시합니다.
  • 키 기반 보안 최적화: 패스워드 로그인을 비활성화한 환경에서도 Invalid user, Connection closed 등의 패턴을 분석하여 공격자의 의도를 읽어냅니다.
  • 임계치 기반 알림: 1분 내 동일 IP에서 5회 이상의 접속 실패가 발생하면 즉시 텔레그램으로 긴급 타전을 보냅니다.

2. 엔진 업그레이드: Llama 3.3 & 지능형 필터링

실전 테스트 중 발생한 ‘환각(Hallucination)’‘다국어 혼용’ 문제를 해결하기 위해 엔진을 Llama 3.3 70B로 업그레이드하고 프롬프트를 고도화했습니다.

  • 내부망 필터링: OCI 내부 통신(10.x, 169.254.x 대역)을 외부 위협으로 오해하지 않도록 ‘망 분리 인지 능력’을 주입했습니다.
  • 한국어 결벽증: 요약 과정에서 간혹 섞여 나오던 다국어 노이즈를 완벽히 차단하여 관리자가 읽기 편한 보고서 형식을 갖췄습니다.
  • 전문가적 논평: 단순 데이터 나열이 아닌, 현재 인프라 상태가 [정상/주의/위험] 중 어느 단계인지 아키텍트의 시선으로 판단하도록 했습니다.

3. 실전 시나리오: Brute-force 공격 방어

실제로 외부에서 비정상적인 접속 시도가 발생했을 때의 대응 흐름입니다.

  1. 감지: monitor.py가 특정 IP의 비정상 접속 폭증 감지.
  2. 보고: 텔레그램으로 “🚨 긴급 보안 알림” 발송.
  3. 분석: 관리자가 “해당 IP 분석해줘”라고 요청 시, 에이전트가 해당 시간대 로그를 정밀 분석하여 보고.
  4. 조치: 관리자의 “차단해” 한 마디에 에이전트가 block_ip 도구 가동.
  5. 승인 (Human-in-the-Loop): AI가 독단적으로 판단하지 않고, 관리자의 최종 [실행 승인] 버튼 클릭 시에만 방화벽(UFW) 룰 업데이트.

⚙️ 인프라의 안착: 24/7 데몬화 (Background Execution)

터미널을 닫아도 에이전트가 죽지 않도록 백그라운드로 실행하는 과정을 거쳤습니다. 이 과정에서 sudo 권한과 가상환경(venv) 간의 라이브러리 경로 충돌을 해결하는 것이 핵심이었습니다.

1
2
3
# 쉘 리다이렉션 권한 문제를 해결한 백그라운드 실행
sudo bash -c "nohup ./venv/bin/python agent.py > agent.log 2>&1 &"
sudo bash -c "nohup ./venv/bin/python monitor.py > monitor.log 2>&1 &"

이제 관리자가 잠든 사이에도 모니터(감시병)는 위협을 추적하고, 에이전트(부사수)는 매일 아침 9시 정기 리포트를 작성하여 배달합니다.



📂 부록: 시스템 안정화 버전 (V3.1) 백업 코드

📦 agent.py (Main Brain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import os
import json
import telebot
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
from openai import OpenAI
from dotenv import load_dotenv
from apscheduler.schedulers.background import BackgroundScheduler
import tools

# 환경 설정
load_dotenv()
TELEGRAM_TOKEN = os.environ['TELEGRAM_TOKEN']
ALLOWED_CHAT_ID = int(os.environ['CHAT_ID'])
GROQ_API_KEY = os.environ['GROQ_API_KEY']

bot = telebot.TeleBot(TELEGRAM_TOKEN)
client = OpenAI(api_key=GROQ_API_KEY, base_url="https://api.groq.com/openai/v1")

# 에이전트 시스템 프롬프트
AGENT_SYSTEM_PROMPT = """
당신은 최고 수준의 글로벌 네트워크 보안 에이전트입니다.
1. 반드시 한국어로만 응답하세요. 외국어가 섞이면 안 됩니다.
2. 데이터가 없을 경우 "최근 감지된 내역이 없어 안전함"이라고 보고하세요.
3. 10.x, 169.254.x 등의 내부망 IP는 외부 위협에서 배제하세요.
"""

chat_history = []
def get_context_messages(new_text):
    global chat_history
    chat_history.append({"role": "user", "content": new_text})
    if len(chat_history) > 6: chat_history = chat_history[-6:]
    return [{"role": "system", "content": AGENT_SYSTEM_PROMPT}] + chat_history

@bot.message_handler(func=lambda message: True)
def handle_natural_language(message):
    if message.chat.id != ALLOWED_CHAT_ID: return
    bot.send_chat_action(message.chat.id, 'typing')

    try:
        res = client.chat.completions.create(
            model="llama-3.3-70b-versatile",
            response_format={"type": "json_object"},
            messages=get_context_messages(message.text)
        )
        dec = json.loads(res.choices[0].message.content)
        tool, is_dang, msg, args = dec.get("tool_name"), dec.get("is_dangerous"), dec.get("message_to_user"), dec.get("tool_args", {})

        if tool == "none" or not tool:
            bot.reply_to(message, msg)
            return

        if is_dang:
            ip = args.get('ip', '알수없음')
            mk = InlineKeyboardMarkup()
            mk.row(InlineKeyboardButton("✅ 승인", callback_data=f"action:block:{ip}"), InlineKeyboardButton("❌ 거부", callback_data="action:cancel:none"))
            bot.reply_to(message, f"🚨 [승인 대기] 대상: {ip}\n사유: {msg}", reply_markup=mk)
            return

        wait_msg = bot.reply_to(message, f"🛠️ {msg}")
        result_data = getattr(tools, tool)(**args) if hasattr(tools, tool) else "도구 실행 실패"

        sum_res = client.chat.completions.create(
            model="llama-3.3-70b-versatile",
            messages=[{"role": "user", "content": f"데이터 분석 및 전문가적 한국어 요약 (내부 IP 제외): {result_data}"}]
        )
        bot.edit_message_text(sum_res.choices[0].message.content, chat_id=message.chat.id, message_id=wait_msg.message_id)

    except Exception as e:
        bot.reply_to(message, f"❌ 에러 발생: {str(e)}")

if __name__ == "__main__":
    bot.infinity_polling()
🛠️ tools.py (Execution Tools)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import subprocess
import os

def run_command(command):
    try:
        proc = subprocess.run(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
        output = proc.stdout.strip()
        if proc.returncode != 0 and "grep" in command and not output:
            return "조회된 데이터가 없습니다."
        return output
    except Exception as e:
        return f"내부 에러: {str(e)}"

def get_network_status():
    status = run_command("top -b -n 1 | head -n 5")
    network = run_command("ip -s link")
    return f"[상태]\n{status}\n\n[네트워크]\n{network}"

def get_active_sessions():
    return run_command("ss -tunp | grep ESTAB")

def get_open_ports():
    return run_command("sudo ss -tulnp | grep LISTEN")

def get_logged_in_users():
    return run_command("who -u")

def analyze_security_logs():
    return run_command("sudo tail -n 100 /var/log/auth.log | grep -e 'Failed' -e 'Invalid' -e 'closed by'")

def block_ip(ip):
    return run_command(f"sudo ufw deny from {ip} && sudo ss -K dst {ip}")
👁️ monitor.py (Security Sentinel)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import os
import time
import subprocess
import telebot
from collections import defaultdict
from dotenv import load_dotenv

load_dotenv()
bot = telebot.TeleBot(os.environ['TELEGRAM_TOKEN'])
CHAT_ID = int(os.environ['CHAT_ID'])
failure_counts = defaultdict(list)

def watch_logs():
    print("[Monitor] 보안 감시 가동...")
    cmd = "sudo tail -f /var/log/auth.log"
    process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
    detect_keywords = ["Connection closed by authenticating", "Invalid user", "Failed password", "Disconnected from authenticating"]

    for line in process.stdout:
        if any(k in line for k in detect_keywords):
            # IP 추출 로직 (로그 포맷에 맞춤)
            parts = line.split()
            ip = parts[parts.index("from") + 1] if "from" in parts else None
            if not ip: continue

            now = time.time()
            failure_counts[ip].append(now)
            failure_counts[ip] = [t for t in failure_counts[ip] if now - t < 60]

            if len(failure_counts[ip]) >= 5:
                bot.send_message(CHAT_ID, f"🚨 **[Brute-force 감지]**\nIP: `{ip}`\n에이전트에게 차단을 명령하세요.")
                failure_counts[ip] = []

if __name__ == "__main__":
    watch_logs()

🚀 마치며

이번 프로젝트를 통해 LLM이 단순히 채팅창 안에 머무는 서비스가 아니라, 실제 인프라를 이해하고 운영을 보조하는 강력한 엔지니어링 도구가 될 수 있음을 확인했습니다. 서버 관리가 더 이상 고된 노동이 아닌, 똑똑한 AI 부사수와의 협업이 되는 그날까지 연구는 계속됩니다!