Migrating projects to Atlassian Cloud can be a complex task. One common issue that arises is the duplication of project roles, which can clutter your project setup and cause confusion. This script helps automate the cleanup of these duplicated roles by replacing the “(migrated)” roles with their original counterparts.

Please note that this script does not check if any workflows are using the migrated project roles. Be sure to verify that the changes align with your workflows and project configurations.

Example Scenario

Let’s say you have a project with the following roles:

  • Developers
  • Administrators
  • Developers (migrated)
  • Administrators (migrated)

The script will identify the “(migrated)” roles and replace them with the corresponding original roles. For instance, “Developers (migrated)” will be replaced with “Developers”.

Script Overview

The script performs the following steps:

  1. Cleans up permission schemes by replacing “(migrated)” project roles with their original roles.
  2. Cleans up project roles in all projects by doing the same replacement.

Here’s the script:

import json
import logging
import requests
from requests.auth import HTTPBasicAuth
from concurrent.futures import ThreadPoolExecutor, as_completed
import time

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')

# Constants
CLOUD_BASE_URL = ""
CLOUD_EMAIL = ""
CLOUD_TOKEN = ""
AUTH = HTTPBasicAuth(CLOUD_EMAIL, CLOUD_TOKEN)
HEADERS = {
    "Accept": "application/json",
    "Content-Type": "application/json"
}
PROJECT_LIST = []  # Add project keys to this list to limit to specific projects, leave empty to process all
PERMISSION_SCHEME_LIST = []  # Add permission scheme IDs to this list to limit to specific schemes, leave empty to process all
MAX_THREADS = 10  # Adjust based on your needs

def request_with_retries(url, method='GET', headers=None, auth=None, data=None, max_retries=5):
    """Make a request with retries and exponential backoff for handling 429 errors."""
    for attempt in range(max_retries):
        response = requests.request(method, url, headers=headers, auth=auth, data=data)
        
        if response.status_code == 429:
            # Handle 429 Too Many Requests by sleeping and retrying
            wait_time = 2 ** attempt  # Exponential backoff
            logging.warning(f"Rate limited. Waiting for {wait_time} seconds before retrying...")
            time.sleep(wait_time)
        else:
            return response
    logging.error(f"Max retries reached for URL: {url}")
    return None

def clean_permission_scheme(scheme):
    logging.info(f"Processing scheme: {scheme['name']} (ID: {scheme['id']})")

    if PERMISSION_SCHEME_LIST and scheme["id"] not in PERMISSION_SCHEME_LIST:
        return  # Skip if not in the specified list

    scheme_details_response = request_with_retries(
        f'{CLOUD_BASE_URL}/rest/api/3/permissionscheme/{scheme["id"]}?expand=all', headers=HEADERS, auth=AUTH)
    if not scheme_details_response or scheme_details_response.status_code != 200:
        logging.error(f"Failed to fetch scheme details for {scheme['name']} (ID: {scheme['id']})")
        return
    
    scheme_details = scheme_details_response.json()
    permissions = scheme_details["permissions"]

    for permission in permissions:
        if "projectRole" in permission["holder"]["type"] and "(migrated)" in permission["holder"]["projectRole"]["name"]:
            normal_role_name = permission["holder"]["projectRole"]["name"].split(" (migrated)", 1)[0]

            project_roles_response = request_with_retries(
                f'{CLOUD_BASE_URL}/rest/api/3/role', headers=HEADERS, auth=AUTH)
            if not project_roles_response or project_roles_response.status_code != 200:
                logging.error(f"Failed to fetch project roles")
                continue

            project_roles = project_roles_response.json()

            for role in project_roles:
                if "scope" in role:
                    continue  # Ignore roles with scope
                if role["name"] == normal_role_name:
                    payload = {
                        "holder": {
                            "type": "projectRole",
                            "value": role["id"]
                        },
                        "permission": permission["permission"]
                    }
                    update_response = request_with_retries(
                        f'{CLOUD_BASE_URL}/rest/api/3/permissionscheme/{scheme["id"]}/permission',
                        method='POST', headers=HEADERS, data=json.dumps(payload), auth=AUTH)
                    if update_response and update_response.status_code == 200:
                        logging.info(f"Successfully updated permission: {permission['permission']} for scheme: {scheme['name']} (ID: {scheme['id']})")
                    else:
                        logging.error(f"Failed to update permission: {permission['permission']} for scheme: {scheme['name']} (ID: {scheme['id']})")

