Hallo,
ich beschäftige mich erst seit Kurzem mit Python, habe allerdings einiges an Erfahrung mit anderen Scriptsprachen.
Zu meinem Projekt:
Ich habe einen Raspberry (Pi1), auf dem die Haussteuerung (FHEM) läuft, an einem zweiten Raspberry (Pi2) hängt ein RFID Reader, ein Display für Statusausgaben (Wetter aktuell + Forecast, Zeit, Status Alarmanlage), sowie zwei Relais, mit dem die Sirene und die Signalleuchte getriggert werden.
Auf dem Pi2 habe ich 2 Python Scripte laufen. Das Erste bedient den RFID Reader. Wenn ein gültiger Chip gelesen wird, aktiviert oder deaktiviert es per URL Aufruf die Alarmanlage im Pi1.
Das zweite Script prüft permanent den Status der Alarmanlage per URL Aufruf, fragt einmal pro Stunde den Wetter Forecast ab und aktualisiert den Display.
Da beide Scripte permanent laufen ist der Pi2 auch zu 97% ausgelastet, wobei das zweite Script 60% Anteil hat.
Ich wäre Euch dankbar, wenn Ihr Euch den Code mal anschauen könntet und Ihr Tipps zur Optimierung hättet.
Vielleicht noch zur Erklärung, von ein paar Besonderheiten. Um sicherzustellen, dass das Script nicht abschmiert, wenn der Pi1 nicht verfügbar ist, fange ich das über ein Timeout ab und blende für diesen Fall einen anderen Screen ein, was super funktioniert. Damit ich den Pi1 nicht mit zu vielen Abfragen bombardiere, habe ich ein paar Counter eingebaut, dass nur bei jedem 10. Durchlauf eine Abfrage losgetreten wird. Das ist nicht wirklich schön gelöst, geht das besser?
Script für das Display und die GPIO Steuerung:
#coding=utf-8
#!/usr/bin/python
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
import datetime
import time
import os
import sys
from functions import *
wetter = ""
do_not_run = 0
hour = ""
alert = "off"
status = "n.a."
alarmcounter = "0"
#readcounters are used to reduce network traffic, query fhem only 1 of 10 times
readcounter1 = 0
readcounter2 = 0
init_display()
# --------- Main Program ---------
#if armed just check events
#if unarmed check weather
try:
while True:
previus_hour = hour
curr_time = datetime.datetime.now()
hour = curr_time.strftime("%H")
#hour = zeit()
#even if hour = allowed hour, only run once per hour
if (previus_hour <> hour):
do_not_run = 0
#get weather forecast only once per hour as weather.com has a limit of 10000 requests per month
#logging.debug('previousHour:' + str(previus_hour))
#logging.debug('actHour:' + str(hour))
#logging.debug('run:' + str(do_not_run))
if (wetter == "" or do_not_run == 0):
wetter = ""
wetter = getweather()
do_not_run = 1
if readcounter1 == 0:
status = systemstatus_get()
readcounter1 = readcounter1 + 1
#logging.debug('System-Status:' + str(status))
else:
readcounter1 = readcounter1 + 1
if readcounter1 == 10:
readcounter1 = 0
#logging.debug('System-Status:' + str(status))
if (status == "on" and readcounter2 == 0):
alert = alert_get()
readcounter2 = readcounter2 + 1
#logging.debug('Alarm-Status:' + alert)
elif (status == "on" and readcounter2 <> 0):
readcounter2 = readcounter2 + 1
if readcounter2 == 20:
readcounter2 = 0
#logging.debug('System-Status:' + status)
if (status == "on" and alert == "off"):
armedscreen()
disable_siren()
disable_flashlight()
elif (status == "on" and alert == "on"):
alertscreen()
#logging.debug('ALERT')
# do just once
if (alarmcounter == "0"):
t1 = time.time()
enable_siren()
enable_flashlight()
subject = "Sicherheitsbruch"
content = "Achtung, es wurde ein Sicherheitsbruch festgestellt."
sendmail(subject,content)
alarmcounter = "1"
t2 = time.time()
diff = t2 - t1
if (diff > 30):
#switch off siren after 30 (max 60) seconds due to legal reasons
disable_siren()
elif (status == "n.a."):
disable_siren()
disable_flashlight()
offlinescreen()
else:
disable_siren()
disable_flashlight()
#statusicon = "status_off48.png"
alarmcounter = "0"
offscreen(wetter)
#logging.debug('Alarm-Status:' + alert)
except KeyboardInterrupt:
logging.debug("Keyboard Interrupt")
switchscreen()
GPIO.cleanup()
# shutdown gracefully
#sys.exit()
os._exit(0)
finally:
# cleanup code if something goes wrong
logging.debug("Cleanup")
GPIO.cleanup()
Alles anzeigen
Functions für das Display-Script:
#coding=utf-8
#!/usr/bin/python
import sys
import os
import time
import datetime
import pygame
import subprocess
import MySQLdb
import urllib
import urllib2
from threading import Timer
from time import sleep
from time import localtime
import RPi.GPIO as GPIO
import xml.etree.ElementTree as ET
import md5
import smtplib
# For guessing MIME type
import mimetypes
# Import the email modules
import email
import email.mime.application
# Define GPIO
GPIO_Siren = 4 # set GPIO4 as an output - Sirene -> Relais IN2
GPIO_Flashlight = 3 # set GPIO14 as an output - Flashlight -> Relais IN1
GPIO.setmode(GPIO.BCM) # Use BCM GPIO numbers
GPIO.setup(GPIO_Siren, GPIO.OUT, initial=0)
GPIO.setup(GPIO_Flashlight, GPIO.OUT, initial=0)
def init_display():
os.system("tft_init")
os.system("tft_clear")
os.system("tft_pwm 20")
def alert_get():
#response = urllib2.urlopen('http://192.168.1.39:8083/fhem&cmd=%7BValue%28%22Securasp_Alarm%22%29%7D&XHR=1')
#alarm = response.read()
#alarm = alarm.strip()
#return alarm
url = "http://192.168.1.39:8083/fhem&cmd=%7BValue%28%22Securasp_Alarm%22%29%7D&XHR=1"
def handler(fh):
try:
fh.close()
except:
pass
try:
fh = urllib2.urlopen(url)
except:
fh = ""
pass
t = Timer(20.0, handler,[fh])
t.start()
try:
alarm = fh.read()
except:
alarm = "n.a."
pass
alarm = alarm.strip()
t.cancel()
return alarm
def systemstatus_get():
#response = urllib2.urlopen('http://192.168.1.39:8083/fhem&cmd=%7BValue%28%22Securasp_Status%22%29%7D&XHR=1')
#status = response.read()
#status = status.strip()
#return status
url = "http://192.168.1.39:8083/fhem&cmd=%7BValue%28%22Securasp_Status%22%29%7D&XHR=1"
def handler(fh):
try:
fh.close()
except:
pass
try:
fh = urllib2.urlopen(url)
except:
fh = ""
pass
t = Timer(20.0, handler,[fh])
t.start()
try:
status = fh.read()
except:
status= "n.a."
pass
status = status.strip()
t.cancel()
return status
def weather_curr_get():
#response = urllib2.urlopen('http://192.168.1.39:8083/fhem&cmd=%7BValue%28%22WS1_TEMPERATURE%22%29%7D&XHR=1')
#temp = response.read()
#temp = temp.strip()
#response = urllib2.urlopen('http://192.168.1.39:8083/fhem&cmd=%7BValue%28%22WS1_ISRAINING%22%29%7D&XHR=1')
#rain = response.read()
#rain = rain.strip()
#response = urllib2.urlopen('http://192.168.1.39:8083/fhem&cmd=%7BValue%28%22WS1_WINDSPEED%22%29%7D&XHR=1')
#wind = response.read()
#wind = wind.strip()
#return (temp, rain, wind)
def handler(fh):
try:
fh.close()
except:
pass
url = "http://192.168.1.39:8083/fhem&cmd=%7BValue%28%22WS1_TEMPERATURE%22%29%7D&XHR=1"
try:
fh = urllib2.urlopen(url)
except:
fh = ""
pass
t = Timer(20.0, handler,[fh])
t.start()
try:
temp = fh.read()
except:
temp = "n.a."
pass
temp = temp.strip()
i = temp.split(" ", 1 )
temp = i[0]
t.cancel()
url = "http://192.168.1.39:8083/fhem&cmd=%7BValue%28%22WS1_ISRAINING%22%29%7D&XHR=1"
try:
fh = urllib2.urlopen(url)
except:
fh = ""
pass
t = Timer(20.0, handler,[fh])
t.start()
try:
rain = fh.read()
except:
rain = "n.a."
pass
rain = rain.strip()
t.cancel()
url = "http://192.168.1.39:8083/fhem&cmd=%7BValue%28%22WS1_WINDSPEED%22%29%7D&XHR=1"
try:
fh = urllib2.urlopen(url)
except:
fh = ""
pass
t = Timer(20.0, handler,[fh])
t.start()
try:
wind = fh.read()
except:
wind = "n.a."
pass
wind = wind.strip()
t.cancel()
return (temp, rain, wind)
def sendmail(subject,content):
# Create a text/plain message
msg = email.mime.Multipart.MIMEMultipart()
msg['Subject'] = subject
msg['From'] = 'xxxxx'
msg['To'] = 'xxxxx'
# The main body is just an attachment
body = email.mime.Text.MIMEText(content)
msg.attach(body)
# send via Gmail server
s = smtplib.SMTP('smtp.gmail.com:587')
s.starttls()
s.login('xxxxx','xxxxx')
s.sendmail('xxxxx',['xxxxx'], msg.as_string())
s.quit()
def zeit():
# aktuelle, lokale Zeit als Tupel
lt = localtime()
# Entpacken des Tupels, Datum
jahr, monat, tag = lt[0:3]
stunde, minute, sekunde = lt[3:6]
#debug lines
#print "Es ist der %02i.%02i.%04i" % (tag,monat,jahr)
#print "Es ist %02i:%02i:%02i" % (stunde,minute,sekunde)
return stunde
#def sound(file):
#file = "mpg123 -q /home/webide/repositories/my-pi-projects/Securasp/Sounds/" + file + ".mp3"
#subprocess.call(file, shell=True)
def getweather():
#degree = "°"
degree = "°"
citycode = 'DE0000779'
apikey = 'xxxxx'
project_name= 'securasp'
m = md5.new()
m.update(project_name+apikey+citycode)
authstring = m.hexdigest()
url = 'http://api.wetter.com/forecast/weather/city/'+citycode+'/project/'+project_name+'/cs/'+authstring
#XML Objekt erstellen und in den root wechseln
tree = ET.parse(urllib.urlopen(url))
root = tree.getroot()
name = root.find('name').text
for data_node in root.findall('forecast'):
i = 1
for child_node in data_node:
if (i==1):
d1 = child_node.get('value')
t1 = str(child_node.find('tx').text)
l1 = str(child_node.find('tn').text)
i1 = child_node.find('w').text
elif (i==2):
d2 = child_node.get('value')
t2 = str(child_node.find('tx').text)
l2 = str(child_node.find('tn').text)
i2 = child_node.find('w').text
elif (i==3):
d3 = child_node.get('value')
t3 = str(child_node.find('tx').text)
l3 = str(child_node.find('tn').text)
i3 = child_node.find('w').text
i = i + 1
#print 'Die Vorhersage fuer ' +name+ ' ist '+i3+'('+i1+'). Temperaturen: Max: ' + t1 + ' ' + 'C' + ' Min: ' + l1+ ' ' + 'C'
#print 'Die Vorhersage fuer ' +name+ ' ist '+i3+'('+i2+'). Temperaturen: Max: ' + t2 + ' ' + 'C' + ' Min: ' + l2+ ' ' + 'C'
#print 'Die Vorhersage fuer ' +name+ ' ist '+i3+'('+i3+'). Temperaturen: Max: ' + t3 + ' ' + 'C' + ' Min: ' + l3+ ' ' + 'C'
return {'d1':d1, 't1':t1, 'l1':l1, 'i1':i1, 'd2':d2, 't2':t2, 'l2':l2, 'i2':i2, 'd3':d3, 't3':t3, 'l3':l3, 'i3':i3 }
def enable_siren():
GPIO.output(GPIO_Siren, 1)
def disable_siren():
GPIO.output(GPIO_Siren, 0)
def enable_flashlight():
GPIO.output(GPIO_Flashlight, 1)
def disable_flashlight():
GPIO.output(GPIO_Flashlight, 0)
def alertscreen():
pygame.init()
window = pygame.display.set_mode((320,240))
xmax = 320 - 2
ymax = 240 - 1
lines = 1
lc = (255,255,255)
font0 = pygame.font.SysFont("droidsans", 21)
font0.set_bold(1)
window.fill(pygame.Color(255,0,0))
# Draw Screen Border
pygame.draw.line( window, lc, (0,0),(xmax,0), lines )
pygame.draw.line( window, lc, (0,0),(0,ymax), lines )
pygame.draw.line( window, lc, (0,ymax),(xmax,ymax), lines )
pygame.draw.line( window, lc, (xmax,0),(xmax,ymax+2), lines )
#2.parallele
pygame.draw.line( window, lc, (0,ymax*0.20),(xmax,ymax*0.20), lines )
label0 = font0.render("SECURITY BREACH!!!", 1, (0,0,0))
window.blit(label0, (50,10))
icon1 = pygame.image.load("/home/webide/repositories/my-pi-projects/Securasp/Icons/status_warning72.png").convert_alpha()
window.blit( icon1, (110,100))
pygame.image.save(window, "/ram/temp.bmp")
os.system("tft_bmp /ram/temp.bmp")
def offlinescreen():
pygame.init()
window = pygame.display.set_mode((320,240))
xmax = 320 - 2
ymax = 240 - 1
lines = 1
lc = (255,255,255)
font0 = pygame.font.SysFont("droidsans", 21)
font0.set_bold(1)
window.fill(pygame.Color(255,0,0))
# Draw Screen Border
pygame.draw.line( window, lc, (0,0),(xmax,0), lines )
pygame.draw.line( window, lc, (0,0),(0,ymax), lines )
pygame.draw.line( window, lc, (0,ymax),(xmax,ymax), lines )
pygame.draw.line( window, lc, (xmax,0),(xmax,ymax+2), lines )
#2.parallele
pygame.draw.line( window, lc, (0,ymax*0.20),(xmax,ymax*0.20), lines )
label0 = font0.render("FHEM offline!!!", 1, (0,0,0))
window.blit(label0, (50,10))
icon1 = pygame.image.load("/home/webide/repositories/my-pi-projects/Securasp/Icons/status_warning72.png").convert_alpha()
window.blit( icon1, (110,100))
pygame.image.save(window, "/ram/temp.bmp")
os.system("tft_bmp /ram/temp.bmp")
def armedscreen():
pygame.init()
window = pygame.display.set_mode((320,240))
xmax = 320 - 2
ymax = 240 - 1
lines = 1
lc = (255,255,255)
font0 = pygame.font.SysFont("droidsans", 21)
font1 = pygame.font.SysFont("droidsans", 15)
font0.set_bold(1)
window.fill(pygame.Color(0,255,0))
# Draw Screen Border
pygame.draw.line( window, lc, (0,0),(xmax,0), lines )
pygame.draw.line( window, lc, (0,0),(0,ymax), lines )
pygame.draw.line( window, lc, (0,ymax),(xmax,ymax), lines )
pygame.draw.line( window, lc, (xmax,0),(xmax,ymax+2), lines )
#2.parallele
pygame.draw.line( window, lc, (0,ymax*0.20),(xmax,ymax*0.20), lines )
label0 = font0.render("SYSTEM ARMED!!!", 1, (0,0,0))
label1 = font1.render("Please leave house or disarm system", 1, (0,0,0))
label2 = font1.render("within 30 seconds before alarm is raised!", 1, (0,0,0))
window.blit(label0, (60,10))
window.blit(label1, (10,180))
window.blit(label2, (10,200))
icon1 = pygame.image.load("/home/webide/repositories/my-pi-projects/Securasp/Icons/status_on72.png").convert_alpha()
window.blit( icon1, (120,100))
pygame.image.save(window, "/ram/temp.bmp")
os.system("tft_bmp /ram/temp.bmp")
def offscreen(wetter):
#degree = u"°C"
degree = u"°C"
wetter = wetter
d1 = wetter.get('d1')
t1 = wetter.get('t1')
l1 = wetter.get('l1')
i1 = wetter.get('i1')
d2 = wetter.get('d2')
t2 = wetter.get('t2')
l2 = wetter.get('l2')
i2 = wetter.get('i2')
d3 = wetter.get('d3')
t3 = wetter.get('t3')
l3 = wetter.get('l3')
i3 = wetter.get('i3')
now = datetime.datetime.now()
vtime = now.strftime("%H:%M")
vtag = now.strftime("%Y-%m-%d")
pygame.init()
window = pygame.display.set_mode((320,240))
xmax = 320 - 2
ymax = 240 - 1
lines = 1
lc = (255,255,255)
font0 = pygame.font.SysFont("droidsans", 21)
font1 = pygame.font.SysFont("droidsans", 15)
font2 = pygame.font.SysFont("droidsans", 83)
#font3 = pygame.font.SysFont("freemono", 33)
font3 = pygame.font.SysFont("droidsans", 33)
font4 = pygame.font.SysFont("freemono", 26)
#font3.set_italic(1)
font3.set_bold(1)
#print alert
label0 = font0.render("System Status", 1, (255,0,0))
window.fill(pygame.Color(0,0,0))
# Draw Screen Border
pygame.draw.line( window, lc, (0,0),(xmax,0), lines )
pygame.draw.line( window, lc, (0,0),(0,ymax), lines )
pygame.draw.line( window, lc, (0,ymax),(xmax,ymax), lines )
pygame.draw.line( window, lc, (xmax,0),(xmax,ymax+2), lines )
#2.parallele
pygame.draw.line( window, lc, (0,ymax*0.20),(xmax,ymax*0.20), lines )
pygame.draw.line( window, lc, (0,ymax*0.5),(xmax,ymax*0.5), lines )
pygame.draw.line( window, lc, (xmax*0.33,ymax*0.5),(xmax*0.33,ymax), lines )
#mittlere senkrechte
pygame.draw.line( window, lc, (xmax*0.5,ymax*0.2),(xmax*0.5,ymax*0.5), lines )
pygame.draw.line( window, lc, (xmax*0.66,ymax*0.5),(xmax*0.66,ymax), lines )
window.blit(label0, (10,10))
labeld1 = font1.render("heute", 1, (255,255,255))
labeld2 = font1.render(d2, 1, (255,255,255))
labeld3 = font1.render(d3, 1, (255,255,255))
window.blit( labeld1, (35,120))
window.blit( labeld2, (120,120))
window.blit( labeld3, (225,120))
icon1 = pygame.image.load("/home/webide/repositories/my-pi-projects/Securasp/Icons/status_off48.png").convert_alpha()
icon2 = pygame.image.load("/home/webide/repositories/my-pi-projects/Securasp/Icons/weather/d_"+i1+"_S.png").convert_alpha()
icon3 = pygame.image.load("/home/webide/repositories/my-pi-projects/Securasp/Icons/weather/d_"+i2+"_S.png").convert_alpha()
icon4 = pygame.image.load("/home/webide/repositories/my-pi-projects/Securasp/Icons/weather/d_"+i3+"_S.png").convert_alpha()
window.blit( icon1, (240,0))
window.blit( icon2, (30,140))
window.blit( icon3, (130,140))
window.blit( icon4, (235,140))
labelt1 = font1.render(t1+"/"+l1+degree, 1, (255,255,255))
labelt2 = font1.render(t2+"/"+l2+degree, 1, (255,255,255))
labelt3 = font1.render(t3+"/"+l3+degree, 1, (255,255,255))
window.blit( labelt1, (30,190))
window.blit( labelt2, (130,190))
window.blit( labelt3, (235,190))
labelx1 = font1.render(vtag, 1, (255,255,255))
labelx2 = font3.render(vtime, 1, (255,255,255))
labelx3 = font1.render("aktuelles Wetter", 1, (255,255,255))
window.blit( labelx1, (45,50))
window.blit( labelx2, (30,80))
window.blit( labelx3, (185,50))
(temp, rain, wind) = weather_curr_get()
labelx4 = font1.render(temp+degree, 1, (255,255,255))
labelx5 = font1.render(rain, 1, (255,255,255))
labelx6 = font1.render(wind, 1, (255,255,255))
window.blit( labelx4, (185,65))
window.blit( labelx5, (185,80))
window.blit( labelx6, (185,95))
pygame.image.save(window, "/ram/temp.bmp")
os.system("tft_bmp /ram/temp.bmp")
def switchscreen():
#used on keyboard interrupt or if button is connected
os.system("tft_pwm 0")
Alles anzeigen
Script für den RFID Reader:
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
from functions_rfid import *
status = "n.a."
# --------- Main Program ---------
#listen for rfid input and set status
try:
while True:
ID = ""
for i in range(1):
ID = rfid()
#logging.debug('ID:' + str(ID))
username = ""
if ID != None:
username = user_get(ID)
logging.debug(username)
if username != "":
#check armed status
status = systemstatus_get()
#logging.debug('Hello User: ' + username)
#sound(username)
if (status == "on"):
systemstatus_set_off()
#logging.debug('sending off command to fhem')
#sound("disarmed")
else:
systemstatus_set_on()
#logging.debug('sending on command to fhem')
#sound("armed")
#else:
# logging.debug('no key recognized')
except KeyboardInterrupt:
logging.debug("Keyboard Interrupt")
# shutdown gracefully
sys.exit()
except:
logging.debug("Unexpected error:" + str(sys.exc_info()[0]))
# shutdown gracefully
sys.exit()
raise
finally:
# cleanup code if something goes wrong
logging.debug("Cleanup")
Alles anzeigen
Functions für den RFID Reader:
#coding=utf-8
#!/usr/bin/python
#Import sys to deal with command line arguments
import sys
import os
import time
import serial
import MySQLdb
import urllib2
from time import sleep
from threading import Timer
#open UART connection
UART = serial.Serial()
UART.port="/dev/ttyAMA0"
UART.timeout=0.5
UART.baudrate = 9600
UART.bytesize=serial.EIGHTBITS
UART.parity=serial.PARITY_NONE
UART.stopbits=serial.STOPBITS_ONE
UART.open()
Startflag = "\x02"
Endflag = "\x03"
def systemstatus_get():
url = "http://192.168.1.39:8083/fhem&cmd=%7BValue%28%22Securasp_Status%22%29%7D&XHR=1"
def handler(fh):
try:
fh.close()
except:
pass
try:
fh = urllib2.urlopen(url)
except:
fh = ""
pass
t = Timer(20.0, handler,[fh])
t.start()
try:
status = fh.read()
except:
status= "n.a."
pass
status = status.strip()
t.cancel()
return status
def systemstatus_set_on():
urllib2.urlopen('http://192.168.1.39:8083/fhem?cmd.Securasp_Status=set%20Securasp_Status%20on')
def systemstatus_set_off():
urllib2.urlopen('http://192.168.1.39:8083/fhem?cmd.Securasp_Status=set%20Securasp_Status%20off')
def user_get(user_id):
user = ""
mysql_opts = {
'host': "192.168.1.100",
'user': "fhemuser",
'pass': "fhempassword",
'db': "fhem"
}
mysql = MySQLdb.connect(mysql_opts['host'], mysql_opts['user'], mysql_opts['pass'], mysql_opts['db'])
mysql.apilevel = "2.0"
mysql.threadsafety = 2
mysql.paramstyle = "format"
cursor = mysql.cursor()
cursor.execute("SELECT User FROM `rfiduser` WHERE ID = %s", (user_id))
user = cursor.fetchone()[0]
return user
def rfid():
# Variablen loeschen
Checksumme = 0
Checksumme_Tag = 0
ID = ""
# Zeichen einlesen
Zeichen = UART.read()
#print "waiting for input2"
# Uebertragungsstart signalisiert worden?
if Zeichen == Startflag:
# ID zusammen setzen
for Counter in range(13):
Zeichen = UART.read()
ID = ID + str(Zeichen)
# Endflag aus dem String loeschen
rfid = ID.replace(Endflag, "" )
UART.flushInput()
return rfid
Alles anzeigen
VG F.