# -*- coding: utf-8 -*-
"""
Copyright (c) 2015 Jesse Peterson, 2017 Mosen
Licensed under the MIT license. See the included LICENSE.txt file for details.
Attributes:
db (SQLAlchemy): A reference to flask SQLAlchemy extensions db instance.
"""
from typing import Optional, Type
from flask_sqlalchemy import SQLAlchemy
import datetime
from enum import Enum, IntEnum
from sqlalchemy.ext.mutable import MutableDict
from sqlalchemy.ext.hybrid import hybrid_property
from .dbtypes import GUID, JSONEncodedDict
from .mdm import CommandStatus, Platform, commands
import base64
from binascii import hexlify
from biplist import Data as NSData
from .profiles.certificates import KeyUsage
db = SQLAlchemy()
class CellularTechnology(IntEnum):
Nothing = 0
GSM = 1
CDMA = 2
Both = 3
device_tags = db.Table(
'device_tags',
db.metadata,
db.Column('device_id', db.Integer, db.ForeignKey('devices.id')),
db.Column('tag_id', db.Integer, db.ForeignKey('tags.id')),
)
[docs]class Device(db.Model):
"""An enrolled device.
:table: devices
"""
__tablename__ = 'devices'
# Common attributes
id = db.Column(db.Integer, primary_key=True)
"""id (int):"""
udid = db.Column(db.String(40), index=True, nullable=True)
"""udid (str): Unique Device Identifier"""
last_seen = db.Column(db.DateTime, nullable=True)
"""last_seen (datetime.datetime): When the device last contacted the MDM."""
is_enrolled = db.Column(db.Boolean, default=False)
"""is_enrolled (bool): Whether the MDM should consider this device enrolled."""
# APNS / Push
topic = db.Column(db.String, nullable=True)
"""topic (str): The APNS topic the device is listening on."""
push_magic = db.Column(db.String, nullable=True)
"""push_magic (str): The UUID that establishes a unique relationship between the device and the MDM."""
# The APNS device token is stored in base64 format. Descriptors are added to handle this encoding and decoding
# to bytes automatically.
_token = db.Column(db.String, nullable=True)
tokenupdate_at = db.Column(db.DateTime)
# if null there are no outstanding push notifications. If this contains anything then dont attempt to deliver
# another APNS push.
last_push_at = db.Column(db.DateTime, nullable=True)
"""last_push_at (datetime.datetime): The datetime when the last push was sent to APNS for this device."""
last_apns_id = db.Column(db.Integer, nullable=True)
"""last_apns_id (str): The UUID of the last apns command sent."""
# if the time delta between last_push_at and last_seen is >= several days to a week,
# this should count as a failed push, and potentially declare the device as dead.
failed_push_count = db.Column(db.Integer, default=0, nullable=False)
# Table 5
last_cloud_backup_date = db.Column(db.DateTime)
"""last_cloud_backup_date (datetime): The date of the last iCloud backup."""
awaiting_configuration = db.Column(db.Boolean)
"""awaiting_configuration (bool): True if device is waiting at Setup Assistant"""
# Table 6
itunes_store_account_is_active = db.Column(db.Boolean)
"""itunes_store_account_is_active (bool): the user is currently logged into an active iTunes Store account."""
itunes_store_account_hash = db.Column(db.String)
"""itunes_store_account_hash (str): a hash of the iTunes Store account currently logged in."""
# DeviceInformation : Table 7
device_name = db.Column(db.String) # Authenticate
"""device_name (str): Name of the device"""
os_version = db.Column(db.String) # Authenticate
"""os_version (str): The operating system version number."""
build_version = db.Column(db.String) # Authenticate
"""build_version (str): DeviceInformation BuildVersion"""
model_name = db.Column(db.String) # Authenticate
"""model_name (str): Longer name of the hardware model"""
model = db.Column(db.String) # Authenticate
"""model (str): Name of the hardware model"""
product_name = db.Column(db.String) # Authenticate
"""product_name (str): The base product name of the hardware"""
serial_number = db.Column(db.String(64), index=True, nullable=True) # Authenticate
"""serial_number (str): The hardware serial number"""
device_capacity = db.Column(db.Float, nullable=True)
"""device_capacity (float): total capacity (base 1024 gigabytes)"""
available_device_capacity = db.Column(db.Float, nullable=True)
"""device_available_capacity (float): available capacity (base 1024 gigabytes)"""
battery_level = db.Column(db.Float, default=-1.0)
"""battery_level (float): battery level, between 0.0 and 1.0. -1.0 if information is not available."""
cellular_technology = db.Column(db.Enum(CellularTechnology))
"""cellular_technology (CellularTechnology): cellular technology."""
imei = db.Column(db.String)
"""imei (str): IMEI number (if device is GSM)."""
meid = db.Column(db.String)
"""meid (str): MEID number (if device is CSMA)."""
modem_firmware_version = db.Column(db.String)
"""modem_firmware_version (str): The baseband firmware version."""
is_supervised = db.Column(db.Boolean)
"""is_supervised (bool): Device is supervised"""
is_device_locator_service_enabled = db.Column(db.Boolean)
"""is_device_locator_service_enabled (bool): Find My iPhone/Mac enabled."""
is_activation_lock_enabled = db.Column(db.Boolean)
"""is_activation_lock_enabled (bool): Device has Activation Lock enabled."""
is_do_not_disturb_in_effect = db.Column(db.Boolean)
"""is_do_not_disturb_in_effect (bool): Device has DND enabled."""
device_id = db.Column(db.String) # ATV
"""device_id (str): Device ID (ATV)"""
eas_device_identifier = db.Column(db.String)
"""eas_device_identifier (str): Exchange ActiveSync Identifier"""
is_cloud_backup_enabled = db.Column(db.Boolean)
"""is_cloud_backup_enabled (bool): iCloud backup is enabled."""
local_hostname = db.Column(db.String)
"""local_hostname (str): """
hostname = db.Column(db.String)
"""hostname (str): """
sip_enabled = db.Column(db.Boolean)
"""sip_enabled (bool): System Integrity Protection is enabled."""
# TODO: ActiveManagedUsers
is_mdm_lost_mode_enabled = db.Column(db.Boolean)
"""is_mdm_lost_mode_enabled (bool): MDM Lost mode is enabled."""
maximum_resident_users = db.Column(db.Integer)
"""maximum_resident_users (int): Maximum number of users that can use Shared iPad."""
# OSUpdateSettings : Table 8
# OSUpdateSettings is flattened
osu_catalog_url = db.Column(db.String)
"""osu_catalog_url (str): Software Update Catalog URL."""
osu_is_default_catalog = db.Column(db.Boolean)
osu_previous_scan_date = db.Column(db.DateTime)
osu_previous_scan_result = db.Column(db.String)
osu_perform_periodic_check = db.Column(db.Boolean)
osu_automatic_check_enabled = db.Column(db.Boolean)
osu_background_download_enabled = db.Column(db.Boolean)
osu_automatic_app_installation_enabled = db.Column(db.Boolean)
osu_automatic_os_installation_enabled = db.Column(db.Boolean)
osu_automatic_security_updates_enabled = db.Column(db.Boolean)
# NetworkInfo : Table 9
iccid = db.Column(db.String)
"""iccid (str): The ICC identifier for the SIM card."""
bluetooth_mac = db.Column(db.String)
"""bluetooth_mac (str): The bluetooth MAC address"""
wifi_mac = db.Column(db.String)
"""wifi_mac (str): The WiFi MAC address"""
# TODO: EthernetMACs
current_carrier_network = db.Column(db.String)
"""current_carrier_network (str): Name of the current carrier network."""
sim_carrier_network = db.Column(db.String)
"""sim_carrier_network (str): Name of the home carrier network."""
subscriber_carrier_network = db.Column(db.String)
"""subscriber_carrier_network (str): Name of the home carrier network (replaces sim_carrier_network)."""
carrier_settings_version = db.Column(db.String)
"""carrier_settings_version (str): Version of the current carrier settings file."""
phone_number = db.Column(db.String)
"""phone_number (str): Raw phone number without punctuation."""
voice_roaming_enabled = db.Column(db.Boolean)
"""voice_roaming_enabled (bool): Voice Roaming is enabled in settings."""
data_roaming_enabled = db.Column(db.Boolean)
"""data_roaming_enabled (bool): Data Roaming is enabled in settings."""
is_roaming = db.Column(db.Boolean)
"""is_roaming (bool): The device is currently roaming."""
personal_hotspot_enabled = db.Column(db.Boolean)
"""personal_hotspot_enabled (bool): Personal HotSpot is currently turned on."""
subscriber_mcc = db.Column(db.String)
"""subscriber_mcc (str): Home Mobile Country Code (numeric)"""
subscriber_mnc = db.Column(db.String)
"""subscriber_mnc (str): Home Mobile Network Code (numeric)"""
current_mcc = db.Column(db.String)
"""current_mcc (str): Current Mobile Country Code (numeric)"""
current_mnc = db.Column(db.String)
"""current_mnc (str): Current Mobile Network Code (numeric)"""
# SecurityInfo
# hardware_encryption_caps = db.Column(DBEnum(HardwareEncryptionCaps))
passcode_present = db.Column(db.Boolean)
"""passcode_present (bool): Device has a passcode."""
passcode_compliant = db.Column(db.Boolean)
"""passcode_compliant (bool): The passcode is compliant with all requirements (incl Exchange accounts)."""
passcode_compliant_with_profiles = db.Column(db.Boolean)
"""passcode_compliant_with_profiles (bool): The passcode is compliant with profile requirements."""
passcode_lock_grace_period_enforced = db.Column(db.Integer)
"""passcode_lock_grace_period_enforced (int): The current enforced time in seconds before unlock passcode will
be required."""
fde_enabled = db.Column(db.Boolean)
"""fde_enabled (bool): Whether full disk encryption is enabled or not."""
fde_has_prk = db.Column(db.Boolean)
"""fde_has_prk (bool): Whether FDE has a personal recovery key set."""
fde_has_irk = db.Column(db.Boolean)
"""fde_has_irk (bool): Whether FDE has an institutional recovery key set."""
fde_personal_recovery_key_cms = db.Column(db.LargeBinary) # 10.13
"""fde_personal_recovery_key_cms (bytes): If Escrow is enabled, contains the encrypted PRK"""
fde_personal_recovery_key_device_key = db.Column(db.String) # 10.13
"""fde_personal_recovery_key_device_key (str):"""
firewall_enabled = db.Column(db.Boolean)
"""firewall_enabled (bool): Application firewall is enabled."""
block_all_incoming = db.Column(db.Boolean)
"""block_all_incoming (bool): All incoming connections are blocked."""
stealth_mode_enabled = db.Column(db.Boolean)
"""stealth_mode_enabled (bool): Stealth mode is enabled."""
# ActivationLockBypassCode
activation_lock_escrow_key = db.Column(db.String)
"""activation_lock_escrow_key (str): The activation lock bypass code generated by the device"""
# DEP Fetch/Sync Fields
is_dep = db.Column(db.Boolean)
"""is_dep (bool): This device has been synced from DEP. False indicates a manual or AC2 enrolment"""
description = db.Column(db.String)
"""description (str): The DEP description which is often identical to the SKU description on the invoice."""
color = db.Column(db.String)
"""color: (str): The device color indicated by DEP"""
asset_tag = db.Column(db.String)
"""asset_tag (str): The device asset tag, if provided by Apple."""
profile_status = db.Column(db.String)
"""profile_status (str): The status of profile installation: empty, assigned, pushed or removed."""
profile_uuid = db.Column(db.String)
"""profile_uuid (str): The UUID of the assigned DEP profile"""
profile_assign_time = db.Column(db.DateTime)
"""profile_assign_time (datetime): The date and time indicating when the DEP profile was assigned"""
profile_push_time = db.Column(db.DateTime)
"""profile_push_time (datetime): The date and time indicating when the DEP profile was pushed."""
device_assigned_date = db.Column(db.DateTime)
"""device_assigned_date (datetime): The date and time the device was recorded into DEP."""
device_assigned_by = db.Column(db.String)
"""device_assigned_by (str): The email of the person who assigned the device."""
os = db.Column(db.String)
"""os (str): The device operating system returned by DEP: iOS, OSX or tvOS"""
device_family = db.Column(db.String)
"""device_family (str): The device's Apple product family returned by DEP."""
# TODO: Blocked Applications
@hybrid_property
def token(self):
return self._token if self._token is None else base64.b64decode(self._token)
@token.setter
def token(self, value):
self._token = base64.b64encode(value) if value is not None else None
@property
def hex_token(self):
"""Retrieve the device token in hex encoding, necessary for the APNS2 client."""
if self._token is None:
return self._token
else:
return hexlify(self.token).decode('utf8')
certificate_id = db.Column(db.Integer, db.ForeignKey('certificates.id'))
certificate = db.relationship('Certificate', backref='devices')
dep_profile_id = db.Column(db.Integer, db.ForeignKey('dep_profiles.id'))
dep_profile = db.relationship('DEPProfile', backref='devices')
tags = db.relationship(
'Tag',
secondary=device_tags,
back_populates='devices'
)
_unlock_token = db.Column(db.String(), name='unlock_token', nullable=True)
@property
def unlock_token(self):
return self._unlock_token
@unlock_token.setter
def unlock_token(self, value):
if isinstance(value, NSData):
self._unlock_token = NSData.encode('base64')
else:
self._unlock_token = value
@property
def platform(self) -> Platform:
if self.model_name in ['iMac', 'MacBook Pro', 'MacBook Air', 'Mac Pro']: # TODO: obviously not sufficient
return Platform.macOS
elif self.model_name in ['iPhone', 'iPad']:
return Platform.iOS
else:
return Platform.Unknown
def __repr__(self):
return '<Device ID=%r UDID=%r SerialNo=%r>' % (self.id, self.udid, self.serial_number)
class CommandSequence(db.Model):
"""A command sequence represents a series of commands where all members must succeed in order for the sequence to
succeed. I.E a single failure or timeout in the sequence stops the delivery of every other member.
:table: command_sequences
"""
__tablename__ = 'command_sequences'
id = db.Column(db.Integer, primary_key=True)
[docs]class Command(db.Model):
"""The command model represents a single MDM command that should be, has been, or has failed to be delivered to
a single enrolled device.
:table: commands
"""
__tablename__ = 'commands'
id = db.Column(db.Integer, primary_key=True)
"""id (int): ID"""
request_type = db.Column(db.String, nullable=False) # string representation of our local command handler
"""request_type (str): The command RequestType attribute"""
uuid = db.Column(GUID, index=True, unique=True, nullable=False)
"""uuid (GUID): Globally unique command UUID"""
parameters = db.Column(MutableDict.as_mutable(JSONEncodedDict),
nullable=True) # JSON add'l data as input to command builder
"""parameters (str): The parameters that were used when generating the command, serialized into JSON. Omitting the
RequestType and CommandUUID attributes."""
status = db.Column(db.Enum(CommandStatus), index=True, nullable=False, default=CommandStatus.Queued)
"""status (CommandStatus): The status of the command."""
queued_at = db.Column(db.DateTime, default=datetime.datetime.utcnow(), server_default=db.text('CURRENT_TIMESTAMP'))
"""queued_at (datetime.datetime): The datetime (utc) of when the command was created. Defaults to UTC now"""
sent_at = db.Column(db.DateTime, nullable=True)
"""sent_at (datetime.datetime): The datetime (utc) of when the command was delivered to the client."""
acknowledged_at = db.Column(db.DateTime, nullable=True)
"""acknowledged_at (datetime.datetime): The datetime (utc) of when the Acknowledged, Error or NotNow response was
returned."""
# command must only be sent after this date
after = db.Column(db.DateTime, nullable=True)
"""after (datetime.datetime): If not null, the command must not be sent until this datetime is in the past."""
# number of retries remaining until dead
ttl = db.Column(db.Integer, nullable=False, default=5)
"""ttl (int): The number of retries remaining until the command will be dead/expired."""
device_id = db.Column(db.ForeignKey('devices.id'), nullable=True)
"""device_id (int): The device ID on the devices table."""
device = db.relationship('Device', backref='commands')
"""device (Device): The instance of the related device."""
# device_user_id = db.Column(ForeignKey('device_users.id'), nullable=True)
# device_user = relationship('DeviceUser', backref='commands')
[docs] @classmethod
def from_model(cls, cmd: commands.Command):
"""This method turns a subclass of commands.Command into an SQLAlchemy model.
The parameters of the command are encoded as a JSON dictionary inside the parameters column.
Args:
cmd (commands.Command): The command to be turned into a database model.
Returns:
Command: The database model, ready to be committed.
"""
c = cls()
assert cmd.request_type is not None
c.request_type = cmd.request_type
c.uuid = cmd.uuid
c.parameters = cmd.parameters
return c
[docs] @classmethod
def find_by_uuid(cls, uuid: str):
"""Find and return an instance of the Command model matching the given UUID string.
Args:
uuid (str): The command UUID
Returns:
Command: Instance of the command, if any
"""
return cls.query.filter(cls.uuid == uuid).one()
[docs] @classmethod
def next_command(cls, device: Device):
"""Get the next available command in the queue for the specified device.
The next available command must match these predicates:
- Assigned to this device.
- The status is "Queued".
- The `after` field is in the past, or empty.
Args:
device (Device): The database model matching the device checking in.
Returns:
Command: The next command model to be processed.
"""
# d == d AND (q_status == Q OR (q_status == R AND result == 'NotNow'))
return cls.query.filter(db.and_(
cls.device == device,
cls.status == CommandStatus.Queued.value)).order_by(cls.id).first()
@classmethod
def next(cls, device: Device): # type: (Type[Command], Device) -> Optional[Command]
model = cls.query.filter(db.and_(
cls.device == device,
cls.status == CommandStatus.Queued.value)).order_by(cls.id).first()
def __repr__(self):
return '<Command ID=%r UUID=%r qstatus=%r>' % (self.id, self.uuid, self.status)
class DeviceUser(db.Model):
"""
This model represents a managed user from the standpoint of the MDM.
It exists to support the macOS user channel extension.
:table: device_users
"""
__tablename__ = 'device_users'
id = db.Column(db.Integer, primary_key=True)
device_id = db.Column(db.ForeignKey('devices.id'), nullable=True)
"""(int): Device foreign key ID."""
device = db.relationship('Device', backref='device_users')
"""(db.relationship): Device relationship"""
device_udid = db.Column(db.String(40), nullable=False)
"""(GUID): Device UDID"""
user_id = db.Column(GUID, nullable=False)
"""user_id (GUID): Local user's GUID, or network user's GUID from Directory Record."""
long_name = db.Column(db.String)
"""long_name (str): The full name of the user"""
short_name = db.Column(db.String)
"""short_name (str): The short (username) of the user"""
need_sync_response = db.Column(db.Boolean) # This is kind of transitive but added anyway.
user_configuration = db.Column(db.Boolean)
digest_challenge = db.Column(db.String)
auth_token = db.Column(db.String)
[docs]class Organization(db.Model):
"""The MDM home organization configuration.
These attributes are used as the defaults for several other services where an org name is required.
Such as Certificate requests and Profile detail.
:table: organizations
"""
__tablename__ = 'organizations'
id = db.Column(db.Integer, primary_key=True)
"""id (int): ID"""
name = db.Column(db.String)
"""name (string): Name"""
payload_prefix = db.Column(db.String)
"""payload_prefix (string): The reverse-dns style prefix to use for all generated profiles."""
# http://www.ietf.org/rfc/rfc5280.txt
# maximum string lengths are well defined by this RFC and this schema follows those recommendations
# this x.509 name is used in the subject of the internal CA and issued certificates
x509_ou = db.Column(db.String(32))
"""x509_ou (string): The x.509 Organizational Unit for generating certificates."""
x509_o = db.Column(db.String(64))
"""x509_o (string): The x.509 Organization for generating certificates."""
x509_st = db.Column(db.String(128))
"""x509_st (string): The x.509 State for generating certificates."""
x509_c = db.Column(db.String(2))
"""x509_c (string): The 2 letter x.509 country code for generating certificates. """
class DeviceIdentitySources(Enum):
"""A list of sources for Device Identity."""
InternalPKCS12 = 'internal_pkcs12'
InternalSCEP = 'internal_scep'
ExternalSCEP = 'external_scep'
class SCEPConfig(db.Model):
"""This table holds a single row containing information used to generate the SCEP enrollment profile.
:table: scep_config
See Also:
- `https://tools.ietf.org/html/rfc3280.html`_.
"""
__tablename__ = 'scep_config'
id = db.Column(db.Integer, primary_key=True)
source_type = db.Column(db.Enum(DeviceIdentitySources), default=DeviceIdentitySources.InternalSCEP)
"""source_type (DeviceIdentitySources): Specify the source used for device certificates."""
url = db.Column(db.String, nullable=False)
challenge_enabled = db.Column(db.Boolean, default=False)
challenge = db.Column(db.String)
ca_fingerprint = db.Column(db.String)
subject = db.Column(db.String, nullable=False) # eg. O=x/OU=y/CN=z
key_size = db.Column(db.Integer, default=2048, nullable=False)
key_type = db.Column(db.String, default='RSA', nullable=False)
key_usage = db.Column(db.Enum(KeyUsage), default=KeyUsage.All)
retries = db.Column(db.Integer, default=3, nullable=False)
retry_delay = db.Column(db.Integer, default=10, nullable=False)
certificate_renewal_time_interval = db.Column(db.Integer, default=14, nullable=False)
class SubjectAlternativeNameType(Enum):
"""Types of SubjectAlternativeNames that can be added using cryptography SAN extension.
See Also:
- `https://tools.ietf.org/html/rfc3280.html`_.
"""
RFC822Name = 'RFC822Name'
"""E-mail address, see: https://tools.ietf.org/html/rfc822"""
DNSName = 'DNSName'
UniformResourceIdentifier = 'UniformResourceIdentifier'
DirectoryName = 'DirectoryName'
RegisteredID = 'RegisteredID'
IPAddress = 'IPAddress'
OtherName = 'OtherName'
# TODO: ntPrincipal
class SubjectAlternativeName(db.Model):
"""This table holds SANs included in the SCEP enrollment request.
:table: subject_alternative_names
"""
__tablename__ = 'subject_alternative_names'
id = db.Column(db.Integer, primary_key=True)
discriminator = db.Column(db.Enum(SubjectAlternativeNameType), nullable=False)
str_value = db.Column(db.String)
octet_value = db.Column(db.LargeBinary) # For IPAddress
class Tag(db.Model):
"""This table holds tags, which are categories that are many-to-many and polymorphic to different types of
objects."""
__tablename__ = 'tags'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String, nullable=False)
color = db.Column(db.String(6), default='888888')
devices = db.relationship(
"Device",
secondary=device_tags,
back_populates="tags",
)
# profiles = db.relationship(
# "Profiles",
# secondary=profile_tags,
# back_populates="tags",
# )