def clean_permission_schemes():
    logging.info("Starting to clean permission schemes")

    response = request_with_retries(f'{CLOUD_BASE_URL}/rest/api/3/permissionscheme', headers=HEADERS, auth=AUTH)
    if not response or response.status_code != 200:
        logging.error(f"Failed to fetch permission schemes")
        return
    
    permission_schemes = response.json()["permissionSchemes"]

    with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
        futures = [executor.submit(clean_permission_scheme, scheme) for scheme in permission_schemes]
        for future in as_completed(futures):
            future.result()  # Will raise exceptions if any occurred during execution

    logging.info("Finished cleaning permission schemes")

def clean_project(project):
    if PROJECT_LIST and project["key"] not in PROJECT_LIST:
        return

    if "classic" in project["style"]:
        project_roles_response = request_with_retries(
            f'{CLOUD_BASE_URL}/rest/api/3/project/{project["id"]}/role', headers=HEADERS, auth=AUTH)
        if not project_roles_response or project_roles_response.status_code != 200:
            logging.error(f"Failed to fetch project roles for {project['name']} (ID: {project['id']})")
            return

        project_roles = project_roles_response.json()

        for key, value in project_roles.items():
            role_id = value.split("role/", 1)[1]
            role_response = request_with_retries(
                f'{CLOUD_BASE_URL}/rest/api/3/project/{project["id"]}/role/{role_id}', headers=HEADERS, auth=AUTH)
            if not role_response or role_response.status_code != 200:
                logging.error(f"Failed to fetch role details for role ID {role_id} in project {project['name']} (ID: {project['id']})")
                continue

            role = role_response.json()

            if "(migrated)" in role['name']:
                normal_role_name = role['name'].split(" (migrated)", 1)[0]

                for key, value in project_roles.items():
                    if key == normal_role_name and "(migrated)" not in key:
                        normal_role_id = value.split("role/", 1)[1]
                        normal_role_response = request_with_retries(
                            f'{CLOUD_BASE_URL}/rest/api/3/project/{project["id"]}/role/{normal_role_id}', headers=HEADERS, auth=AUTH)
                        if not normal_role_response or normal_role_response.status_code != 200:
                            logging.error(f"Failed to fetch normal role details for role ID {normal_role_id} in project {project['name']} (ID: {project['id']})")
                            continue

                        for actor in role["actors"]:
                            if actor["type"] == "atlassian-group-role-actor":
                                payload = {"groupId": [actor["actorGroup"]["groupId"]]}
                                requests.post(
                                    f'{CLOUD_BASE_URL}/rest/api/3/project/{project["id"]}/role/{normal_role_id}', headers=HEADERS, data=json.dumps(payload), auth=AUTH)
                            elif actor["type"] == "atlassian-user-role-actor":
                                payload = {"user": [actor["actorUser"]["accountId"]]}
                                requests.post(
                                    f'{CLOUD_BASE_URL}/rest/api/3/project/{project["id"]}/role/{normal_role_id}', headers=HEADERS, data=json.dumps(payload), auth=AUTH)

def clean_projects():
    response = request_with_retries(f'{CLOUD_BASE_URL}/rest/api/3/project', headers=HEADERS, auth=AUTH)
    if not response or response.status_code != 200:
        logging.error(f"Failed to fetch projects")
        return

    projects = response.json()

    with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
        futures = [executor.submit(clean_project, project) for project in projects]
        for future in as_completed(futures):
            future.result()  # Will raise exceptions if any occurred during execution

if __name__ == "__main__":
    logging.info('Starting [Permission Schemes] cleaning...')
    clean_permission_schemes()
    logging.info('Starting [Projects] cleaning...')
    clean_projects()

How to Use the Script

  1. Configure the Constants:
    • Set CLOUD_BASE_URL to your Atlassian Cloud base URL.
    • Set CLOUD_EMAIL to your Atlassian Cloud email.
    • Set CLOUD_TOKEN to your Atlassian Cloud token.
  2. Optionally Specify Projects and Permission Schemes:
    • Populate PROJECT_LIST with the project keys you want to limit the cleanup to.
    • Populate PERMISSION_SCHEME_LIST with the permission scheme IDs you want to limit the cleanup to.
  3. Run the Script:
    • Execute the script in your Python environment.

Conclusion

This script automates the tedious task of cleaning up duplicated project roles after migrating to Atlassian Cloud. By focusing on replacing “(migrated)” roles with their original counterparts, it helps maintain a cleaner and more manageable project setup. Remember to check your workflows manually to ensure they are not disrupted by these changes.

Feel free to customize the script to better suit your specific needs and improve your migration process.