網頁

2020年6月24日 星期三

Wii 平衡板 修改(三)Jetson Nano 上的 python

CSR 藍牙接收發射器 4.0

哈啦圈 計數


執行須使用 sudo
$ sudo ~/envs/esp32/bin/python scan.pyscan.py
# execute this by
# (microbit) nano@nano-desktop:~/Data/Esp32/bluepy$ sudo ~/envs/microbit/bin/python scan.py


from bluepy.btle import Scanner, DefaultDelegate

class ScanDelegate(DefaultDelegate):
    def __init__(self):
        DefaultDelegate.__init__(self)

    def handleDiscovery(self, dev, isNewDev, isNewData):
        if isNewDev:
            print("Discovered device {}", dev.addr)
        elif isNewData:
            print("Received new data from {}", dev.addr)

scanner = Scanner().withDelegate(ScanDelegate())
devices = scanner.scan(10.0)

for dev in devices:
    print("Device {} ({}), RSSI={} dB", dev.addr, dev.addrType, dev.rssi)
    for (adtype, desc, value) in dev.getScanData():
        print("  {} = {}", desc, value)


notify.py

from bluepy.btle import Peripheral, ADDR_TYPE_RANDOM, ADDR_TYPE_PUBLIC, AssignedNumbers

import time

class Wii(Peripheral):
    def __init__(self, addr):
        #Peripheral.__init__(self, addr, addrType=ADDR_TYPE_RANDOM)
        Peripheral.__init__(self, addr, addrType=ADDR_TYPE_PUBLIC)

if __name__=="__main__":
    cccid = AssignedNumbers.client_characteristic_configuration

    wii = None
    try:
        wii = Wii('C4:4F:33:54:B5:3B')

        service, = [s for s in wii.getServices() if s.uuid=='4fafc201-1fb5-459e-8fcc-c5c9c331914b']
        char_r, = service.getCharacteristics(forUUID=str('beb5483e-36e1-4688-b7f5-ea07361b26a8'))
        char_w, = service.getCharacteristics(forUUID=str('ca89b981-2701-40ea-91ef-1b67c137f9e0'))

        if 0: # This doesn't work
            char_r.write(b'\1\0')

        else:
            desc = wii.getDescriptors(service.hndStart,
                                      service.hndEnd)
            d, = [d for d in desc if d.uuid==cccid]

            wii.writeCharacteristic(d.handle, b'\1\0')

        t0=time.time()
        def print_hr(cHandle, data):
            battery = int.from_bytes(data[0:2], byteorder='little', signed=True)
            cnt = int.from_bytes(data[3:4], byteorder='little', signed=False)
            dataDL = int.from_bytes(data[4:8], byteorder='little', signed=True)
            dataTL = int.from_bytes(data[8:12], byteorder='little', signed=True)
            dataTR = int.from_bytes(data[12:16], byteorder='little', signed=True)
            dataDR = int.from_bytes(data[16:20], byteorder='little', signed=True)
            print('handle: {}\t{}\t{}'.format(cHandle, cnt, battery))
            print('{}\t{}\t{}\t{}'.format(dataDL, dataTL, dataTR, dataDR))
        
        wii.delegate.handleNotification = print_hr

        b = True;
        for x in range(1000):
            if x % 10 == 0:
                if b:
                    char_w.write(b'1');
                    b = False;
                else:
                    char_w.write(b'0');
                    b = True;
            wii.waitForNotifications(3.)

    finally:
        if wii:
            wii.disconnect()


HulaHoop.py
import tkinter as tk
from bluepy.btle import Peripheral, AssignedNumbers, BTLEDisconnectError, BTLEGattError
from bluepy.btle import ADDR_TYPE_RANDOM, ADDR_TYPE_PUBLIC
import threading
import time
import sys

