# Steps for workspaces deployment 0. ### Create and Alter tables ```sql create table e_workspace_status ( name varchar(100) not null, constraint e_workspace_status_pk unique (name) ); create table workspaces ( id int auto_increment primary key, name varchar(255) not null, slug varchar(100) not null, status varchar(255) not null, is_default tinyint(1) default 0 not null, timezone varchar(255) DEFAULT 'GMT', workspace_image text, created_at timestamp default CURRENT_TIMESTAMP null, updated_at timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP, constraint workspace_e_workspace_status_name_fk foreign key (status) references e_workspace_status (name) ); create table account_workspaces ( id int auto_increment primary key, account_id int unsigned not null, workspace_id int not null, created_at timestamp default CURRENT_TIMESTAMP null, updated_at timestamp default CURRENT_TIMESTAMP null on update CURRENT_TIMESTAMP, constraint account_workspaces_accounts_id_fk foreign key (account_id) references accounts (id), constraint account_workspaces_workspaces_id_fk foreign key (workspace_id) references workspaces (id) ); create table workspace_users ( `id` int NOT NULL AUTO_INCREMENT, `workspace_id` int NOT NULL, `user_id` int unsigned NOT NULL, `created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, `role_id` int unsigned DEFAULT NULL, PRIMARY KEY (`id`), KEY `workspace_users_users_id_fk` (`user_id`), KEY `workspace_users_workspaces_id_fk` (`workspace_id`), KEY `workspace_users_roles_id_fk` (`role_id`), CONSTRAINT `workspace_users_roles_id_fk` FOREIGN KEY (`role_id`) REFERENCES `roles` (`id`), CONSTRAINT `workspace_users_users_id_fk` FOREIGN KEY (`user_id`) REFERENCES `users` (`id`), CONSTRAINT `workspace_users_workspaces_id_fk` FOREIGN KEY (`workspace_id`) REFERENCES `workspaces` (`id`) ); ``` ```sql -- Run before everything update dc_admin.configured_reports set deleted_at = null where deleted_at < '2015-01-01'; alter table default_reports add workspace_id int null; alter table default_reports add constraint default_reports_workspaces_id_fk foreign key (workspace_id) references workspaces (id); alter table storages add workspace_id int null; alter table storages add constraint storages_workspaces_id_fk foreign key (workspace_id) references workspaces (id); alter table transformations add workspace_id int null; alter table transformations add constraint transformations_workspaces_id_fk foreign key (workspace_id) references workspaces (id); alter table configured_reports add workspace_id int null; alter table configured_reports add constraint configured_reports_workspaces_id_fk foreign key (workspace_id) references workspaces (id); alter table configured_syncs add workspace_id int null; alter table configured_syncs add constraint configured_syncs_workspaces_id_fk foreign key (workspace_id) references workspaces (id); alter table datasource_creds add workspace_id int null; alter table datasource_creds add constraint datasource_creds_workspaces_id_fk foreign key (workspace_id) references workspaces (id); alter table data_models add workspace_id int null; alter table data_models add constraint data_models_workspaces_id_fk foreign key (workspace_id) references workspaces (id); alter table account_notifications add workspace_id int null; alter table account_notifications add constraint account_notifications_workspaces_id_fk foreign key (workspace_id) references workspaces (id); alter table data_warehouses add workspace_id int null; alter table data_warehouses add constraint data_warehouses_workspaces_id_fk foreign key (workspace_id) references workspaces (id); alter table report_templates add workspace_id int null; alter table report_templates add constraint report_templates_workspaces_id_fk foreign key (workspace_id) references workspaces (id); alter table dbt_credentials add workspace_id int null; alter table dbt_credentials add constraint dbt_credentials_workspaces_id_fk foreign key (workspace_id) references workspaces (id); alter table dbt_projects add workspace_id int null; alter table dbt_projects add constraint dbt_projects_workspaces_id_fk foreign key (workspace_id) references workspaces (id); ALTER TABLE notification_logs ADD COLUMN workspace_id integer; alter table notification_logs add constraint notification_logs_workspaces_id_fk foreign key (workspace_id) references workspaces (id); ALTER TABLE orchestrations ADD workspace_id INT DEFAULT null NULL; ALTER TABLE orchestrations ADD CONSTRAINT orchestrations_workspaces_FK FOREIGN KEY (workspace_id) REFERENCES workspaces(id); ``` ### Alter table for logs db ```sql use dc_logs; alter table agg_report_run_status add workspace_id int null; alter table log_reverse_sync_runs add workspace_id int null; ALTER TABLE log_orchestration_runs ADD workspace_id INT DEFAULT null NULL; ``` - Insert ACTIVE,DELETED,PAUSED values into e_workspace_status table 1. ### Creating default account workspaces ```python= import traceback from loguru import logger from dotenv import load_dotenv; load_dotenv() from datachannel_models.reverse_connector import ReverseConnector from datachannel_models.account import Account from datachannel_models.workspace import Workspace from datachannel_models.account_workspace import AccountWorkspace from datachannel_models.account_workspace import Workspace from datachannel.common.database import get_database_session def create_workspace_for_account(account, db_session): logger.info(f"adding for account {account.id}") workspace_name = account.name workspace_slug = f"{account.slug}_default" timezone = account.account_timezone workspace = Workspace(name = workspace_name, slug = workspace_slug, timezone = timezone, status="ACTIVE", is_default = True) db_session.add(workspace) db_session.flush() account_workspace = AccountWorkspace(account_id = account.id, workspace_id = workspace.id) db_session.add(account_workspace) logger.info(f"finished adding for account {account.id}") def main(db_session): accounts = db_session.query(Account).all() for account in accounts: create_workspace_for_account(account, db_session) logger.info("Completed") db_session.commit() if __name__ == '__main__': try: db_session = get_database_session() main(db_session) except Exception as exc: print(traceback.format_exc()) finally: logger.info("CLosing db session") db_session.close() ``` 2. ### User migration to workspace created in step 1 - Add role named "Super-Admin" in roles table in admin db ```sql INSERT INTO dc_admin.roles (name,created_at,updated_at,service) VALUES ('Super-Admin',NULL,NULL,'Super-Admin'); ``` - Add permissions to superadmin role. Same permissions as admin. Add 3 extra permissions - **workspace.read, workspace.create, workspace.edit. Replace id 9 with super-admin role id from step 2.** ```sql INSERT INTO dc_admin.permissions (role_id,permission,created_at,updated_at,service) VALUES (9,'report.create',NULL,NULL,'FRONTEND_APP'), (9,'report.edit',NULL,NULL,'FRONTEND_APP'), (9,'user.create',NULL,NULL,'FRONTEND_APP'), (9,'user.edit',NULL,NULL,'FRONTEND_APP'), (9,'user.read',NULL,NULL,'FRONTEND_APP'), (9,'datadestination.edit',NULL,NULL,'FRONTEND_APP'), (9,'datadestination.create',NULL,NULL,'FRONTEND_APP'), (9,'notification.read',NULL,NULL,'FRONTEND_APP'), (9,'notification.add',NULL,NULL,'FRONTEND_APP'), (9,'notification.delete',NULL,NULL,'FRONTEND_APP'); INSERT INTO dc_admin.permissions (role_id,permission,created_at,updated_at,service) VALUES (9,'report.run',NULL,NULL,'FRONTEND_APP'), (9,'account.create',NULL,NULL,'FRONTEND_APP'), (9,'template.create',NULL,NULL,'FRONTEND_APP'), (9,'transformation.edit',NULL,NULL,'FRONTEND_APP'), (9,'transformation.delete',NULL,NULL,'FRONTEND_APP'), (9,'transformation.run',NULL,NULL,'FRONTEND_APP'), (9,'transformation.add',NULL,NULL,'FRONTEND_APP'), (9,'transformation.read',NULL,NULL,'FRONTEND_APP'), (9,'datawarehouse.read',NULL,NULL,'FRONTEND_APP'), (9,'datawarehouse.add',NULL,NULL,'FRONTEND_APP'); INSERT INTO dc_admin.permissions (role_id,permission,created_at,updated_at,service) VALUES (9,'datawarehouse.delete',NULL,NULL,'FRONTEND_APP'), (9,'datawarehouse.edit',NULL,NULL,'FRONTEND_APP'), (9,'user.delete',NULL,NULL,'FRONTEND_APP'), (9,'workspace.read',NULL,NULL,'FRONTEND_APP'), (9,'workspace.create',NULL,NULL,'FRONTEND_APP'), (9,'workspace.edit',NULL,NULL,'FRONTEND_APP'); ``` - script for migrating users. **Change on prod accordingly (SUperadmin role id)** ```python= import traceback from loguru import logger from dotenv import load_dotenv; load_dotenv() from datachannel_models.reverse_connector import ReverseConnector from datachannel_models.role import Role from datachannel_models.account import Account from datachannel_models.account_role_user import AccountRoleUser from datachannel_models.workspace_user import WorkspaceUser from datachannel_models.workspace import Workspace from datachannel_models.user import User from datachannel_models.account_workspace import AccountWorkspace from datachannel_models.account_workspace import Workspace from datachannel.common.database import get_database_session def main(db_session): accounts = db_session.query(Account).all() for account in accounts: logger.info(f"account id = {account.id}") acc_workspace = db_session.query(AccountWorkspace).filter_by(account_id = account.id).one() acu = db_session.query(AccountRoleUser).filter_by(account_id = account.id).order_by(AccountRoleUser.id.asc()).all() added_super_admin = False for account_role_user in acu: if account_role_user.user: logger.info(f"started for account role user id = {account_role_user.id}") logger.info(f"working on user id = {account_role_user.user.id}") logger.debug(account_role_user.user.id) role = account_role_user.role_id if account_role_user.user.status == "ACTIVE" and not added_super_admin: role = 9 # Change on prod accordingly (SUperadmin role id) added_super_admin = True workspace_user = WorkspaceUser(user_id = account_role_user.user.id, workspace_id = acc_workspace.workspace_id, role_id = account_role_user.role_id) account_role_user.role_id = role db_session.add(workspace_user) logger.info(f"finished for user id = {account_role_user.user.id}") logger.info("Completed") db_session.commit() if __name__ == '__main__': try: db_session = get_database_session() main(db_session) except Exception as exc: print(traceback.format_exc()) finally: logger.info("CLosing db") db_session.close() ``` - Updating firebase users to set custom claim to the default workspace ```python= import traceback import firebase_admin from firebase_admin import credentials from firebase_admin import auth from loguru import logger from dotenv import load_dotenv; load_dotenv() from datachannel_models.workspace_user import WorkspaceUser from datachannel_models.user import User from datachannel.common.database import get_database_session # change to firebaseConfig.prod.json on production deployment cred = credentials.Certificate( "/app/datachannel/firebaseConfig.stage.json") firebase_app = firebase_admin.initialize_app(cred) def main(db_session): users = db_session.query(User).all() for user in users: logger.info(f"working on user_id {user.id}") firebase_id = user.firebase_id workspaceUser = db_session.query(WorkspaceUser).filter_by(user_id = user.id).first() if workspaceUser: try: auth.set_custom_user_claims(firebase_id, {"userId": user.id, "workspace_id": workspaceUser.workspace_id}, app=None) except Exception as exc: logger.error(exc) logger.info(f"completed for user_id {user.id}") logger.info("Completed") db_session.commit() if __name__ == '__main__': try: db_session = get_database_session() main(db_session) except Exception as exc: print(traceback.format_exc()) finally: logger.info("CLosing db") db_session.close() ``` 3. ### MIgrating Entities for workspace ```python= import traceback from loguru import logger from dotenv import load_dotenv; load_dotenv() from datachannel_models.account import Account from datachannel_models.datasource import Datasource from datachannel_models.data_destination import DataDestination from datachannel_models.account_workspace import AccountWorkspace from datachannel_models.configured_report import ConfiguredReport from datachannel_models.configured_sync import ConfiguredSync from datachannel_models.data_warehouse import DataWarehouse from datachannel_models.transformation import Transformation from datachannel_models.datasource_cred import DatasourceCred from datachannel.models.core import DefaultReport from datachannel_models.storage import Storage from datachannel_models.dbt_credential import DBTCredential from datachannel_models.dbt_project import DBTProject from datachannel_models.data_model import DataModel from datachannel.models.core import AccountNotification from datachannel_models.user import User from datachannel.common.database import get_database_session from datachannel_models.orchestration import Orchestration def update_orchestrations(account, workspace_id, db_session): logger.info("update_orchestrations") orchestrations = db_session.query(Orchestration).filter_by(account_id = account.id).all() for orchestration in orchestrations: orchestration.workspace_id = workspace_id def update_configured_reports(account, workspace_id, db_session): logger.info("update_configured_reports") configured_reports = db_session.query(ConfiguredReport).filter_by(account_id = account.id).all() for report in configured_reports: report.workspace_id = workspace_id def update_configured_syncs(account, workspace_id, db_session): logger.info("update_configured_syncs") configured_syncs = db_session.query(ConfiguredSync).filter_by(account_id = account.id).all() for sync in configured_syncs: sync.workspace_id = workspace_id def update_warehouses(account, workspace_id, db_session): logger.info("update_warehouses") warehouses = db_session.query(DataWarehouse).filter_by(account_id = account.id).all() for warehouse in warehouses: warehouse.workspace_id = workspace_id def update_storages(account, workspace_id, db_session): logger.info("update_storages") storages = db_session.query(Storage).filter_by(account_id = account.id).all() for storage in storages: storage.workspace_id = workspace_id def update_transformations(account, workspace_id, db_session): logger.info("update_transformations") transformations = db_session.query(Transformation).filter_by(account_id = account.id).all() for transformation in transformations: transformation.workspace_id = workspace_id def update_datasource_creds(account, workspace_id, db_session): logger.info("update_datasource_creds") creds = db_session.query(DatasourceCred).filter_by(account_id = account.id).all() for cred in creds: cred.workspace_id = workspace_id def update_default_reports(account, workspace_id, db_session): logger.info("update_default_reports") d_reports = db_session.query(DefaultReport).filter_by(account_id = account.id).all() for d_report in d_reports: d_report.workspace_id = workspace_id def update_dbt_cred(account, workspace_id, db_session): logger.info("update_dbt_cred") creds = db_session.query(DBTCredential).filter_by(account_id = account.id).all() for cred in creds: cred.workspace_id = workspace_id def update_dbt_projects(account, workspace_id, db_session): logger.info("update_dbt_projects") projects = db_session.query(DBTProject).filter_by(account_id = account.id).all() for project in projects: project.workspace_id = workspace_id def update_data_models(account, workspace_id, db_session): logger.info("update_data_models") models = db_session.query(DataModel).filter_by(account_id = account.id).all() for model in models: model.workspace_id = workspace_id def update_account_notifications(account, workspace_id, db_session): logger.info("update_account_notifications") nots = db_session.query(AccountNotification).filter_by(account_id = account.id).all() for noti in nots: noti.workspace_id = workspace_id def main(db_session): logger.info("Started") accounts = db_session.query(Account).all() for account in accounts: logger.debug(f"################## account = {account.id} ##################") acc_workspace = db_session.query(AccountWorkspace).filter_by(account_id = account.id).one() update_warehouses(account, acc_workspace.workspace_id, db_session) update_storages(account, acc_workspace.workspace_id, db_session) update_configured_reports(account, acc_workspace.workspace_id, db_session) update_datasource_creds(account, acc_workspace.workspace_id, db_session) update_data_models(account, acc_workspace.workspace_id, db_session) update_configured_syncs(account, acc_workspace.workspace_id, db_session) update_transformations(account, acc_workspace.workspace_id, db_session) update_default_reports(account, acc_workspace.workspace_id, db_session) update_dbt_cred(account, acc_workspace.workspace_id, db_session) update_dbt_projects(account, acc_workspace.workspace_id, db_session) update_account_notifications(account, acc_workspace.workspace_id, db_session) update_orchestrations(account, acc_workspace.workspace_id, db_session) logger.info("Completed") db_session.commit() if __name__ == '__main__': try: db_session = get_database_session() main(db_session) except Exception as exc: print(traceback.format_exc()) finally: logger.info("CLosing db") db_session.close() ``` 4. ### Migrating Logs - Script for AggReportRunStatus ```python= import traceback from loguru import logger from dotenv import load_dotenv; load_dotenv() from datachannel_models.account import Account from datachannel_models.log_tables.agg_report_run_status import AggReportRunStatus from datachannel_models.account_workspace import AccountWorkspace from datachannel.common.database import get_database_session account_id_to_worksapce_id_map = {} def update_agg_report_run_status(admin_db_session, log_db_session): logs = log_db_session.query(AggReportRunStatus).all() for log in logs: if log.account_id: logger.info(f"working for log id {log.id}") account_workspace_id = None if account_id_to_worksapce_id_map.get(log.account_id, None) is not None: account_workspace_id = account_id_to_worksapce_id_map.get(log.account_id, None) else: account_workspace = admin_db_session.query(AccountWorkspace).filter_by(account_id = log.account_id).one() account_id_to_worksapce_id_map[log.account_id] = account_workspace.workspace_id account_workspace_id = account_workspace.workspace_id logger.debug(account_workspace_id) log.workspace_id = account_workspace_id log.ts_updated = log.ts_updated logger.debug(log.workspace_id) logger.info(f"finished working for log id {log.id}") else: logger.error(f"No record found for {log.account_id}") def main(admin_db_session, log_db_session): update_agg_report_run_status(admin_db_session, log_db_session) logger.info("Starting Commit") log_db_session.commit() logger.info("Completed") if __name__ == '__main__': try: log_db_session = get_database_session("logs") admin_db_session = get_database_session() main(admin_db_session, log_db_session) except Exception as exc: logger.error(traceback.format_exc()) finally: logger.info("CLosing db") admin_db_session.close() log_db_session.close() ``` - Script to migrate log_reverse_sync_run ```python= import traceback from loguru import logger from dotenv import load_dotenv; load_dotenv() from datachannel_models.account import Account from datachannel_models.log_tables.log_reverse_sync_run import LogReverseSyncRun from datachannel_models.account_workspace import AccountWorkspace from datachannel.common.database import get_database_session account_id_to_worksapce_id_map = {} def update_log_reverse_sync_run(admin_db_session, log_db_session): logs = log_db_session.query(LogReverseSyncRun).all() for log in logs: if log.account_id: logger.info(f"working for log id {log.id}") account_workspace_id = None if account_id_to_worksapce_id_map.get(log.account_id, None) is not None: account_workspace_id = account_id_to_worksapce_id_map.get(log.account_id, None) else: account_workspace = admin_db_session.query(AccountWorkspace).filter_by(account_id = log.account_id).one() account_id_to_worksapce_id_map[log.account_id] = account_workspace.workspace_id account_workspace_id = account_workspace.workspace_id logger.debug(account_workspace_id) log.workspace_id = account_workspace_id log.updated_at = log.updated_at logger.debug(log.workspace_id) logger.info(f"finished working for log id {log.id}") else: logger.error(f"No record found for {log.account_id}") def main(admin_db_session, log_db_session): update_log_reverse_sync_run(admin_db_session, log_db_session) logger.info("Started Commit") log_db_session.commit() logger.info("Finished Commit") if __name__ == '__main__': try: log_db_session = get_database_session("logs") admin_db_session = get_database_session() main(admin_db_session, log_db_session) except Exception as exc: logger.error(traceback.format_exc()) finally: logger.info("CLosing db") admin_db_session.close() log_db_session.close() ``` - Script to migrate dc_admin.notification_logs table ```python= import traceback from loguru import logger from dotenv import load_dotenv; load_dotenv() from datachannel_models.account import Account from datachannel_models.notification_logs import NotificationLog from datachannel_models.account_workspace import AccountWorkspace from datachannel.common.database import get_database_session account_id_to_worksapce_id_map = {} def update_account_notification_logs(admin_db_session): logs = admin_db_session.query(NotificationLog).all() for log in logs: if log.account_id: logger.info(f"working for log id {log.id}") account_workspace_id = None if account_id_to_worksapce_id_map.get(log.account_id, None) is not None: account_workspace_id = account_id_to_worksapce_id_map.get(log.account_id, None) else: account_workspace = admin_db_session.query(AccountWorkspace).filter_by(account_id = log.account_id).one() account_id_to_worksapce_id_map[log.account_id] = account_workspace.workspace_id account_workspace_id = account_workspace.workspace_id logger.debug(account_workspace_id) log.workspace_id = account_workspace_id log.updated_at = log.updated_at logger.debug(log.workspace_id) logger.info(f"finished working for log id {log.id}") else: logger.error(f"No record found for {log.account_id}") def main(admin_db_session): update_account_notification_logs(admin_db_session) logger.info("Started Commit") admin_db_session.commit() logger.info("Finished Commit") if __name__ == '__main__': try: admin_db_session = get_database_session() main(admin_db_session) except Exception as exc: logger.error(traceback.format_exc()) finally: logger.info("CLosing db") admin_db_session.close() ``` 5. Deploy All Branches - DataChannelFrontendV2 - feature/Dat-I1372-workspaces - DataChannelFrontendAPI - feature/Dat-I1372-workspaces - DataChannelReverseAPI - feature/Dat-I1372-workspaces - DataChannelModels - feature/Dat-I1372-workspaces - DataChannelMetaAPI - feature/Dat-I1372-workspaces - DataChannelLoggingAPI - feature/I1372-workspace_id-to-logging-routes - DataChannelV2 - feature/I1372-workspace_id-passed-to-notification - DataChannelNotification - feature/I1372-workspace_id-logged-to-notification_logs - DataChannelDjangoAdmin - feature/I1372-workspaces - DataChannelReverse - feature/I1372-workspace_id-passed-to-notification - DataChannelDBT - feature/I1372-workspace_id-passed-to-notification - DataChannelAdminUI - feature/I1372-workspaces