Wifi Led Fix

This commit is contained in:
Chizuui
2025-12-01 14:26:02 +07:00
parent cfb042fc48
commit e02b1a87ba
14 changed files with 1939 additions and 0 deletions

View File

@@ -0,0 +1,265 @@
# Copyright (c) 2015-2019 Volodymyr Shymanskyy. See the file LICENSE for copying permission.
__version__ = "1.0.0"
import struct
import time
import sys
import os
try:
import machine
gettime = lambda: time.ticks_ms()
SOCK_TIMEOUT = 0
except ImportError:
const = lambda x: x
gettime = lambda: int(time.time() * 1000)
SOCK_TIMEOUT = 0.05
def dummy(*args):
pass
MSG_RSP = const(0)
MSG_LOGIN = const(2)
MSG_PING = const(6)
MSG_TWEET = const(12)
MSG_NOTIFY = const(14)
MSG_BRIDGE = const(15)
MSG_HW_SYNC = const(16)
MSG_INTERNAL = const(17)
MSG_PROPERTY = const(19)
MSG_HW = const(20)
MSG_HW_LOGIN = const(29)
MSG_EVENT_LOG = const(64)
MSG_REDIRECT = const(41) # TODO: not implemented
MSG_DBG_PRINT = const(55) # TODO: not implemented
STA_SUCCESS = const(200)
STA_INVALID_TOKEN = const(9)
DISCONNECTED = const(0)
CONNECTING = const(1)
CONNECTED = const(2)
print("""
___ __ __
/ _ )/ /_ _____ / /__
/ _ / / // / _ \\/ '_/
/____/_/\\_, /_//_/_/\\_\\
/___/ for Python v""" + __version__ + " (" + sys.platform + ")\n")
class EventEmitter:
def __init__(self):
self._cbks = {}
def on(self, evt, f=None):
if f:
self._cbks[evt] = f
else:
def D(f):
self._cbks[evt] = f
return f
return D
def emit(self, evt, *a, **kv):
if evt in self._cbks:
self._cbks[evt](*a, **kv)
class BlynkProtocol(EventEmitter):
def __init__(self, auth, tmpl_id=None, fw_ver=None, heartbeat=50, buffin=1024, log=None):
EventEmitter.__init__(self)
self.heartbeat = heartbeat*1000
self.buffin = buffin
self.log = log or dummy
self.auth = auth
self.tmpl_id = tmpl_id
self.fw_ver = fw_ver
self.state = DISCONNECTED
self.connect()
def virtual_write(self, pin, *val):
self._send(MSG_HW, 'vw', pin, *val)
def send_internal(self, pin, *val):
self._send(MSG_INTERNAL, pin, *val)
def set_property(self, pin, prop, *val):
self._send(MSG_PROPERTY, pin, prop, *val)
def sync_virtual(self, *pins):
self._send(MSG_HW_SYNC, 'vr', *pins)
def log_event(self, *val):
self._send(MSG_EVENT_LOG, *val)
def _send(self, cmd, *args, **kwargs):
if 'id' in kwargs:
id = kwargs.get('id')
else:
id = self.msg_id
self.msg_id += 1
if self.msg_id > 0xFFFF:
self.msg_id = 1
if cmd == MSG_RSP:
data = b''
dlen = args[0]
else:
data = ('\0'.join(map(str, args))).encode('utf8')
dlen = len(data)
self.log('<', cmd, id, '|', *args)
msg = struct.pack("!BHH", cmd, id, dlen) + data
self.lastSend = gettime()
self._write(msg)
def connect(self):
if self.state != DISCONNECTED: return
self.msg_id = 1
(self.lastRecv, self.lastSend, self.lastPing) = (gettime(), 0, 0)
self.bin = b""
self.state = CONNECTING
self._send(MSG_HW_LOGIN, self.auth)
def disconnect(self):
if self.state == DISCONNECTED: return
self.bin = b""
self.state = DISCONNECTED
self.emit('disconnected')
def process(self, data=None):
if not (self.state == CONNECTING or self.state == CONNECTED): return
now = gettime()
if now - self.lastRecv > self.heartbeat+(self.heartbeat//2):
return self.disconnect()
if (now - self.lastPing > self.heartbeat//10 and
(now - self.lastSend > self.heartbeat or
now - self.lastRecv > self.heartbeat)):
self._send(MSG_PING)
self.lastPing = now
if data != None and len(data):
self.bin += data
while True:
if len(self.bin) < 5:
break
cmd, i, dlen = struct.unpack("!BHH", self.bin[:5])
if i == 0: return self.disconnect()
self.lastRecv = now
if cmd == MSG_RSP:
self.bin = self.bin[5:]
self.log('>', cmd, i, '|', dlen)
if self.state == CONNECTING and i == 1:
if dlen == STA_SUCCESS:
self.state = CONNECTED
dt = now - self.lastSend
info = ['ver', __version__, 'h-beat', self.heartbeat//1000, 'buff-in', self.buffin, 'dev', sys.platform+'-py']
if self.tmpl_id:
info.extend(['tmpl', self.tmpl_id])
info.extend(['fw-type', self.tmpl_id])
if self.fw_ver:
info.extend(['fw', self.fw_ver])
self._send(MSG_INTERNAL, *info)
try:
self.emit('connected', ping=dt)
except TypeError:
self.emit('connected')
else:
if dlen == STA_INVALID_TOKEN:
self.emit("invalid_auth")
print("Invalid auth token")
return self.disconnect()
else:
if dlen >= self.buffin:
print("Cmd too big: ", dlen)
return self.disconnect()
if len(self.bin) < 5+dlen:
break
data = self.bin[5:5+dlen]
self.bin = self.bin[5+dlen:]
args = list(map(lambda x: x.decode('utf8'), data.split(b'\0')))
self.log('>', cmd, i, '|', ','.join(args))
if cmd == MSG_PING:
self._send(MSG_RSP, STA_SUCCESS, id=i)
elif cmd == MSG_HW or cmd == MSG_BRIDGE:
if args[0] == 'vw':
self.emit("V"+args[1], args[2:])
self.emit("V*", args[1], args[2:])
elif cmd == MSG_INTERNAL:
self.emit("internal:"+args[0], args[1:])
elif cmd == MSG_REDIRECT:
self.emit("redirect", args[0], int(args[1]))
else:
print("Unexpected command: ", cmd)
return self.disconnect()
import socket
class Blynk(BlynkProtocol):
def __init__(self, auth, **kwargs):
self.insecure = kwargs.pop('insecure', False)
self.server = kwargs.pop('server', 'blynk.cloud')
self.port = kwargs.pop('port', 80 if self.insecure else 443)
BlynkProtocol.__init__(self, auth, **kwargs)
self.on('redirect', self.redirect)
def redirect(self, server, port):
self.server = server
self.port = port
self.disconnect()
self.connect()
def connect(self):
print('Connecting to %s:%d...' % (self.server, self.port))
s = socket.socket()
s.connect(socket.getaddrinfo(self.server, self.port)[0][-1])
try:
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except:
pass
if self.insecure:
self.conn = s
else:
try:
import ussl
ssl_context = ussl
except ImportError:
import ssl
ssl_context = ssl.create_default_context()
self.conn = ssl_context.wrap_socket(s, server_hostname=self.server)
try:
self.conn.settimeout(SOCK_TIMEOUT)
except:
s.settimeout(SOCK_TIMEOUT)
BlynkProtocol.connect(self)
def _write(self, data):
#print('<', data)
self.conn.write(data)
# TODO: handle disconnect
def run(self):
data = b''
try:
data = self.conn.read(self.buffin)
#print('>', data)
except KeyboardInterrupt:
raise
except socket.timeout:
# No data received, call process to send ping messages when needed
pass
except: # TODO: handle disconnect
return
self.process(data)

View File

@@ -0,0 +1,129 @@
"""
BlynkTimer
blynk-library-python
Module for launching timed Blynk actions
with polling
"""
import time
class BlynkTimer:
'''Executes functions after a defined period of time'''
_MAX_TIMERS = 16
def __init__(self):
self.timers = []
self.ids = self._get_unique_id()
def _get_unique_id(self, current=0):
'''yields unique id for new timer'''
numId = current
while numId < self._MAX_TIMERS:
yield numId
numId += 1
def _add(self, func, **kwargs):
'''Inits Timer'''
timerId = next(self.ids)
timer = Timer(timerId, func, **kwargs)
self.timers.append(timer)
return timer
def _get(self, timerId):
'''Gets timer by id'''
timer = [t for t in self.timers if t.id == timerId]
if len(timer) <= 0:
return None
return timer[0]
def _delete(self, timerId):
'''Deletes timer'''
timer = self._get(timerId)
timer.disable()
self.timers = [t for t in self.timers if t.id != timerId]
num_timers = self.get_num_timers()[0]
self.ids = self._get_unique_id(current=num_timers)
return timerId
def get_num_timers(self):
'''Returns number of used timer slots'''
num_timers = len(self.timers)
return (num_timers, self._MAX_TIMERS)
def is_enabled(self, timerId):
'''Returns true if timer is enabled'''
timer = self._get(timerId)
return timer.enabled
def set_interval(self, value, func):
'''Sets time interval for function'''
timer = self._add(func)
timer.set_interval(value)
return timer.id
def set_timeout(self, value, func):
'''Runs function once after timeout'''
timer = self._add(func, post_run=self._delete)
timer.set_interval(value)
return timer.id
def enable(self, timerId):
'''Enables timer'''
timer = self._get(timerId)
timer.enable()
return timerId
def disable(self, timerId):
'''Disables timer'''
timer = self._get(timerId)
timer.disable()
return timerId
def run(self):
'''Polls timers'''
[t.run() for t in self.timers]
class Timer:
'''Runs function after specific interval'''
def __init__(self, id, func, **kwargs):
self.id = id
self.func = func
self.interval = None
self.start_time = None
self.enabled = False
self.on_post_run = kwargs.get('post_run', None)
def _handle_post_run(self):
'''handles post run events'''
self.start_time += self.interval
if self.on_post_run:
return self.on_post_run(self.id)
def enable(self):
'''enables Timer'''
self.enabled = True
self.start_time = time.time()
def disable(self):
'''disables timer'''
self.enabled = False
self.start_time = None
def set_interval(self, value):
'''Sets Time Interval for calling function'''
self.interval = value
self.enable()
def run(self):
'''Runs function if interval has passed'''
if not self.enabled:
return
now = time.time()
if now - self.start_time > self.interval:
self.func()
self._handle_post_run()

View File

@@ -0,0 +1,253 @@
# Copyright (c) 2015-2019 Volodymyr Shymanskyy. See the file LICENSE for copying permission.
_VERSION = "0.2.0"
import struct
import time
import os
try:
import machine
gettime = lambda: time.ticks_ms()
except ImportError:
const = lambda x: x
gettime = lambda: int(time.time() * 1000)
def dummy(*args):
pass
MSG_RSP = const(0)
MSG_LOGIN = const(2)
MSG_PING = const(6)
MSG_TWEET = const(12)
MSG_EMAIL = const(13)
MSG_NOTIFY = const(14)
MSG_BRIDGE = const(15)
MSG_HW_SYNC = const(16)
MSG_INTERNAL = const(17)
MSG_PROPERTY = const(19)
MSG_HW = const(20)
MSG_HW_LOGIN = const(29)
MSG_EVENT_LOG = const(64)
MSG_REDIRECT = const(41) # TODO: not implemented
MSG_DBG_PRINT = const(55) # TODO: not implemented
STA_SUCCESS = const(200)
STA_INVALID_TOKEN = const(9)
DISCONNECTED = const(0)
CONNECTING = const(1)
CONNECTED = const(2)
print("""
___ __ __
/ _ )/ /_ _____ / /__
/ _ / / // / _ \\/ '_/
/____/_/\\_, /_//_/_/\\_\\
/___/ for Python v""" + _VERSION + " (" + os.uname()[0] + ")\n")
class BlynkProtocol:
def __init__(self, auth, heartbeat=10, buffin=1024, log=None):
self.callbacks = {}
self.heartbeat = heartbeat*1000
self.buffin = buffin
self.log = log or dummy
self.auth = auth
self.state = DISCONNECTED
self.connect()
def ON(blynk, evt):
class Decorator:
def __init__(self, func):
self.func = func
blynk.callbacks[evt] = func
def __call__(self):
return self.func()
return Decorator
# These are mainly for backward-compatibility you can use "blynk.ON()" instead
def VIRTUAL_READ(blynk, pin):
class Decorator():
def __init__(self, func):
self.func = func
blynk.callbacks["readV"+str(pin)] = func
def __call__(self):
return self.func()
return Decorator
def VIRTUAL_WRITE(blynk, pin):
class Decorator():
def __init__(self, func):
self.func = func
blynk.callbacks["V"+str(pin)] = func
def __call__(self):
return self.func()
return Decorator
def on(self, evt, func):
self.callbacks[evt] = func
def emit(self, evt, *a, **kv):
self.log("Event:", evt, "->", *a)
if evt in self.callbacks:
self.callbacks[evt](*a, **kv)
def virtual_write(self, pin, *val):
self._send(MSG_HW, 'vw', pin, *val)
def set_property(self, pin, prop, *val):
self._send(MSG_PROPERTY, pin, prop, *val)
def sync_virtual(self, *pins):
self._send(MSG_HW_SYNC, 'vr', *pins)
def notify(self, msg):
self._send(MSG_NOTIFY, msg)
def tweet(self, msg):
self._send(MSG_TWEET, msg)
def log_event(self, event, descr=None):
if descr==None:
self._send(MSG_EVENT_LOG, event)
else:
self._send(MSG_EVENT_LOG, event, descr)
def _send(self, cmd, *args, **kwargs):
if "id" in kwargs:
id = kwargs.id
else:
id = self.msg_id
self.msg_id += 1
if self.msg_id > 0xFFFF:
self.msg_id = 1
if cmd == MSG_RSP:
data = b''
dlen = args[0]
else:
data = ('\0'.join(map(str, args))).encode('utf8')
dlen = len(data)
self.log('<', cmd, id, '|', *args)
msg = struct.pack("!BHH", cmd, id, dlen) + data
self.lastSend = gettime()
self._write(msg)
def connect(self):
if self.state != DISCONNECTED: return
self.msg_id = 1
(self.lastRecv, self.lastSend, self.lastPing) = (gettime(), 0, 0)
self.bin = b""
self.state = CONNECTING
self._send(MSG_HW_LOGIN, self.auth)
def disconnect(self):
if self.state == DISCONNECTED: return
self.state = DISCONNECTED
self.emit('disconnected')
def process(self, data=b''):
if not (self.state == CONNECTING or self.state == CONNECTED): return
now = gettime()
if now - self.lastRecv > self.heartbeat+(self.heartbeat//2):
return self.disconnect()
if (now - self.lastPing > self.heartbeat//10 and
(now - self.lastSend > self.heartbeat or
now - self.lastRecv > self.heartbeat)):
self._send(MSG_PING)
self.lastPing = now
if data != None and len(data):
self.bin += data
while True:
if len(self.bin) < 5: return
cmd, i, dlen = struct.unpack("!BHH", self.bin[:5])
if i == 0: return self.disconnect()
self.lastRecv = now
if cmd == MSG_RSP:
self.bin = self.bin[5:]
self.log('>', cmd, i, '|', dlen)
if self.state == CONNECTING and i == 1:
if dlen == STA_SUCCESS:
self.state = CONNECTED
dt = now - self.lastSend
self._send(MSG_INTERNAL, 'ver', _VERSION, 'h-beat', self.heartbeat//1000, 'buff-in', self.buffin, 'dev', 'python')
try:
self.emit('connected', ping=dt)
except TypeError:
self.emit('connected')
else:
if dlen == STA_INVALID_TOKEN:
print("Invalid auth token")
return self.disconnect()
else:
if dlen >= self.buffin:
print("Cmd too big: ", dlen)
return self.disconnect()
if len(self.bin) < 5+dlen: return
data = self.bin[5:5+dlen]
self.bin = self.bin[5+dlen:]
args = list(map(lambda x: x.decode('utf8'), data.split(b'\0')))
self.log('>', cmd, i, '|', ','.join(args))
if cmd == MSG_PING:
self._send(MSG_RSP, STA_SUCCESS, id=i)
elif cmd == MSG_HW or cmd == MSG_BRIDGE:
if args[0] == 'vw':
self.emit("V"+args[1], args[2:])
self.emit("V*", args[1], args[2:])
elif args[0] == 'vr':
self.emit("readV"+args[1])
self.emit("readV*", args[1])
elif cmd == MSG_INTERNAL:
self.emit("int_"+args[1], args[2:])
else:
print("Unexpected command: ", cmd)
return self.disconnect()
import socket
class Blynk(BlynkProtocol):
def __init__(self, auth, **kwargs):
self.server = kwargs.pop('server', 'blynk-cloud.com')
self.port = kwargs.pop('port', 80)
BlynkProtocol.__init__(self, auth, **kwargs)
def connect(self):
try:
self.conn = socket.socket()
self.conn.connect(socket.getaddrinfo(self.server, self.port)[0][4])
try:
self.conn.settimeout(eval('0.05'))
except:
self.conn.settimeout(0)
BlynkProtocol.connect(self)
except:
raise ValueError('Connection with the Blynk server %s:%d failed' % (self.server, self.port))
def _write(self, data):
#print('<', data.hex())
self.conn.send(data)
# TODO: handle disconnect
def run(self):
data = b''
try:
data = self.conn.recv(self.buffin)
#print('>', data.hex())
except KeyboardInterrupt:
raise
except: # TODO: handle disconnect
pass
self.process(data)

View File

@@ -0,0 +1 @@
import machine; machine.reset()

View File

@@ -0,0 +1,141 @@
import gc
import time
import BlynkLib
import machine
import network
from machine import ADC, Pin
# --- KONFIGURASI ---
led = Pin(2, Pin.OUT)
WIFI_SSID = "chizui-desktop" # Pastikan SSID 2.4GHz
WIFI_PASS = "chizui03"
BLYNK_AUTH = "JtlD6EgQ3A-Q-Qt3iW0gnzug67NAQmP3"
# Variabel Global
pot = ADC(0)
pot_old = 0
pot_now = 0
pot_lock = 0
pot_time = 0
pot_value_old = 0
pot_value = False
pot_next_check = 300
# --- FUNGSI KONEKSI WIFI ---
def connect_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print("Menghubungkan ke WiFi...")
wlan.connect(WIFI_SSID, WIFI_PASS)
while not wlan.isconnected():
time.sleep(1)
print(".", end="")
print("\nWiFi Terhubung!")
print("IP Address:", wlan.ifconfig()[0])
connect_wifi()
# --- INISIALISASI BLYNK ---
# Kita gunakan server blynk.cloud sesuai akun baru
blynk = BlynkLib.Blynk(BLYNK_AUTH, server="blynk.cloud")
# --- FUNGSI LOGIKA ---
def TombolSentuh():
global pot, pot_old, pot_now, pot_lock, pot_time, pot_value, pot_next_check
now = time.ticks_ms()
elapsed = int(now - pot_time)
if elapsed > pot_next_check:
pot_value = 0
pot_old = pot_now
for x in range(1, 5):
pot_now = pot.read()
if abs(pot_now - pot_old) >= 3:
pot_value = 1
break
pot_old = pot_now
time.sleep(0.01)
pot_time = time.ticks_ms()
if pot_value:
pot_next_check = 500
else:
pot_next_check = 10
return pot_value
gc.collect()
# --- BLYNK HANDLERS (CARA MANUAL) ---
def blynk_disconnected(args):
print("Blynk disconnected")
def v0_write_handler(value):
print("Nilai tombol V0: " + str(value[0]))
if value[0] == "1":
led.value(0)
print("LED menyala")
else:
led.value(1)
print("LED mati")
def v1_write_handler(value):
print("Tombol Sentuh V1 : " + str(value[0]))
def v2_write_handler(value):
print("Uptime Request V2: " + str(value[0]))
# --- DAFTARKAN HANDLER (BAGIAN REVISI) ---
# Ini yang memperbaiki error "3 arguments"
blynk.on("disconnected", blynk_disconnected)
blynk.on("V0", v0_write_handler)
blynk.on("V1", v1_write_handler)
blynk.on("V2", v2_write_handler)
# --- MAIN LOOP ---
def runLoop():
global pot_value_old, pot_value
pot_value = TombolSentuh()
firstBoot = time.time()
oldTick = firstBoot
print("Sistem Siap! Cek App Blynk kamu...")
while True:
blynk.run()
if time.time() > oldTick:
uptime_sec = time.time() - firstBoot
blynk.virtual_write(2, uptime_sec)
oldTick = time.time() + 1
pot_value = TombolSentuh()
if pot_value_old != pot_value:
pot_value_old = pot_value
print("Status tombol sentuh : " + str(pot_value))
if pot_value:
blynk.virtual_write(1, 1)
else:
blynk.virtual_write(1, 0)
gc.collect()
# Jalankan Program
gc.collect()
runLoop()