Commit bcacd84c authored by m.baxter's avatar m.baxter

Initial

parent 52c1d877
File added
File added
File added
__all__ = ['ev3dev', 'lego','mindsensors']
from ev3 import ev3dev
from ev3 import lego
from ev3 import mindsensors
import os
import glob
import warnings
import logging
import re
import atexit
logger = logging.getLogger(__name__)
@atexit.register
def cleanup():
for f in glob.glob('/sys/class/tacho-motor/motor*/command'):
with open(f, 'w') as f:
f.write('stop')
for f in glob.glob('/sys/class/leds/*/trigger'):
with open(f, 'w') as f:
f.write('none')
for f in glob.glob('/sys/class/leds/*/brightness'):
with open(f, 'w') as f:
f.write('0')
class NoSuchSensorError(Exception):
def __init__(self, port, name=None):
self.port = port
self.name = name
def __str__(self):
return "No such sensor port=%d name=%s" % (self.port, self.name)
class NoSuchMotorError(Exception):
def __init__(self, port, _type):
self.port = port
self._type = _type
def __str__(self):
return "No such motor port=%s type=%s" % (self.port, self._type)
class NoSuchLibraryError(Exception):
def __init__(self, lib=""):
self.lib = lib
def __str__(self):
return "No such library %s" % self.lib
class Ev3StringType(object):
@staticmethod
def post_read(value):
return value
@staticmethod
def pre_write(value):
return value
class Ev3IntType(object):
@staticmethod
def post_read(value):
return int(value)
@staticmethod
def pre_write(value):
return str(value)
class Ev3BoolType(object):
@staticmethod
def post_read(value):
return bool(value)
@staticmethod
def pre_write(value):
return '1' if value else '0'
class Ev3OnOffType(object):
@staticmethod
def post_read(value):
return True if value == 'on' else False
@staticmethod
def pre_write(value):
if (value == 'on' or value == 'off'):
return value
else:
return 'on' if bool(value) else 'off'
class create_ev3_property(object):
def __init__(self, **kwargs):
self.kwargs = kwargs
def __call__(self, cls):
for name, args in self.kwargs.items():
def ev3_property(name, read_only=False, write_only=False, flush_on_write=False, property_type=Ev3StringType):
def fget(self):
if not write_only:
return property_type.post_read(self.read_value(name))
else:
return None
def fset(self, value):
self.write_value(
name, property_type.pre_write(value), flush_on_write)
return property(fget, None if read_only else fset)
setattr(cls, name, ev3_property(name, **args))
return cls
def get_battery_percentage():
"""
Return an int() of the percentage of battery life remaining
"""
voltage_max = None
voltage_min = None
voltage_now = None
with open('/sys/devices/platform/legoev3-battery/power_supply/legoev3-battery/uevent', 'r') as fh:
for line in fh:
if not voltage_max:
re_voltage_max = re.search(
'POWER_SUPPLY_VOLTAGE_MAX_DESIGN=(\d+)', line)
if re_voltage_max:
voltage_max = int(re_voltage_max.group(1))
continue
if not voltage_min:
re_voltage_min = re.search(
'POWER_SUPPLY_VOLTAGE_MIN_DESIGN=(\d+)', line)
if re_voltage_min:
voltage_min = int(re_voltage_min.group(1))
continue
if not voltage_now:
re_voltage_now = re.search(
'POWER_SUPPLY_VOLTAGE_NOW=(\d+)', line)
if re_voltage_now:
voltage_now = int(re_voltage_now.group(1))
if re_voltage_max and re_voltage_min and re_voltage_now:
break
if voltage_max and voltage_min and voltage_now:
# This happens with the EV3 rechargeable battery if it is fully charge
if voltage_now >= voltage_max:
return 100
# Haven't seen this scenario but it can't hurt to check for it
elif voltage_now <= voltage_min:
return 0
# voltage_now is between the min and max
else:
voltage_max -= voltage_min
voltage_now -= voltage_min
return int(voltage_now / float(voltage_max) * 100)
else:
logger.error('voltage_max %s, voltage_min %s, voltage_now %s' %
(voltage_max, voltage_min, voltage_now))
return 0
class Ev3Dev(object):
def __init__(self):
self.sys_path = ""
def read_value(self, name):
attr_file = os.path.join(self.sys_path, name)
if os.path.isfile(attr_file):
with open(attr_file) as f:
value = f.read().strip()
return value
else:
return None
def write_value(self, name, value, flush = False):
attr_file = os.path.join(self.sys_path, name)
if os.path.isfile(attr_file):
with open(attr_file, 'w') as f:
f.write(str(value))
if flush:
f.flush()
else:
return
@create_ev3_property(
bin_data={'read_only': True},
bin_data_format={'read_only': True},
decimals={'read_only': True},
#mode={ 'read_only': False},
fw_version={'read_only': True},
modes={'read_only': True},
name={'read_only': True},
port_name={'read_only': True},
uevent={'read_only': True},
units={'read_only': True},
value0={'read_only': True, 'property_type': Ev3IntType},
value1={'read_only': True, 'property_type': Ev3IntType},
value2={'read_only': True, 'property_type': Ev3IntType},
value3={'read_only': True, 'property_type': Ev3IntType},
value4={'read_only': True, 'property_type': Ev3IntType},
value5={'read_only': True, 'property_type': Ev3IntType},
value6={'read_only': True, 'property_type': Ev3IntType},
value7={'read_only': True, 'property_type': Ev3IntType}
)
class LegoSensor(Ev3Dev):
def __init__(self, port=-1, name=None):
Ev3Dev.__init__(self)
sensor_existing = False
if (port > 0):
self.port = port
for p in glob.glob('/sys/class/lego-sensor/sensor*/uevent'):
with open(p) as f:
for value in f:
if (value.strip().lower() == ('LEGO_ADDRESS=in' + str(port)).lower()):
self.sys_path = os.path.dirname(p)
sensor_existing = True
break
if name != None and port == -1:
for p in glob.glob('/sys/class/lego-sensor/sensor*/uevent'):
with open(p) as f:
port_name = None
for value in f:
if (value.strip().lower().startswith('LEGO_ADDRESS=in'.lower())):
port_name = value.strip()[-1]
if sensor_existing:
break
if (value.strip().lower() == ('LEGO_DRIVER_NAME=' + name).lower()):
self.sys_path = os.path.dirname(p)
sensor_existing = True
if port_name is not None:
break
if sensor_existing:
self.port = int(port_name)
break
if (not sensor_existing):
raise NoSuchSensorError(port, name)
self._mode = self.read_value('mode')
@property
def mode(self):
return self._mode
@mode.setter
def mode(self, value):
if (self._mode != value):
self._mode = value
self.write_value('mode', value)
def mode_force_flush(self, value):
self._mode = value
self.write_value('mode', value)
class Enum(object):
def __init__(self, *args, **kwargs):
for arg in args:
kwargs[arg] = arg
self.enum_dict = kwargs
def __getattr__(self, name):
if (name in self.enum_dict.keys()):
return self.enum_dict[name]
else:
raise NameError("no such item %s" % name)
@create_ev3_property(
commands={'read_only': True},
command={'read_only': True, 'write_only': True},
count_per_rot={'read_only': True, 'property_type': Ev3IntType},
driver_name={'read_only': True},
duty_cycle={'read_only': True, 'property_type': Ev3IntType},
duty_cycle_sp={'read_only': False, 'property_type': Ev3IntType},
encoder_polarity={'read_only': False},
polarity_mode={'read_only': False},
port_name={'read_only': True},
position={'read_only': False, 'property_type': Ev3IntType},
position_sp={'read_only': False, 'property_type': Ev3IntType},
ramp_down_sp={'read_only': False, 'property_type': Ev3IntType},
ramp_up_sp={'read_only': False, 'property_type': Ev3IntType},
speed={'read_only': True, 'property_type': Ev3IntType},
speed_regulation={'read_only': False, 'property_type': Ev3OnOffType},
speed_sp={'read_only': False, 'property_type': Ev3IntType},
state={'read_only': True},
stop_command={'read_only': False},
stop_commands={'read_only': True},
time_sp={'read_only': False, 'property_type': Ev3IntType},
uevent={'read_only': True}
)
class Motor(Ev3Dev):
STOP_MODE = Enum(COAST='coast', BRAKE='brake', HOLD='hold')
POSITION_MODE = Enum(RELATIVE='relative', ABSOLUTE='absolute')
PORT = Enum('A', 'B', 'C', 'D')
def __init__(self, port='', _type=''):
Ev3Dev.__init__(self)
motor_existing = False
searchpath = '/sys/class/tacho-motor/motor*/'
if (port != ''):
self.port = port
for p in glob.glob(searchpath + 'uevent'):
with open(p) as f:
for value in f:
if (value.strip().lower() == ('LEGO_ADDRESS=out' + port).lower()):
self.sys_path = os.path.dirname(p)
motor_existing = True
break
if (_type != '' and port == ''):
for p in glob.glob(searchpath + 'uevent'):
with open(p) as f:
port_name = None
for value in f:
if (value.strip().lower().startswith('LEGO_ADDRESS=out'.lower())):
port_name = value.strip()[-1]
if motor_existing:
break
if (value.strip().lower() == ('LEGO_DRIVER_NAME=' + _type).lower()):
self.sys_path = os.path.dirname(p)
motor_existing = True
if port_name is not None:
break
if motor_existing:
self.port = port_name
break
if (not motor_existing):
raise NoSuchMotorError(port, _type)
def stop(self):
self.write_value('command', 'stop')
def start(self):
self.write_value('command', self.mode)
def reset(self):
self.write_value('command', 'reset')
# setup functions just set up all the values, run calls start (run=1)
# these are separated so that multiple motors can be started at the same time
def setup_forever(self, speed_sp, **kwargs):
self.mode = 'run-forever'
for k in kwargs:
v = kwargs[k]
if (v != None):
setattr(self, k, v)
speed_regulation = self.speed_regulation
if (speed_regulation):
self.speed_sp = int(speed_sp)
else:
self.duty_cycle_sp = int(speed_sp)
def run_forever(self, speed_sp, **kwargs):
self.setup_forever(speed_sp, **kwargs)
self.start()
def setup_direct(self, duty_cycle_sp, **kwargs):
self.mode = 'run-forever'
for k in kwargs:
v = kwargs[k]
if (v != None):
setattr(self, k, v)
self.duty_cycle_sp = int(duty_cycle_sp)
def run_direct(self, duty_cycle_sp, **kwargs):
self.setup_direct(duty_cycle_sp, **kwargs)
self.start()
def setup_time_limited(self, time_sp, speed_sp, **kwargs):
self.mode = 'run-timed'
for k in kwargs:
v = kwargs[k]
if (v != None):
setattr(self, k, v)
speed_regulation = self.speed_regulation
if (speed_regulation):
self.speed_sp = int(speed_sp)
else:
self.duty_cycle_sp = int(speed_sp)
self.time_sp = int(time_sp)
def run_time_limited(self, time_sp, speed_sp, **kwargs):
self.setup_time_limited(time_sp, speed_sp, **kwargs)
self.start()
def setup_position_limited(self, position_sp, speed_sp, absolute=True, **kwargs):
if absolute == True:
self.mode = 'run-to-abs-pos'
else:
self.mode = 'run-to-rel-pos'
kwargs['speed_regulation'] = True
for k in kwargs:
v = kwargs[k]
if (v != None):
setattr(self, k, v)
self.speed_sp = int(speed_sp)
self.position_sp = int(position_sp)
def run_position_limited(self, position_sp, speed_sp, **kwargs):
self.setup_position_limited(position_sp, speed_sp, **kwargs)
self.start()
def I2CSMBusProxy(cls):
try:
from smbus import SMBus
smbus_proxied_methods = [
m for m in dir(SMBus) if (m.startswith('read') or m.startswith('write'))]
for m in smbus_proxied_methods:
def create_proxied_smb_method(method):
def proxied_smb_method(self, *args, **kwargs):
return getattr(self.b, method)(self.addr, *args, **kwargs)
return proxied_smb_method
setattr(cls, m, create_proxied_smb_method(m))
return cls
except ImportError:
warnings.warn('python-smbus binding not found!')
return cls
@I2CSMBusProxy
class I2CS(object):
def __init__(self, port, addr):
self.port = port
self.i2c_port = port + 2
self.sys_path = '/dev/i2c-%s' % self.i2c_port
if (not os.path.exists(self.sys_path)):
raise NoSuchSensorError(port)
try:
from smbus import SMBus
self.b = SMBus(self.i2c_port)
self.addr = addr
except ImportError:
raise NoSuchLibraryError('smbus')
def read_byte_array(self, reg, _len):
return [self.read_byte_data(reg + r) for r in range(_len)]
def read_byte_array_as_string(self, reg, _len):
return ''.join(chr(r) for r in self.read_byte_array(reg, _len))
class create_i2c_property(object):
def __init__(self, **kwargs):
self.kwargs = kwargs
def __call__(self, cls):
for name, reg_address_and_read_only in self.kwargs.items():
def i2c_property(reg, read_only=True):
def fget(self):
return self.read_byte_data(reg)
def fset(self, value):
return self.write_byte_data(reg, value)
return property(fget, None if read_only else fset)
if (type(reg_address_and_read_only) == int):
prop = i2c_property(reg_address_and_read_only)
else:
prop = i2c_property(
reg_address_and_read_only[0], **reg_address_and_read_only[1])
setattr(cls, name, prop)
return cls
@create_ev3_property(
brightness={'read_only': False, 'property_type': Ev3IntType},
max_brightness={'read_only': True, 'property_type': Ev3IntType},
trigger={'read_only': False, 'flush_on_write': True},
delay_on={'read_only': False, 'property_type': Ev3IntType},
delay_off={'read_only': False, 'property_type': Ev3IntType}
)
class LEDLight(Ev3Dev):
def __init__(self, light_path):
super(Ev3Dev, self).__init__()
self.sys_path = '/sys/class/leds/' + light_path
class LEDSide (object):
def __init__(self, left_or_right):
self.green = LEDLight('ev3-%s1:green:ev3dev' % left_or_right)
self.red = LEDLight('ev3-%s0:red:ev3dev' % left_or_right)
self._color = (0, 0)
@property
def color(self):
"""LED color (RED, GREEN), where RED and GREEN are integers
between 0 and 255."""
return self._color
@color.setter
def color(self, value):
assert len(value) == 2
assert 0 <= value[0] <= self.red.max_brightness
assert 0 <= value[1] <= self.green.max_brightness
self._color = (
self.red.brightness, self.green.brightness) = tuple(value)
def blink(self, color=(0, 0), **kwargs):
if (color != (0, 0)):
self.color = color
for index, light in enumerate((self.red, self.green)):
if (not self._color[index]):
continue
light.trigger = 'timer'
for p, v in kwargs.items():
setattr(light, p, v)
def on(self):
self.green.trigger, self.red.trigger = 'none', 'none'
self.red.brightness, self.green.brightness = self._color
def off(self):
self.green.trigger, self.red.trigger = 'none', 'none'
self.red.brightness, self.green.brightness = 0, 0
class LED(object):
class COLOR:
NONE = (0, 0)
RED = (255, 0)
GREEN = (0, 255)
YELLOW = (25, 255)
ORANGE = (120, 255)
AMBER = (255, 255)
left = LEDSide('left')
right = LEDSide('right')
@create_ev3_property(
tone={'read_only': False},
mode={'read_only': True},
volume={'read_only': False, 'property_type': Ev3IntType}
)
class Tone(Ev3Dev):
def __init__(self):
super(Ev3Dev, self).__init__()
self.sys_path = '/sys/devices/platform/snd-legoev3'
def play(self, frequency, milliseconds=1000):
self.tone = '%d %d' % (frequency, milliseconds)
def stop(self):
self.tone = '0'
class Lcd(object):
def __init__(self):
try:
from PIL import Image, ImageDraw
SCREEN_WIDTH = 178
SCREEN_HEIGHT = 128
HW_MEM_WIDTH = int((SCREEN_WIDTH + 31) / 32) * 4
SCREEN_MEM_WIDTH = int((SCREEN_WIDTH + 7) / 8)
LCD_BUFFER_LENGTH = SCREEN_MEM_WIDTH * SCREEN_HEIGHT
LCD_HW_BUFFER_LENGTH = HW_MEM_WIDTH * SCREEN_HEIGHT
self._buffer = Image.new(
"1", (HW_MEM_WIDTH * 8, SCREEN_HEIGHT), "white")
self._draw = ImageDraw.Draw(self._buffer)
except ImportError:
raise NoSuchLibraryError('PIL')
def update(self):
f = os.open('/dev/fb0', os.O_RDWR)
os.write(f, self._buffer.tobytes("raw", "1;IR"))
os.close(f)
@property
def buffer(self):
return self._buffer
@property
def draw(self):
return self._draw
def reset(self):
self._draw.rectangle(
(0, 0) + self._buffer.size, outline='white', fill='white')
class attach_ev3_keys(object):
def __init__(self, **kwargs):
self.kwargs = kwargs
def __call__(self, cls):
key_const = {}
for key_name, key_code in self.kwargs.items():
def attach_key(key_name, key_code):
def fget(self):
buf = self.polling()
return self.test_bit(key_code, buf)
return property(fget)
setattr(cls, key_name, attach_key(key_name, key_code))
key_const[key_name.upper()] = key_code
setattr(cls, 'CODE', Enum(**key_const))
return cls
import array
import fcntl
@attach_ev3_keys(
up=103,
down=108,
left=105,
right=106,
enter=28,
backspace=14
)
class Key(object):
def __init__(self):
pass
def EVIOCGKEY(self, length):
return 2 << (14 + 8 + 8) | length << (8 + 8) | ord('E') << 8 | 0x18
def test_bit(self, bit, bytes):
# bit in bytes is 1 when released and 0 when pressed
return not bool(bytes[int(bit / 8)] & 1 << bit % 8)
def polling(self):
KEY_MAX = 0x2ff
BUF_LEN = int((KEY_MAX + 7) / 8)
buf = array.array('B', [0] * BUF_LEN)
with open('/dev/input/by-path/platform-gpio-keys.0-event', 'r') as fd:
ret = fcntl.ioctl(fd, self.EVIOCGKEY(len(buf)), buf)
if (ret < 0):
return None
else:
return buf
"""
A simple condition-based event loop.
"""
import time
class EventLoop(object):
"""The event loop.
"""
def __init__(self):
self._events = []
self._closed = False
def register_condition(self, condition, target, repeat=False, count=-1):
"""Register `target` to be called when ``condition()`` evaluates to
true.
If `repeat` is false, `target` will only be called once for a serie
of polls where `condition` evaluates to true, e.g. when a button is
pressed.
If `count` is a positive number, the condition will be automatically
unregistered after been called this many times.
Returns an ID that can be used to unregister the condition.
"""
self._events.append(Event(condition, target, repeat, count))
return len(self._events) - 1
def register_value_change(self, getter, startvalue, target, count=-1):
"""Register `target` to be called when evaluating ``getter()``
returns a new value. `startvalue` is the starting value to check
against.
If `count` is a positive number, `target` will be called
periodically this many times before it will be disabled.
Returns an ID that can be used to unregister the timer.
"""
self._events.append(ValueChangeEvent(
getter, startvalue, target, count))
return len(self._events) - 1
def register_timer(self, seconds, target, count=1):
"""Register `target` to be called when the given number of seconds
has passed.
If `count` is a positive number, `target` will be called
periodically this many times before it will be disabled.
Returns an ID that can be used to unregister the timer.
"""
target_time = time.time() + seconds
condition = lambda: time.time() >= target_time
return self.register_condition(condition, target, False, count)
def register_poll(self, poller, callback):
"""Register `callback` to be called on every ``poller()`` evaluation.
Returns an ID that can be used to unregister this poll event.
"""
self._events.append(PollEvent(
poller, callback))
return len(self._events) - 1
def unregister(self, id):
"""Ungegister condition with the given id."""
del self._events[id]
def start(self, time_delta = 0.1):
"""Starts the event loop."""
self._loop(time_delta)
def stop(self):
"""Stops event loop."""
self._closed = True
def _loop(self, time_delta = 0.1):
"""The event loop."""
while not self._closed:
for i in range(len(self._events) - 1, -1, -1):
self._events[i].evaluate()
if self._events[i]._count == 0:
self.unregister(i)
time.sleep(time_delta)
class Event(object):
"""The base Event class.
"""
def __init__(self, condition, target, repeat=False, count=-1):
self._condition = condition
self._target = target
self._repeat = repeat
self._count = count
self._previous_evaluation = False
self._evaluation = False
def poll(self):
"""Returns true if condition is satisfied, otherwise
false. `evaluation` is the evaluated condition."""
return self._evaluation and (
self._repeat or not self._previous_evaluation)
def evaluate(self):
"""Evaluate the condition. If it evaluates to true, `target` is
called and the count number is decreased."""
self._evaluation = self._condition()
if self.poll():
self._target(self)
if self._count >= 0:
self._count -= 1
self._previous_evaluation = self._evaluation
evaluation = property(
lambda self: self._evaluation,
doc='The value from the most resent evaluation of the condition.')
previous_evaluation = property(
lambda self: self._previous_evaluation,
doc='The value from the previous evaluation of the condition.')
repeat = property(
lambda self: self._repeat,
lambda self, value: setattr(self, '_repeat', bool(value)),
doc="Whether to call target every time `condition` is true, even "
"if it haven't changed since last poll.")
count = property(
lambda self: self._count,
lambda self, value: setattr(self, '_count', int(value)),
doc='Remaining number of times this event can be fired before it is '
'automatically unregistered.')
class ValueChangeEvent(Event):
def __init__(self, getter, startvalue, target, count=-1):
Event.__init__(self, getter, target, count=count)
self._previous_evaluation = startvalue
def poll(self):
return self._evaluation != self._previous_evaluation
class PollEvent(Event):
def __init__(self, poller, callback):
Event.__init__(self, poller, callback, count=-1)
def poll(self):
return True
from .ev3dev import LegoSensor, Motor
class TouchSensor(LegoSensor):
def __init__(self, port=-1):
# Both lego-nxt-touch and lego-ev3-touch support auto
LegoSensor.__init__(self, port, name='lego-ev3-touch')
@property
def is_pushed(self):
self.mode = 'TOUCH'
return bool(self.value0)
class LightSensor(LegoSensor):
def __init__(self, port=-1):
LegoSensor.__init__(self, port, name='lego-nxt-light')
@property
def reflect(self):
self.mode = 'REFLECT'
# Reflected light intensity (0 to 100)
return self.value0/10.0
@property
def ambient(self):
self.mode = 'AMBIENT'
# Ambient light intensity (0 to 100)
return self.value0/10.0
class SoundSensor(LegoSensor):
def __init__(self, port=-1):
LegoSensor.__init__(self, port, name='lego-nxt-sound')
@property
def db(self):
self.mode = 'DB'
# Sound pressure level (0 to 1000)
return self.value0/10.0
@property
def dba(self):
self.mode = 'DBA'
# Sound pressure level (0 to 1000)
return self.value0/10.0
class ColorSensor(LegoSensor):
colors = (None, 'black', 'blue', 'green',
'yellow', 'red', 'white', 'brown')
def __init__(self, port=-1):
LegoSensor.__init__(self, port, name='lego-ev3-color')
@property
def rgb(self):
"""Raw color components; (`r`, `g`, `b`). Values between 0 and 1020(??).
All leds rapidly cycle, appears white."""
self.mode = 'RGB-RAW'
return self.value0, self.value1, self.value2
@property
def color(self):
"""Integer between 0 and 7 indicating the color. See the `colors`
attribute for color names.
All leds rapidly cycle, appears white."""
self.mode = 'COL-COLOR'
return self.value0
@property
def reflect(self):
"""Reflected intensity in percent (int). Red led is on."""
self.mode = 'COL-REFLECT'
return self.value0
@property
def ambient(self):
"""Ambient intensity in percent (int). Red led is off."""
self.mode = 'COL-AMBIENT'
return self.value0
@property
def ref_raw(self):
"""Raw reflected intensity; (`r`, `b`). Values between 0 and 1020(??).
Red led is on."""
self.mode = 'REF-RAW'
return self.value0, self.value1
class InfraredSensor(LegoSensor):
def __init__(self, port=-1):
LegoSensor.__init__(self, port, name='lego-ev3-ir')
class REMOTE:
"""Button values for the `remote` property."""
NONE = 0
RED_UP = 1
RED_DOWN = 2
BLUE_UP = 3
BLUE_DOWN = 4
RED_UP_AND_BLUE_UP = 5
RED_UP_AND_BLUE_DOWN = 6
RED_DOWN_AND_BLUE_UP = 7
RED_DOWN_AND_BLUE_DOWN = 8
BAECON_MODE_ON = 9
RED_UP_AND_RED_DOWN = 10
BLUE_UP_AND_BLUE_DOWN = 11
@property
def remote(self):
"""IR remote control mode. A tuple of recieved value for each of the 4
channels.
"""
self.mode = 'IR-REMOTE'
return self.value0, self.value1, self.value2, self.value3
@property
def remote_bin(self):
self.mode = 'IR-REM-A'
return self.value0
@property
def prox(self):
"""Proximity mode. Distance in percent (100% is about 70cm)."""
self.mode = 'IR-PROX'
return self.value0
@property
def seek(self):
"""IR Seeker mode. A list of (`heading`, `distance`) pairs for each of
the 4 channels.
When looking in the same direction as the sensor, `heading` =
-25 is far left and `heading` = +25 is far right.
`distance` is the distance in percent (100% is about 70cm).
Channels with no baecon returns (0, 128).
"""
self.mode = 'IR-SEEK'
return [(self.value0, self.value1),
(self.value2, self.value3),
(self.value4, self.value5),
(self.value6, self.value7)]
class GyroSensor(LegoSensor):
def __init__(self, port=-1):
LegoSensor.__init__(self, port, name='lego-ev3-gyro')
@property
def ang(self):
self.mode = 'GYRO-ANG'
return self.value0
@property
def rate(self):
self.mode = 'GYRO-RATE'
return self.value0
@property
def ang_and_rate(self):
self.mode = 'GYRO-G&A'
return self.value0, self.value1
class UltrasonicSensor(LegoSensor):
def __init__(self, port=-1):
LegoSensor.__init__(self, port, name='lego-ev3-us')
@property
def dist_cm(self):
self.mode = 'US-DIST-CM'
return self.value0/10.0
@property
def dist_in(self):
self.mode = 'US-DIST-IN'
return self.value0/10.0
@property
def listen(self):
self.mode = 'US-LISTEN'
return bool(self.value0)
@property
def si_cm(self):
self.mode_force_flush('US-SI-CM')
return self.value0/10.0
@property
def si_in(self):
self.mode_force_flush('US-SI-IN')
return self.value0/10.0
class LargeMotor(Motor):
def __init__(self, port=''):
Motor.__init__(self, port, _type='lego-ev3-l-motor')
class MediumMotor(Motor):
def __init__(self, port=''):
Motor.__init__(self, port, _type='lego-ev3-m-motor')
from .ev3dev import I2CS
from .ev3dev import LegoSensor
class MindSensorI2CS(I2CS):
@property
def version(self):
return self.read_byte_array_as_string(0x00, 8)
@property
def vendor_id(self):
return self.read_byte_array_as_string(0x08, 8)
@property
def device_id(self):
return self.read_byte_array_as_string(0x10, 8)
@I2CS.create_i2c_property(
command=(0x41, {'read_only': False}),
button_set_1 = 0x42,
button_set_2= 0x43,
x_left= 0x44,
y_left= 0x45,
x_right= 0x46,
y_right= 0x47,
up= 0x4A,
right= 0x4B,
down= 0x4C,
left= 0x4D,
l2= 0x4E,
r2= 0x4F,
l1= 0x50,
r1= 0x51,
triangle= 0x52,
circle= 0x53,
cross= 0x54,
square= 0x55)
class PSPNxV4(MindSensorI2CS):
def __init__(self, port, addr=0x01):
I2CS.__init__(self, port, addr)
self.command = 0x49
class AbsoluteIMU(LegoSensor):
# Is this too tricky to create property?
def __init__(self, port=-1):
LegoSensor.__init__(self, port, name='ms-absolute-imu')
self._mode = ''
@property
def version(self):
return self.fw_version
@property
def compass_cal_start(self):
self.write_value('command', 'BEGIN-COMP-CAL')
@property
def compass_cal_end(self):
self.write_value('command', 'END-COMP-CAL')
@property
def acc_2g(self):
self.write_value('command', 'ACCEL-2G')
@property
def acc_4g(self):
self.write_value('command', 'ACCEL-4G')
@property
def acc_8g(self):
self.write_value('command', 'ACCEL-8G')
@property
def acc_16g(self):
self.write_value('command', 'ACCEL-16G')
@property
def x_acc(self):
self.mode = 'ACCEL'
return self.value0
@property
def y_acc(self):
self.mode = 'ACCEL'
return self.value1
@property
def z_acc(self):
self.mode = 'ACCEL'
return self.value2
@property
def x_tilt(self):
self.mode = 'TILT'
return self.value0
@property
def y_tilt(self):
self.mode = 'TILT'
return self.value1
@property
def z_tilt(self):
self.mode = 'TILT'
return self.value2
@property
def x_raw_magnetic(self):
self.mode = 'MAG'
return self.value0
@property
def y_raw_magnetic(self):
self.mode = 'MAG'
return self.value1
@property
def z_raw_magnetic(self):
self.mode = 'MAG'
return self.value2
@property
def x_gyro(self):
self.mode = 'GYRO'
return self.value0
@property
def y_gyro(self):
self.mode = 'GYRO'
return self.value1
@property
def z_gyro(self):
self.mode = 'COMPASS'
return self.value0
@property
def compass(self):
self.mode = 'GYRO'
return self.value2
class MagicWand(MindSensorI2CS):
val = 0xff
# port = 1..4 , matching the EV3 input port where the magic wand is
# connected
def __init__(self, port, addr=0x38):
MindSensorI2CS.__init__(self, port, addr)
def put_data(self, v):
self.val = v
MindSensorI2CS.write_byte(self, v)
# turns all leds on
def led_all_on(self):
self.put_data(0x00)
# turns all leds off
def led_all_off(self):
self.put_data(0xff)
# turns a specific led on. leds are indexed 1..8
def led_on(self, num):
self.put_data(self.val & (0xff - (1 << (num - 1))))
# turns a specific led off. leds are indexed 1..8
def led_off(self, num):
self.put_data(self.val | (1 << (num - 1)))
#!/bin/bash
sudo iptables -P FORWARD ACCEPT
sudo iptables -A POSTROUTING -t nat -j MASQUERADE -s 10.42.0.0/24
...@@ -4,7 +4,7 @@ import socketserver ...@@ -4,7 +4,7 @@ import socketserver
import socket import socket
import os import os
import sys import sys
import ev3dev.ev3 as ev3 import ev3dev as ev3
if len(sys.argv) != 2: if len(sys.argv) != 2:
def eprint(*args, **kwargs): def eprint(*args, **kwargs):
...@@ -118,6 +118,8 @@ class TCPServer(socketserver.TCPServer): ...@@ -118,6 +118,8 @@ class TCPServer(socketserver.TCPServer):
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.bind(self.server_address) self.socket.bind(self.server_address)
httpd = TCPServer(("", int(sys.argv[1])), Handler) httpd = TCPServer(("", int(float(sys.argv[1]))), Handler)
print ('EV3 ready.') print ('EV3 ready.')
print ('http://snap.berkeley.edu/snapsource/snap.html#open:http://localhost:1330/snap-ev3')
httpd.serve_forever() httpd.serve_forever()
import sys
print(sys.path)
sys.path.insert(0, "/ev3script")
print("AFTER")
print(sys.path)
import sys
sys.path.remove("/home/isaac/Documents/ev3script")
import sys
print (sys.path)
#!/bin/bash
#snap interface to the lego mindstorms robot installation
#Runs python script to append the required files to the pythonpath
python pathappend.py
ifconfig
sudo ifconfig enp0s26u1u4 10.42.0.1 up
ifconfig
#Runs the program
python snap-ev3.py
...@@ -50,5 +50,4 @@ os.system('scp listen.py %s@%s:' % (EV3_USER, EV3_IP)) ...@@ -50,5 +50,4 @@ os.system('scp listen.py %s@%s:' % (EV3_USER, EV3_IP))
#os.system('ssh %s@%s -- nohup python3 listen.py %d &' % (EV3_USER, EV3_IP, EV3_PORT)) #os.system('ssh %s@%s -- nohup python3 listen.py %d &' % (EV3_USER, EV3_IP, EV3_PORT))
httpd = TCPServer(("", SNAP_PORT), Handler) httpd = TCPServer(("", SNAP_PORT), Handler)
print "http://snap.berkeley.edu/snapsource/snap.html#open:http://localhost:1330/snap-ev3"
httpd.serve_forever() httpd.serve_forever()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment