Migrating projects to Atlassian Cloud can be a complex task. One common issue that arises is the duplication of project roles, which can clutter your project setup and cause confusion. This script helps automate the cleanup of these duplicated roles by replacing the “(migrated)” roles with their original counterparts.
Please note that this script does not check if any workflows are using the migrated project roles. Be sure to verify that the changes align with your workflows and project configurations.
Example Scenario
Let’s say you have a project with the following roles:
- Developers
- Administrators
- Developers (migrated)
- Administrators (migrated)
The script will identify the “(migrated)” roles and replace them with the corresponding original roles. For instance, “Developers (migrated)” will be replaced with “Developers”.
Script Overview
The script performs the following steps:
- Cleans up permission schemes by replacing “(migrated)” project roles with their original roles.
- Cleans up project roles in all projects by doing the same replacement.
Here’s the script:
import json
import logging
import requests
from requests.auth import HTTPBasicAuth
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
# Constants
CLOUD_BASE_URL = ""
CLOUD_EMAIL = ""
CLOUD_TOKEN = ""
AUTH = HTTPBasicAuth(CLOUD_EMAIL, CLOUD_TOKEN)
HEADERS = {
"Accept": "application/json",
"Content-Type": "application/json"
}
PROJECT_LIST = [] # Add project keys to this list to limit to specific projects, leave empty to process all
PERMISSION_SCHEME_LIST = [] # Add permission scheme IDs to this list to limit to specific schemes, leave empty to process all
MAX_THREADS = 10 # Adjust based on your needs
def request_with_retries(url, method='GET', headers=None, auth=None, data=None, max_retries=5):
"""Make a request with retries and exponential backoff for handling 429 errors."""
for attempt in range(max_retries):
response = requests.request(method, url, headers=headers, auth=auth, data=data)
if response.status_code == 429:
# Handle 429 Too Many Requests by sleeping and retrying
wait_time = 2 ** attempt # Exponential backoff
logging.warning(f"Rate limited. Waiting for {wait_time} seconds before retrying...")
time.sleep(wait_time)
else:
return response
logging.error(f"Max retries reached for URL: {url}")
return None
def clean_permission_scheme(scheme):
logging.info(f"Processing scheme: {scheme['name']} (ID: {scheme['id']})")
if PERMISSION_SCHEME_LIST and scheme["id"] not in PERMISSION_SCHEME_LIST:
return # Skip if not in the specified list
scheme_details_response = request_with_retries(
f'{CLOUD_BASE_URL}/rest/api/3/permissionscheme/{scheme["id"]}?expand=all', headers=HEADERS, auth=AUTH)
if not scheme_details_response or scheme_details_response.status_code != 200:
logging.error(f"Failed to fetch scheme details for {scheme['name']} (ID: {scheme['id']})")
return
scheme_details = scheme_details_response.json()
permissions = scheme_details["permissions"]
for permission in permissions:
if "projectRole" in permission["holder"]["type"] and "(migrated)" in permission["holder"]["projectRole"]["name"]:
normal_role_name = permission["holder"]["projectRole"]["name"].split(" (migrated)", 1)[0]
project_roles_response = request_with_retries(
f'{CLOUD_BASE_URL}/rest/api/3/role', headers=HEADERS, auth=AUTH)
if not project_roles_response or project_roles_response.status_code != 200:
logging.error(f"Failed to fetch project roles")
continue
project_roles = project_roles_response.json()
for role in project_roles:
if "scope" in role:
continue # Ignore roles with scope
if role["name"] == normal_role_name:
payload = {
"holder": {
"type": "projectRole",
"value": role["id"]
},
"permission": permission["permission"]
}
update_response = request_with_retries(
f'{CLOUD_BASE_URL}/rest/api/3/permissionscheme/{scheme["id"]}/permission',
method='POST', headers=HEADERS, data=json.dumps(payload), auth=AUTH)
if update_response and update_response.status_code == 200:
logging.info(f"Successfully updated permission: {permission['permission']} for scheme: {scheme['name']} (ID: {scheme['id']})")
else:
logging.error(f"Failed to update permission: {permission['permission']} for scheme: {scheme['name']} (ID: {scheme['id']})")
def clean_permission_schemes():
logging.info("Starting to clean permission schemes")
response = request_with_retries(f'{CLOUD_BASE_URL}/rest/api/3/permissionscheme', headers=HEADERS, auth=AUTH)
if not response or response.status_code != 200:
logging.error(f"Failed to fetch permission schemes")
return
permission_schemes = response.json()["permissionSchemes"]
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
futures = [executor.submit(clean_permission_scheme, scheme) for scheme in permission_schemes]
for future in as_completed(futures):
future.result() # Will raise exceptions if any occurred during execution
logging.info("Finished cleaning permission schemes")
def clean_project(project):
if PROJECT_LIST and project["key"] not in PROJECT_LIST:
return
if "classic" in project["style"]:
project_roles_response = request_with_retries(
f'{CLOUD_BASE_URL}/rest/api/3/project/{project["id"]}/role', headers=HEADERS, auth=AUTH)
if not project_roles_response or project_roles_response.status_code != 200:
logging.error(f"Failed to fetch project roles for {project['name']} (ID: {project['id']})")
return
project_roles = project_roles_response.json()
for key, value in project_roles.items():
role_id = value.split("role/", 1)[1]
role_response = request_with_retries(
f'{CLOUD_BASE_URL}/rest/api/3/project/{project["id"]}/role/{role_id}', headers=HEADERS, auth=AUTH)
if not role_response or role_response.status_code != 200:
logging.error(f"Failed to fetch role details for role ID {role_id} in project {project['name']} (ID: {project['id']})")
continue
role = role_response.json()
if "(migrated)" in role['name']:
normal_role_name = role['name'].split(" (migrated)", 1)[0]
for key, value in project_roles.items():
if key == normal_role_name and "(migrated)" not in key:
normal_role_id = value.split("role/", 1)[1]
normal_role_response = request_with_retries(
f'{CLOUD_BASE_URL}/rest/api/3/project/{project["id"]}/role/{normal_role_id}', headers=HEADERS, auth=AUTH)
if not normal_role_response or normal_role_response.status_code != 200:
logging.error(f"Failed to fetch normal role details for role ID {normal_role_id} in project {project['name']} (ID: {project['id']})")
continue
for actor in role["actors"]:
if actor["type"] == "atlassian-group-role-actor":
payload = {"groupId": [actor["actorGroup"]["groupId"]]}
requests.post(
f'{CLOUD_BASE_URL}/rest/api/3/project/{project["id"]}/role/{normal_role_id}', headers=HEADERS, data=json.dumps(payload), auth=AUTH)
elif actor["type"] == "atlassian-user-role-actor":
payload = {"user": [actor["actorUser"]["accountId"]]}
requests.post(
f'{CLOUD_BASE_URL}/rest/api/3/project/{project["id"]}/role/{normal_role_id}', headers=HEADERS, data=json.dumps(payload), auth=AUTH)
def clean_projects():
response = request_with_retries(f'{CLOUD_BASE_URL}/rest/api/3/project', headers=HEADERS, auth=AUTH)
if not response or response.status_code != 200:
logging.error(f"Failed to fetch projects")
return
projects = response.json()
with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor:
futures = [executor.submit(clean_project, project) for project in projects]
for future in as_completed(futures):
future.result() # Will raise exceptions if any occurred during execution
if __name__ == "__main__":
logging.info('Starting [Permission Schemes] cleaning...')
clean_permission_schemes()
logging.info('Starting [Projects] cleaning...')
clean_projects()
How to Use the Script
- Configure the Constants:
- Set
CLOUD_BASE_URLto your Atlassian Cloud base URL. - Set
CLOUD_EMAILto your Atlassian Cloud email. - Set
CLOUD_TOKENto your Atlassian Cloud token.
- Set
- Optionally Specify Projects and Permission Schemes:
- Populate
PROJECT_LISTwith the project keys you want to limit the cleanup to. - Populate
PERMISSION_SCHEME_LISTwith the permission scheme IDs you want to limit the cleanup to.
- Populate
- Run the Script:
- Execute the script in your Python environment.
Conclusion
This script automates the tedious task of cleaning up duplicated project roles after migrating to Atlassian Cloud. By focusing on replacing “(migrated)” roles with their original counterparts, it helps maintain a cleaner and more manageable project setup. Remember to check your workflows manually to ensure they are not disrupted by these changes.
Feel free to customize the script to better suit your specific needs and improve your migration process.