commit 8d3e7b635645eeb51fd3a1598f257b96677a9fce Author: a Date: Thu Jun 23 00:09:34 2022 -0500 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..87543fc --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.env +.cache/* +.logs/* +sounds/originals/* +sounds/normalized/* +cogs/__pycache__/* diff --git a/.style.yapf b/.style.yapf new file mode 100644 index 0000000..b732673 --- /dev/null +++ b/.style.yapf @@ -0,0 +1,13 @@ +[style] +USE_TABS: true +INDENT_WIDTH: 1 +BLANK_LINES_BETWEEN_TOP_LEVEL_IMPORTS_AND_VARIABLES: 0 +DEDENT_CLOSING_BRACKETS: true +COALESCE_BRACKETS: false +CONTINUATION_ALIGN_STYLE: fixed +CONTINUATION_INDENT_WIDTH: 1 +BLANK_LINES_AROUND_TOP_LEVEL_DEFINITION: 1 +INDENT_BLANK_LINES: true +SPACES_BEFORE_COMMENT: 1 +SPLIT_ARGUMENTS_WHEN_COMMA_TERMINATED: true +ALLOW_SPLIT_BEFORE_DICT_VALUE: false \ No newline at end of file diff --git a/app.py b/app.py new file mode 100644 index 0000000..4bd2521 --- /dev/null +++ b/app.py @@ -0,0 +1,129 @@ +import discord +import pymumble_py3 as pymumble + +""" Declare intents that the bot will use """ +intents = discord.Intents.default() +intents.emojis_and_stickers = True +intents.guilds = True +intents.integrations = True +intents.message_content = True +intents.messages = True +intents.members = True +intents.presences = True +intents.reactions = True +intents.voice_states = True + +""" Load the bot token """ +from dotenv import load_dotenv +from os import getenv +load_dotenv() +DISCORD_TOKEN = getenv("DISCORD_BOT_TOKEN") +DISCORD_GUILD = int(getenv("DISCORD_GUILD_ID")) + +""" Initialize the Discord bot """ +from discord.ext.commands import Bot, when_mentioned_or +bot = Bot( + command_prefix=when_mentioned_or('..', '>', '.'), + description="A list of commands available", + intents=intents, + # debug_guilds=[GUILD], + max_messages=100_000 +) + +@bot.event +async def on_ready(): + print(f"Logged in as {bot.user} (ID: {bot.user.id})") + print("------") + +""" Load cogs """ +from os import listdir +def list_all_cogs() -> list[str]: + return [file[:-3] for file in listdir("cogs") if file.endswith(".py")] + +for cog in list_all_cogs(): + bot.load_extension(f"cogs.{cog}") + +# TODO: web server for jukebox and/or soundboard? +# TODO: cog to announce when someone joins vc (maybe delete messages after some time?) +# TODO: purge messages from a user / etc? links, number, bot or human, idk +# TODO: automod? blocklist certain words or urls or whatever +# TODO: warn or kick or ban a user? +# TODO: filter the audit logs for a user? maybe? +# TODO: keep stats or levels? ehhhh + +# ================================= ADMIN ====================================== +from discord.commands import Option +from discord.ext.commands import Context +from discord import AutocompleteContext, ApplicationContext + +async def cog_autocomplete(ctx: AutocompleteContext): + return [cog for cog in list_all_cogs() if cog.lower().startswith( ctx.value.lower() )] + +ROLE_ADMIN = 518625964763119616 +ROLE_ADMIN_SFW = 727205354353721374 +ME = 201046736565829632 + +def allowed_to_reload(ctx: Context | ApplicationContext): + roles = [role.id for role in ctx.author.roles] + admin = ROLE_ADMIN in roles + sfw_admin = ROLE_ADMIN_SFW in roles + owner = ctx.author.id == ME + return any([admin, sfw_admin, owner]) + +from cogs.music import Music +def reload_music(ctx): + music: Music = bot.get_cog("Music") + + q = music.q + track = music.track + repeat_mode = music.repeat_mode + search_results = music.search_results + + bot.reload_extension(f"cogs.music") + + music = bot.get_cog("Music") + + music.q = q + music.track = track + music.repeat_mode = repeat_mode + music.search_results = search_results + +@bot.command(name='reload') +async def reload_prefix(ctx: Context, cog: str = None): + """Reload an extension (admin command)""" + if not allowed_to_reload(ctx): + return await ctx.send( + "You must be an admin or bot owner to use this command" + ) + if not cog: + return await ctx.send("Please specify a cog to reload") + elif cog.lower() == "music": + reload_music(ctx) + else: + bot.reload_extension(f"cogs.{cog}") + await ctx.send(f"Reloaded `{cog}` extension") + +@bot.slash_command( + name='reload', + guild_ids=[DISCORD_GUILD], +) +async def reload_slash( + ctx: ApplicationContext, + cog: Option(str, "The cog to be reloaded", autocomplete=cog_autocomplete) +): + """Reload an extension (admin command)""" + if not allowed_to_reload(ctx): + return await ctx.respond( + "You must be an admin or bot owner to use this command", + ephemeral=True + ) + if cog == "music": + reload_music(ctx) + else: + bot.reload_extension(f"cogs.{cog}") + await ctx.respond(f"Reloaded `{cog}` extension", ephemeral=True) + +# ================================== END ======================================= + +""" Run the bot """ +bot.run(DISCORD_TOKEN) \ No newline at end of file diff --git a/cogs/mumble.py.disabled b/cogs/mumble.py.disabled new file mode 100644 index 0000000..39f2492 --- /dev/null +++ b/cogs/mumble.py.disabled @@ -0,0 +1,25 @@ +import discord +from discord.ext.commands import Cog +from os import getenv, path, makedirs +import logging + +if not path.exists('.logs'): + makedirs('.logs') + +logger = logging.getLogger(__name__) +logger.setLevel(logging.WARNING) +fh = logging.FileHandler('.logs/mumble.log') +formatter = logging.Formatter('%(asctime)s | %(name)s | [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S') +fh.setFormatter(formatter) +if not len(logger.handlers): + logger.addHandler(fh) + +def setup(bot: discord.Bot): + bot.add_cog(Mumble(bot)) + +class Mumble(Cog): + """Bridge to Mumble""" + + def __init__(self, bot: discord.Bot): + self.bot: discord.Bot = bot + print("Initialized Music cog") \ No newline at end of file diff --git a/cogs/music.py b/cogs/music.py new file mode 100644 index 0000000..0b10162 --- /dev/null +++ b/cogs/music.py @@ -0,0 +1,826 @@ +import discord +from discord.ext.commands import Cog, command, Context +from discord.ext.pages import Paginator +import asyncio # used to run async functions within regular functions +import subprocess # for running ffprobe and getting duration of files +from os import getenv, path, makedirs +from time import time # performance tracking +import random # for shuffling the queue +import math # for ceiling function in queue pages +from functools import partial +import logging + +if not path.exists('.logs'): + makedirs('.logs') + +logger = logging.getLogger(__name__) +logger.setLevel(logging.WARNING) +fh = logging.FileHandler('.logs/music.log') +formatter = logging.Formatter('%(asctime)s | %(name)s | [%(levelname)s] %(message)s', '%Y-%m-%d %H:%M:%S') +fh.setFormatter(formatter) +if not len(logger.handlers): + logger.addHandler(fh) + +def setup(bot: discord.Bot): + bot.add_cog(Music(bot)) + +def format_time(d: int) -> str: + """Convert seconds to timestamp""" + h = d // 3600 + m = d % 3600 // 60 + s = d % 60 + if h: + return '{}:{:02}:{:02}'.format(h,m,s) + return '{}:{:02}'.format(m,s) + +def format_date(d: str) -> str: + """Convert YYYYMMDD to YYYY/MM/DD""" + return f"{d[:4]}/{d[4:6]}/{d[6:]}" + +class Track: + + def __init__( + self, + *, + source: str, + requester: discord.User, + title: str = None, + duration = None, + author: str = None, + author_icon: str = None, + data: dict = None, + ): + self.source = source + self.requester = requester + self.title = title + self.duration = duration + self.author = author + self.data = data + + def __repr__(self): + return f" bytes: + data = self.original.read() + if data: + self.packets_read += 1 + return data + + @property + def elapsed(self) -> float: + return self.packets_read * 0.02 # each packet is 20ms + + @property + def progress(self) -> str: + elapsed = format_time(int(self.elapsed)) + duration = format_time(int(self.duration)) if self.duration else "?:??" + return f"{elapsed} / {duration}" + + @property + def length(self) -> str: + return format_time(int(self.duration)) + +class Music(Cog): + """Play audio within a voice channel.""" + + REPEAT_NONE = 0 + REPEAT_ONE = 1 + REPEAT_ALL = 2 + + MAX_RESULTS = 5 + PAGE_SIZE = 10 + + def __init__(self, bot: discord.Bot): + self.bot: discord.Bot = bot + self.q: list[Track] = [] + self.track: Track | None = None + self.repeat_mode = Music.REPEAT_NONE + self.search_results: dict = None + self.i: int = -1 + print("Initialized Music cog") + + @command(aliases=['start', 'summon', 'connect']) + async def join(self, ctx: Context, *, channel: discord.VoiceChannel = None): + """Joins a voice channel""" + logger.info(f".join {channel}" if channel else ".join") + if not channel: # Upon a raw "join" command without a channel specified, + if not ctx.author.voice: + msg = await ctx.send( + "You must either be in a voice channel, " + "or specify a voice channel in order to use this command" + ) + if msg: + logger.info(f"Message sent: no channel specified, and {ctx.author} is not in a voice channel") + return + channel = ctx.author.voice.channel # bind to your current vc channel. + if ctx.voice_client: # If the bot is in a different channel, + await ctx.voice_client.move_to(channel) # move to your channel. + logger.info(f"existing voice client moved to {channel}") + return + voice_client = await channel.connect() # Finally, join the chosen channel. + if voice_client: + logger.info("voice client created") + + @command(aliases=['quit', 'dismiss', 'disconnect']) + async def leave(self, ctx: Context): + """Stop+disconnect from voice""" + logger.info(".leave") + if ctx.voice_client: + await ctx.voice_client.disconnect() + logger.info("voice client disconnected") + + def get_duration_from_file(self, filename: str): + cmd = subprocess.run( + [ + 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', + 'default=noprint_wrappers=1:nokey=1', filename + ], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT + ) + return float(cmd.stdout) + + async def get_tracks_from_query(self, ctx: Context, query: str): + logger.debug(f"get_tracks_from_query() called for query: {query}") + # Detect if the track should be downloaded + download = False + if query.endswith('!dl'): + download = True + query = query[:-3] + elif query.endswith('!download'): + download = True + query = query[:-9] + # Handle attachment playback + if query == "file": + logger.info(f"getting tracks from attachment") + return await self.get_tracks_from_attachments(ctx) + # Handle online playback + elif query.startswith('http'): + logger.info(f"getting tracks from url") + return await self.get_tracks_from_url(ctx, query, download=download) + # Handle local playback + elif tracks := await self.get_tracks_from_path(ctx, query): + logger.info(f"getting tracks from path to local file") + return tracks + # Do a youtube search if not found and no prior search + elif not self.search_results: + logger.info(f"performing a search result") + return await self.search_youtube(ctx, query=query) + # Handle prior search + try: + i = int(query) - 1 + except ValueError: + logger.info(f"performing a search result") + return await self.search_youtube(ctx, query=query) + if i not in range(self.MAX_RESULTS + 1): + return await ctx.send(f"Please provide an integer between 1 and {self.MAX_RESULTS}") + url = self.search_results['entries'][i]['url'] + self.search_results = [] + logger.info(f"handling a prior search") + return await self.get_tracks_from_url(ctx, url) + + async def get_tracks_from_url(self, ctx: Context, url: str, download: bool = False): + logger.debug(f"get_tracks_from_url() called for URL: {url}") + try: + data = ytdl.extract_info(url, download=download) + # logger.debug(f"{data=}") + # Detect tabs + if data['extractor'] == 'youtube:tab' and not "entries" in data: + logger.info("youtube:tab detected, no entries in data (so not a playlist)") + data = ytdl.extract_info(data['url'], download=download) # process the playlist url + logger.debug(f"{data=}") + except Exception as e: + logger.error("Exception thrown!") + logger.error(f"{e=}") + return e + # Detect playlists + entries = [data] # Assume that there is only one song. + if "entries" in data: # If we're wrong, just overwrite our singlet list. + entries = data["entries"] # yapf: disable + # Create Track objects + tracks = [] + for entry in entries: + url = entry["url"] + title = entry["title"] + duration = None + data = entry + if not "duration" in data and not "duration_string" in data: + logger.info("duration not found in entry's extracted data -- refetching") + logger.debug(f"{data=}") + start = time() + data = ytdl.extract_info(url, download=download) + logger.info(f"Refetching data took {time() - start} seconds") + if "duration" in data: + duration = data["duration"] + elif "duration_string" in data: + d = [int(x) for x in data["duration_string"].split(':')] + if len(d) == 2: + m,s = d + h = 0 + elif len(d) == 3: + h,m,s = d + duration = s + 60*m + 3600*h + tracks.append( + Track( + source=url, + requester=ctx.message.author, + title=title, + duration=duration, + data=data + ) + ) + logger.info(f"Got {len(tracks)} track(s) from URL") + logger.debug(f"{tracks=}") + return tracks + + async def get_tracks_from_path(self, ctx: Context, query: str): + """Attempt to load a local file from path""" + logger.debug(f"get_tracks_from_path() called for query: {query}") + if "/.." in query: + return None + filename = f"sounds/normalized/{query}" + try: + player = discord.FFmpegPCMAudio(filename) + except: + return None + if player.read(): + logger.info("filename is readable from path") + return [ + Track( + source=filename, + requester=ctx.message.author, + title=query, + duration=self.get_duration_from_file(filename) + ) + ] + return None + + async def get_tracks_from_attachments(self, ctx: Context): + """Fetch the attachment URL and convert it to a track""" + logger.debug(f"get_tracks_from_attachment() called") + attachments = ctx.message.attachments + tracks = [] + for attachment in attachments: + try: + track = await self.get_tracks_from_url(ctx, attachment.url, download=False) + tracks += track + except Exception as e: + logger.error("Exception thrown!") + logger.error(f"{e=}") + msg = await ctx.send( + f"An error occurred while adding `{attachment.filename}`:\n" + f"```{e.exc_info[1]}```" + ) + if msg: + logger.warning("Message sent: An error occurred while adding `{attachment.filename}`") + return e + logger.debug(f"{tracks=}") + return tracks + + @command(name='search') + async def search_youtube(self, ctx: Context, *, query: str): + """Do a YouTube search for the given query""" + logger.debug(f"search_youtube() called for query: {query}") + try: + self.search_results = ytdl.extract_info(f"ytsearch{self.MAX_RESULTS}:{query}", download=False) + except Exception as e: + logger.error("Exception thrown!") + logger.error(f"{e=}") + msg = await ctx.send( + f"An error occurred while searching for `{query}`:\n" + f"```{e.exc_info[1]}```" + ) + if msg: + logger.warning(f"Message sent: An error occurred while searching for `{query}`") + return e + await self.results(ctx) + + @command() + async def results(self, ctx: Context): + """Show results of a prior search""" + logger.debug(f"results() called") + if not self.search_results: + logger.info("No stored search results") + msg = await ctx.send("There are no stored search results right now.") + if msg: + logger.warning("Message sent: There are no stored search results right now.") + return + + embeds = [] + + formatted_results = ( + f"Performed a search for `{self.search_results['id']}`.\n" + "Which track would you like to play?\n" + "Make your choice using the `play` command.\n\n" + ) + + for i, result in enumerate(self.search_results['entries']): + + if result['live_status'] == "is_upcoming": + continue # skip YT Premieres + + title = result['title'] + duration = format_time(int(result['duration'])) + uploader = result['uploader'] + views = "{:,}".format(result['view_count']) + image = result['thumbnails'][-1]['url'] + height = result['thumbnails'][-1]['height'] + width = result['thumbnails'][-1]['width'] + url = result['url'] + + formatted_results += ( + f"{i+1}: **{title}** ({duration})\n" + f"{uploader} - {views} views\n" + ) + + embeds.append( + discord.Embed( + title = title, + url = url, + type = 'image', + colour = 0xff0000, + ).add_field( + name = "Duration", + value = duration, + ).add_field( + name = "Views", + value = views, + ).add_field( + name = "Uploaded by", + value = uploader, + ).set_thumbnail( + url = image, + ) + ) + + msg = await ctx.send(formatted_results, embeds = embeds) + if msg: + logger.info("Message sent: formatted_results") + + async def play_next(self, ctx: Context): + logger.debug("play_next() called") + if not ctx.voice_client: + return + + if ctx.voice_client.is_playing(): + return + + logger.info(f"{self.track=}") + + if self.repeat_mode == Music.REPEAT_NONE: + if not self.q: + return await ctx.send("Finished playing queue.") + self.track = self.q.pop(0) + logger.info("Repeat none -- popped track from queue") + elif self.repeat_mode == Music.REPEAT_ONE: + self.track = self.track + logger.info("Repeat one -- keeping track the same") + elif self.repeat_mode == Music.REPEAT_ALL: + self.i += 1 + if self.i >= len(self.q): + self.i = 0 + self.track = self.q[self.i] + logger.info("Repeat all -- advancing pointer without popping track") + + logger.info(f"{self.track=}") + + if self.track.source.startswith('http'): + # detect private or unplayable videos here + try: + data = ytdl.extract_info(self.track.source, download=False) + logger.debug(f"{data=}") + except Exception as e: + logger.error("Exception thrown!") + logger.error(f"{e=}") + await ctx.send( + f"`{self.track.source}` is unplayable -- skipping...\n" + f"```{e.exc_info[1]}```" + ) + logger.warning(f"Skipping as unplayable: {self.track.source}") + return await self.play_next(ctx) + player = await Player.prepare_stream(self.track, loop = self.bot.loop) + else: + player = await Player.prepare_file(self.track, loop = self.bot.loop) + + logger.info("playing Player on the voice client") + ctx.voice_client.play( + player, + after=lambda e: self.after(ctx) + ) + + def after(self, ctx: Context): + logger.debug("after() called") + + if not ctx.voice_client: + logger.info("no voice client -- bot was disconnected from vc?") + self.track = None + self.q = [] + asyncio.run_coroutine_threadsafe( + ctx.send(f"Clearing queue after bot left VC"), + self.bot.loop + ).result() + logger.info("Cleared queue after bot left VC") + return + + if not self.q and self.repeat_mode == Music.REPEAT_NONE: + logger.info("queue empty and not repeating") + self.track = None + asyncio.run_coroutine_threadsafe( + ctx.send(f"Finished playing queue."), + self.bot.loop + ).result() + logger.info("Finished playing queue.") + return + + if self.q and ctx.voice_client and not ctx.voice_client.is_playing(): + logger.info("queue exists and voice client is not playing") + logger.debug(f"{self.q=}") + logger.info("playing next...") + asyncio.run_coroutine_threadsafe( + self.play_next(ctx), + self.bot.loop + ).result() + return + + def check_for_numbers(self, ctx: Context): + """anti numbers action""" + logger.debug("check_for_numbers() called") + NUMBERS = 187024083471302656 + PIZZA = 320294046935547905 + RICKY = 949503750651936828 + if ctx.author.id in [ + NUMBERS, + PIZZA, + ]: + return False + #if ctx.author.id != NUMBERS: + # return False + # if ctx.author.voice: + # members = ctx.author.voice.channel.members + # current_vc_members = {member.id for member in members} + # other_members = current_vc_members - {NUMBERS} - {RICKY} + # return (NUMBERS in current_vc_members) and not other_members + return True + + async def add_to_queue(self, + ctx: Context, + query: str, + top: bool = False + ): + logger.debug(f"add_to_queue({query}) called") + # Check for permission to add tracks + allowed = self.check_for_numbers(ctx) + if not allowed: + logger.info(f"{ctx.author} is not allowed to add to queue") + #return await ctx.send( + #"You must be in a voice chat by yourself " + #"in order to use this command." + #) + return await ctx.send("No 💜") + # Ensure we are connected to voice + if not ctx.voice_client: + logger.warning("no voice client") + if ctx.author.voice: + logger.info(f"moving voice client to {ctx.author.voice.channel}") + await ctx.author.voice.channel.connect() + else: + msg = await ctx.send( + "You are not connected to a voice channel. " + "Use the `join` command with a specified channel " + "or while connected to a channel before trying to " + "play any tracks." + ) + if msg: + logger.info("Message sent: author not in voice, and no voice client exists") + return + # Guard against errors + tracks = await self.get_tracks_from_query(ctx, query) + if isinstance(tracks, Exception): + msg = await ctx.send( + f"An error occurred while trying to add `{query}` to the queue:\n" + f"```{tracks}```" + ) + if msg: + logger.warning(f"Message sent: An error occurred while trying to add `{query}` to the queue") + return + if not tracks: # a search was performed instead + return + # Add track(s) to queue + if top: + self.q = tracks + self.q + else: + self.q = self.q + tracks + if ctx.voice_client.is_playing(): + if top: + msg = await ctx.send(f"Added **{len(tracks)}** track(s) to top of queue.") + if msg: + logger.info(f"Message sent: Added **{len(tracks)}** track(s) to top of queue.") + return + else: + msg = await ctx.send(f"Added **{len(tracks)}** track(s) to queue.") + if msg: + logger.info(f"Message sent: Added **{len(tracks)}** track(s) to queue.") + return + # If not playing, start playing + if len(self.q) == 1: + msg = await ctx.send(f"Playing **{self.q[0].title}**") + if msg: + logger.info(f"Message sent: Playing **{self.q[0].title}**") + else: + msg = await ctx.send(f"Playing {len(tracks)} tracks.") + if msg: + logger.info(f"Message sent: Playing {len(tracks)} tracks.") + await self.play_next(ctx) + + @command(aliases=['p', 'listen']) + async def play(self, ctx: Context, *, query: str): + """Add track(s) to queue""" + if not query: + msg = await ctx.send("No query detected") + if msg: + logger.info("Empty .play command was issued") + return + logger.info(f".play {query}") + return await self.add_to_queue(ctx, query, top=False) + + @command(aliases=['ptop', 'top']) + async def playtop(self, ctx: Context, *, query: str): + """Add tracks to top of queue""" + if not query: + msg = await ctx.send("No query detected") + if msg: + logger.info("Empty .playtop command was issued") + return + logger.info(f".playtop {query}") + return await self.add_to_queue(ctx, query, top=True) + + # TODO: repeat once, repeat all, repeat none (repeat/loop command) + # TODO: move positions of songs? + # TODO: cleanup command for clearing songs requested by users not in vc? + # TODO: remove duplicates? + # TODO: remove range of songs + # TODO: restart current song + # TODO: seek command? [no fuckin idea how] + # TODO: skip multiple songs? + # TODO: autoplay???? [???????????] + # TODO: filters? bass boost? nightcore? speed? [probs not] + + @command(aliases=['q']) + async def queue(self, ctx: Context, p: int = 1): + """Show tracks up next""" + logger.info(f".queue {p}" if p else ".queue") + # check that there is a queue and a current track + if not self.q and not self.track: + msg = await ctx.send("The queue is currently empty.") + logger.info("Message sent: The queue is currently empty.") + return + # paginate the queue to just one page + full_queue = [self.track] + self.q + start = self.PAGE_SIZE * (p-1) + end = self.PAGE_SIZE * p + queue_page = full_queue[start:end] + # construct header + formatted_results = f"{len(self.q)} tracks on queue.\n" + total_pages = math.ceil(len(full_queue) / self.PAGE_SIZE) + formatted_results += f"Page {p} of {total_pages}:\n" + # construct page + for i, track in enumerate(queue_page): + if p == 1 and i == 0: # print nowplaying on first queue page + formatted_results += "=== Currently playing ===\n" + formatted_results += ( + f"{start+i+1}: {track}\n" + ) + if p == 1 and i == 0: # add separator on first page for actually queued tracks + formatted_results += "=== Up next ===\n" + # send text to channel + msg = await ctx.send(formatted_results) + if msg: + logger.info("Message sent: Sent queue page to channel") + + @command(aliases=['np']) + async def nowplaying(self, ctx: Context): + """Show currently playing track""" + logger.info(".nowplaying") + if not self.track: + msg = await ctx.send("Nothing is currently playing") + if msg: + logger.info("Nothing is currently playing") + return + if not ctx.voice_client: + msg = await ctx.send("Bot is not currently connected to a voice channel") + if msg: + logger.info("Bot not connected to VC") + return + source: Player = ctx.voice_client.source + embed = discord.Embed( + title=f"{self.track.title}", + url=f"{self.track.source}", + ).add_field( + name="Progress", + value=f"{source.progress}", + ).set_footer( + text=f"Requested by {self.track.requester.display_name} ({self.track.requester})", + icon_url=f"{self.track.requester.display_avatar.url}", + ) + thumb = None + if "thumbnail" in source.data: + thumb = source.data['thumbnail'] + elif "thumbnails" in source.data: + thumb = source.data['thumbnails'][0]['url'] + if thumb: + embed.set_thumbnail( + url=thumb + ) + msg = await ctx.send( + f"Now playing:\n{self.track}", + embed = embed + ) + if msg: + logger.info(f"Message sent: Now playing: {self.track.title}") + + @command() + async def skip(self, ctx: Context): + """Start playing next track""" + logger.info(".skip") + if ctx.voice_client.is_playing(): + if self.track: + msg = await ctx.send(f"Skipping: {self.track.title}") + if msg: + logger.info(f"Message sent: Skipping: {self.track.title}") + ctx.voice_client.stop() + + @command() + async def remove(self, ctx: Context, i: int): + """Remove track at given position""" + logger.info(f".remove {i}") + i -= 1 # convert to zero-indexing + track = self.q.pop(i) + msg = await ctx.send(f"Removed: {track.title}") + if msg: + logger.info(f"Message sent: Removed: {track.title}") + + @command() + async def pause(self, ctx: Context): + """Pause the currently playing track""" + logger.info(".pause") + if ctx.voice_client.is_playing(): + ctx.voice_client.pause() + msg = await ctx.send("Playback is paused.") + if msg: + logger.info("Message sent: Playback is paused.") + + @command() + async def resume(self, ctx: Context): + """Resume playback of a paused track""" + logger.info(".resume") + if ctx.voice_client.is_paused(): + ctx.voice_client.resume() + msg = await ctx.send("Playback is resumed.") + if msg: + logger.info("Message sent: Playback is resumed.") + + @command() + async def shuffle(self, ctx: Context): + """Randomizes the current queue""" + logger.info(".shuffle") + if not self.q: + return await ctx.send("There is no queue to shuffle") + + logger.debug(f"{self.track=}") + logger.debug(f"{self.q=}") + + random.shuffle(self.q) + + logger.debug(f"{self.track=}") + logger.debug(f"{self.q=}") + msg = await ctx.send("Queue has been shuffled") + if msg: + logger.info("Message sent: Queue has been shuffled") + + @command() + async def stop(self, ctx: Context): + """Clear queue and stop playing""" + logger.info(".stop") + self.q = [] + self.track = None + if ctx.voice_client: + if ctx.voice_client.is_connected(): + ctx.voice_client.stop() + msg = await ctx.send("Stopped playing tracks and cleared queue.") + if msg: + logger.info("Message sent: Stopped playing tracks and cleared queue.") + + @command() + async def clear(self, ctx: Context): + """Clear queue, but keep playing""" + logger.info(".clear") + self.q = [] + msg = await ctx.send("Queue has been cleared.") + if msg: + logger.info("Message sent: Queue has been cleared.") + + @command(aliases=['v', 'vol']) + async def volume(self, ctx: Context, volume: int): + """Changes the player's volume""" + logger.info(f".volume {volume}") + if ctx.voice_client is None: + return await ctx.send("Not connected to a voice channel.") + if volume not in range(101): + return await ctx.send(f"Please use an integer from 0 to 100") + ctx.voice_client.source.volume = volume / 100 + await ctx.send(f"Changed volume to {volume}%") + + @command(aliases=['list']) + async def catalogue(self, ctx: Context, subdirectory: str = ""): + """Shows the available local files""" + logger.info(f".catalogue {subdirectory}" if subdirectory else ".catalogue") + if "../" in subdirectory: + return await ctx.send(f"Nice try, but that won't work.") + path = "." + if subdirectory: + path += f"/{subdirectory}" + cmd = subprocess.run( + f"cd sounds/normalized && find {path} -type f | sort", + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT + ) + results = cmd.stdout.decode('utf-8').split('\n')[:-1] + formatted_results = "```" + for result in results: + formatted_results += f"{result[2:]}\n" + formatted_results += "```" + await ctx.send(formatted_results) + +""" +Initialize youtube-dl service. +""" +import yt_dlp as youtube_dl +youtube_dl.utils.bug_reports_message = lambda: "" +ytdl_format_options = { + "format": "bestaudio/best", + "outtmpl": ".cache/%(extractor)s-%(id)s-%(title)s.%(ext)s", + "restrictfilenames": True, + "noplaylist": True, + "nocheckcertificate": True, + "ignoreerrors": False, + "logtostderr": False, + "quiet": True, + "no_warnings": True, + "default_search": "auto", + # "source_address": "0.0.0.0", # Bind to ipv4 since ipv6 addresses cause issues + "extract_flat": True, # massive speedup for fetching metadata, at the cost of no upload date +} +username = getenv("YOUTUBE_USERNAME") +password = getenv("YOUTUBE_PASSWORD") +ytdl = youtube_dl.YoutubeDL(ytdl_format_options) \ No newline at end of file diff --git a/cogs/random.py b/cogs/random.py new file mode 100644 index 0000000..5832c3d --- /dev/null +++ b/cogs/random.py @@ -0,0 +1,103 @@ +import discord +from discord.ext.commands import Cog, command, Context +from discord.commands import slash_command, Option +import requests +from os import getenv +from collections import Counter +import re + +GUILD = int(getenv("DISCORD_GUILD_ID")) + +def setup(bot: discord.Bot): + bot.add_cog(Random(bot)) + +class Random(Cog): + """Use random.org to provide truly random results""" + def __init__(self, bot: discord.Bot): + self.bot: discord.Bot = bot + print("Initialized Random cog") + +# ============================================================================== + + @staticmethod + def roll(faces: int = 6, number: int = 1) -> str: + """Roll die(s) using random.org integer generation""" + columns = number if number < 11 else 10 + return requests.get(f'https://www.random.org/integers/?num={number}&min=1&max={faces}&col={columns}&base=10&format=plain&rnd=new').text + + @staticmethod + def roll_results(faces: int = 6, number: int = 1) -> str: + """Returns a formatted result of dice outcomes""" + result = Random.roll(faces=int(faces), number=int(number)) + if faces == 100: # 00 - 99 + result = re.sub('100', '00', result) + result = re.sub(r'\b(\d){1}\b', r'0\1', result) # zero pad single digit + return f"*You rolled {number}d{faces} and got:*\n**{result}**" + + @command( + name='roll', + aliases=['dice'] + ) + async def roll_prefix(self, ctx: Context, query: str = '1d6'): + """Roll some dice (default 1d6)""" + number, faces = query.split('d') + if not number: # handle raw dN rolls, e.g. 'd6' or 'd20' + number = 1 + await ctx.send(Random.roll_results(faces=int(faces), number=int(number))) + + @slash_command( + name='roll', + guild_ids=[GUILD] + ) + async def roll_slash(self, + ctx: discord.ApplicationContext, + faces: Option(int, "How many faces are on each die?", min_value=1, max_value=1_000_000_000, default=6), + number: Option(int, "How many dies should be rolled?", min_value=1, max_value=10_000, default=1) + ): + """Roll some dice (default 1d6)""" + await ctx.respond(Random.roll_results(faces=faces, number=number)) + +# ============================================================================== + + @staticmethod + def flip(number: int = 1) -> str: + """Returns a raw list of Heads or Tails outcomes""" + result = requests.get(f'https://www.random.org/integers/?num={number}&min=1&max=2&col=1&base=10&format=plain&rnd=new').text + result = result.replace('1', 'Heads') + result = result.replace('2', 'Tails') + return result + + @staticmethod + def flip_results(number: int = 1) -> str: + """Returns a formatted result of Heads and Tails counts""" + result = Random.flip(number) + if len(result) == 6: # \n counts as a character and we don't strip it + return ( + f"*You flipped {number} coins and got:*\n" + f"**{result}**" + ) + c = Counter(result.split('\n')) + return ( + f"*You flipped {number} coins and got:*\n" + f"**{c['Heads']}** Heads\n" + f"**{c['Tails']}** Tails" + ) + + @command( + name='flip', + aliases=['coin'] + ) + async def flip_prefix(self, ctx: Context, number: str = 1): + """Flip a coin (default 1)""" + await ctx.send(self.flip_results(number)) + + @slash_command( + name='flip', + guild_ids=[GUILD] + ) + async def flip_slash(self, + ctx: discord.ApplicationContext, + number: Option(int, "How many coins should be flipped?", min_value=1, max_value=10_000, default=1) + ): + """Flip a coin (default 1)""" + await ctx.respond(self.flip_results(number)) \ No newline at end of file diff --git a/cogs/unpin.py b/cogs/unpin.py new file mode 100644 index 0000000..1dbb4a9 --- /dev/null +++ b/cogs/unpin.py @@ -0,0 +1,39 @@ +import discord +from discord.ext.commands import Cog + +"""Cog setup function""" +def setup(bot: discord.Bot): + bot.add_cog(Unpin(bot)) + +class Unpin(Cog): + """Reply to unpinned messages in the channel they were posted.""" + + def __init__(self, bot: discord.Bot): + self.bot: discord.Bot = bot + print("Initialized Unpin cog") + + @Cog.listener() + async def on_guild_channel_pins_update( + self, channel: discord.TextChannel | discord.Thread, last_pin + ): + # get last audit log entry + guild = channel.guild + entries = await guild.audit_logs(limit=1).flatten() + entry = entries[0] + # check it is an unpin event + if entry.action != discord.AuditLogAction.message_unpin: + return + # get the unpinned message + message_id = entry.extra.message_id + message = await channel.fetch_message(message_id) + # make sure the message is actually unpinned + # - quickly pinning after unpinning can cause issues otherwise + if message.pinned: + return + # construct and send message as a reply + url = f"https://discord.com/channels/{guild.id}/{channel.id}/{message.id}" + msg = await channel.send( + embed=discord.Embed(title="Message unpinned", url=url), + reference=message, + mention_author=False + ) \ No newline at end of file diff --git a/cogs/vcjoin.py b/cogs/vcjoin.py new file mode 100644 index 0000000..00a2bfd --- /dev/null +++ b/cogs/vcjoin.py @@ -0,0 +1,47 @@ +import discord +from discord.ext.commands import Cog + +def setup(bot: discord.Bot): + bot.add_cog(VCJoin(bot)) + +class VCJoin(Cog): + """Log when a member joins VC, for pinging purposes""" + + CHANNEL = 949516670630760499 + + def __init__(self, bot: discord.Bot): + self.bot: discord.Bot = bot + self.channel: discord.TextChannel = self.bot.get_channel(VCJoin.CHANNEL) + print("Initialized VCJoin cog") + + @Cog.listener() + async def on_ready(self): + await self.bot.wait_until_ready() + self.channel = self.bot.get_channel(VCJoin.CHANNEL) + + @Cog.listener() + async def on_voice_state_update( + self, + member: discord.Member, + before: discord.VoiceState, + after: discord.VoiceState): + """Log when someone joins or leaves VC""" + if before.channel == after.channel: + return + if not before.channel: + embed = discord.Embed( + description = f"**{member.display_name}** joined **{after.channel.name}**" + ) + elif not after.channel: + embed = discord.Embed( + description = f"**{member.display_name}** left **{before.channel.name}**" + ) + else: + embed = discord.Embed( + description = f"**{member.display_name}** moved from **{before.channel.name}** to **{after.channel.name}**" + ) + embed = embed.set_author( + name = f"{member.display_name} ({member})", + icon_url = member.display_avatar.url, + ) + return await self.channel.send(embed=embed) \ No newline at end of file diff --git a/icon2.png b/icon2.png new file mode 100644 index 0000000..6dc05c6 Binary files /dev/null and b/icon2.png differ diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..90248ee --- /dev/null +++ b/requirements.txt @@ -0,0 +1,14 @@ +pip +requests +pymumble +py-cord>=2.0.0rc1 +py-cord[voice]>=2.0.0rc1 +py-cord[speed]>=2.0.0rc1 +asyncio +yt-dlp +pynacl +aiodns +brotlipy +cchardet +orjson +python-dotenv \ No newline at end of file