Files
main/punisher.py
2026-04-14 16:10:36 -05:00

511 lines
20 KiB
Python

import discord
from discord import app_commands
from discord.ext import commands
import sqlite3
import os
import logging
import datetime
from logging.handlers import RotatingFileHandler
from dotenv import load_dotenv
load_dotenv()
TOKEN = os.getenv("DISCORD_TOKEN")
DB_PATH = os.getenv("DB_PATH", "data/blacklist.db")
AUTH_USERS_FILE = os.getenv("AUTH_USERS_FILE", "data/authorized_users.txt")
LOG_GUILD_ID = 1491646567097307136
LOG_CHANNEL_ID = 1491648117483770037
APPEALS_GUILD_ID = 1491644365020467210
# ─── Logging ──────────────────────────────────────────────────────────────────
os.makedirs("logs", exist_ok=True)
os.makedirs(os.path.dirname(DB_PATH), exist_ok=True)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
handlers=[
RotatingFileHandler("logs/bot.log", maxBytes=5_000_000, backupCount=3),
logging.StreamHandler(),
],
)
log = logging.getLogger("eagler")
# ─── Intents ──────────────────────────────────────────────────────────────────
intents = discord.Intents.default()
intents.members = True
bot = commands.Bot(command_prefix="!", intents=intents)
tree = bot.tree
# ─── Authorized users ─────────────────────────────────────────────────────────
def load_authorized_users() -> set[str]:
"""Read authorized_users.txt and return a set of user ID strings."""
if not os.path.exists(AUTH_USERS_FILE):
return set()
with open(AUTH_USERS_FILE) as f:
ids = set()
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
# Strip inline comments (e.g. "123456 # username")
user_id = line.split("#")[0].strip()
if user_id:
ids.add(user_id)
return ids
def is_authorized(user_id: int) -> bool:
return str(user_id) in load_authorized_users()
# ─── Database ─────────────────────────────────────────────────────────────────
def get_conn() -> sqlite3.Connection:
return sqlite3.connect(DB_PATH)
def init_db():
with get_conn() as conn:
conn.execute("""
CREATE TABLE IF NOT EXISTS blacklist (
user_id TEXT PRIMARY KEY,
reason TEXT,
banned_by TEXT,
banned_at TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS pardons (
user_id TEXT,
guild_id TEXT,
PRIMARY KEY (user_id, guild_id)
)
""")
conn.commit()
def bl_add(user_id: str, reason: str, banned_by: str):
with get_conn() as conn:
conn.execute(
"INSERT OR REPLACE INTO blacklist VALUES (?,?,?,?)",
(user_id, reason, banned_by, datetime.datetime.utcnow().isoformat())
)
conn.commit()
def bl_remove(user_id: str) -> bool:
with get_conn() as conn:
cur = conn.execute("DELETE FROM blacklist WHERE user_id=?", (user_id,))
conn.commit()
return cur.rowcount > 0
def bl_get(user_id: str):
with get_conn() as conn:
return conn.execute(
"SELECT reason, banned_by, banned_at FROM blacklist WHERE user_id=?",
(user_id,)
).fetchone()
def bl_all():
with get_conn() as conn:
return conn.execute(
"SELECT user_id, reason, banned_by, banned_at FROM blacklist ORDER BY banned_at DESC"
).fetchall()
def pardon_add(user_id: str, guild_id: str):
with get_conn() as conn:
conn.execute("INSERT OR IGNORE INTO pardons VALUES (?,?)", (user_id, guild_id))
conn.commit()
def pardon_exists(user_id: str, guild_id: str) -> bool:
with get_conn() as conn:
return conn.execute(
"SELECT 1 FROM pardons WHERE user_id=? AND guild_id=?", (user_id, guild_id)
).fetchone() is not None
# ─── Permission checks ────────────────────────────────────────────────────────
def requires_admin():
"""All commands: must have Administrator in the server."""
async def predicate(interaction: discord.Interaction) -> bool:
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
":no_entry: You need the **Administrator** permission to use this command.",
ephemeral=True
)
return False
return True
return app_commands.check(predicate)
def requires_authorized():
"""ban/unban: must be Administrator AND in authorized_users.txt."""
async def predicate(interaction: discord.Interaction) -> bool:
if not interaction.user.guild_permissions.administrator:
await interaction.response.send_message(
":no_entry: You need the **Administrator** permission to use this command.",
ephemeral=True
)
return False
if not is_authorized(interaction.user.id):
await interaction.response.send_message(
":no_entry: You are not in the authorized users list for this command.",
ephemeral=True
)
return False
return True
return app_commands.check(predicate)
def parse_user_id(raw: str) -> int | None:
try:
return int(raw.strip())
except ValueError:
return None
async def send_log(actioned_by: discord.User | discord.Member, target_user, target_id: str, reason: str, server_count: int | None, action: str):
guild = bot.get_guild(LOG_GUILD_ID)
if not guild:
log.warning("Ban log guild not found (%s)", LOG_GUILD_ID)
return
channel = guild.get_channel(LOG_CHANNEL_ID)
if not channel:
log.warning("Ban log channel not found (%s)", LOG_CHANNEL_ID)
return
target_display = f"{target_user.mention}\n`{target_id}`" if target_user else f"<@{target_id}>\n`{target_id}`"
is_ban = action == "ban"
embed = discord.Embed(
title=f"Eagler Enforcement — {'Ban' if is_ban else 'Unban'} Log",
color=discord.Color.red() if is_ban else discord.Color.green(),
timestamp=datetime.datetime.utcnow()
)
embed.add_field(name="Actioned By", value=f"{actioned_by.mention}\n`{actioned_by.id}`", inline=True)
embed.add_field(name=f"{'Banned' if is_ban else 'Unbanned'} User", value=target_display, inline=True)
embed.add_field(name="Reason", value=reason, inline=False)
if server_count is not None:
embed.add_field(name="Servers Banned From", value=str(server_count), inline=True)
embed.add_field(name="Date", value=f"<t:{int(datetime.datetime.utcnow().timestamp())}:F>", inline=True)
embed.set_footer(text="Eagler Enforcement")
try:
await channel.send(embed=embed)
except discord.Forbidden:
log.warning("Missing permission to send to ban log channel")
# ─── Events ───────────────────────────────────────────────────────────────────
@bot.event
async def on_ready():
init_db()
# Create the authorized_users.txt if it doesn't exist yet
if not os.path.exists(AUTH_USERS_FILE):
with open(AUTH_USERS_FILE, "w") as f:
f.write("# Eagler Enforcement — Authorized Users\n")
f.write("# Add one Discord user ID per line.\n")
f.write("# These users are allowed to use /ban and /unban.\n")
f.write("# Lines starting with # are ignored.\n")
synced = await tree.sync()
log.info("Logged in as %s | Synced %d command(s)", bot.user, len(synced))
@bot.event
async def on_guild_join(guild: discord.Guild):
owner = guild.owner or await bot.fetch_user(guild.owner_id)
if not owner:
return
try:
embed = discord.Embed(
title="Thanks for using Eagler Enforcement!",
color=discord.Color.dark_red()
)
embed.add_field(
name="**(VERY RECOMMENDED)** Join the Server Owner Guild",
value="https://discord.gg/BEHrUkdHzs",
inline=False
)
embed.add_field(
name="\u200b",
value="Have fun keeping your server safe :)",
inline=False
)
embed.set_footer(text="Eagler Enforcement")
await owner.send(embed=embed)
except discord.Forbidden:
pass
@bot.event
async def on_member_join(member: discord.Member):
if member.guild.id == APPEALS_GUILD_ID:
return
row = bl_get(str(member.id))
if not row:
return
if pardon_exists(str(member.id), str(member.guild.id)):
return
reason, banned_by, _ = row
try:
dm_embed = discord.Embed(
title="You have been blacklisted from Eagler Enforcement",
color=discord.Color.red(),
timestamp=datetime.datetime.utcnow()
)
dm_embed.add_field(name="Reason", value=reason, inline=False)
dm_embed.add_field(name="Banned By", value=f"<@{banned_by}> (`{banned_by}`)", inline=True)
dm_embed.add_field(name="Appeal", value="[Click here to join the appeals server](https://discord.gg/PMJ99n7yye)", inline=False)
dm_embed.set_footer(text="Eagler Enforcement")
await member.send(embed=dm_embed)
except discord.Forbidden:
pass
try:
await member.guild.ban(
member,
reason=f"Eagler Enforcement Blacklist: {reason}",
delete_message_days=0
)
except discord.Forbidden:
pass
# ─── /ban ─────────────────────────────────────────────────────────────────────
@tree.command(name="ban", description="Blacklist a player from Eagler Enforcement")
@app_commands.describe(userid="The user ID to blacklist", reason="Reason for the blacklist")
@requires_authorized()
async def ban_cmd(interaction: discord.Interaction, userid: str, reason: str):
await interaction.response.defer(ephemeral=True)
uid = parse_user_id(userid)
if uid is None:
await interaction.followup.send("Invalid user ID — must be a numeric snowflake.", ephemeral=True)
return
if bl_get(userid):
await interaction.followup.send(f"User `{userid}` is already blacklisted.", ephemeral=True)
return
bl_add(userid, reason, str(interaction.user.id))
# Fetch target user object early (needed for DM and log)
try:
target = bot.get_user(uid) or await bot.fetch_user(uid)
except discord.NotFound:
target = None
# Ban from every guild the bot is in (except the appeals server)
banned_from, failed = [], []
for guild in bot.guilds:
if guild.id == APPEALS_GUILD_ID:
continue
if pardon_exists(userid, str(guild.id)):
continue
try:
await guild.ban(
discord.Object(id=uid),
reason=f"Eagler Enforcement Blacklist: {reason}",
delete_message_days=0
)
banned_from.append(guild.name)
except discord.Forbidden:
failed.append(guild.name)
except Exception:
failed.append(guild.name)
# DM the user now that we know the server count
if target:
try:
dm_embed = discord.Embed(
title="You have been blacklisted from Eagler Enforcement",
color=discord.Color.red(),
timestamp=datetime.datetime.utcnow()
)
dm_embed.add_field(name="Reason", value=reason, inline=False)
dm_embed.add_field(name="Banned By", value=f"{interaction.user} (`{interaction.user.id}`)", inline=True)
dm_embed.add_field(name="Servers Banned From", value=str(len(banned_from)), inline=True)
dm_embed.add_field(name="Appeal", value="[Click here to join the appeals server](https://discord.gg/PMJ99n7yye)", inline=False)
dm_embed.set_footer(text="Eagler Enforcement")
await target.send(embed=dm_embed)
except discord.Forbidden:
pass
await send_log(interaction.user, target, userid, reason, len(banned_from), "ban")
username = str(target) if target else "Unknown User"
embed = discord.Embed(
title="User Blacklisted",
color=discord.Color.red(),
timestamp=datetime.datetime.utcnow()
)
embed.add_field(name="User", value=f"{username}\n`{userid}`", inline=True)
embed.add_field(name="Reason", value=reason, inline=True)
embed.add_field(name="Servers Banned", value=str(len(banned_from)), inline=True)
if failed:
embed.add_field(
name="Failed — Missing Permission",
value="\n".join(failed) or "None",
inline=False
)
embed.set_footer(text=f"Actioned by {interaction.user}")
await interaction.followup.send(embed=embed, ephemeral=True)
# ─── /unban ───────────────────────────────────────────────────────────────────
@tree.command(name="unban", description="Remove the blacklist from a player within Eagler Enforcement")
@app_commands.describe(userid="The user ID to unblacklist", reason="Reason for the removal")
@requires_authorized()
async def unban_cmd(interaction: discord.Interaction, userid: str, reason: str):
await interaction.response.defer(ephemeral=True)
uid = parse_user_id(userid)
if uid is None:
await interaction.followup.send("Invalid user ID — must be a numeric snowflake.", ephemeral=True)
return
if not bl_remove(userid):
await interaction.followup.send(f"User `{userid}` is not blacklisted.", ephemeral=True)
return
# Lift the ban from every guild the bot is in
unbanned_from, failed = [], []
for guild in bot.guilds:
if guild.id == APPEALS_GUILD_ID:
continue
try:
await guild.unban(
discord.Object(id=uid),
reason=f"Eagler Enforcement unban: {reason}"
)
unbanned_from.append(guild.name)
except discord.NotFound:
pass # wasn't banned there, skip
except discord.Forbidden:
failed.append(guild.name)
try:
target = bot.get_user(uid) or await bot.fetch_user(uid)
username = str(target) if target else "Unknown User"
except discord.NotFound:
target = None
username = "Unknown User"
await send_log(interaction.user, target, userid, reason, None, "unban")
embed = discord.Embed(
title="User Unblacklisted",
color=discord.Color.green(),
timestamp=datetime.datetime.utcnow()
)
embed.add_field(name="User", value=f"{username}\n`{userid}`", inline=True)
embed.add_field(name="Reason", value=reason, inline=True)
embed.add_field(name="Servers Unbanned From", value=str(len(unbanned_from)), inline=True)
if failed:
embed.add_field(name="Failed — Missing Permission", value="\n".join(failed), inline=False)
embed.set_footer(text=f"Actioned by {interaction.user}")
await interaction.followup.send(embed=embed, ephemeral=True)
# ─── /pardon ──────────────────────────────────────────────────────────────────
@tree.command(name="pardon", description="Pardon a player's ban from this specific server, not all servers")
@app_commands.describe(userid="The user ID to pardon in this server")
@requires_admin()
async def pardon_cmd(interaction: discord.Interaction, userid: str):
await interaction.response.defer(ephemeral=True)
uid = parse_user_id(userid)
if uid is None:
await interaction.followup.send("Invalid user ID — must be a numeric snowflake.", ephemeral=True)
return
try:
await interaction.guild.unban(
discord.Object(id=uid),
reason=f"Eagler Enforcement pardon issued by {interaction.user}"
)
except discord.NotFound:
pass
except discord.Forbidden:
await interaction.followup.send(
"I don't have permission to unban members in this server.", ephemeral=True
)
return
pardon_add(userid, str(interaction.guild.id))
still_global = bl_get(userid) is not None
note = (
"This user is still on the **global blacklist** — they will not be re-banned when joining "
"**this server**, but remain banned everywhere else."
if still_global
else "Note: this user is not currently on the global blacklist."
)
try:
target = bot.get_user(uid) or await bot.fetch_user(uid)
username = str(target) if target else "Unknown User"
except discord.NotFound:
username = "Unknown User"
embed = discord.Embed(
title="User Pardoned (This Server Only)",
color=discord.Color.blue(),
timestamp=datetime.datetime.utcnow()
)
embed.add_field(name="User", value=f"{username}\n`{userid}`", inline=True)
embed.add_field(name="Server", value=interaction.guild.name, inline=True)
embed.add_field(name="Note", value=note, inline=False)
embed.set_footer(text=f"Actioned by {interaction.user}")
await interaction.followup.send(embed=embed, ephemeral=True)
# ─── /list ────────────────────────────────────────────────────────────────────
@tree.command(name="list", description="Shows a list of all blacklisted players")
@requires_admin()
async def list_cmd(interaction: discord.Interaction):
await interaction.response.defer(ephemeral=True)
rows = bl_all()
if not rows:
await interaction.followup.send("No users are currently blacklisted.", ephemeral=True)
return
embeds: list[discord.Embed] = []
current = discord.Embed(
title="Eagler Enforcement — Blacklist",
color=discord.Color.dark_red(),
timestamp=datetime.datetime.utcnow()
)
current.set_footer(text=f"{len(rows)} blacklisted user(s)")
field_count = 0
for user_id, reason, banned_by, banned_at in rows:
try:
user = bot.get_user(int(user_id)) or await bot.fetch_user(int(user_id))
username = str(user)
except Exception:
username = "Unknown User"
date_str = banned_at[:10] if banned_at else "Unknown"
value = f"`{user_id}`\nReason: {reason}\nDate: {date_str}"
if field_count >= 24:
embeds.append(current)
current = discord.Embed(
title="Eagler Enforcement — Blacklist (cont.)",
color=discord.Color.dark_red()
)
field_count = 0
current.add_field(name=username, value=value, inline=True)
field_count += 1
embeds.append(current)
await interaction.followup.send(embed=embeds[0], ephemeral=True)
for embed in embeds[1:]:
await interaction.followup.send(embed=embed, ephemeral=True)
# ─── Run ──────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
if not TOKEN:
raise RuntimeError("DISCORD_TOKEN is not set. Copy .env.example to .env and fill it in.")
bot.run(TOKEN)