Introduction
When migrating between Atlassian instances or during company rebranding/mergers, one of the most challenging aspects is preserving user permissions. This is especially true when email addresses have already been claimed in the target instance.
Recently, I faced this exact challenge while helping a client migrate from one company to another. The users’ email addresses from the source instance couldn’t be transferred to the destination company’s domain because the corresponding accounts had already been created in the target instance. As a result, renaming the email addresses was not an option. After the migration, we needed to efficiently transfer all permissions, assignments, and roles from the original user accounts to the new ones.
What would normally take several hours of manual work can now be accomplished in minutes with a bit of API knowledge, some Python scripting, and of course AI. Let me share how we solved this problem.
Understanding the Jira and Confluence API Challenge
The Atlassian REST APIs are powerful but not always intuitive. For example, the Confluence permissions API structure can be particularly challenging to work with. When adding permissions, you must ensure users have “read space” permission before adding any other permission, or you’ll encounter cryptic error messages.
Similarly, with Jira, there are different API endpoints for updating issue assignments, project roles, and group memberships, each with their own specific requirements.
The Solution: Automated Migration Scripts
I’ve created two scripts that handle the comprehensive migration of user data:
- A Jira user migration script for issue assignments, reporter fields, project roles, and group memberships
- A Confluence space permission migration script
Both scripts follow a similar approach:
- Read user mappings from a CSV file (old user ID → new user ID)
- Fetch existing data (permissions, assignments, roles, etc.)
- Apply these to the new users
Example User Mapping CSV
Here’s a sample of what the user mapping CSV file should look like:
User name,User id Source,email,old_domain_user,matched id,matching new_domain email
John Smith,5f723e8a9c7df9001e5c4f12,john.smith@oldcompany.com,TRUE,70121:e08b84cf-d953-4d1e-98d8-b59b91e81c81,john.smith@newcompany.com
Jane Doe,5e8a24c96f9ca9001f553d45,jane.doe@oldcompany.com,TRUE,70121:c84e8b99-0bf5-4eb3-b532-9a4e22cbab26,jane.doe@newcompany.com
Alex Johnson,5f9a20c16f35a9001a45b210,alex.johnson@oldcompany.com,TRUE,70121:3d44f19c-5269-4d9f-8cfe-0827e953a739,alex.johnson@newcompany.com
Maria Garcia,5e72f9c16f9ca9001f553e12,maria.garcia@oldcompany.com,TRUE,70121:a7b9c6e4-1234-5678-9abc-def123456789,maria.garcia@newcompany.com
Bob Williams,5f9a20c16f35a9001a45b211,bob.williams@oldcompany.com,FALSE,#N/A,#N/A
Notes about the CSV:
- The
User id Sourcecolumn contains the account IDs from the source system - The
matched idcolumn contains the account IDs in the target system - The
old_domain_usercolumn indicates if the user had an email with the old domain - Users with
FALSEin theold_domain_usercolumn or#N/Ain thematched idcolumn will be skipped
Example API Responses
Jira User Groups Response Example
When fetching a user’s groups, the API response will look something like this:
The Jira User Migration Script
#!/usr/bin/env python3
import csv
import requests
from requests.auth import HTTPBasicAuth
import json
import logging
from tqdm import tqdm
import time
import sys
# Logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("jira_user_migration.log"),
logging.StreamHandler()
]
)
# Jira settings
JIRA_BASE_URL = "https://your-instance.atlassian.net"
JIRA_API_TOKEN = "YOUR_API_TOKEN"
JIRA_EMAIL = "your.email@example.com"
# Authentication
auth = HTTPBasicAuth(JIRA_EMAIL, JIRA_API_TOKEN)
# Headers for requests
headers = {
"Accept": "application/json",
"Content-Type": "application/json"
}
# Base for APIs
JIRA_API_BASE = f"{JIRA_BASE_URL}/rest/api/3"
def read_users_csv():
"""Reads the CSV file and returns a list of user mappings"""
users_to_migrate = []
try:
with open('user-mappings.csv', mode='r', encoding='utf-8') as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
if row['old_domain_user'].upper() == 'TRUE' and row['matched id'] != '#N/A':
old_user_id = row['User id Source']
new_user_id = row['matched id']
if old_user_id and new_user_id and new_user_id != '#N/A':
logging.info(f"Found user mapping: {row['User name']} - OLD: {old_user_id} -> NEW: {new_user_id}")
users_to_migrate.append({
'old_id': old_user_id,
'new_id': new_user_id,
'name': row['User name'],
'old_email': row['email'],
'new_email': row['matching new_domain email']
})
logging.info(f"Found {len(users_to_migrate)} valid users for migration")
return users_to_migrate
except Exception as e:
logging.error(f"Error reading CSV file: {str(e)}")
sys.exit(1)
def find_issues_assigned_to_user(user_id):
"""Find all issues assigned to a specific user"""
jql = f'assignee = {user_id} AND resolution = Unresolved'
url = f"{JIRA_API_BASE}/search"
issues = []
start_at = 0
max_results = 100
while True:
params = {
'jql': jql,
'startAt': start_at,
'maxResults': max_results,
'fields': 'id,key,summary,assignee'
}
response = requests.get(url, params=params, headers=headers, auth=auth)
if response.status_code != 200:
logging.error(f"Error searching for issues: {response.status_code} - {response.text}")
break
data = response.json()
issues.extend(data['issues'])
total = data['total']
if start_at + max_results >= total:
break
start_at += max_results
return issues
def update_issue_assignee(issue_key, new_user_id):
"""Update the assignee of an issue"""
url = f"{JIRA_API_BASE}/issue/{issue_key}"
payload = {
"fields": {
"assignee": {
"id": new_user_id
}
}
}
response = requests.put(url, json=payload, headers=headers, auth=auth)
if response.status_code in [200, 204]:
return True
else:
logging.error(f"Error updating assignee for issue {issue_key}: {response.status_code} - {response.text}")
return False
def find_issues_reported_by_user(user_id):
"""Find all issues reported by a specific user"""
jql = f'reporter = {user_id}'
url = f"{JIRA_API_BASE}/search"
issues = []
start_at = 0
max_results = 100
while True:
params = {
'jql': jql,
'startAt': start_at,
'maxResults': max_results,
'fields': 'id,key,summary,reporter'
}
response = requests.get(url, params=params, headers=headers, auth=auth)
if response.status_code != 200:
logging.error(f"Error searching for issues: {response.status_code} - {response.text}")
break
data = response.json()
issues.extend(data['issues'])
total = data['total']
if start_at + max_results >= total:
break
start_at += max_results
return issues
def update_issue_reporter(issue_key, new_user_id):
"""Update the reporter of an issue"""
url = f"{JIRA_API_BASE}/issue/{issue_key}"
payload = {
"fields": {
"reporter": {
"id": new_user_id
}
}
}
response = requests.put(url, json=payload, headers=headers, auth=auth)
if response.status_code in [200, 204]:
return True
else:
logging.error(f"Error updating reporter for issue {issue_key}: {response.status_code} - {response.text}")
return False
def get_user_groups(user_id):
"""Get all groups that a user is a member of"""
url = f"{JIRA_API_BASE}/user/groups"
params = {
'accountId': user_id
}
response = requests.get(url, params=params, headers=headers, auth=auth)
if response.status_code == 200:
return response.json()
else:
logging.error(f"Error getting groups for user: {response.status_code} - {response.text}")
return []
def add_user_to_group(group_name, user_id):
"""Add a user to a group"""
url = f"{JIRA_API_BASE}/group/user"
params = {
'groupname': group_name
}
payload = {
"accountId": user_id
}
response = requests.post(url, params=params, json=payload, headers=headers, auth=auth)
if response.status_code in [200, 201, 204]:
return True
else:
logging.error(f"Error adding user to group {group_name}: {response.status_code} - {response.text}")
return False
def get_user_project_roles(user_id):
"""Get all project roles assigned to a user"""
# First get all projects
url = f"{JIRA_API_BASE}/project"
projects = []
start_at = 0
max_results = 50
while True:
params = {
'startAt': start_at,
'maxResults': max_results
}
response = requests.get(url, params=params, headers=headers, auth=auth)
if response.status_code != 200:
logging.error(f"Error getting projects: {response.status_code} - {response.text}")
break
data = response.json()
projects.extend(data)
if len(data) < max_results:
break
start_at += max_results
# For each project, check the roles the user has
user_roles = []
for project in projects:
project_id = project['id']
project_key = project['key']
roles_url = f"{JIRA_API_BASE}/project/{project_id}/role"
roles_response = requests.get(roles_url, headers=headers, auth=auth)
if roles_response.status_code != 200:
logging.error(f"Error getting roles for project {project_key}: {roles_response.status_code} - {roles_response.text}")
continue
roles = roles_response.json()
for role_name, role_url in roles.items():
role_id = role_url.split('/')[-1]
role_detail_url = f"{JIRA_API_BASE}/project/{project_id}/role/{role_id}"
role_detail_response = requests.get(role_detail_url, headers=headers, auth=auth)
if role_detail_response.status_code != 200:
continue
role_details = role_detail_response.json()
# Check if user is in the role
actors = role_details.get('actors', [])
for actor in actors:
if actor.get('actorUser', {}).get('accountId') == user_id:
user_roles.append({
'project_id': project_id,
'project_key': project_key,
'role_id': role_id,
'role_name': role_name
})
break
return user_roles
def add_user_to_project_role(project_id, role_id, user_id):
"""Add a user to a project role"""
url = f"{JIRA_API_BASE}/project/{project_id}/role/{role_id}"
payload = {
"user": [
{
"accountId": user_id
}
]
}
response = requests.post(url, json=payload, headers=headers, auth=auth)
if response.status_code in [200, 201, 204]:
return True
else:
logging.error(f"Error adding user to project role: {response.status_code} - {response.text}")
return False
def migrate_user_permissions(users):
"""Migrate user permissions from old to new users"""
total_updated = {
'assignee': 0,
'reporter': 0,
'groups': 0,
'project_roles': 0
}
for user in tqdm(users, desc="Migrating users"):
old_id = user['old_id']
new_id = user['new_id']
name = user['name']
logging.info(f"Processing user: {name} ({old_id} -> {new_id})")
# Migrate issue assignments
issues_assigned = find_issues_assigned_to_user(old_id)
logging.info(f"Found {len(issues_assigned)} issues assigned to {name}")
for issue in tqdm(issues_assigned, desc=f"Migrating assignments for {name}", leave=False):
issue_key = issue['key']
if update_issue_assignee(issue_key, new_id):
total_updated['assignee'] += 1
# Migrate issue reporters
issues_reported = find_issues_reported_by_user(old_id)
logging.info(f"Found {len(issues_reported)} issues reported by {name}")
for issue in tqdm(issues_reported, desc=f"Migrating reporters for {name}", leave=False):
issue_key = issue['key']
if update_issue_reporter(issue_key, new_id):
total_updated['reporter'] += 1
# Migrate group memberships
groups = get_user_groups(old_id)
logging.info(f"Found {len(groups)} groups for {name}")
for group in tqdm(groups, desc=f"Migrating groups for {name}", leave=False):
group_name = group['name']
if add_user_to_group(group_name, new_id):
total_updated['groups'] += 1
# Migrate project roles
roles = get_user_project_roles(old_id)
logging.info(f"Found {len(roles)} project roles for {name}")
for role in tqdm(roles, desc=f"Migrating project roles for {name}", leave=False):
project_id = role['project_id']
role_id = role['role_id']
if add_user_to_project_role(project_id, role_id, new_id):
total_updated['project_roles'] += 1
# Pause to avoid rate limiting
time.sleep(1)
return total_updated
def main():
logging.info("Starting Jira user migration process")
# Step 1: Ask for confirmation before migrating
proceed = input("\nDo you want to proceed with migrating Jira users? (yes/no): ")
if proceed.lower() != 'yes':
logging.info("Migration cancelled by user.")
return
# Step 2: Read user mappings and perform migration
users = read_users_csv()
if users:
total_updated = migrate_user_permissions(users)
logging.info(f"Migration completed. Stats:")
logging.info(f"Issues with updated assignee: {total_updated['assignee']}")
logging.info(f"Issues with updated reporter: {total_updated['reporter']}")
logging.info(f"Group memberships added: {total_updated['groups']}")
logging.info(f"Project roles added: {total_updated['project_roles']}")
else:
logging.warning("No user mappings found. Migration skipped.")
logging.info("Process completed")
if __name__ == "__main__":
main()
The Confluence Permission Migration Script
#!/usr/bin/env python3
import csv
import requests
from requests.auth import HTTPBasicAuth
import json
import logging
from tqdm import tqdm
import time
import sys
from prettytable import PrettyTable # For displaying tabular data
# Logging configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("confluence_user_migration.log"),
logging.StreamHandler()
]
)
# Confluence settings
CONFLUENCE_BASE_URL = "https://your-instance.atlassian.net/wiki"
CONFLUENCE_API_TOKEN = "YOUR_API_TOKEN"
CONFLUENCE_EMAIL = "your.email@example.com"
# Authentication
auth = HTTPBasicAuth(CONFLUENCE_EMAIL, CONFLUENCE_API_TOKEN)
# Headers for requests
headers = {
"Accept": "application/json",
"Content-Type": "application/json"
}
# Base for APIs
CONFLUENCE_API_BASE = f"{CONFLUENCE_BASE_URL}/rest/api"
# List of Confluence space keys
SPACE_KEYS = [
"SPACE1", "SPACE2", "SPACE3", "SPACE4", "SPACE5"
# Add your space keys here
]
def read_users_csv():
"""Reads the CSV file and returns a list of user mappings"""
users_to_migrate = []
try:
with open('user-mappings.csv', mode='r', encoding='utf-8') as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
if row['old_domain_user'].upper() == 'TRUE' and row['matched id'] != '#N/A':
old_user_id = row['User id Source']
new_user_id = row['matched id']
if old_user_id and new_user_id and new_user_id != '#N/A':
logging.info(f"Found user mapping: {row['User name']} - OLD: {old_user_id} -> NEW: {new_user_id}")
users_to_migrate.append({
'old_id': old_user_id,
'new_id': new_user_id,
'name': row['User name'],
'old_email': row['email'],
'new_email': row['matching new_domain email']
})
logging.info(f"Found {len(users_to_migrate)} valid users for migration")
return users_to_migrate
except Exception as e:
logging.error(f"Error reading CSV file: {str(e)}")
sys.exit(1)
def get_space_permissions(space_key):
"""Gets all permissions for a specific space"""
url = f"{CONFLUENCE_API_BASE}/space?spaceKey={space_key}&expand=permissions"
try:
response = requests.get(
url,
headers=headers,
auth=auth
)
if response.status_code == 200:
data = response.json()
if 'results' in data and len(data['results']) > 0:
space = data['results'][0]
if 'permissions' in space:
return space['permissions']
logging.warning(f"No permissions found for space {space_key}")
return []
else:
logging.error(f"Error getting permissions for space {space_key}: {response.status_code} - {response.text}")
return []
except Exception as e:
logging.error(f"Exception getting permissions for space {space_key}: {str(e)}")
return []
def print_space_permission_summary(space_key, permissions):
"""Prints a summary of permissions for a space in a more visible format"""
if not permissions:
print(f"No permissions found for space: {space_key}")
return
print(f"\n{'='*80}")
print(f"SPACE: {space_key}")
print(f"{'='*80}")
# Create tables
users_table = PrettyTable()
users_table.field_names = ["User Name", "Account ID", "Operation", "Target Type"]
groups_table = PrettyTable()
groups_table.field_names = ["Group Name", "Group ID", "Operation", "Target Type"]
user_count = 0
group_count = 0
# Process permissions
for permission in permissions:
operation_data = permission.get('operation', {})
operation_name = operation_data.get('operation', 'unknown')
target_type = operation_data.get('targetType', 'unknown')
subjects = permission.get('subjects', {})
# Process user subjects
if 'user' in subjects and 'results' in subjects['user']:
for user in subjects['user']['results']:
user_name = user.get('displayName', 'Unknown User')
account_id = user.get('accountId', 'No ID')
users_table.add_row([user_name, account_id, operation_name, target_type])
user_count += 1
# Process group subjects
if 'group' in subjects and 'results' in subjects['group']:
for group in subjects['group']['results']:
group_name = group.get('name', 'Unknown Group')
group_id = group.get('id', 'No ID')
groups_table.add_row([group_name, group_id, operation_name, target_type])
group_count += 1
# Print summary
print(f"Total permissions: {len(permissions)}")
print(f"User permissions: {user_count}")
print(f"Group permissions: {group_count}")
# Print tables
if user_count > 0:
print("\nUSER PERMISSIONS:")
print(users_table)
if group_count > 0:
print("\nGROUP PERMISSIONS:")
print(groups_table)
def add_permission_to_space(space_key, key, target, account_id):
"""
Adds permission for a user in a space according to the Atlassian API requirements.
Args:
space_key (str): The key of the space to add the permission to
key (str): The operation key (e.g., "administer", "read", "create", etc.)
target (str): The target type (e.g., "page", "blogpost", "space", etc.)
account_id (str): The account ID of the user to add the permission for
Returns:
bool: True if successful, False otherwise
"""
url = f"{CONFLUENCE_API_BASE}/space/{space_key}/permission"
# Payload according to the API documentation
payload = {
"subject": {
"type": "user",
"identifier": account_id # Using accountId as identifier for users
},
"operation": {
"key": key,
"target": target
}
}
try:
response = requests.post(
url,
json=payload,
headers=headers,
auth=auth
)
if response.status_code in [200, 201, 204]:
logging.info(f"Permission {key} on {target} successfully added for user (accountId: {account_id}) in space {space_key}")
return True
else:
error_message = "Unknown error"
try:
error_data = response.json()
if 'message' in error_data:
error_message = error_data['message']
elif 'data' in error_data and 'errors' in error_data['data']:
errors = error_data['data']['errors']
if errors and 'message' in errors[0]:
error_message = errors[0]['message'].get('translation', 'Unknown error')
except:
error_message = response.text
logging.error(f"Error adding permission {key} on {target} for user: {response.status_code} - {error_message}")
return False
except Exception as e:
logging.error(f"Exception adding permission for user: {str(e)}")
return False
def process_space_permissions_migration(users):
"""Processes the migration of space permissions for users, always adding read permission first"""
total_permissions_updated = 0
for space_key in tqdm(SPACE_KEYS, desc="Processing space permissions"):
logging.info(f"Processing space: {space_key}")
# Get all permissions for the space
permissions = get_space_permissions(space_key)
# Display current permissions for the space
print_space_permission_summary(space_key, permissions)
if not permissions:
logging.warning(f"No permissions found for space {space_key}")
continue
# Track users and their permissions to migrate
users_to_migrate = {}
# First, collect all permissions that need to be migrated
for permission in permissions:
operation_data = permission.get('operation', {})
operation_key = operation_data.get('operation', '')
target_type = operation_data.get('targetType', '')
if not operation_key or not target_type:
continue
subjects = permission.get('subjects', {})
if 'user' in subjects and 'results' in subjects['user']:
for user_entry in subjects['user']['results']:
user_id = user_entry.get('accountId')
if not user_id:
continue
# Check if the user is in the migration list
for user in users:
old_id = user['old_id']
new_id = user['new_id']
if user_id == old_id:
# Add to migration list
if new_id not in users_to_migrate:
users_to_migrate[new_id] = {
'user_info': user,
'permissions': []
}
# Add permission if not already in the list
perm_entry = {
'key': operation_key,
'target': target_type
}
if perm_entry not in users_to_migrate[new_id]['permissions']:
users_to_migrate[new_id]['permissions'].append(perm_entry)
# Now process the migrations, ALWAYS adding read space permission first
for new_id, data in users_to_migrate.items():
user_info = data['user_info']
permissions_to_add = data['permissions']
# ALWAYS add read space permission first, regardless of other permissions
logging.info(f"Adding required 'read space' permission first for {user_info['name']} in space {space_key}")
read_space_added = add_permission_to_space(space_key, 'read', 'space', new_id)
if read_space_added:
total_permissions_updated += 1
else:
logging.error(f"Failed to add required 'read space' permission for {user_info['name']}. Skipping other permissions.")
continue
# Add a short pause after the read permission
time.sleep(0.5)
# Now add all other permissions (except read space if it's already in the list)
for permission in permissions_to_add:
# Skip if this is the read space permission as we already added it
if permission['key'] == 'read' and permission['target'] == 'space':
continue
logging.info(f"Adding permission {permission['key']} on {permission['target']} for {user_info['name']} in space {space_key}")
if add_permission_to_space(space_key, permission['key'], permission['target'], new_id):
total_permissions_updated += 1
else:
logging.warning(f"Failed to add permission {permission['key']} on {permission['target']} for {user_info['name']}")
# Small pause between permissions
time.sleep(0.2)
# Larger pause between users
time.sleep(0.5)
# Larger pause between spaces
time.sleep(1)
return total_permissions_updated
def main():
logging.info("Starting Confluence user permission migration process")
# Step 1: Ask for confirmation before migrating
proceed = input("\nDo you want to proceed with migrating permissions? (yes/no): ")
if proceed.lower() != 'yes':
logging.info("Migration cancelled by user.")
return
# Step 2: Read user mappings and perform migration
users = read_users_csv()
if users:
total_permissions = process_space_permissions_migration(users)
logging.info(f"Total permissions migrated: {total_permissions}")
else:
logging.warning("No user mappings found. Migration skipped.")
logging.info("Process completed")
if __name__ == "__main__":
main()
API Error Examples and Troubleshooting
Common Confluence API Error: Missing Read Permission
If you try to add a permission without first adding the read space permission, you’ll receive an error like this:
{
"statusCode": 400,
"data": {
"authorized": true,
"valid": false,
"errors": [
{
"message": {
"translation": "'read space' permission has to exist before adding any other permissions for a user or group. Try adding 'read space' permission for the given user or group.",
"args": []
}
}
],
"successful": false
},
"message": "com.atlassian.confluence.api.service.exceptions.BadRequestException: 'read space' permission has to exist before adding any other permissions for a user or group. Try adding 'read space' permission for the given user or group."
}
Practical Tips for Running the Scripts
- Start with a dry run: Make a copy of the script and comment out the actual update lines, just logging what would be changed.
- Use smaller batches: If you have many users to migrate, consider running the script on smaller batches first to ensure everything works as expected.
- Monitor rate limiting: Atlassian APIs have rate limits. If you’re experiencing issues, increase the
time.sleep()values. - Keep good logs: The scripts include comprehensive logging. Save these logs for troubleshooting and to have a record of what was migrated.
- Test on non-critical spaces first: Start with test spaces before migrating permissions on critical production spaces.
Conclusion
Migrating between Atlassian instances doesn’t have to involve tedious manual permission reapplication. With the right scripts, understanding of the APIs and you best allied: Artifficial Intelligence, you can automate this process and save countless hours of work.
The scripts provided here focus on the most essential aspects of user migration—space permissions in Confluence and issue assignments, reporter fields, project roles, and group memberships in Jira. While they don’t address certain elements like page creator information or certain restrictions (which weren’t required in our case), they provide a strong foundation that you can build upon for your specific migration needs.
Have you faced similar challenges with Atlassian migrations? Share your experiences in the comments below!
Fantastic, as always, Rodolfo!
LikeLiked by 1 person