# pywws - Python software for USB Wireless Weather Stations
# http://github.com/jim-easterbrook/pywws
# Copyright (C) 2008-13 Jim Easterbrook jim@jim-easterbrook.me.uk
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""Get data from WH1080/WH3080 compatible weather stations.
Derived from wwsr.c by Michael Pendec (michael.pendec@gmail.com),
wwsrdump.c by Svend Skafte (svend@skafte.net), modified by Dave Wells,
and other sources.
Introduction
------------
This is the module that actually talks to the weather station base
unit. I don't have much understanding of USB, so copied a lot from
Michael Pendec's C program wwsr.
The weather station memory has two parts: a "fixed block" of 256 bytes
and a circular buffer of 65280 bytes. As each weather reading takes 16
bytes the station can store 4080 readings, or 14 days of 5-minute
interval readings. (The 3080 type stations store 20 bytes per reading,
so store a maximum of 3264.) As data is read in 32-byte chunks, but
each weather reading is 16 or 20 bytes, a small cache is used to
reduce USB traffic. The caching behaviour can be over-ridden with the
``unbuffered`` parameter to ``get_data`` and ``get_raw_data``.
Decoding the data is controlled by the static dictionaries
``reading_format``, ``lo_fix_format`` and ``fixed_format``. The keys
are names of data items and the values can be an ``(offset, type,
multiplier)`` tuple or another dictionary. So, for example, the
reading_format dictionary entry ``'rain' : (13, 'us', 0.3)`` means
that the rain value is an unsigned short (two bytes), 13 bytes from
the start of the block, and should be multiplied by 0.3 to get a
useful value.
The use of nested dictionaries in the ``fixed_format`` dictionary
allows useful subsets of data to be decoded. For example, to decode
the entire block ``get_fixed_block`` is called with no parameters::
ws = WeatherStation.weather_station()
print ws.get_fixed_block()
To get the stored minimum external temperature, ``get_fixed_block`` is
called with a sequence of keys::
ws = WeatherStation.weather_station()
print ws.get_fixed_block(['min', 'temp_out', 'val'])
Often there is no requirement to read and decode the entire fixed
block, as its first 64 bytes contain the most useful data: the
interval between stored readings, the buffer address where the current
reading is stored, and the current date & time. The
``get_lo_fix_block`` method provides easy access to these.
For more examples of using the WeatherStation module, see the
TestWeatherStation program.
Detailed API
------------
"""
__docformat__ = "restructuredtext en"
from datetime import datetime
import logging
import sys
import time
from pywws import Localisation
USBDevice = None
if not USBDevice:
try:
from pywws.device_ctypes_hidapi import USBDevice
except ImportError:
pass
if not USBDevice:
try:
from pywws.device_cython_hidapi import USBDevice
except ImportError:
pass
if not USBDevice:
try:
from pywws.device_pyusb1 import USBDevice
except ImportError:
pass
if not USBDevice:
from pywws.device_pyusb import USBDevice
# get meaning for status integer
rain_overflow = 0x80
lost_connection = 0x40
unknown = 0x20
unknown = 0x10
unknown = 0x08
unknown = 0x04
unknown = 0x02
unknown = 0x01
[docs]def decode_status(status):
result = {}
for key, mask in (('rain_overflow', 0x80),
('lost_connection', 0x40),
('unknown', 0x3f),
):
result[key] = status & mask
return result
# decode weather station raw data formats
def _decode(raw, format):
def _signed_byte(raw, offset):
res = raw[offset]
if res == 0xFF:
return None
sign = 1
if res >= 128:
sign = -1
res = res - 128
return sign * res
def _signed_short(raw, offset):
lo = raw[offset]
hi = raw[offset+1]
if lo == 0xFF and hi == 0xFF:
return None
sign = 1
if hi >= 128:
sign = -1
hi = hi - 128
return sign * ((hi * 256) + lo)
def _unsigned_short(raw, offset):
lo = raw[offset]
hi = raw[offset+1]
if lo == 0xFF and hi == 0xFF:
return None
return (hi * 256) + lo
def _unsigned_int3(raw, offset):
lo = raw[offset]
md = raw[offset+1]
hi = raw[offset+2]
if lo == 0xFF and md == 0xFF and hi == 0xFF:
return None
return (hi * 256 * 256) + (md * 256) + lo
def _bcd_decode(byte):
hi = (byte // 16) & 0x0F
lo = byte & 0x0F
return (hi * 10) + lo
def _date_time(raw, offset):
year = _bcd_decode(raw[offset])
month = _bcd_decode(raw[offset+1])
day = _bcd_decode(raw[offset+2])
hour = _bcd_decode(raw[offset+3])
minute = _bcd_decode(raw[offset+4])
return '%4d-%02d-%02d %02d:%02d' % (year + 2000, month, day, hour, minute)
def _bit_field(raw, offset):
mask = 1
result = []
for i in range(8):
result.append(raw[offset] & mask != 0)
mask = mask << 1
return result
if not raw:
return None
if isinstance(format, dict):
result = {}
for key, value in format.items():
result[key] = _decode(raw, value)
else:
pos, type, scale = format
if type == 'ub':
result = raw[pos]
if result == 0xFF:
result = None
elif type == 'sb':
result = _signed_byte(raw, pos)
elif type == 'us':
result = _unsigned_short(raw, pos)
elif type == 'u3':
result = _unsigned_int3(raw, pos)
elif type == 'ss':
result = _signed_short(raw, pos)
elif type == 'dt':
result = _date_time(raw, pos)
elif type == 'tt':
result = '%02d:%02d' % (_bcd_decode(raw[pos]),
_bcd_decode(raw[pos+1]))
elif type == 'pb':
result = raw[pos]
elif type == 'wa':
# wind average - 12 bits split across a byte and a nibble
result = raw[pos] + ((raw[pos+2] & 0x0F) << 8)
if result == 0xFFF:
result = None
elif type == 'wg':
# wind gust - 12 bits split across a byte and a nibble
result = raw[pos] + ((raw[pos+1] & 0xF0) << 4)
if result == 0xFFF:
result = None
elif type == 'bf':
# bit field - 'scale' is a list of bit names
result = {}
for k, v in zip(scale, _bit_field(raw, pos)):
result[k] = v
return result
else:
raise IOError('unknown type %s' % type)
if scale and result:
result = float(result) * scale
return result
[docs]class CUSBDrive(object):
"""Low level interface to weather station via USB.
Loosely modeled on a C++ class obtained from
http://site.ambientweatherstore.com/easyweather/ws_1080_2080_protocol.zip.
I don't know the provenance of this, but it looks as if it may
have come from the manufacturer.
"""
EndMark = 0x20
ReadCommand = 0xA1
WriteCommand = 0xA0
WriteCommandWord = 0xA2
def __init__(self):
self.logger = logging.getLogger('pywws.WeatherStation.CUSBDrive')
self.logger.info('using %s', USBDevice.__module__)
self.dev = USBDevice(0x1941, 0x8021)
[docs] def read_block(self, address):
"""Read 32 bytes from the weather station.
If the read fails for any reason, :obj:`None` is returned.
:param address: address to read from.
:type address: int
:return: the data from the weather station.
:rtype: list(int)
"""
buf = [
self.ReadCommand,
address // 256,
address % 256,
self.EndMark,
self.ReadCommand,
address // 256,
address % 256,
self.EndMark,
]
if not self.dev.write_data(buf):
return None
return self.dev.read_data(32)
[docs] def write_byte(self, address, data):
"""Write a single byte to the weather station.
:param address: address to write to.
:type address: int
:param data: the value to write.
:type data: int
:return: success status.
:rtype: bool
"""
buf = [
self.WriteCommandWord,
address // 256,
address % 256,
self.EndMark,
self.WriteCommandWord,
data,
0,
self.EndMark,
]
if not self.dev.write_data(buf):
return False
buf = self.dev.read_data(8)
if buf is None:
return False
for byte in buf:
if byte != 0xA5:
return False
return True
[docs]class weather_station(object):
"""Class that represents the weather station to user program."""
# avoid USB activity this number of seconds each side of time when
# station screen is believed to be writing to the memory
avoid = 3.0
# minimum interval between polling for data change
min_pause = 0.5
def __init__(self, ws_type='1080', params=None, status=None):
"""Connect to weather station and prepare to read data."""
self.logger = logging.getLogger('pywws.weather_station')
# create basic IO object
self.cusb = CUSBDrive()
# init variables
self.params = params
self.status = status
self._fixed_block = None
self._data_block = None
self._data_pos = None
self._current_ptr = None
if self.params:
self.params.unset('fixed', 'station clock')
self.params.unset('fixed', 'sensor clock')
if self.status:
self._station_clock = eval(
self.status.get('clock', 'station', 'None'))
self._sensor_clock = eval(
self.status.get('clock', 'sensor', 'None'))
else:
self._station_clock = None
self._sensor_clock = None
self.ws_type = ws_type
[docs] def live_data(self, logged_only=False):
# There are two things we want to synchronise to - the data is
# updated every 48 seconds and the address is incremented
# every 5 minutes (or 10, 15, ..., 30). Rather than getting
# data every second or two, we sleep until one of the above is
# due. (During initialisation we get data every two seconds
# anyway.)
read_period = self.get_fixed_block(['read_period'])
log_interval = float(read_period * 60)
live_interval = 48.0
old_ptr = self.current_pos()
old_data = self.get_data(old_ptr, unbuffered=True)
now = time.time()
if self._sensor_clock:
next_live = now
next_live -= (next_live - self._sensor_clock) % live_interval
next_live += live_interval
else:
next_live = None
if self._station_clock and next_live:
# set next_log
next_log = next_live - live_interval
next_log -= (next_log - self._station_clock) % 60
next_log -= old_data['delay'] * 60
next_log += log_interval
else:
next_log = None
self._station_clock = None
ptr_time = 0
data_time = 0
last_log = now - (old_data['delay'] * 60)
last_status = None
while True:
if not self._station_clock:
next_log = None
if not self._sensor_clock:
next_live = None
now = time.time()
# wake up just before next reading is due
advance = now + max(self.avoid, self.min_pause) + self.min_pause
pause = 600.0
if next_live:
if not logged_only:
pause = min(pause, next_live - advance)
else:
pause = self.min_pause
if next_log:
pause = min(pause, next_log - advance)
elif old_data['delay'] < read_period - 1:
pause = min(
pause, ((read_period - old_data['delay']) * 60.0) - 110.0)
else:
pause = self.min_pause
pause = max(pause, self.min_pause)
self.logger.debug(
'delay %s, pause %g', str(old_data['delay']), pause)
time.sleep(pause)
# get new data
last_data_time = data_time
new_data = self.get_data(old_ptr, unbuffered=True)
data_time = time.time()
# log any change of status
if new_data['status'] != last_status:
self.logger.warning(
'status %s', str(decode_status(new_data['status'])))
last_status = new_data['status']
# 'good' time stamp if we haven't just woken up from long
# pause and data read wasn't delayed
valid_time = data_time - last_data_time < (self.min_pause * 2.0) - 0.1
# make sure changes because of logging interval aren't
# mistaken for new live data
if new_data['delay'] >= read_period:
for key in ('delay', 'hum_in', 'temp_in', 'abs_pressure'):
old_data[key] = new_data[key]
# ignore solar data which changes every 60 seconds
if self.ws_type == '3080':
for key in ('illuminance', 'uv'):
old_data[key] = new_data[key]
if new_data != old_data:
self.logger.debug('live_data new data')
result = dict(new_data)
if valid_time:
# data has just changed, so definitely at a 48s update time
self._sensor_clock = data_time
self.logger.warning(
'setting sensor clock %g', data_time % live_interval)
if self.status:
self.status.set(
'clock', 'sensor', str(self._sensor_clock))
if not next_live:
self.logger.warning('live_data live synchronised')
else:
self.logger.error('unexpected sensor clock setting')
next_live = data_time
elif next_live and data_time < next_live - self.min_pause:
self.logger.warning(
'live_data lost sync %g', data_time - next_live)
next_live = None
self._sensor_clock = None
if next_live and not logged_only:
while data_time > next_live + live_interval:
self.logger.info('live_data missed')
next_live += live_interval
result['idx'] = datetime.utcfromtimestamp(int(next_live))
next_live += live_interval
yield result, old_ptr, False
old_data = new_data
# get new pointer
if old_data['delay'] < read_period - 1:
continue
last_ptr_time = ptr_time
new_ptr = self.current_pos()
ptr_time = time.time()
valid_time = ptr_time - last_ptr_time < (self.min_pause * 2.0) - 0.1
if new_ptr != old_ptr:
self.logger.debug('live_data new ptr: %06x', new_ptr)
last_log = ptr_time
# re-read data, to be absolutely sure it's the last
# logged data before the pointer was updated
new_data = self.get_data(old_ptr, unbuffered=True)
result = dict(new_data)
if valid_time:
# pointer has just changed, so definitely at a logging time
self._station_clock = ptr_time
self.logger.warning(
'setting station clock %g', ptr_time % 60.0)
if self.status:
self.status.set(
'clock', 'station', str(self._station_clock))
if not next_log:
self.logger.warning('live_data log synchronised')
next_log = ptr_time
elif next_log and ptr_time < next_log - self.min_pause:
self.logger.warning(
'live_data lost log sync %g', ptr_time - next_log)
next_log = None
self._station_clock = None
if next_log:
result['idx'] = datetime.utcfromtimestamp(int(next_log))
next_log += log_interval
yield result, old_ptr, True
if new_ptr != self.inc_ptr(old_ptr):
self.logger.error(
'live_data unexpected ptr change %06x -> %06x',
old_ptr, new_ptr)
old_ptr = new_ptr
old_data['delay'] = 0
data_time = 0
elif ptr_time > last_log + ((new_data['delay'] + 2) * 60):
# if station stops logging data, don't keep reading
# USB until it locks up
raise IOError('station is not logging data')
elif valid_time and next_log and ptr_time > next_log + 6.0:
self.logger.warning('live_data log extended')
next_log += 60.0
[docs] def inc_ptr(self, ptr):
"""Get next circular buffer data pointer."""
result = ptr + self.reading_len[self.ws_type]
if result >= 0x10000:
result = self.data_start
return result
[docs] def dec_ptr(self, ptr):
"""Get previous circular buffer data pointer."""
result = ptr - self.reading_len[self.ws_type]
if result < self.data_start:
result = 0x10000 - self.reading_len[self.ws_type]
return result
[docs] def get_raw_data(self, ptr, unbuffered=False):
"""Get raw data from circular buffer.
If unbuffered is false then a cached value that was obtained
earlier may be returned."""
if unbuffered:
self._data_pos = None
# round down ptr to a 'block boundary'
idx = ptr - (ptr % 0x20)
ptr -= idx
count = self.reading_len[self.ws_type]
if self._data_pos == idx:
# cache contains useful data
result = self._data_block[ptr:ptr + count]
if len(result) >= count:
return result
else:
result = list()
if ptr + count > 0x20:
# need part of next block, which may be in cache
if self._data_pos != idx + 0x20:
self._data_pos = idx + 0x20
self._data_block = self._read_block(self._data_pos)
result += self._data_block[0:ptr + count - 0x20]
if len(result) >= count:
return result
# read current block
self._data_pos = idx
self._data_block = self._read_block(self._data_pos)
result = self._data_block[ptr:ptr + count] + result
return result
[docs] def get_data(self, ptr, unbuffered=False):
"""Get decoded data from circular buffer.
If unbuffered is false then a cached value that was obtained
earlier may be returned."""
return _decode(self.get_raw_data(ptr, unbuffered),
self.reading_format[self.ws_type])
[docs] def current_pos(self):
"""Get circular buffer location where current data is being written."""
new_ptr = _decode(
self._read_fixed_block(0x0020), self.lo_fix_format['current_pos'])
if new_ptr == self._current_ptr:
return self._current_ptr
if self._current_ptr and new_ptr != self.inc_ptr(self._current_ptr):
for k in self.reading_len:
if (new_ptr - self._current_ptr) == self.reading_len[k]:
self.logger.warning(
'type change %s -> %s', self.ws_type, k)
self.ws_type = k
break
self._current_ptr = new_ptr
return self._current_ptr
[docs] def get_raw_fixed_block(self, unbuffered=False):
"""Get the raw "fixed block" of settings and min/max data."""
if unbuffered or not self._fixed_block:
self._fixed_block = self._read_fixed_block()
return self._fixed_block
[docs] def get_fixed_block(self, keys=[], unbuffered=False):
"""Get the decoded "fixed block" of settings and min/max data.
A subset of the entire block can be selected by keys."""
if unbuffered or not self._fixed_block:
self._fixed_block = self._read_fixed_block()
format = self.fixed_format
# navigate down list of keys to get to wanted data
for key in keys:
format = format[key]
return _decode(self._fixed_block, format)
def _wait_for_station(self):
# avoid times when station is writing to memory
while True:
pause = 60.0
if self._station_clock:
phase = time.time() - self._station_clock
if phase > 24 * 3600:
# station clock was last measured a day ago, so reset it
self._station_clock = None
else:
pause = min(pause, (self.avoid - phase) % 60)
if self._sensor_clock:
phase = time.time() - self._sensor_clock
if phase > 24 * 3600:
# sensor clock was last measured a day ago, so reset it
self._sensor_clock = None
else:
pause = min(pause, (self.avoid - phase) % 48)
if pause > self.avoid * 2.0:
return
self.logger.debug('avoid %s', str(pause))
time.sleep(pause)
def _read_block(self, ptr, retry=True):
# Read block repeatedly until it's stable. This avoids getting corrupt
# data when the block is read as the station is updating it.
old_block = None
while True:
self._wait_for_station()
new_block = self.cusb.read_block(ptr)
if new_block:
if (new_block == old_block) or not retry:
break
if old_block:
self.logger.debug('_read_block changing %06x', ptr)
old_block = new_block
return new_block
def _read_fixed_block(self, hi=0x0100):
result = []
for mempos in range(0x0000, hi, 0x0020):
result += self._read_block(mempos)
# check 'magic number'
if result[:2] not in ([0x55, 0xAA], [0xFF, 0xFF],
[0x55, 0x55], [0xC4, 0x00]):
self.logger.critical(
"Unrecognised 'magic number' %02x %02x", result[0], result[1])
return result
def _write_byte(self, ptr, value):
self._wait_for_station()
if not self.cusb.write_byte(ptr, value):
raise IOError('_write_byte failed')
[docs] def write_data(self, data):
"""Write a set of single bytes to the weather station. Data must be an
array of (ptr, value) pairs."""
# send data
for ptr, value in data:
self._write_byte(ptr, value)
# set 'data changed'
self._write_byte(self.fixed_format['data_changed'][0], 0xAA)
# wait for station to clear 'data changed'
while True:
ack = _decode(
self._read_fixed_block(0x0020), self.fixed_format['data_changed'])
if ack == 0:
break
self.logger.debug('write_data waiting for ack')
time.sleep(6)
# Tables of "meanings" for raw weather station data. Each key
# specifies an (offset, type, multiplier) tuple that is understood
# by _decode.
# depends on weather station type
reading_format = {}
reading_format['1080'] = {
'delay' : (0, 'ub', None),
'hum_in' : (1, 'ub', None),
'temp_in' : (2, 'ss', 0.1),
'hum_out' : (4, 'ub', None),
'temp_out' : (5, 'ss', 0.1),
'abs_pressure' : (7, 'us', 0.1),
'wind_ave' : (9, 'wa', 0.1),
'wind_gust' : (10, 'wg', 0.1),
'wind_dir' : (12, 'ub', None),
'rain' : (13, 'us', 0.3),
'status' : (15, 'pb', None),
}
reading_format['3080'] = {
'illuminance' : (16, 'u3', 0.1),
'uv' : (19, 'ub', None),
}
reading_format['3080'].update(reading_format['1080'])
lo_fix_format = {
'read_period' : (16, 'ub', None),
'settings_1' : (17, 'bf', ('temp_in_F', 'temp_out_F', 'rain_in',
'bit3', 'bit4', 'pressure_hPa',
'pressure_inHg', 'pressure_mmHg')),
'settings_2' : (18, 'bf', ('wind_mps', 'wind_kmph', 'wind_knot',
'wind_mph', 'wind_bft', 'bit5',
'bit6', 'bit7')),
'display_1' : (19, 'bf', ('pressure_rel', 'wind_gust', 'clock_12hr',
'date_mdy', 'time_scale_24', 'show_year',
'show_day_name', 'alarm_time')),
'display_2' : (20, 'bf', ('temp_out_temp', 'temp_out_chill',
'temp_out_dew', 'rain_hour', 'rain_day',
'rain_week', 'rain_month', 'rain_total')),
'alarm_1' : (21, 'bf', ('bit0', 'time', 'wind_dir', 'bit3',
'hum_in_lo', 'hum_in_hi',
'hum_out_lo', 'hum_out_hi')),
'alarm_2' : (22, 'bf', ('wind_ave', 'wind_gust',
'rain_hour', 'rain_day',
'pressure_abs_lo', 'pressure_abs_hi',
'pressure_rel_lo', 'pressure_rel_hi')),
'alarm_3' : (23, 'bf', ('temp_in_lo', 'temp_in_hi',
'temp_out_lo', 'temp_out_hi',
'wind_chill_lo', 'wind_chill_hi',
'dew_point_lo', 'dew_point_hi')),
'timezone' : (24, 'sb', None),
'unknown_01' : (25, 'pb', None),
'data_changed' : (26, 'ub', None),
'data_count' : (27, 'us', None),
'display_3' : (29, 'bf', ('illuminance_fc', 'bit1', 'bit2', 'bit3',
'bit4', 'bit5', 'bit6', 'bit7')),
'current_pos' : (30, 'us', None),
}
fixed_format = {
'rel_pressure' : (32, 'us', 0.1),
'abs_pressure' : (34, 'us', 0.1),
'lux_wm2_coeff' : (36, 'us', 0.1),
'date_time' : (43, 'dt', None),
'unknown_18' : (97, 'pb', None),
'alarm' : {
'hum_in' : {'hi' : (48, 'ub', None), 'lo' : (49, 'ub', None)},
'temp_in' : {'hi' : (50, 'ss', 0.1), 'lo' : (52, 'ss', 0.1)},
'hum_out' : {'hi' : (54, 'ub', None), 'lo' : (55, 'ub', None)},
'temp_out' : {'hi' : (56, 'ss', 0.1), 'lo' : (58, 'ss', 0.1)},
'windchill' : {'hi' : (60, 'ss', 0.1), 'lo' : (62, 'ss', 0.1)},
'dewpoint' : {'hi' : (64, 'ss', 0.1), 'lo' : (66, 'ss', 0.1)},
'abs_pressure' : {'hi' : (68, 'us', 0.1), 'lo' : (70, 'us', 0.1)},
'rel_pressure' : {'hi' : (72, 'us', 0.1), 'lo' : (74, 'us', 0.1)},
'wind_ave' : {'bft' : (76, 'ub', None), 'ms' : (77, 'ub', 0.1)},
'wind_gust' : {'bft' : (79, 'ub', None), 'ms' : (80, 'ub', 0.1)},
'wind_dir' : (82, 'ub', None),
'rain' : {'hour' : (83, 'us', 0.3), 'day' : (85, 'us', 0.3)},
'time' : (87, 'tt', None),
'illuminance' : (89, 'u3', 0.1),
'uv' : (92, 'ub', None),
},
'max' : {
'uv' : {'val' : (93, 'ub', None)},
'illuminance' : {'val' : (94, 'u3', 0.1)},
'hum_in' : {'val' : (98, 'ub', None), 'date' : (141, 'dt', None)},
'hum_out' : {'val' : (100, 'ub', None), 'date' : (151, 'dt', None)},
'temp_in' : {'val' : (102, 'ss', 0.1), 'date' : (161, 'dt', None)},
'temp_out' : {'val' : (106, 'ss', 0.1), 'date' : (171, 'dt', None)},
'windchill' : {'val' : (110, 'ss', 0.1), 'date' : (181, 'dt', None)},
'dewpoint' : {'val' : (114, 'ss', 0.1), 'date' : (191, 'dt', None)},
'abs_pressure' : {'val' : (118, 'us', 0.1), 'date' : (201, 'dt', None)},
'rel_pressure' : {'val' : (122, 'us', 0.1), 'date' : (211, 'dt', None)},
'wind_ave' : {'val' : (126, 'us', 0.1), 'date' : (221, 'dt', None)},
'wind_gust' : {'val' : (128, 'us', 0.1), 'date' : (226, 'dt', None)},
'rain' : {
'hour' : {'val' : (130, 'us', 0.3), 'date' : (231, 'dt', None)},
'day' : {'val' : (132, 'us', 0.3), 'date' : (236, 'dt', None)},
'week' : {'val' : (134, 'us', 0.3), 'date' : (241, 'dt', None)},
'month' : {'val' : (136, 'us', 0.3), 'date' : (246, 'dt', None)},
'total' : {'val' : (138, 'us', 0.3), 'date' : (251, 'dt', None)},
},
},
'min' : {
'hum_in' : {'val' : (99, 'ub', None), 'date' : (146, 'dt', None)},
'hum_out' : {'val' : (101, 'ub', None), 'date' : (156, 'dt', None)},
'temp_in' : {'val' : (104, 'ss', 0.1), 'date' : (166, 'dt', None)},
'temp_out' : {'val' : (108, 'ss', 0.1), 'date' : (176, 'dt', None)},
'windchill' : {'val' : (112, 'ss', 0.1), 'date' : (186, 'dt', None)},
'dewpoint' : {'val' : (116, 'ss', 0.1), 'date' : (196, 'dt', None)},
'abs_pressure' : {'val' : (120, 'us', 0.1), 'date' : (206, 'dt', None)},
'rel_pressure' : {'val' : (124, 'us', 0.1), 'date' : (216, 'dt', None)},
},
}
fixed_format.update(lo_fix_format)
# start of readings / end of fixed block
data_start = 0x0100 # 256
# bytes per reading, depends on weather station type
reading_len = {
'1080' : 16,
'3080' : 20,
}