-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchat.py
232 lines (196 loc) · 10.1 KB
/
chat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
import tkinter
import tkinter.messagebox
import customtkinter
import os
import random
import json
import string
import threading
import requests
import sqlite3
from Crypto.PublicKey import RSA
from Crypto.Random import get_random_bytes
from Crypto.Cipher import AES, PKCS1_OAEP
import vk_api
from vk_api.longpoll import VkLongPoll, VkEventType
from db import Database
customtkinter.set_appearance_mode("System")
customtkinter.set_default_color_theme("blue")
class Chat(customtkinter.CTkToplevel):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Configure window
self.title("Vk Client")
self.geometry(f"{800}x{500}")
self.resizable(width=False, height=False)
self.x = (self.winfo_screenwidth() - 800) / 2
self.y = (self.winfo_screenheight() - 500) / 2
self.wm_geometry("%dx%d+%d+%d" % (800, 500, self.x, self.y))
self.protocol("WM_DELETE_WINDOW", self.window_chat_destroy)
self.grab_set()
self.font = customtkinter.CTkFont(family="Helvetica", size=18)
# Create button
self.button = customtkinter.CTkButton(self, text='', command=self.button_event, width=70, height=50)
self.button.place(x=720, y=440)
# Create frame
self.text_frame = customtkinter.CTkFrame(self, border_width=2, fg_color='#ffffff', border_color='#b3b3b3', width=780, height=420)
self.text_frame.place(x=10, y=10)
# Create text box
self.text_box = customtkinter.CTkTextbox(self.text_frame, wrap='word', fg_color='#ffffff', width=760, height=400, font=self.font)
self.text_box.place(x=10, y=10)
self.text_box.configure(state='disabled')
# Create entry
self.entry = customtkinter.CTkEntry(self, width=700, height=50, font=self.font)
self.entry.place(x=10, y=440)
self.entry.bind('<Return>', self.button_event)
# If the "try" block fails, it prints "No data. Try to create a new connection."
try:
# Getting information from the database
self.db = Database()
all_data = self.db.takes_all_data()
self.token = all_data[0]
self.owner_id = all_data[1]
self.user_id = all_data[2]
self.username = all_data[3]
# Authorization
self.session = vk_api.VkApi(token=self.token)
self.vk_session = self.session.get_api()
# Getting Owner name
u_get = self.vk_session.users.get()
u_list = u_get[0]
self.owner_name = u_list["first_name"]
# Start a separate thread
th = threading.Thread(target=self.listener)
th.start()
except sqlite3.OperationalError:
self.text_box.configure(state='normal')
self.text_box.insert('end', 'No data. Try to create a new connection.')
self.text_box.configure(state='disabled')
self.entry.configure(state='disabled')
def button_event(self, *args):
'''Sends a message to a user
and transfers information from the "entry" field to the "text_box" field
'''
self.text_box.configure(state='normal')
text_value = str(self.entry.get())
if text_value == '':
self.text_box.configure(state = 'disabled')
else:
t_value = '[' + str(self.owner_name) + '] ' + text_value
# Filename creation
random_name = str(''.join(random.choice(string.ascii_letters) for i in range(8)) + '.bin')
o_id_key = str(self.owner_id) + '.pem'
# Message encryption
data = text_value.encode("utf-8", "ignore")
file_out = open(random_name, "wb")
recipient_key = RSA.import_key(open(o_id_key).read())
session_key = get_random_bytes(16)
cipher_rsa = PKCS1_OAEP.new(recipient_key)
enc_session_key = cipher_rsa.encrypt(session_key)
cipher_aes = AES.new(session_key, AES.MODE_EAX)
ciphertext, tag = cipher_aes.encrypt_and_digest(data)
[ file_out.write(x) for x in (enc_session_key, cipher_aes.nonce, tag, ciphertext) ]
file_out.close()
# Sending a message
url_mess = self.vk_session.docs.getMessagesUploadServer(peer_id=self.user_id)
url_upl = requests.post(url_mess['upload_url'], files={'file' : open(random_name, "rb")})
result = json.loads(url_upl.text)
doc_save = self.vk_session.docs.save(file=result["file"])
doc_id = doc_save["doc"]
att_file = 'doc' + str(self.owner_id) + '_' + str(doc_id["id"])
self.vk_session.messages.send(user_id=self.user_id, attachment=att_file, random_id=0)
self.text_box.insert('end', f'{t_value}\n')
self.entry.delete(0, 'end')
self.text_box.configure(state='disabled')
def window_chat_destroy(self):
'''Requests confirmation from the user to close the window'''
y_n = tkinter.messagebox.askyesno(message="All messages will be deleted after closing. Do you want to continue?")
if y_n is True:
del self.db
self.destroy()
def key_exchange(self):
'''Keys exchange'''
def send_key():
"""Send key"""
doc = open(str(self.user_id) + '.pem', 'r')
url_mess = self.vk_session.docs.getMessagesUploadServer(peer_id=self.user_id)
url_upl = requests.post(url_mess['upload_url'], files={'file' : doc}).json()
doc_save = self.vk_session.docs.save(file=url_upl["file"])
doc_id = doc_save["doc"]
att_file = 'doc' + str(self.owner_id) + '_' + str(doc_id["id"])
self.vk_session.messages.send(user_id=self.user_id, attachment=att_file, random_id=0)
# Rename owner's public key
os.rename("receiver.pem", str(self.user_id) + ".pem")
send_key()
b = False
for event in VkLongPoll(self.session, preload_messages=True, mode=2).listen():
if event.type == VkEventType.MESSAGE_NEW and event.to_me:
try:
u_id = event.user_id
att = event.attachments
if str(u_id) == str(self.user_id):
try:
# Takes url and name of the attachment
id_att = att['attach1']
gHA = self.vk_session.messages.getHistoryAttachments(peer_id=self.user_id, media_type='doc')
ac_key = gHA['items'][0]['attachment']['doc']['access_key']
id_ac_key = str(id_att) + '_' + str(ac_key)
doc_info = self.vk_session.docs.getById(docs=id_ac_key)
url_att = doc_info[0]['url']
title_att = doc_info[0]['title']
filename = str(self.owner_id) + '.pem'
if str(title_att) == str(filename):
# Download user's public key
u_doc = requests.get(url_att, allow_redirects=True)
open(filename, "wb").write(u_doc.content)
send_key()
b = True
else:
...
except:
...
else:
...
except:
...
if b is True:
break
def listener(self):
'''Accepts new user's message'''
for event in VkLongPoll(self.session).listen():
if event.type == VkEventType.MESSAGE_NEW and event.to_me:
try:
u_id = event.user_id
message_id = event.message_id
if str(u_id) == str(self.user_id):
try:
message_info = self.vk_session.messages.getById(message_ids=message_id)
url_att = message_info['items'][0]['attachments'][0]['doc']['url']
# Filename creation
random_name = str(''.join(random.choice(string.ascii_letters) for i in range(8)) + '.bin')
# Download file
u_doc = requests.get(url_att)
open(random_name, "wb").write(u_doc.content)
# Message decryption
file_in = open(random_name, "rb")
private_key = RSA.import_key(open("private.pem").read())
enc_session_key, nonce, tag, ciphertext = \
[ file_in.read(x) for x in (private_key.size_in_bytes(), 16, 16, -1) ]
cipher_rsa = PKCS1_OAEP.new(private_key)
session_key = cipher_rsa.decrypt(enc_session_key)
cipher_aes = AES.new(session_key, AES.MODE_EAX, nonce)
user_data = cipher_aes.decrypt_and_verify(ciphertext, tag)
file_in.close()
# Displaying a message
u_message = user_data.decode("utf-8")
self.text_box.configure(state='normal')
us_message = '[' + str(self.username) + '] ' + u_message
self.text_box.insert('end', f'{us_message}\n')
self.text_box.configure(state='disabled')
self.vk_session.messages.markAsRead(peer_id=self.user_id)
except:
self.vk_session.messages.markAsRead(peer_id=self.user_id)
else:
...
except:
...