1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
import webiopi
webiopi.setDebug()
GPIO = webiopi.GPIO
ICMP_DEVICE = "icmp0"
GAS_PORT = 0
SLEEP = 1
class State(object):
def __init__(self, name, checks, expr, func):
self.name = name
self.checks = checks
self.cnt = 0
self.enabled = True
self.expr = expr
self.func = func
def incr(self, duration):
if not self.expr(duration):
self.cnt = 0
return
if self.cnt_reached():
return
self.cnt += 1
if self.cnt_reached() and self.enabled:
self.trigger()
def trigger(self):
webiopi.debug("GAS script - %s triggered" % (self.name))
self.func()
def cnt_reached(self):
return self.cnt >= self.checks
def __str__(self):
enabled = "enabled" if self.enabled else "disabled"
return "%s(%s) %d/%d" % (self.name, enabled, self.cnt,
self.checks)
_online = State('Online', 5, lambda x: x > 0 and x < 300,
lambda: GPIO.digitalWrite(GAS_PORT, GPIO.LOW))
_offline = State('Offline', 30, lambda x: not _online.expr(x),
lambda: GPIO.digitalWrite(GAS_PORT, GPIO.HIGH))
STATES = [ _online, _offline ]
def setup():
webiopi.debug("GAS script - setup")
GPIO.setFunction(GAS_PORT, GPIO.OUT)
GPIO.digitalWrite(GAS_PORT, GPIO.HIGH)
def loop():
icmp = webiopi.deviceInstance(ICMP_DEVICE)
if icmp is not None:
duration = icmp.getMilliseconds()
webiopi.debug("GAS script - %s: %dms" % (ICMP_DEVICE, duration))
for s in STATES:
s.incr(duration)
webiopi.debug("GAS script - %s" % (s))
webiopi.sleep(SLEEP)
def destroy():
webiopi.debug("GAS script - destroy")
GPIO.setFunction(GAS_PORT, GPIO.OUT)
GPIO.digitalWrite(GAS_PORT, GPIO.HIGH)
@webiopi.macro
def gas_getPort():
global GAS_PORT
return str(GAS_PORT)
_force = None
@webiopi.macro
def gas_getForce():
global _force
if _force is None:
return ""
return _force.name
@webiopi.macro
def gas_setForce(state = None):
global _force
webiopi.debug("GAS script - forcing state %s" % (state))
if state is None:
_force = None
for s in STATES:
s.enabled = True
# trigger again if mode is automatic
if s.cnt_reached():
s.trigger()
else:
for s in STATES:
if s.name == state:
_force = s
break
if _force is not None:
for s in STATES:
s.enabled = False
_force.trigger()
return gas_getForce()
|