class Wii(Peripheral):
    def __init__(self, addr, fun_notify):
        Peripheral.__init__(self, addr, addrType=ADDR_TYPE_PUBLIC)
        cccid = AssignedNumbers.client_characteristic_configuration
        service, = [s for s in self.getServices() if s.uuid=='4fafc201-1fb5-459e-8fcc-c5c9c331914b']
        char_data, = service.getCharacteristics(forUUID=str('beb5483e-36e1-4688-b7f5-ea07361b26a8'))
        self.char_led, = service.getCharacteristics(forUUID=str('ca89b981-2701-40ea-91ef-1b67c137f9e0'))
        char_button, = service.getCharacteristics(forUUID=str('4e923fa6-ba0c-408a-b1d3-167ec9f1d80d'))
        self.char_data_h = char_data.getHandle()
        self.char_button_h = char_button.getHandle()

        desc = self.getDescriptors(service.hndStart, service.hndEnd)
        for d in desc:
            if d.uuid==cccid:
                self.writeCharacteristic(d.handle, b'\1\0')
        self.delegate.handleNotification = fun_notify

class WiiThread(threading.Thread):
    def __init__(self, mac, fun_notify):
        threading.Thread.__init__(self)
        self.mac = mac
        self.fun_notify = fun_notify
        self.wii = None
        self.do_loop = True
        self.isConnected = False
        self._tareStep = 0;
        self._tareCnt = 0;
        self.tareOk = False
        self.offsetTL = 0.
        self.offsetTR = 0.
        self.offsetDL = 0.
        self.offsetDR = 0.
        self._nextCnt = -1
        self._oldTL = 0.
        self._oldTR = 0.
        self._oldDL = 0.
        self._oldDR = 0.
        self.dataTL = 0.
        self.dataTR = 0.
        self.dataDL = 0.
        self.dataDR = 0.
        self._dataList = []
        self.bodyWeightOk = False
        self._bodyWeight = 0.
        STD_WEIGHT = 80.0
        STD_DATA = 1103051
        self._scale = STD_WEIGHT / STD_DATA;
        self.daemon = True
        self.start()
    def run(self):
        print('WiiThread run')
        while self.do_loop:
            try:
                print('Wii connecting')
                self.wii = Wii(self.mac, self.fun_notify)
                print('Wii connected')
                while self.do_loop:
                    self.wii.waitForNotifications(3.)
            except BTLEDisconnectError:
                print('BTLEDisconnectError')
            except BTLEGattError:
                print('BTLEGattError')
            finally:
                if self.wii:
                    self.wii.disconnect()
                print('Wii disconnect')
    def setTareSta(self):
        self._tareStep = 1
    def _setTare(self, dataTL, dataTR, dataDL, dataDR):
        STABLE_RANGE = 10000
        if abs(self._oldTL - dataTL) > STABLE_RANGE or \
                abs(self._oldTR - dataTR) > STABLE_RANGE or \
                abs(self._oldDL - dataDL) > STABLE_RANGE or \
                abs(self._oldDR - dataDR) > STABLE_RANGE:
            #print('{:.0f}\t{:.0f}\t{:.0f}\t{:.0f}'.format(
            #        abs(self._oldTL - dataTL), 
            #        abs(self._oldTR - dataTR), 
            #        abs(self._oldDL - dataDL), 
            #        abs(self._oldDR - dataDR)))
            self._tareStep = 1
            self._oldTL, self._oldTR, self._oldDL, self._oldDR = dataTL, dataTR, dataDL, dataDR
        if self._tareStep == 1:
            self.offsetTL = 0.
            self.offsetTR = 0.
            self.offsetDL = 0.
            self.offsetDR = 0.
            self._tareStep = 2
            self._tareCnt = 0
        elif self._tareCnt == 100:
            self.offsetTL /= self._tareCnt
            self.offsetTR /= self._tareCnt
            self.offsetDL /= self._tareCnt
            self.offsetDR /= self._tareCnt
            self._tareStep = 0
            self.tareOk = True;
            self._tareCnt = 0
            return True, True
        elif self._tareStep == 2:
            self.offsetTL += dataTL
            self.offsetTR += dataTR
            self.offsetDL += dataDL
            self.offsetDR += dataDR
            self._tareCnt += 1
        return True, False
    def setBodyWeight(self):
        dataTot = self.dataDL + self.dataTL + self.dataTR + self.dataDR
        if len(self._dataList) == 0:
            self._dataList.append(dataTot)
            return False
        maxData = max(self._dataList)
        minData = min(self._dataList)
        avgData = sum(self._dataList)/len(self._dataList)
        if abs(dataTot - avgData) > 12000:
            self._dataList.clear()
            return False
        if len(self._dataList) > 200:
            del self._dataList[0]
            #print('{}\t{:.0f}\t{:.0f}\t{:.0f}'.format(
            #            len(gDataList), maxData-minData, avgData, self.getWeight()))
            if self.getWeight() > 20.0:
                self.bodyWeightOk = True # 取得體重
                self._bodyWeight = self.getRawWeight()
                return True
        self._dataList.append(dataTot)
        return False
    def setData(self, cnt, dataTL, dataTR, dataDL, dataDR):
        if self._tareStep == 0:
            if self._nextCnt == -1:
                self._nextCnt = cnt
            elif self._nextCnt != cnt:
                print("WiiThread cnt diff {} {}".format(self._nextCnt, cnt))
                self._nextCnt = cnt
            if self._nextCnt == 255:
                self._nextCnt = 0
            else:
                self._nextCnt += 1
            self.dataTL = dataTL - self.offsetTL
            self.dataTR = dataTR - self.offsetTR
            self.dataDL = dataDL - self.offsetDL
            self.dataDR = dataDR - self.offsetDR
            self._oldTL = self.dataTL
            self._oldTR = self.dataTR
            self._oldDL = self.dataDL
            self._oldDR = self.dataDR
            return False, False
        return self._setTare(dataTL, dataTR, dataDL, dataDR)
    def getRawWeight(self):
        return (self.dataTL + self.dataTR + self.dataDL + self.dataDR);
    def getWeight(self):
        return self.getRawWeight() * self._scale;
    def getCoordinate(self):
        dataX = ((self.dataDR + self.dataTR) - (self.dataDL + self.dataTL)) / self._bodyWeight  
        dataY = ((self.dataDL + self.dataDR) - (self.dataTL + self.dataTR)) / self._bodyWeight
        return dataX, dataY

