Files
jarvis/main.py
2023-04-17 16:02:33 +03:00

317 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import datetime
import json
import os
import queue
import random
import struct
import subprocess
import sys
import time
from ctypes import POINTER, cast
import openai
from openai import error
import pvporcupine
import simpleaudio as sa
import vosk
import yaml
from comtypes import CLSCTX_ALL
from fuzzywuzzy import fuzz
from pvrecorder import PvRecorder
from pycaw.pycaw import (
AudioUtilities,
IAudioEndpointVolume
)
from rich import print
import config
import tts
# some consts
CDIR = os.getcwd()
VA_CMD_LIST = yaml.safe_load(
open('commands.yaml', 'rt', encoding='utf8'),
)
# ChatGPT vars
system_message = {"role": "system", "content": "Ты голосовой ассистент из железного человека."}
message_log = [system_message]
# init openai
openai.api_key = config.OPENAI_TOKEN
# PORCUPINE
porcupine = pvporcupine.create(
access_key=config.PICOVOICE_TOKEN,
keywords=['jarvis'],
sensitivities=[1]
)
# print(pvporcupine.KEYWORDS)
# VOSK
model = vosk.Model("model_small")
samplerate = 16000
device = config.MICROPHONE_INDEX
kaldi_rec = vosk.KaldiRecognizer(model, samplerate)
q = queue.Queue()
def gpt_answer():
global message_log
model_engine = "gpt-3.5-turbo"
max_tokens = 256 # default 1024
try:
response = openai.ChatCompletion.create(
model=model_engine,
messages=message_log,
max_tokens=max_tokens,
temperature=0.7,
top_p=1,
stop=None
)
except (error.TryAgain, error.ServiceUnavailableError):
return "ChatGPT перегружен!"
except openai.OpenAIError as ex:
# если ошибка - это макс длина контекста, то возвращаем ответ с очищенным контекстом
if ex.code == "context_length_exceeded":
message_log = [system_message, message_log[-1]]
return gpt_answer()
else:
return "OpenAI токен не рабочий."
# Find the first response from the chatbot that has text in it (some responses may not have text)
for choice in response.choices:
if "text" in choice:
return choice.text
# If no response with text is found, return the first response's content (which may be empty)
return response.choices[0].message.content
# play(f'{CDIR}\\sound\\ok{random.choice([1, 2, 3, 4])}.wav')
def play(phrase, wait_done=True):
global recorder
filename = f"{CDIR}\\sound\\"
if phrase == "greet": # for py 3.8
filename += f"greet{random.choice([1, 2, 3])}.wav"
elif phrase == "ok":
filename += f"ok{random.choice([1, 2, 3])}.wav"
elif phrase == "not_found":
filename += "not_found.wav"
elif phrase == "thanks":
filename += "thanks.wav"
elif phrase == "run":
filename += "run.wav"
elif phrase == "stupid":
filename += "stupid.wav"
elif phrase == "ready":
filename += "ready.wav"
elif phrase == "off":
filename += "off.wav"
if wait_done:
recorder.stop()
wave_obj = sa.WaveObject.from_wave_file(filename)
play_obj = wave_obj.play()
if wait_done:
play_obj.wait_done()
# time.sleep((len(wave_obj.audio_data) / wave_obj.sample_rate) + 0.5)
# print("END")
# time.sleep(0.5)
recorder.start()
def q_callback(indata, frames, time, status):
if status:
print(status, file=sys.stderr)
q.put(bytes(indata))
def va_respond(voice: str):
global recorder, message_log, first_request
print(f"Распознано: {voice}")
cmd = recognize_cmd(filter_cmd(voice))
print(cmd)
if len(cmd['cmd'].strip()) <= 0:
return False
elif cmd['percent'] < 70 or cmd['cmd'] not in VA_CMD_LIST.keys():
# play("not_found")
# tts.va_speak("Что?")
if fuzz.ratio(voice.join(voice.split()[:1]).strip(), "скажи") > 75:
message_log.append({"role": "user", "content": voice})
response = gpt_answer()
message_log.append({"role": "assistant", "content": response})
recorder.stop()
tts.va_speak(response)
time.sleep(0.5)
recorder.start()
return False
else:
play("not_found")
time.sleep(1)
return False
else:
execute_cmd(cmd['cmd'], voice)
return True
def filter_cmd(raw_voice: str):
cmd = raw_voice
for x in config.VA_ALIAS:
cmd = cmd.replace(x, "").strip()
for x in config.VA_TBR:
cmd = cmd.replace(x, "").strip()
return cmd
def recognize_cmd(cmd: str):
rc = {'cmd': '', 'percent': 0}
for c, v in VA_CMD_LIST.items():
for x in v:
vrt = fuzz.ratio(cmd, x)
if vrt > rc['percent']:
rc['cmd'] = c
rc['percent'] = vrt
return rc
def execute_cmd(cmd: str, voice: str):
if cmd == 'open_browser':
subprocess.Popen([f'{CDIR}\\custom-commands\\Run browser.exe'])
play("ok")
elif cmd == 'open_youtube':
subprocess.Popen([f'{CDIR}\\custom-commands\\Run youtube.exe'])
play("ok")
elif cmd == 'open_google':
subprocess.Popen([f'{CDIR}\\custom-commands\\Run google.exe'])
play("ok")
elif cmd == 'music':
subprocess.Popen([f'{CDIR}\\custom-commands\\Run music.exe'])
play("ok")
elif cmd == 'music_off':
subprocess.Popen([f'{CDIR}\\custom-commands\\Stop music.exe'])
time.sleep(0.2)
play("ok")
elif cmd == 'music_save':
subprocess.Popen([f'{CDIR}\\custom-commands\\Save music.exe'])
time.sleep(0.2)
play("ok")
elif cmd == 'music_next':
subprocess.Popen([f'{CDIR}\\custom-commands\\Next music.exe'])
time.sleep(0.2)
play("ok")
elif cmd == 'music_prev':
subprocess.Popen([f'{CDIR}\\custom-commands\\Prev music.exe'])
time.sleep(0.2)
play("ok")
elif cmd == 'sound_off':
play("ok", True)
devices = AudioUtilities.GetSpeakers()
interface = devices.Activate(IAudioEndpointVolume._iid_, CLSCTX_ALL, None)
volume = cast(interface, POINTER(IAudioEndpointVolume))
volume.SetMute(1, None)
elif cmd == 'sound_on':
devices = AudioUtilities.GetSpeakers()
interface = devices.Activate(IAudioEndpointVolume._iid_, CLSCTX_ALL, None)
volume = cast(interface, POINTER(IAudioEndpointVolume))
volume.SetMute(0, None)
play("ok")
elif cmd == 'thanks':
play("thanks")
elif cmd == 'stupid':
play("stupid")
elif cmd == 'gaming_mode_on':
play("ok")
subprocess.check_call([f'{CDIR}\\custom-commands\\Switch to gaming mode.exe'])
play("ready")
elif cmd == 'gaming_mode_off':
play("ok")
subprocess.check_call([f'{CDIR}\\custom-commands\\Switch back to workspace.exe'])
play("ready")
elif cmd == 'switch_to_headphones':
play("ok")
subprocess.check_call([f'{CDIR}\\custom-commands\\Switch to headphones.exe'])
time.sleep(0.5)
play("ready")
elif cmd == 'switch_to_dynamics':
play("ok")
subprocess.check_call([f'{CDIR}\\custom-commands\\Switch to dynamics.exe'])
time.sleep(0.5)
play("ready")
elif cmd == 'off':
play("off", True)
porcupine.delete()
exit(0)
# `-1` is the default input audio device.
recorder = PvRecorder(device_index=config.MICROPHONE_INDEX, frame_length=porcupine.frame_length)
recorder.start()
print('Using device: %s' % recorder.selected_device)
print(f"Jarvis (v3.0) начал свою работу ...")
play("run")
time.sleep(0.5)
ltc = time.time() - 1000
while True:
try:
pcm = recorder.read()
keyword_index = porcupine.process(pcm)
if keyword_index >= 0:
recorder.stop()
play("greet", True)
print("Yes, sir.")
recorder.start() # prevent self recording
ltc = time.time()
while time.time() - ltc <= 10:
pcm = recorder.read()
sp = struct.pack("h" * len(pcm), *pcm)
if kaldi_rec.AcceptWaveform(sp):
if va_respond(json.loads(kaldi_rec.Result())["text"]):
ltc = time.time()
break
except Exception as err:
print(f"Unexpected {err=}, {type(err)=}")
raise