Files
jarvis/main.py
2023-04-16 17:15:36 +05:00

312 lines
7.9 KiB
Python

import datetime
import json
import os
import queue
import random
import struct
import subprocess
import sys
import time
from ctypes import POINTER, cast
import openai
import pvporcupine
import simpleaudio as sa
import vosk
import yaml
from comtypes import CLSCTX_ALL
from fuzzywuzzy import fuzz
from pvrecorder import PvRecorder
from pycaw.pycaw import (
AudioUtilities,
IAudioEndpointVolume
)
from rich import print
import config
import tts
# some consts
CDIR = os.getcwd()
VA_CMD_LIST = yaml.safe_load(
open('commands.yaml', 'rt', encoding='utf8'),
)
# ChatGPT vars
message_log = [
{"role": "system", "content": "Ты голосовой ассистент из железного человека."}
]
# Set a flag to keep track of whether this is the first request in the conversation
first_request = True
# init openai
openai.api_key = config.OPENAI_TOKEN
# PORCUPINE
porcupine = pvporcupine.create(
access_key=config.PICOVOICE_TOKEN,
keywords=['jarvis'],
sensitivities=[1]
)
# print(pvporcupine.KEYWORDS)
# VOSK
model = vosk.Model("model_small")
samplerate = 16000
device = config.MICROPHONE_INDEX
kaldi_rec = vosk.KaldiRecognizer(model, samplerate)
q = queue.Queue()
def gpt_answer():
global message_log
model_engine = "gpt-3.5-turbo"
max_tokens = 256 # default 1024
response = openai.ChatCompletion.create(
model=model_engine,
messages=message_log,
max_tokens=max_tokens,
temperature=0.7,
top_p=1,
stop=None
)
# Find the first response from the chatbot that has text in it (some responses may not have text)
for choice in response.choices:
if "text" in choice:
return choice.text
# If no response with text is found, return the first response's content (which may be empty)
return response.choices[0].message.content
# play(f'{CDIR}\\sound\\ok{random.choice([1, 2, 3, 4])}.wav')
def play(phrase, wait_done=True):
global recorder
filename = f"{CDIR}\\sound\\"
if phrase == "greet": # for py 3.8
filename += f"greet{random.choice([1, 2, 3])}.wav"
elif phrase == "ok":
filename += f"ok{random.choice([1, 2, 3])}.wav"
elif phrase == "not_found":
filename += "not_found.wav"
elif phrase == "thanks":
filename += "thanks.wav"
elif phrase == "run":
filename += "run.wav"
elif phrase == "stupid":
filename += "stupid.wav"
elif phrase == "ready":
filename += "ready.wav"
elif phrase == "off":
filename += "off.wav"
if wait_done:
recorder.stop()
wave_obj = sa.WaveObject.from_wave_file(filename)
play_obj = wave_obj.play()
if wait_done:
play_obj.wait_done()
# time.sleep((len(wave_obj.audio_data) / wave_obj.sample_rate) + 0.5)
# print("END")
# time.sleep(0.5)
recorder.start()
def q_callback(indata, frames, time, status):
if status:
print(status, file=sys.stderr)
q.put(bytes(indata))
def va_respond(voice: str):
global recorder, message_log, first_request
print(f"Распознано: {voice}")
cmd = recognize_cmd(filter_cmd(voice))
print(cmd)
if len(cmd['cmd'].strip()) <= 0:
return False
elif cmd['percent'] < 70 or cmd['cmd'] not in VA_CMD_LIST.keys():
# play("not_found")
# tts.va_speak("Что?")
if fuzz.ratio(voice.join(voice.split()[:1]).strip(), "скажи") > 75:
if first_request:
message_log.append({"role": "user", "content": voice})
first_request = False
response = gpt_answer()
message_log.append({"role": "assistant", "content": response})
recorder.stop()
tts.va_speak(response)
time.sleep(0.5)
recorder.start()
return False
else:
play("not_found")
time.sleep(1)
return False
else:
execute_cmd(cmd['cmd'], voice)
return True
def filter_cmd(raw_voice: str):
cmd = raw_voice
for x in config.VA_ALIAS:
cmd = cmd.replace(x, "").strip()
for x in config.VA_TBR:
cmd = cmd.replace(x, "").strip()
return cmd
def recognize_cmd(cmd: str):
rc = {'cmd': '', 'percent': 0}
for c, v in VA_CMD_LIST.items():
for x in v:
vrt = fuzz.ratio(cmd, x)
if vrt > rc['percent']:
rc['cmd'] = c
rc['percent'] = vrt
return rc
def execute_cmd(cmd: str, voice: str):
if cmd == 'open_browser':
subprocess.Popen([f'{CDIR}\\custom-commands\\Run browser.exe'])
play("ok")
elif cmd == 'open_youtube':
subprocess.Popen([f'{CDIR}\\custom-commands\\Run youtube.exe'])
play("ok")
elif cmd == 'open_google':
subprocess.Popen([f'{CDIR}\\custom-commands\\Run google.exe'])
play("ok")
elif cmd == 'music':
subprocess.Popen([f'{CDIR}\\custom-commands\\Run music.exe'])
play("ok")
elif cmd == 'music_off':
subprocess.Popen([f'{CDIR}\\custom-commands\\Stop music.exe'])
time.sleep(0.2)
play("ok")
elif cmd == 'music_save':
subprocess.Popen([f'{CDIR}\\custom-commands\\Save music.exe'])
time.sleep(0.2)
play("ok")
elif cmd == 'music_next':
subprocess.Popen([f'{CDIR}\\custom-commands\\Next music.exe'])
time.sleep(0.2)
play("ok")
elif cmd == 'music_prev':
subprocess.Popen([f'{CDIR}\\custom-commands\\Prev music.exe'])
time.sleep(0.2)
play("ok")
elif cmd == 'sound_off':
play("ok", True)
devices = AudioUtilities.GetSpeakers()
interface = devices.Activate(IAudioEndpointVolume._iid_, CLSCTX_ALL, None)
volume = cast(interface, POINTER(IAudioEndpointVolume))
volume.SetMute(1, None)
elif cmd == 'sound_on':
devices = AudioUtilities.GetSpeakers()
interface = devices.Activate(IAudioEndpointVolume._iid_, CLSCTX_ALL, None)
volume = cast(interface, POINTER(IAudioEndpointVolume))
volume.SetMute(0, None)
play("ok")
elif cmd == 'thanks':
play("thanks")
elif cmd == 'stupid':
play("stupid")
elif cmd == 'gaming_mode_on':
play("ok")
subprocess.check_call([f'{CDIR}\\custom-commands\\Switch to gaming mode.exe'])
play("ready")
elif cmd == 'gaming_mode_off':
play("ok")
subprocess.check_call([f'{CDIR}\\custom-commands\\Switch back to workspace.exe'])
play("ready")
elif cmd == 'switch_to_headphones':
play("ok")
subprocess.check_call([f'{CDIR}\\custom-commands\\Switch to headphones.exe'])
time.sleep(0.5)
play("ready")
elif cmd == 'switch_to_dynamics':
play("ok")
subprocess.check_call([f'{CDIR}\\custom-commands\\Switch to dynamics.exe'])
time.sleep(0.5)
play("ready")
elif cmd == 'off':
play("off", True)
porcupine.delete()
exit(0)
# `-1` is the default input audio device.
recorder = PvRecorder(device_index=config.MICROPHONE_INDEX, frame_length=porcupine.frame_length)
recorder.start()
print('Using device: %s' % recorder.selected_device)
print(f"Jarvis (v3.0) начал свою работу ...")
play("run")
time.sleep(0.5)
ltc = time.time() - 1000
while True:
try:
pcm = recorder.read()
keyword_index = porcupine.process(pcm)
if keyword_index >= 0:
recorder.stop()
play("greet", True)
print("Yes, sir.")
recorder.start() # prevent self recording
ltc = time.time()
while time.time() - ltc <= 10:
pcm = recorder.read()
sp = struct.pack("h" * len(pcm), *pcm)
if kaldi_rec.AcceptWaveform(sp):
if va_respond(json.loads(kaldi_rec.Result())["text"]):
ltc = time.time()
break
except Exception as err:
print(f"Unexpected {err=}, {type(err)=}")
raise