window = tk.Tk()
window.title('Wii BlanceBoard')

def rgb2hex(r, g, b):
    return '#{:02X}{:02X}{:02X}'.format(r, g, b)

def hex2rgb(color):
    r = int(color[1:3], 16)
    g = int(color[3:5], 16)
    b = int(color[5:7], 16)
    return r, g, b

def color2rgb(color):
    rgb = window.winfo_rgb(color)
    r, g, b = [x>>8 for x in rgb]
    return r, g, b

def color2hex(color):
    r, g, b = color2rgb(color)
    return rgb2hex(r, g, b)

def colorAdd(hcolor, r=None, g=None, b=None):
    r_, g_, b_ = hex2rgb(hcolor)
    if r != None and (r_ + r) >= 0 and (r_ + r) < 256:
        r_ = r_ + r
    if g != None and (g_ + g) >= 0 and (g_ + g) < 256:
        g_ = g_ + g
    if b != None and (b_ + b) >= 0 and (b_ + b) < 256:
        b_ = b_ + b
    return rgb2hex(r_, g_, b_)

def fun_led():
    if bLed['background'] == color2hex('gray'):
        color = color2hex('red')
        wiiThread.wii.char_led.write(b'1');
    else:
        color = color2hex('gray')
        wiiThread.wii.char_led.write(b'0');
    bLed.config(bg=color, activebackground=color)

def fun_debug():
    gWhirlCnt = 0
    gWhirlCntStr.set('_')
    pass

FONT_FAMILY = 'Arial'
FONT_SIZE = 20

