π
DarkzzAngel
import re
import motor.motor_asyncio # pylint: disable=import-error
from bot import DB_URI # pylint: disable=import-error
class Singleton(type):
__instances__ = {}
def __call__(cls, *args, **kwargs):
if cls not in cls.__instances__:
cls.__instances__[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls.__instances__[cls]
class Database(metaclass=Singleton):
def __init__(self):
self._client = motor.motor_asyncio.AsyncIOMotorClient(DB_URI)
self.db = self._client["Adv_Auto_Filter"]
self.col = self.db["Main"]
self.acol = self.db["Active_Chats"]
self.fcol = self.db["Filter_Collection"]
self.cache = {}
self.acache = {}
async def create_index(self):
"""
Create text index if not in db
"""
await self.fcol.create_index([("file_name", "text")])
def new_chat(self, group_id, channel_id, channel_name):
"""
Create a document in db if the chat is new
"""
try:
group_id, channel_id = int(group_id), int(channel_id)
except:
pass
return dict(
_id = group_id,
chat_ids = [{
"chat_id": channel_id,
"chat_name": channel_name
}],
types = dict(
audio=False,
document=True,
video=True
),
configs = dict(
accuracy=0.80,
max_pages=5,
max_results=50,
max_per_page=10,
pm_fchat=True,
show_invite_link=True
)
)
async def status(self, group_id: int):
"""
Get the total filters, total connected
chats and total active chats of a chat
"""
group_id = int(group_id)
total_filter = await self.tf_count(group_id)
chats = await self.find_chat(group_id)
chats = chats.get("chat_ids")
total_chats = len(chats) if chats is not None else 0
achats = await self.find_active(group_id)
if achats not in (None, False):
achats = achats.get("chats")
if achats == None:
achats = []
else:
achats = []
total_achats = len(achats)
return total_filter, total_chats, total_achats
async def find_group_id(self, channel_id: int):
"""
Find all group id which is connected to a channel
for add a new files to db
"""
data = self.col.find({})
group_list = []
for group_id in await data.to_list(length=50): # No Need Of Even 50
for y in group_id["chat_ids"]:
if int(y["chat_id"]) == int(channel_id):
group_list.append(group_id["_id"])
else:
continue
return group_list
# Related TO Finding Channel(s)
async def find_chat(self, group_id: int):
"""
A funtion to fetch a group's settings
"""
connections = self.cache.get(str(group_id))
if connections is not None:
return connections
connections = await self.col.find_one({'_id': group_id})
if connections:
self.cache[str(group_id)] = connections
return connections
else:
return self.new_chat(None, None, None)
async def add_chat(self, group_id: int, channel_id: int, channel_name):
"""
A funtion to add/update a chat document when a new chat is connected
"""
new = self.new_chat(group_id, channel_id, channel_name)
update_d = {"$push" : {"chat_ids" : {"chat_id": channel_id, "chat_name" : channel_name}}}
prev = await self.col.find_one({'_id':group_id})
if prev:
await self.col.update_one({'_id':group_id}, update_d)
await self.update_active(group_id, channel_id, channel_name)
await self.refresh_cache(group_id)
return True
self.cache[str(group_id)] = new
await self.col.insert_one(new)
await self.add_active(group_id, channel_id, channel_name)
await self.refresh_cache(group_id)
return True
async def del_chat(self, group_id: int, channel_id: int):
"""
A Funtion to delete a channel and its files from db of a chat connection
"""
group_id, channel_id = int(group_id), int(channel_id) # group_id and channel_id Didnt type casted to int for some reason
prev = self.col.find_one({"_id": group_id})
if prev:
await self.col.update_one(
{"_id": group_id},
{"$pull" :
{"chat_ids" :
{"chat_id":
channel_id
}
}
},
False,
True
)
await self.del_active(group_id, channel_id)
await self.refresh_cache(group_id)
return True
return False
async def in_db(self, group_id: int, channel_id: int):
"""
Check whether if the given channel id is in db or not...
"""
connections = self.cache.get(group_id)
if connections is None:
connections = await self.col.find_one({'_id': group_id})
check_list = []
if connections:
for x in connections["chat_ids"]:
check_list.append(int(x.get("chat_id")))
if int(channel_id) in check_list:
return True
return False
async def update_settings(self, group_id: int, settings):
"""
A Funtion to update a chat's filter types in db
"""
group_id = int(group_id)
prev = await self.col.find_one({"_id": group_id})
if prev:
try:
await self.col.update_one({"_id": group_id}, {"$set": {"types": settings}})
await self.refresh_cache(group_id)
return True
except Exception as e:
print (e)
return False
print("You Should First Connect To A Chat To Use This Funtion..... 'databse.py/#201' ")
return False
async def update_configs(self, group_id: int, configs):
"""
A Funtion to update a chat's configs in db
"""
prev = await self.col.find_one({"_id": group_id})
if prev:
try:
await self.col.update_one(prev, {"$set":{"configs": configs}})
await self.refresh_cache(group_id)
return True
except Exception as e:
print (e)
return False
print("You Should First Connect To A Chat To Use This")
return False
async def delete_all(self, group_id: int):
"""
A Funtion to delete all documents related to a
chat from db
"""
prev = await self.col.find_one({"_id": group_id})
if prev:
await self.delall_active(group_id)
await self.delall_filters(group_id)
await self.del_main(group_id)
await self.refresh_cache(group_id)
return
async def del_main(self, group_id: int):
"""
A Funtion To Delete the chat's main db document
"""
await self.col.delete_one({"_id": group_id})
await self.refresh_cache(group_id)
return True
async def refresh_cache(self, group_id: int):
"""
A Funtion to refresh a chat's chase data
in case of update in db
"""
if self.cache.get(str(group_id)):
self.cache.pop(str(group_id))
prev = await self.col.find_one({"_id": group_id})
if prev:
self.cache[str(group_id)] = prev
return True
# Related To Finding Active Channel(s)
async def add_active(self, group_id: int, channel_id: int, channel_name):
"""
A Funtion to add a channel as an active chat the a connected group
(This Funtion will be used only if its the first time)
"""
templ = {"_id": group_id, "chats":[{"chat_id": channel_id, "chat_name": channel_name}]}
try:
await self.acol.insert_one(templ)
await self.refresh_acache(group_id)
except Exception as e:
print(e)
return False
return True
async def del_active(self, group_id: int, channel_id: int):
"""
A funtion to delete a channel from active chat colletion in db
"""
templ = {"$pull": {"chats": dict(chat_id = channel_id)}}
try:
await self.acol.update_one({"_id": group_id}, templ, False, True)
except Exception as e:
print(e)
pass
await self.refresh_acache(group_id)
return True
async def update_active(self, group_id: int, channel_id: int, channel_name):
"""
A Funtion to add a new active chat to the connected group
"""
group_id, channel_id = int(group_id), int(channel_id)
prev = await self.acol.find_one({"_id": group_id})
templ = {"$push" : {"chats" : dict(chat_id = channel_id, chat_name = channel_name)}}
in_c = await self.in_active(group_id, channel_id)
if prev:
if not in_c:
await self.acol.update_one({"_id": group_id}, templ)
else:
return False
else:
await self.add_active(group_id, channel_id, channel_name)
return True
async def find_active(self, group_id: int):
"""
A Funtion to find all active chats of
a group from db
"""
if self.acache.get(str(group_id)):
self.acache.get(str(group_id))
connection = await self.acol.find_one({"_id": group_id})
if connection:
self.acache[str(group_id)] = connection
return connection
return False
async def in_active(self, group_id: int, channel_id: int):
"""
A Funtion to check if a chat id is in the active
chat id list in db
"""
prev = await self.acol.find_one({"_id": group_id})
if prev:
for x in prev["chats"]:
if x["chat_id"] == channel_id:
return True
return False
return False
async def delall_active(self, group_id: int):
"""
A Funtion to Delete all active chats of
a group from db
"""
await self.acol.delete_one({"_id":int(group_id)})
await self.refresh_acache(group_id)
return
async def refresh_acache(self, group_id: int):
"""
A Funtion to refresh a active chat's chase data
in case of update in db
"""
if self.acache.get(str(group_id)):
self.acache.pop(str(group_id))
prev = await self.acol.find_one({"_id": group_id})
if prev:
self.acache[str(group_id)] = prev
return True
# Related To Finding Filter(s)
async def add_filters(self, data):
"""
A Funtion to add document as
a bulk to db
"""
try:
await self.fcol.insert_many(data)
except Exception as e:
print(e)
return True
async def del_filters(self, group_id: int, channel_id: int):
"""
A Funtion to delete all filters of a specific
chat and group from db
"""
group_id, channel_id = int(group_id), int(channel_id)
try:
await self.fcol.delete_many({"chat_id": channel_id, "group_id": group_id})
print(await self.cf_count(group_id, channel_id))
return True
except Exception as e:
print(e)
return False
async def delall_filters(self, group_id: int):
"""
A Funtion To delete all filters of a group
"""
await self.fcol.delete_many({"group_id": int(group_id)})
return True
async def get_filters(self, group_id: int, keyword: str):
"""
A Funtion to fetch all similar results for a keyowrd
from using text index
"""
achats = await self.find_active(group_id)
achat_ids=[]
if not achats:
return False
for chats in achats["chats"]:
achat_ids.append(chats.get("chat_id"))
filters = []
pattern = keyword.lower().strip().replace(' ','.*')
raw_pattern = r"\b{}\b".format(pattern)
regex = re.compile(raw_pattern, flags=re.IGNORECASE)
db_list = self.fcol.find({"group_id": group_id,"file_name": regex})
for document in await db_list.to_list(length=600):
if document["chat_id"] in achat_ids:
filters.append(document)
else:
continue
return filters
async def get_file(self, unique_id: str):
"""
A Funtion to get a specific files using its
unique id
"""
file = await self.fcol.find_one({"unique_id": unique_id})
file_id = None
file_type = None
file_name = None
file_caption = None
if file:
file_id = file.get("file_id")
file_name = file.get("file_name")
file_type = file.get("file_type")
file_caption = file.get("file_caption")
return file_id, file_name, file_caption, file_type
async def cf_count(self, group_id: int, channel_id: int):
"""
A Funtion To count number of filter in channel
w.r.t the connect group
"""
return await self.fcol.count_documents({"chat_id": channel_id, "group_id": group_id})
async def tf_count(self, group_id: int):
"""
A Funtion to count total filters of a group
"""
return await self.fcol.count_documents({"group_id": group_id})