Skip to content
Extraits de code Groupes Projets
Valider 4ea239d4 rédigé par François De Keersmaeker's avatar François De Keersmaeker
Parcourir les fichiers

Added all devices

parent d3f528d9
Aucune branche associée trouvée
Aucune étiquette associée trouvée
Aucune requête de fusion associée trouvée
Affichage de
avec 693 ajouts et 2 suppressions
# Config folders # Config folders
.venv .venv
.vscode
# Cache folders
__pycache__
# Local test files
test.py
...@@ -59,5 +59,5 @@ dependencies = [ ...@@ -59,5 +59,5 @@ dependencies = [
] ]
[project.urls] [project.urls]
"Homepage" = "https://forge.uclouvain.be/smart-home-network-security/signature-extraction" "Homepage" = "https://forge.uclouvain.be/smart-home-network-security/smart-home-testbed"
"Source" = "https://forge.uclouvain.be/smart-home-network-security/signature-extraction" "Source" = "https://forge.uclouvain.be/smart-home-network-security/smart-home-testbed"
import time
from .DeviceControl import DeviceControl
class CameraControl(DeviceControl):
"""
Class to control a camera device.
"""
# Stream event duration
stream_duration = 7
def stream(self) -> None:
"""
Perform the `stream` event.
"""
self.start_stream()
time.sleep(self.stream_duration)
self.stop_stream()
def start_stream(self) -> None:
"""
Start the camera's video stream.
"""
self.get_phone().shell(f"input tap {self.x_start} {self.y_start}")
def do_stop_stream(self) -> None:
"""
Stop the camera's video stream.
"""
self.get_phone().shell(f"input tap {self.x_stop} {self.y_stop}")
def stop_stream(self) -> None:
"""
Check if the stream was successful,
then stop the stream.
"""
# Check if stream was successful
self._was_last_stream_successful = self.get_state()
if self._was_last_stream_successful:
self.do_stop_stream()
from ppadb.device import Device as AdbDevice
from ppadb.client import Client as AdbClient
import cv2
import numpy as np
class DeviceControl:
"""
Abstract class which provides methods to control a device.
"""
ADB_SERVER_IP = "127.0.0.1"
ADB_SERVER_PORT = 5037
def get_phone(self) -> AdbDevice:
"""
Get the adb device object.
Returns:
ppadb.device.Device: The adb device object.
Raises:
IndexError: If no adb device is found.
"""
adb = AdbClient(host=DeviceControl.ADB_SERVER_IP, port=DeviceControl.ADB_SERVER_PORT)
return adb.devices()[0]
def start_app(self) -> None:
"""
Start the device's app on the phone.
Raises:
IndexError: If no adb device is found.
"""
phone = self.get_phone()
phone.shell(f"monkey -p {self.android_package} -c android.intent.category.LAUNCHER 1")
def close_app(self) -> None:
"""
Close the device's app on the device.
Raises:
IndexError: If no adb device is found.
"""
phone = self.get_phone()
phone.shell(f"am force-stop {self.android_package}")
def screenshot(self) -> np.ndarray:
"""
Take a screenshot of the device's screen.
Returns:
numpy.ndarray: The screenshot as a numpy array.
Raises:
IndexError: If no adb device is found.
"""
screenshot_bytes = self.get_phone().screencap()
screenshot_array = np.frombuffer(screenshot_bytes, dtype=np.uint8)
return cv2.imdecode(screenshot_array, cv2.IMREAD_UNCHANGED)
import random
from .DeviceControl import DeviceControl
class LightControl(DeviceControl):
"""
Class to control a light bulb device.
"""
def toggle(self) -> None:
"""
Perform the toggle event on the light.
"""
self.get_phone().shell(f"input tap {self.x} {self.y}")
def _set_light_attr(self, y: int) -> None:
"""
Randomly set an attribute of the light
(brightness or color).
Args:
y (int): The y-coordinate of the corresponding gauge.
"""
x = random.randint(self.x_left_gauge, self.x_right_gauge)
self.get_phone().shell(f"input tap {x} {y}")
def do_set_brightness(self) -> None:
"""
Randomly set the brightness of the light.
"""
self._set_light_attr(self.y_brightness)
def set_brightness(self) -> None:
"""
Template method to randomly set the brightness of the light.
The child class must implement the concrete method `is_on`,
which checks if the light is on.
"""
# If light is off, turn it on
if self.is_off():
self.toggle()
self.do_set_brightness()
def do_set_color(self) -> None:
"""
Randomly set the color of the light.
"""
self._set_light_attr(self.y_color)
def set_color(self) -> None:
"""
Template method to randomly set the color of the light.
The child class must implement the concrete method `is_on`,
which checks if the light is on.
"""
# If light is off, turn it on
if self.is_off():
self.toggle()
self.do_set_color()
from .DeviceControl import DeviceControl
class PlugControl(DeviceControl):
"""
Class to control a plug device.
"""
def toggle(self) -> None:
"""
Perform the toggle action on the plug.
"""
self.get_phone().shell(f"input tap {self.x} {self.y}")
import time
from .DeviceControl import DeviceControl
class SmartThingsControl(DeviceControl):
"""
Abstract class to control a generic device through SmartThings.
"""
## Class variables
# Android package name
android_package = "com.samsung.android.oneconnect"
# Devices tab button screen coordinates
devices_x = 223.2
devices_y = 1281.6
def start_app(self) -> None:
"""
Start the SmartThings app on the phone,
and open the device controls.
Raises:
IndexError: If no adb device is found.
"""
phone = self.get_phone()
# Start SmartThings app
phone.shell(f"monkey -p {SmartThingsControl.android_package} -c android.intent.category.LAUNCHER 1")
time.sleep(10)
# Open "Devices" tab
phone.shell(f"input tap {SmartThingsControl.devices_x} {SmartThingsControl.devices_y}")
time.sleep(5)
# Open device controls
phone.shell(f"input tap {self.device_x} {self.device_y}")
from .SmartThingsControl import SmartThingsControl
from .LightControl import LightControl
class SmartThingsLightControl(SmartThingsControl, LightControl):
"""
Abstract class to control a generic light bulb through SmartThings.
"""
## Screen event coordinates
# Toggle
x = 626.4
y = 273.6
# Brightness and color gauges
x_left_gauge = 21
x_right_gauge = 692
# Brightness
y_brightness = 504
# Color
y_color = 1036.8
from .SmartThingsControl import SmartThingsControl
from .PlugControl import PlugControl
class SmartThingsPlugControl(SmartThingsControl, PlugControl):
"""
Abstract class to control a generic plug through SmartThings.
"""
# Toggle coordinates
x = 626.4
y = 273.6
import time
from .DeviceControl import DeviceControl
class TapoControl(DeviceControl):
"""
Abstract class to control a generic device through the Tapo app.
"""
# Android package name
android_package = "com.tplink.iot"
def start_app(self) -> None:
"""
Start the device's app on the phone.
Raises:
IndexError: If no adb device is found.
"""
phone = self.get_phone()
# Start Tapo app
phone.shell(f"monkey -p {TapoControl.android_package} -c android.intent.category.LAUNCHER 1")
time.sleep(10)
# Open device controls
phone.shell(f"input tap {self.device_x} {self.device_y}")
from .TapoControl import TapoControl
from .PlugControl import PlugControl
class TapoPlugControl(TapoControl, PlugControl):
"""
Abstract class to control a generic plug through the Tapo app.
"""
# Toggle coordinates
x = 612
y = 619.2
import time
from .DeviceControl import DeviceControl
class TuyaControl(DeviceControl):
"""
Abstract class to control a generic device through the Tuya app.
"""
# Metadata
android_package = "com.tuya.smart"
version = "3.3"
def start_app(self) -> None:
"""
Start the Tuya app on the phone.
Raises:
IndexError: If no adb device is found.
"""
phone = self.get_phone()
# Start Tapo app
phone.shell(f"monkey -p {TuyaControl.android_package} -c android.intent.category.LAUNCHER 1")
time.sleep(10)
# Open device controls
phone.shell(f"input tap {self.device_x} {self.device_y}")
from .DeviceControl import DeviceControl
from .PlugControl import PlugControl
from .LightControl import LightControl
from .CameraControl import CameraControl
from .TapoControl import TapoControl
from .TapoPlugControl import TapoPlugControl
from .SmartThingsControl import SmartThingsControl
from .SmartThingsPlugControl import SmartThingsPlugControl
from .SmartThingsLightControl import SmartThingsLightControl
from .TuyaControl import TuyaControl
import cv2
from skimage.metrics import structural_similarity as ssim
from .CameraState import CameraState
class CameraScreenshotState(CameraState):
"""
Abstract class for the state of a camera device,
which uses screenshot-based event validation.
"""
FILENAME_SCREENSHOT_STREAM = "stream.png"
def __init__(self, ipv4: str, **kwargs) -> None:
"""
Constructor.
Initialize the camera device with its IP address.
Args:
ipv4 (str): The device's IPv4 address.
kwargs (dict): device-specific additional parameters,
including the path to the streaming screenshot.
"""
# Call parent constructor
super().__init__(ipv4, **kwargs)
# Compute gray array of streaming screenshot
stream_image = cv2.imread(kwargs.get("path_screenshot_stream", CameraScreenshotState.FILENAME_SCREENSHOT_STREAM))
self.gray_stream_image = cv2.cvtColor(stream_image, cv2.COLOR_BGR2GRAY)
def get_state(self) -> bool:
"""
Get the state of the camera device,
i.e. if it is currently streaming,
by taking a screenshot of its app.
Returns:
bool: True if the camera is currently streaming, False otherwise.
"""
# Take screenshot and convert to grayscale
screenshot_array = self.screenshot()
gray_screenshot = cv2.cvtColor(screenshot_array, cv2.COLOR_BGR2GRAY)
# Compute SSIM
try:
score = ssim(self.gray_stream_image, gray_screenshot, full=False)
return score > self.SSIM_DIFF_THRESHOLD
except ValueError:
return False
from .DeviceState import DeviceState
class CameraState(DeviceState):
"""
Abstract class for the state of a camera device.
"""
def __init__(self, ipv4: str, **kwargs) -> None:
"""
Constructor.
Initialize the camera device with its IP address.
Args:
ipv4 (str): The device's IPv4 address.
kwargs (dict): device-specific additional parameters.
"""
self.ipv4 = ipv4
self._was_last_stream_successful = False
def is_event_successful(self, _) -> bool:
"""
Check if the last stream event was successful.
Returns:
bool: True if the event was successful, False otherwise.
"""
return self._was_last_stream_successful
from typing import Any
class DeviceState:
"""
Abstract class which represent a device state.
"""
def __init__(self, ipv4: str, **kwargs) -> None:
"""
Constructor.
Initialize the device with its IP address.
Args:
ipv4 (str): The device's IPv4 address.
kwargs (dict): device-specific additional parameters.
"""
self.ipv4 = ipv4
# Set the device-specific parameters
for key, value in kwargs.items():
setattr(self, key, value)
def get_state(self) -> Any:
"""
Get the state of the device.
Returns:
Any: current state of the device.
"""
raise NotImplementedError
def is_event_successful(self, previous_state: Any) -> bool:
"""
Check if an event was successful,
i.e. if the device is still reachable
and its state has changed compared to the previous state.
Args:
previous_state (Any): The state of the device before the event.
Returns:
bool: True if the event was successful, False otherwise.
"""
try:
state = self.get_state()
except Exception:
return False
else:
return state != previous_state
from typing import Awaitable
from enum import Enum
import contextlib
import asyncio
from aiohue import HueBridgeV2, create_app_key
from .LightState import LightState
class HueState(LightState):
"""
Class to represent the state of a Philips Hue light bulb.
"""
class StateKeys(Enum):
"""
Enum for the keys in the dictionary representing the state of the Philips Hue device.
"""
IS_ON = "is_on"
BRIGHTNESS = "brightness"
COLOR_X = "color_x"
COLOR_Y = "color_y"
@staticmethod
async def _get_app_key(bridge_ip: str) -> Awaitable[str]:
"""
Asynchronously get the app key for the Philips Hue bridge.
Args:
bridge_ip (str): IP address of the Philips Hue bridge.
Returns:
Awaitable[str]: async function which returns the app key.
"""
input("Press the link button on the bridge and press enter to continue...")
return await create_app_key(bridge_ip, "authentication_example")
@classmethod
def get_app_key(cls, bridge_ip: str) -> str:
"""
Get the app key for the Philips Hue bridge.
Args:
bridge_ip (str): IP address of the Philips Hue bridge.
Returns:
str: app key.
"""
with contextlib.suppress(KeyboardInterrupt):
asyncio.run(cls._get_app_key(bridge_ip))
async def _async_get_state(self) -> Awaitable[dict]:
"""
Get the state of the Philips Hue light asynchronously.
Returns:
Awaitable[dict]: async function which returns the status of the Philips Hue light.
"""
async with HueBridgeV2(self.ipv4, self.appkey) as bridge:
light = next(l for l in bridge.lights.items)
return {
self.StateKeys.IS_ON: light.is_on,
self.StateKeys.BRIGHTNESS: light.brightness,
self.StateKeys.COLOR_X: light.color.xy.x,
self.StateKeys.COLOR_Y: light.color.xy.y
}
def get_state(self) -> dict:
"""
Get the state of the Philips Hue light.
Returns:
dict: The status of the Philips Hue light.
"""
return asyncio.run(self._async_get_state())
from enum import Enum
from .DeviceState import DeviceState
class LightState(DeviceState):
"""
Abstract class for the state of a light device.
"""
class StateKeys(Enum):
"""
Enum for the keys in the dictionary representing the state of a light device.
"""
IS_ON = "is_on"
BRIGHTNESS = "brightness"
HUE = "hue"
SATURATION = "saturation"
def is_off(self) -> bool:
"""
Check if the light is on.
Returns:
bool: True if the light is on, False otherwise.
"""
return not self.get_state()[self.StateKeys.IS_ON]
import numpy as np
import cv2
from skimage.metrics import structural_similarity as ssim
from .DeviceState import DeviceState
class ScreenshotState(DeviceState):
"""
Device state using app screenshot.
"""
# SSIM threshold below which images are considered different
SSIM_DIFF_THRESHOLD = 0.95
def get_state(self) -> np.ndarray:
"""
Get the state of the device,
by taking a screenshot of its app.
Returns:
numpy.ndarray: The screenshot as a numpy array.
"""
return self.screenshot()
def is_event_successful(self, previous_state: np.ndarray) -> bool:
"""
Check if an event was successful,
i.e. if the current state has changed compared to the given previous state.
This method uses the SSIM metric to compare the screenshots.
Args:
previous_state (numpy.ndarray): app screenshot, as a numpy array, which represents the previous state.
Returns:
bool: True if the event was successful, False otherwise.
"""
current_state = self.get_state()
# Load images, and convert to grayscale
gray_previous = cv2.cvtColor(previous_state, cv2.COLOR_BGR2GRAY)
gray_current = cv2.cvtColor(current_state, cv2.COLOR_BGR2GRAY)
# Compute SSIM
score = ssim(gray_previous, gray_current, full=False)
return score < self.SSIM_DIFF_THRESHOLD
from typing import List, Any, Awaitable
import asyncio
import aiohttp
from pysmartthings import SmartThings, DeviceEntity
from .DeviceState import DeviceState
class SmartThingsState(DeviceState):
"""
Abstract class which represents a generic SmartThings device's state.
"""
@staticmethod
async def _get_all_devices(token: str) -> Awaitable[List[DeviceEntity]]:
"""
Asynchronously get all the devices connected to SmartThings.
Args:
token (str): SmartThings API token.
Returns:
Awaitable[List]: async function which returns the list of all devices.
"""
async with aiohttp.ClientSession() as session:
api = SmartThings(session, token)
devices = await api.devices()
return devices
@classmethod
def get_all_devices(cls) -> List[DeviceEntity]:
"""
Get all the devices connected to SmartThings.
Returns:
List: list of all devices
"""
return asyncio.run(cls._get_all_devices())
@classmethod
def print_all_devices(cls) -> None:
"""
Print the label, name and id of all devices connected to SmartThings.
"""
for device in cls.get_all_devices():
print(device.label, device.name, device.device_id)
async def _async_get_state(self) -> Awaitable[Any]:
"""
Asynchronously get the SmartThings device status.
Returns:
Awaitable[Any]: async function which returns the device status.
"""
async with aiohttp.ClientSession() as session:
api = SmartThings(session, self.token)
devices = await api.devices(device_ids=[self.id])
device = devices[0]
await device.status.refresh()
return device.status
def get_state(self) -> Any:
"""
Get the device status.
Returns:
Any: device status
"""
return asyncio.run(self._async_get_state())
0% Chargement en cours ou .
You are about to add 0 people to the discussion. Proceed with caution.
Terminez d'abord l'édition de ce message.
Veuillez vous inscrire ou vous pour commenter