top_frame = tk.Frame(window)
top_frame.pack(side=tk.TOP, fill=tk.X, expand=True)
gRemarkStr = tk.StringVar()
gRemarkStr.set('連線中...')
lRemark = tk.Label(top_frame, textvariable=gRemarkStr, anchor='w', 
        font=(FONT_FAMILY, FONT_SIZE))
lRemark.pack(side=tk.LEFT, fill=tk.X, expand=True)
bLed = tk.Button(top_frame, text='LED', 
        bg=color2hex('gray'), activebackground=color2hex('gray'), 
        font=(FONT_FAMILY, FONT_SIZE),
        command=fun_led)
bLed.pack(side=tk.LEFT)
bDebug = tk.Button(top_frame, text='Debug', 
        bg=color2hex('gray'), activebackground=color2hex('gray'), 
        font=(FONT_FAMILY, FONT_SIZE),
        command=fun_debug)
bDebug.pack(side=tk.LEFT)

#WIN_HEIGHT = 1024
#WIN_WIDTH = 1286
WIN_HEIGHT = 600
WIN_WIDTH = 800
gCanvas = tk.Canvas(window, height=WIN_HEIGHT, width=WIN_WIDTH, 
        highlightbackground='black')
gCanvas.pack()
lineX = gCanvas.create_line(0, WIN_HEIGHT/2, WIN_WIDTH, WIN_HEIGHT/2)
lineY = gCanvas.create_line(WIN_WIDTH/2, 0, WIN_WIDTH/2, WIN_HEIGHT)

bottom_frame = tk.Frame(window)
bottom_frame.pack(side=tk.TOP, fill=tk.X, expand=True)
gWhirlCntStr = tk.StringVar()
gWhirlCntStr.set('_')
lWhirlCnt = tk.Label(bottom_frame, textvariable=gWhirlCntStr, width=8, 
        font=(FONT_FAMILY, FONT_SIZE))
lWhirlCnt.pack(side=tk.LEFT, fill=tk.X, expand=True)
gWeightStr = tk.StringVar()
gWeightStr.set('?Kg')
lWeight = tk.Label(bottom_frame, textvariable=gWeightStr, width=8, 
        font=(FONT_FAMILY, FONT_SIZE))
lWeight.pack(side=tk.LEFT)
gVoltageStr = tk.StringVar()
gVoltageStr.set('?V')
lVoltage = tk.Label(bottom_frame, textvariable=gVoltageStr, width=5, 
        font=(FONT_FAMILY, FONT_SIZE))
lVoltage.pack(side=tk.LEFT)

POINT_CNT = 10
POINT_COLOR = '#FF10FF'
POINT_RADIUS = int(WIN_WIDTH/256)
gPoints = []

def parseData(data):
    battery = int.from_bytes(data[0:2], byteorder='little', signed=True)
    cnt = int.from_bytes(data[3:4], byteorder='little', signed=False)
    dataDR = int.from_bytes(data[4:8], byteorder='little', signed=True)
    dataTR = int.from_bytes(data[8:12], byteorder='little', signed=True)
    dataTL = int.from_bytes(data[12:16], byteorder='little', signed=True)
    dataDL = int.from_bytes(data[16:20], byteorder='little', signed=True)
    return battery, cnt, dataTL, dataTR, dataDL, dataDR
    
def drawPoints(dataX, dataY):
    global gPoints

    if len(gPoints) > POINT_CNT:
        gCanvas.delete(gPoints[0])
        del gPoints[0]
    x = dataX * (WIN_WIDTH/2) + (WIN_WIDTH/2)
    y = dataY * (WIN_HEIGHT/2) + (WIN_HEIGHT/2)
    oval = gCanvas.create_oval(x-POINT_RADIUS, y-POINT_RADIUS, x+POINT_RADIUS, y+POINT_RADIUS, 
            fill=POINT_COLOR, outline=POINT_COLOR)
    gPoints.append(oval)
    hcolor = POINT_COLOR
    addC = int(-255/POINT_CNT)
    for o in gPoints:
        hcolor = colorAdd(hcolor, r=addC, b=addC)
        gCanvas.itemconfig(o, fill=hcolor, outline=hcolor)
        hcolor = gCanvas.itemcget(o, 'fill')

