Source code for commandment.vpp.vpp

# -*- coding: utf-8 -*-
"""
Volume Purchase Programme Support

TODO:
    - Not all error conditions are unit tested.
    - Only the license cursor has been tested.
    - The license assignment method remains untested.
"""

import requests
from typing import List, Optional, Iterator, Tuple, Dict, Text, Any
import json
import base64

from commandment.vpp.decorators import raise_error_replies
from commandment.vpp.enum import LicenseAssociation, LicenseDisassociation, LicenseAssociationType, \
    LicenseDisassociationType, VPPPricingParam

SERVICE_CONFIG_URL = 'https://vpp.itunes.apple.com/WebObjects/MZFinance.woa/wa/VPPServiceConfigSrv'
"""str: The default production URL to fetch VPP service configuration from."""


def encode_stoken(token: dict) -> bytes:
    """Encode a dict containing the sToken properties into a base64 token for use with VPP.

    Args:
          token (dict): Token containing the 'token', 'expDate', and 'orgName' fields.

    Returns:
          bytes: Base64 encoded token.
    """
    return base64.urlsafe_b64encode(json.dumps(token, separators=(',', ':')).encode('utf8'))


class VPPCursor(object):
    """Generic base class for operations on endpoints that require a token to retrieve multiple pages of records.

    Attributes:
          _current (dict): Current results.
          _vpp (VPP): Instance of the VPP object that generated this cursor.
    """

    @property
    def batch_count(self) -> Optional[int]:
        """Optional[int]: Number of records returned in this batch."""
        return self._current.get('batchCount', None)

    @property
    def total(self) -> Optional[int]:
        """Optional[int]: Number of records in total that will be returned."""
        return self._current.get('totalCount', None)

    @property
    def batch_token(self) -> Optional[str]:
        """Optional[str]: The batch token, if a batch fetch is in progress and is not complete."""
        return self._current.get('batchToken', None)

    @property
    def since_modified_token(self) -> Optional[str]:
        """Optional[str]: The since modified token, if a batch fetch is not in progress, but a fetch has been
        made."""
        return self._current.get('sinceModifiedToken', None)

    def __init__(self, since_modified_token: str = None, vpp=None) -> None:
        self._current: Dict[Text, Any] = {}
        if since_modified_token is not None:
            self._current['sinceModifiedToken'] = since_modified_token

        self._vpp = vpp


class VPPUserCursor(VPPCursor):
    """VPPUserCursor represents a batch fetch operation on the `getVPPUsersSrv` endpoint.

    Attributes:
          includes_retired (bool): This fetch operation includes users that have been marked as *Retired*.
    """

    @property
    def users(self) -> Optional[List[dict]]:
        """Optional[List[dict]]: The current set of users in the cursor result, or None if there are no results."""
        return self._current.get('users', None)

    def __init__(self, includes_retired: bool = True, vpp=None) -> None:
        super(VPPUserCursor, self).__init__(vpp=vpp)
        self.includes_retired = includes_retired

    def next(self):
        """

        Returns:
            next VPPUserCursor or None when batch is exhausted
        """
        if self.batch_token is not None:
            next_cursor = self._vpp.users(batch_token=self.batch_token)
            next_cursor.includes_retired = self.includes_retired

            return next_cursor
        else:
            return None


class VPPLicenseCursor(VPPCursor):
    """VPPLicenseCursor represents a batch fetch operation on the `getVPPLicensesSrv` endpoint.

    """

    @property
    def licenses(self) -> Optional[List[dict]]:
        """Optional[List[dict]]: The current set of licenses in the cursor result, or None if there are
        no results."""
        return self._current.get('licenses', None)

    def __init__(self, *args, **kwargs) -> None:
        super(VPPLicenseCursor, self).__init__(*args, **kwargs)

    def next(self):
        """

        Returns:
            next VPPLicenseCursor or None when batch is exhausted
        """
        if self.batch_token is not None:
            next_cursor = self._vpp.licenses(batch_token=self.batch_token)
            self._current = next_cursor._current
            return self
        else:
            return None


