Files
jarvis/main.py
2023-04-16 16:59:18 +05:00

291 lines
7.2 KiB
Python

import datetime
import json
import os
import queue
import random
import struct
import subprocess
import sys
import time
from ctypes import POINTER, cast
import openai
import pvporcupine
import simpleaudio as sa
import vosk
import yaml
from comtypes import CLSCTX_ALL
from fuzzywuzzy import fuzz
from pvrecorder import PvRecorder
from pycaw.pycaw import (
AudioUtilities,
IAudioEndpointVolume
)
from rich import print
import config
import tts
# some consts
CDIR = os.getcwd()
VA_CMD_LIST = yaml.safe_load(
open('commands.yaml', 'rt', encoding='utf8'),
)
# init openai
openai.api_key = config.OPENAI_TOKEN
# PORCUPINE
porcupine = pvporcupine.create(
access_key=config.PICOVOICE_TOKEN,
keywords=['jarvis'],
sensitivities=[1]
)
# print(pvporcupine.KEYWORDS)
# VOSK
model = vosk.Model("model_small")
samplerate = 16000
device = config.MICROPHONE_INDEX
kaldi_rec = vosk.KaldiRecognizer(model, samplerate)
q = queue.Queue()
def gpt_answer(message):
model_engine = "gpt-3.5-turbo"
max_tokens = 256 # default 1024
completion = openai.Completion.create(
engine=model_engine,
prompt=message,
max_tokens=max_tokens,
temperature=0.5,
top_p=1,
frequency_penalty=0,
presence_penalty=0
)
return completion.choices[0].text
# play(f'{CDIR}\\sound\\ok{random.choice([1, 2, 3, 4])}.wav')
def play(phrase, wait_done=True):
global recorder
filename = f"{CDIR}\\sound\\"
if phrase == "greet": # for py 3.8
filename += f"greet{random.choice([1, 2, 3])}.wav"
elif phrase == "ok":
filename += f"ok{random.choice([1, 2, 3])}.wav"
elif phrase == "not_found":
filename += "not_found.wav"
elif phrase == "thanks":
filename += "thanks.wav"
elif phrase == "run":
filename += "run.wav"
elif phrase == "stupid":
filename += "stupid.wav"
elif phrase == "ready":
filename += "ready.wav"
elif phrase == "off":
filename += "off.wav"
if wait_done:
recorder.stop()
wave_obj = sa.WaveObject.from_wave_file(filename)
play_obj = wave_obj.play()
if wait_done:
play_obj.wait_done()
# time.sleep((len(wave_obj.audio_data) / wave_obj.sample_rate) + 0.5)
# print("END")
# time.sleep(0.5)
recorder.start()
def q_callback(indata, frames, time, status):
if status:
print(status, file=sys.stderr)
q.put(bytes(indata))
def va_respond(voice: str):
global recorder
print(f"Распознано: {voice}")
cmd = recognize_cmd(filter_cmd(voice))
print(cmd)
if len(cmd['cmd'].strip()) <= 0:
return False
elif cmd['percent'] < 70 or cmd['cmd'] not in VA_CMD_LIST.keys():
# play("not_found")
# tts.va_speak("Что?")
if fuzz.ratio(voice.join(voice.split()[:1]).strip(), "скажи") > 75:
gpt_result = gpt_answer(voice)
recorder.stop()
tts.va_speak(gpt_result)
time.sleep(1)
recorder.start()
return False
else:
play("not_found")
time.sleep(1)
return False
else:
execute_cmd(cmd['cmd'], voice)
return True
def filter_cmd(raw_voice: str):
cmd = raw_voice
for x in config.VA_ALIAS:
cmd = cmd.replace(x, "").strip()
for x in config.VA_TBR:
cmd = cmd.replace(x, "").strip()
return cmd
def recognize_cmd(cmd: str):
rc = {'cmd': '', 'percent': 0}
for c, v in VA_CMD_LIST.items():
for x in v:
vrt = fuzz.ratio(cmd, x)
if vrt > rc['percent']:
rc['cmd'] = c
rc['percent'] = vrt
return rc
def execute_cmd(cmd: str, voice: str):
if cmd == 'open_browser':
subprocess.Popen([f'{CDIR}\\custom-commands\\Run browser.exe'])
play("ok")
elif cmd == 'open_youtube':
subprocess.Popen([f'{CDIR}\\custom-commands\\Run youtube.exe'])
play("ok")
elif cmd == 'open_google':
subprocess.Popen([f'{CDIR}\\custom-commands\\Run google.exe'])
play("ok")
elif cmd == 'music':
subprocess.Popen([f'{CDIR}\\custom-commands\\Run music.exe'])
play("ok")
elif cmd == 'music_off':
subprocess.Popen([f'{CDIR}\\custom-commands\\Stop music.exe'])
time.sleep(0.2)
play("ok")
elif cmd == 'music_save':
subprocess.Popen([f'{CDIR}\\custom-commands\\Save music.exe'])
time.sleep(0.2)
play("ok")
elif cmd == 'music_next':
subprocess.Popen([f'{CDIR}\\custom-commands\\Next music.exe'])
time.sleep(0.2)
play("ok")
elif cmd == 'music_prev':
subprocess.Popen([f'{CDIR}\\custom-commands\\Prev music.exe'])
time.sleep(0.2)
play("ok")
elif cmd == 'sound_off':
play("ok", True)
devices = AudioUtilities.GetSpeakers()
interface = devices.Activate(IAudioEndpointVolume._iid_, CLSCTX_ALL, None)
volume = cast(interface, POINTER(IAudioEndpointVolume))
volume.SetMute(1, None)
elif cmd == 'sound_on':
devices = AudioUtilities.GetSpeakers()
interface = devices.Activate(IAudioEndpointVolume._iid_, CLSCTX_ALL, None)
volume = cast(interface, POINTER(IAudioEndpointVolume))
volume.SetMute(0, None)
play("ok")
elif cmd == 'thanks':
play("thanks")
elif cmd == 'stupid':
play("stupid")
elif cmd == 'gaming_mode_on':
play("ok")
subprocess.check_call([f'{CDIR}\\custom-commands\\Switch to gaming mode.exe'])
play("ready")
elif cmd == 'gaming_mode_off':
play("ok")
subprocess.check_call([f'{CDIR}\\custom-commands\\Switch back to workspace.exe'])
play("ready")
elif cmd == 'switch_to_headphones':
play("ok")
subprocess.check_call([f'{CDIR}\\custom-commands\\Switch to headphones.exe'])
time.sleep(0.5)
play("ready")
elif cmd == 'switch_to_dynamics':
play("ok")
subprocess.check_call([f'{CDIR}\\custom-commands\\Switch to dynamics.exe'])
time.sleep(0.5)
play("ready")
elif cmd == 'off':
play("off", True)
porcupine.delete()
exit(0)
# `-1` is the default input audio device.
recorder = PvRecorder(device_index=config.MICROPHONE_INDEX, frame_length=porcupine.frame_length)
recorder.start()
print('Using device: %s' % recorder.selected_device)
print(f"Jarvis (v3.0) начал свою работу ...")
play("run")
time.sleep(0.5)
ltc = time.time() - 1000
while True:
try:
pcm = recorder.read()
keyword_index = porcupine.process(pcm)
if keyword_index >= 0:
recorder.stop()
play("greet", True)
print("Yes, sir.")
recorder.start() # prevent self recording
ltc = time.time()
while time.time() - ltc <= 10:
pcm = recorder.read()
sp = struct.pack("h" * len(pcm), *pcm)
if kaldi_rec.AcceptWaveform(sp):
if va_respond(json.loads(kaldi_rec.Result())["text"]):
ltc = time.time()
break
except Exception as err:
print(f"Unexpected {err=}, {type(err)=}")
raise