gWhirlCnt = 0
gWhirlDir = 0
gQuadrant = -1
gQuadrantOld = -1

def fun_notify(cHandle, data):
    global gWhirlCnt, gWhirlDir, gQuadrant, gQuadrantOld

    if wiiThread.do_loop == False:
        return
    if cHandle == wiiThread.wii.char_button_h:
        wiiThread.bodyWeightOk = False
        gWeightStr.set('?Kg')
        gRemarkStr.set('請站上去,保持不動...')
        return
    if wiiThread.isConnected == False:
        wiiThread.isConnected = True;
        gRemarkStr.set('校正中,請保持淨空')
        wiiThread.setTareSta()

    battery, cnt, dataTL, dataTR, dataDL, dataDR = parseData(data)
    voltage = 3.3 / 4095 * battery
    gVoltageStr.set('{:.1f}V'.format(voltage))
    #print('handle: {}\t{}\t{:.1f}'.format(cHandle, cnt, voltage))
    in_tare, tare_ok = wiiThread.setData(cnt, dataTL, dataTR, dataDL, dataDR)
    if in_tare:
        if tare_ok:
            wiiThread.bodyWeightOk = False
            gRemarkStr.set('請站上去,保持不動...')
        return
    if wiiThread.tareOk == False:
        return
    if wiiThread.bodyWeightOk == False:
        if wiiThread.setBodyWeight():
            gWeightStr.set('{:.0f}Kg'.format(wiiThread.getWeight()))
            gRemarkStr.set('完成')
        return
    dataX, dataY = wiiThread.getCoordinate()
    #print('{:.2f}\t{:.2f}'.format(dataX, dataY))
    drawPoints(dataX, dataY)
    RADIUS = 0.1
    if dataX > RADIUS and dataY > RADIUS:
        quadrant = 0
    elif dataX < -RADIUS and dataY > RADIUS:
        quadrant = 1
    elif dataX < -RADIUS and dataY < -RADIUS:
        quadrant = 2
    elif dataX > RADIUS and dataY < -RADIUS:
        quadrant = 3
    else:
        quadrant = -1
    if quadrant == -1:
        pass
    elif gQuadrant == -1 or gQuadrantOld == -1:
        gQuadrant = quadrant
        gQuadrantOld = quadrant
    else:
        gQuadrant = quadrant
    if gQuadrant == (gQuadrantOld+1)%4:
        #print("{} {} {}".format(gQuadrant, gQuadrantOld, gWhirlCnt))
        gQuadrantOld = gQuadrant
        gWhirlDir = -1
        gWhirlCnt += gWhirlDir
    elif (gQuadrant+1)%4 == gQuadrantOld:
        #print("{} {} {}".format(gQuadrant, gQuadrantOld, gWhirlCnt))
        gQuadrantOld = gQuadrant
        gWhirlDir = 1
        gWhirlCnt += gWhirlDir
    elif gQuadrant == (gQuadrantOld+2)%4 or \
            (gQuadrant+2)%4 == gQuadrantOld:
        gQuadrantOld = gQuadrant
        gWhirlCnt += gWhirlDir
        pass
    gWhirlCntStr.set('{}'.format(gWhirlCnt))
    #gWhirlCntStr.set('{}'.format(gWhirlCnt//4))
        
def fun_window_exit():
    wiiThread.do_loop = False
    time.sleep(0.2)
    window.destroy()

wiiThread = WiiThread('C4:4F:33:54:B5:3B', fun_notify)
window.protocol("WM_DELETE_WINDOW", fun_window_exit)
window.mainloop()


沒有留言:

張貼留言