[docs]class VPPLicenseOperation(object): """VPPLicenseOperation represents a number of license operations on a single Adam ID (iTunes Store Product). Attributes: _association_type (LicenseAssociationType): This specifies the type of association this license operation represents. The API only accepts one of these in a single request. _disassociation_type (LicenseDisassociationType): This specifies the type of disassociation this license operation represents. The API only accepts one of these in a single request. """ # _vpp: VPP @property def adam_id(self) -> int: return self._adam_id @property def pricing_param(self) -> str: return self._pricing_param @property def associations(self) -> Tuple[LicenseAssociationType, List[LicenseAssociation]]: return self._association_type, self._associate @property def disassociations(self) -> Tuple[LicenseDisassociationType, List[LicenseDisassociation]]: return self._disassociation_type, self._disassociate def __init__(self, adam_id: int, pricing_param: str = 'STDQ', license_association_type: Optional[LicenseAssociationType] = None, license_disassociation_type: Optional[LicenseDisassociationType] = None) -> None: self._adam_id = adam_id self._pricing_param = pricing_param self._associate: List[LicenseAssociation] = [] self._disassociate: List[LicenseDisassociation] = [] self._association_type = license_association_type self._disassociation_type = license_disassociation_type def add(self, association_type: LicenseAssociationType, value: str): if self._association_type is None: self._association_type = association_type elif association_type != self._association_type: raise ValueError('You cannot specify two different types of association in a license operation.') self._associate.append((association_type, value)) def additions_for_type(self, association_type: LicenseAssociationType) -> Iterator[LicenseAssociation]: return filter(lambda x: x[0] == association_type, self._associate) def remove(self, disassociation_type: LicenseDisassociationType, value: str): if self._disassociation_type is None: self._disassociation_type = disassociation_type elif disassociation_type != self._disassociation_type: raise ValueError('You cannot specify two different types of disassociation in a license operation.') self._disassociate.append((disassociation_type, value)) def removals_for_type(self, disassociation_type: LicenseDisassociationType) -> Iterator[LicenseDisassociation]: return filter(lambda x: x[0] == disassociation_type, self._disassociate)
[docs]class VPPUserLicenseOperation(VPPLicenseOperation): """This object represents a batch operation on a license which will be associated to or disassociated from an MDM user. AKA VPP User License Assignment. Args: adam_id (int): The Adam ID of the iTunes Store asset to manage. pricing_param (str): The pricing parameter, defaults to 'STDQ' (Standard Quality) """ def __init__(self, *args, **kwargs) -> None: super(VPPUserLicenseOperation, self).__init__(*args, **kwargs) self._association_type = LicenseAssociationType.ClientUserID self._disassociation_type = LicenseDisassociationType.ClientUserID
[docs]class VPPDeviceLicenseOperation(VPPLicenseOperation): """This object represents a batch operation on a license which will be associated to or disassociated from a Device Serial Number. AKA VPP Device License Assignment. Args: adam_id (int): The Adam ID of the iTunes Store asset to manage. pricing_param (str): The pricing parameter, defaults to 'STDQ' (Standard Quality) """ def __init__(self, *args, **kwargs) -> None: super(VPPDeviceLicenseOperation, self).__init__(*args, **kwargs) self._association_type = LicenseAssociationType.SerialNumber self._disassociation_type = LicenseDisassociationType.SerialNumber
[docs]class VPP(object): """ VPP Object. The main VPP API wrapper class. Attributes: VPP.AssociationProperties (dict): Mapping of the LicenseAssociationType enum to the expected JSON keys in the request. VPP.DisassociationProperties (dict): Mapping of the LicenseDisassociationType enum to the expected JSON keys in the request. """ AssociationProperties = { LicenseAssociationType.ClientUserID: 'associateClientUserIdStrs', LicenseAssociationType.SerialNumber: 'associateSerialNumbers' } DisassociationProperties = { LicenseDisassociationType.SerialNumber: 'disassociateSerialNumbers', LicenseDisassociationType.ClientUserID: 'disassociateClientUserIdStrs', LicenseDisassociationType.LicenseID: 'disassociateLicenseIdStrs', } def __init__(self, stoken: str, vpp_service_config_url: str = SERVICE_CONFIG_URL, service_config: dict = None) -> None: """ The VPP class is a wrapper around a requests session and provides an API for interacting with Apple's VPP service. Args: stoken (str): Service Token vpp_service_config_url (str): URL to the VPPServiceConfigSrv endpoint. defaults to Apple's live server. service_config (dict): Dictionary containing service config, if you do not want to fetch it (testing only). """ self._session = requests.Session() self._session.headers.update({'Content-Type': 'application/json'}) self._stoken = stoken if not service_config: fetched_service_config = self._fetch_config(vpp_service_config_url) self._service_config = fetched_service_config else: self._service_config = service_config def _fetch_config(self, service_config_url: str) -> dict: """Fetch the service configuration from Apple, which contains all of the URLs required for VPP. Args: service_config_url (str): The VPPServiceConfigSrv URL to use """ res = self._session.get(service_config_url) return res.json()
[docs] @raise_error_replies def register_user(self, client_user_id: str, email: str = None, facilitator_member_id: str = None, managed_apple_id: str = None): """ Register an MDM user with VPP. Args: client_user_id (str): A unique string, usually a UUID to identify the user in the MDM. email (str): The e-mail address of the user. facilitator_member_id (str): Currently unused managed_apple_id (str): Currently unused Returns: dict: Containing the decoded body of the reply from the VPP service, eg:: { "status": 0, "user": { "userId": 2878111686099947, "email": "vpp-test@localhost", "status": "Registered", "inviteUrl": "http://localhost:8080/D1971F9DD5F8E67BDD", "inviteCode": "D1971F9DD5F8E67BDD", "clientUserIdStr": "F33D9E0F-CDE3-427E-A444-B137BEF9EFA2" } } """ res = self._session.post(self._service_config['registerUserSrvUrl'], data=json.dumps({ 'clientUserIdStr': client_user_id, 'email': email, 'sToken': self._stoken, })) return res.json()
[docs] @raise_error_replies def get_user(self, client_user_id: str = None, its_id_hash: str = None, facilitator_member_id: str = None, user_id: int = None): """ Get the status of a user by their unique ID. Args: client_user_id (str): A unique string, usually a UUID to identify the user in the MDM. You can use this OR the user_id to identify the user. its_id_hash (str): (Optional) iTunes Store ID hash facilitator_member_id: user_id (int): User ID which uniquely identifies the user with the iTunes store. Returns: dict: Containing the reply from the service. """ request_body = {'sToken': self._stoken} if user_id is not None: request_body['userId'] = user_id else: request_body['clientUserIdStr'] = client_user_id if its_id_hash is not None: request_body['itsIdHash'] = its_id_hash res = self._session.post(self._service_config['getUserSrvUrl'], data=json.dumps(request_body)) return res.json()
[docs] def users(self, include_retired: int = 1, facilitator_member_id: str = None, batch_token: str = None, since_modified_token: str = None) -> VPPUserCursor: """ Args: include_retired (int): 0 - do not include retired users, 1 - include retired users facilitator_member_id: Currently unused batch_token (str): Batch token (if being called from a cursor) since_modified_token (str): Since modified token (if requesting a time delta) Returns: """ request_body = {'sToken': self._stoken} if include_retired == 1: request_body['includeRetired'] = 1 if batch_token is not None: request_body['batchToken'] = batch_token elif since_modified_token is not None: request_body['sinceModifiedToken'] = since_modified_token res = self._session.post(self._service_config['getUsersSrvUrl'], data=json.dumps(request_body)) results = res.json() cursor = VPPUserCursor(includes_retired=(include_retired == 1)) cursor._current = results cursor._vpp = self return cursor
[docs] @raise_error_replies def retire_user(self, client_user_id: str = None, facilitator_member_id: str = None, user_id: str = None): """ Unregister a user from VPP. Args: client_user_id (str): A unique string, usually a UUID to identify the user in the MDM. You can use this OR the user_id to identify the user. facilitator_member_id: Currently unused user_id (int): User ID which uniquely identifies the user with the iTunes store. Returns: dict: Containing the reply from the service. """ request_body = {'sToken': self._stoken} if user_id is not None: request_body['userId'] = user_id else: request_body['clientUserIdStr'] = client_user_id res = self._session.post(self._service_config['retireUserSrvUrl'], data=json.dumps(request_body)) return res.json()
[docs] @raise_error_replies def edit_user(self, client_user_id: str = None, facilitator_member_id: str = None, email: str = None, managed_apple_id: str = None, user_id: str = None): """ Edit a user's VPP record. Args: client_user_id (str): A unique string, usually a UUID to identify the user in the MDM. You can use this OR the user_id to identify the user. facilitator_member_id: Currently unused email (str): Supply an E-mail address to update the current address. user_id (int): User ID which uniquely identifies the user with the iTunes store. managed_apple_id (str): Managed Apple ID Returns: dict: Containing the reply from the service. """ request_body = {'sToken': self._stoken} if user_id is not None: request_body['userId'] = user_id else: request_body['clientUserIdStr'] = client_user_id if email is not None: request_body['email'] = email if managed_apple_id is not None: request_body['managedAppleIDStr'] = managed_apple_id res = self._session.post(self._service_config['editUserSrvUrl'], data=json.dumps(request_body)) return res.json()
[docs] @raise_error_replies def assets(self, include_license_counts: bool = True, facilitator_member_id: str = None) -> List[dict]: """ Get assets for which the organization has licenses. Args: include_license_counts (bool): Include counts of total/assigned/unassigned licenses. facilitator_member_id: Currently unused Returns: List[dict]: List of VPP assets for which this organization has licenses. """ request_body = { 'sToken': self._stoken, 'includeLicenseCounts': include_license_counts, } res = self._session.post(self._service_config['getVPPAssetsSrvUrl'], data=json.dumps(request_body)) return res.json()
[docs] def manage(self, adam_id: int, pricing_param: str = 'STDQ') -> VPPLicenseOperation: """Manage VPP licenses for the given Adam ID. Args: adam_id (str): The Adam ID pricing_param (str): The pricing param defaults to 'STDQ' but may be 'PLUS' for things which aren't software. Returns: VPPLicenseOperation: an instance of a VPP license operation which can be modified to add or remove devices, and then submitted. """ op = VPPLicenseOperation(adam_id, pricing_param) op._vpp = self return op
[docs] def manage_user_licenses(self, adam_id: int, pricing_param: str = 'STDQ') -> VPPUserLicenseOperation: """Manage VPP User License Assignment. Args: adam_id (str): The Adam ID pricing_param (str): The pricing param defaults to 'STDQ' but may be 'PLUS' for things which aren't software. Returns: VPPUserLicenseOperation: an instance of a VPP license operation which can be modified to add or remove license associations by user client id """ op = VPPUserLicenseOperation(adam_id, pricing_param) op._vpp = self return op
[docs] def manage_device_licenses(self, adam_id: int, pricing_param: str = 'STDQ') -> VPPDeviceLicenseOperation: """Manage VPP Device License Assignment. Args: adam_id (str): The Adam ID pricing_param (str): The pricing param defaults to 'STDQ' but may be 'PLUS' for things which aren't software. Returns: VPPDeviceLicenseOperation: an instance of a VPP license operation which can be modified to add or remove license associations by device serial number """ op = VPPDeviceLicenseOperation(adam_id, pricing_param) op._vpp = self return op
[docs] def licenses(self, adam_id: int = None, pricing_param: Optional[VPPPricingParam] = None, assigned_only: bool = False, facilitator_member_id: str = None, batch_token: str = None, since_modified_token: str = None) -> VPPLicenseCursor: """Retrieve a list of licenses matching the supplied criteria. Args: adam_id (int): Get licenses that match this Adam ID pricing_param (Optional[VPPPricingParam]): Get licenses that match this 'Quality' param. assigned_only (bool): Return only licenses that are assigned to users, if this value is true. facilitator_member_id (str): Currently unused batch_token (str): Supplied if there are more results to fetch. since_modified_token (str): Supplied if you want to fetch results modified since a certain date. This will be supplied on the last page of your most recent set of results. Returns: VPPLicenseCursor: A cursor that can be used to fetch all remaining results, pre-populated with the first page. """ request_body = {'sToken': self._stoken} if assigned_only: request_body['assignedOnly'] = True if batch_token: request_body['batchToken'] = batch_token if since_modified_token: request_body['sinceModifiedToken'] = since_modified_token # These parameters are normally ignored if a batch/modified token is supplied. if batch_token is None and since_modified_token is None: if adam_id is not None: request_body['adamId'] = adam_id if pricing_param is not None: request_body['pricingParam'] = pricing_param.value res = self._session.post(self._service_config['getLicensesSrvUrl'], data=json.dumps(request_body)) reply = res.json() cursor = VPPLicenseCursor(vpp=self) cursor._current = reply return cursor
[docs] def save(self, operation: VPPLicenseOperation, notify: bool = False) -> dict: """Execute a license management operation, represented by a VPPLicenseOperation or subclass. This provides a more convenient interface than bulk_update_licenses. Args: operation (VPPLicenseOperation): The license operation to perform. notify (bool): Optional. Notify devices of license disassociation. Returns: dict: Reply from the license endpoint. """ atype, associations = operation.associations dtype, disassociations = operation.disassociations request_body = { 'sToken': self._stoken, 'adamIdStr': operation.adam_id, 'pricingParam': operation.pricing_param, 'notifyDisassociation': notify, VPP.AssociationProperties[atype]: associations, VPP.DisassociationProperties[dtype]: disassociations, } res = self._session.post(self._service_config['manageVPPLicensesByAdamIdSrvUrl'], data=json.dumps(request_body)) reply = res.json() return reply
[docs] def bulk_update_licenses(self, adam_id: int, association_type: Optional[LicenseAssociationType] = None, associate: Optional[List[str]] = None, disassociation_type: Optional[LicenseDisassociationType] = None, disassociate: Optional[List[str]] = None, pricing_param: str = 'STDQ', notify: bool = False) -> dict: """Perform a batch operation of license associations and disassociations. Args: adam_id (int): Adam ID - The iTunes Store Product for which licenses will be managed. association_type (Optional[LicenseAssociationType]): Provide an association type if associate length > 0 associate (Optional[List[str]]): A list of values that will be used to associate licenses, corresponding to the association_type disassociation_type (Optional[LicenseDisassociationType]): Provide a disassociation type if disassociate length > 0. disassociate (Optional[List[str]]): A list of values that will be used to disassociate licenses, corresponding to the association_type pricing_param (str): Defaults to Standard Quality 'STDQ' notify (bool): Notify disassociation, default is False See Also: - manageVPPLicensesByAdamIdSrv """ request_body = { 'sToken': self._stoken, 'adamIdStr': adam_id, 'pricingParam': pricing_param, 'notifyDisassociation': notify, } if association_type in VPP.AssociationProperties: request_body[VPP.AssociationProperties[association_type]] = associate if disassociation_type in VPP.DisassociationProperties: request_body[VPP.DisassociationProperties[disassociation_type]] = disassociate res = self._session.post(self._service_config['manageVPPLicensesByAdamIdSrvUrl'], data=json.dumps(request_body)) reply = res.json() return reply