👑️
Size: a a a
👑️
👑️
👑️
👑️
ЕП
👑️
ЕП
ЕТ
👑️
ЕТ
👑️
ЕТ
👑️
ЕТ
👑️
class aiogramTTLCache():
def __init__(self, **ttl):
self.ttl = ttl
self.cache = {}
def get(self, message: types.Message):
ttl = self.cache.get(message.chat.id, {}).get(message.from_user.id, datetime(2000, 1, 1))
if datetime.now() < ttl:
return True
self.cache.get(message.chat.id, {}).pop(message.from_user.id, None)
return False
def set(self, message: types.Message, **ttl):
delta_ttl = ttl or self.ttl
if not delta_ttl:
raise Exception("where ttl idiot?")
time = datetime.now() + timedelta(**delta_ttl)
self.cache.setdefault(message.chat.id, {})[message.from_user.id] = time
def left(self, message: types.Message) -> timedelta:
if self.get(message):
return self.cache.get(message.chat.id).get(message.from_user.id) - datetime.now()
else:
return timedelta()
👑️
👑️
class aiogramTTLCache():
def __init__(self, **ttl):
self.ttl = ttl
self.cache = {}
def get(self, message: types.Message):
ttl = self.cache.get(message.chat.id, {}).get(message.from_user.id, datetime(2000, 1, 1))
if datetime.now() < ttl:
return True
self.cache.get(message.chat.id, {}).pop(message.from_user.id, None)
return False
def set(self, message: types.Message, **ttl):
delta_ttl = ttl or self.ttl
if not delta_ttl:
raise Exception("where ttl idiot?")
time = datetime.now() + timedelta(**delta_ttl)
self.cache.setdefault(message.chat.id, {})[message.from_user.id] = time
def left(self, message: types.Message) -> timedelta:
if self.get(message):
return self.cache.get(message.chat.id).get(message.from_user.id) - datetime.now()
else:
return timedelta()
class ThrottleMiddleware(BaseMiddleware):
async def on_process_message(self, message: types.Message, data: dict):
if not cache.get(message):
cache.set(message)
return
else:
cache.set(message, seconds=int(cache.left(message).total_seconds()*2))
await message.answer(f"flood control wait {cache.left(message)} sec.")
raise CancelHandler
👑️
ЕТ
👑️