-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinvocation.py
More file actions
150 lines (122 loc) · 5.17 KB
/
invocation.py
File metadata and controls
150 lines (122 loc) · 5.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import asyncio
import datetime
import discord
import shlex
import io
import sources
SUCCESS = "<a:success:1394931188501581844>"
FAILED = "<a:failed:1394931198807248986>"
TIMED_OUT = "<a:timed_out:1394931175491113122>"
OOM = "<a:oom:1394931248350232647>"
STDOUT = "<a:stdout:1394931226535788585>"
STDERR = "<a:stderr:1394929989555720242>"
RUNNING = "<a:running:1394931166443999283>"
def render(b, name, *, file=False, codeblock=False):
try:
out = b.decode()
except UnicodeDecodeError:
out = None
if file or out is None or sum(1 + len(x) // 90 for x in out.splitlines()) > 11:
return discord.File(io.BytesIO(b), f"{name}.txt")
if codeblock:
out = out.replace('```', '`\u200b``')
out = f"```\n\u200b{out}```"
return name, out
def attr(e):
return "send_stdout" if e == STDOUT or e[1:] == STDOUT[2:] else "send_stderr" if e == STDERR or e[1:] == STDERR[2:] else None
class Invocation:
def __init__(self, session, message, lang, code, *, outputter, stdin="", options=(), args=()):
self.session = session
self.lang = lang
self.message = message
self.outputter = outputter
self.code = code
self.stdin = stdin
self.options = options
self.args = args
self.stdout = b""
self.stderr = b""
self.success = sources.FAILED
async def send_public_message(self, content="", embed=None, files=None):
if not (content.strip() or embed or files):
await self.outputter.delete()
return
if self.outputter.can_edit():
return await self.outputter.edit(content=content, embed=embed, attachments=files)
await self.outputter.send(content, embed=embed, files=files, reference=self.message if self.message.channel.last_message_id != self.message.id else None, mention_author=False)
async def send_output(self):
texts = []
files = []
if self.send_stdout:
s = render(self.stdout, "stdout")
if isinstance(s, discord.File):
files.append(s)
else:
texts.append(s)
if self.send_stderr:
s = render(self.stderr, "stderr", file=files, codeblock=True)
if isinstance(s, discord.File):
files.append(s)
else:
texts.append(s)
if len(texts) == 2 or self.stdin or self.options or self.args:
text = ""
embed = discord.Embed()
if self.options:
embed.add_field(name="Options", value=shlex.join(self.options), inline=False)
if self.stdin:
embed.add_field(name="Input", value=self.stdin, inline=False)
if self.args:
embed.add_field(name="Arguments", value=shlex.join(self.args), inline=False)
for name, value in texts:
embed.add_field(name=name, value=value, inline=False)
elif texts:
text = texts[0][1]
embed = None
else:
text = ""
embed = None
await self.send_public_message(text, embed=embed, files=files)
async def _execute(self):
loop = asyncio.get_event_loop()
me = self.message.guild.me
can_react = hasattr(self.outputter, "add_reaction")
async def running():
# wait a bit for quick programs to finish right away without wasting time reacting
await asyncio.sleep(2)
await self.outputter.clear_reactions()
await self.outputter.add_reaction(RUNNING)
if self.outputter.can_edit():
await self.outputter.edit(content="Message edited. Recalculating...", embed=None, attachments=[])
send_running = loop.create_task(running() if can_react else asyncio.sleep(0))
await self.lang.execute(self)
send_running.cancel()
is_stdout = bool(self.stdout)
is_stderr = bool(self.stderr)
if can_react:
self.send_stdout = self.success == sources.SUCCESS and is_stdout
self.send_stderr = False
async def send_reactions():
await self.outputter.clear_reactions()
if self.success == sources.SUCCESS:
await self.outputter.add_reaction(SUCCESS)
elif self.success == sources.TIMEOUT:
await self.outputter.add_reaction(TIMED_OUT)
elif self.success == sources.OOM:
await self.outputter.add_reaction(OOM)
else:
await self.outputter.add_reaction(FAILED)
if self.success != sources.SUCCESS and is_stdout:
await self.outputter.add_reaction(STDOUT)
if is_stderr:
await self.outputter.add_reaction(STDERR)
async with asyncio.TaskGroup() as tg:
tg.create_task(send_reactions())
tg.create_task(self.send_output())
else:
self.send_stdout = is_stdout
self.send_stderr = is_stderr
await self.send_output()
async def execute(self):
self.task = task = asyncio.get_event_loop().create_task(self._execute())
await task