# Steps for workspaces deployment
0. ### Create and Alter tables
```sql
create table e_workspace_status
(
name varchar(100) not null,
constraint e_workspace_status_pk
unique (name)
);
create table workspaces
(
id int auto_increment
primary key,
name varchar(255) not null,
slug varchar(100) not null,
status varchar(255) not null,
is_default tinyint(1) default 0 not null,
timezone varchar(255) DEFAULT 'GMT',
workspace_image text,
created_at timestamp default CURRENT_TIMESTAMP null,
updated_at timestamp default CURRENT_TIMESTAMP null on
update
CURRENT_TIMESTAMP,
constraint workspace_e_workspace_status_name_fk
foreign key (status) references e_workspace_status (name)
);
create table account_workspaces
(
id int auto_increment
primary key,
account_id int unsigned not null,
workspace_id int not null,
created_at timestamp default CURRENT_TIMESTAMP null,
updated_at timestamp default CURRENT_TIMESTAMP null on
update
CURRENT_TIMESTAMP,
constraint account_workspaces_accounts_id_fk
foreign key (account_id) references accounts (id),
constraint account_workspaces_workspaces_id_fk
foreign key (workspace_id) references workspaces (id)
);
create table workspace_users
(
`id` int NOT NULL AUTO_INCREMENT,
`workspace_id` int NOT NULL,
`user_id` int unsigned NOT NULL,
`created_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`updated_at` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON
UPDATE
CURRENT_TIMESTAMP,
`role_id` int unsigned DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `workspace_users_users_id_fk` (`user_id`),
KEY `workspace_users_workspaces_id_fk` (`workspace_id`),
KEY `workspace_users_roles_id_fk` (`role_id`),
CONSTRAINT `workspace_users_roles_id_fk` FOREIGN KEY (`role_id`) REFERENCES `roles` (`id`),
CONSTRAINT `workspace_users_users_id_fk` FOREIGN KEY (`user_id`) REFERENCES `users` (`id`),
CONSTRAINT `workspace_users_workspaces_id_fk` FOREIGN KEY (`workspace_id`) REFERENCES `workspaces` (`id`)
);
```
```sql
-- Run before everything
update
dc_admin.configured_reports
set
deleted_at = null
where
deleted_at < '2015-01-01';
alter table default_reports
add workspace_id int null;
alter table default_reports
add constraint default_reports_workspaces_id_fk
foreign key (workspace_id) references workspaces (id);
alter table storages
add workspace_id int null;
alter table storages
add constraint storages_workspaces_id_fk
foreign key (workspace_id) references workspaces (id);
alter table transformations
add workspace_id int null;
alter table transformations
add constraint transformations_workspaces_id_fk
foreign key (workspace_id) references workspaces (id);
alter table configured_reports
add workspace_id int null;
alter table configured_reports
add constraint configured_reports_workspaces_id_fk
foreign key (workspace_id) references workspaces (id);
alter table configured_syncs
add workspace_id int null;
alter table configured_syncs
add constraint configured_syncs_workspaces_id_fk
foreign key (workspace_id) references workspaces (id);
alter table datasource_creds
add workspace_id int null;
alter table datasource_creds
add constraint datasource_creds_workspaces_id_fk
foreign key (workspace_id) references workspaces (id);
alter table data_models
add workspace_id int null;
alter table data_models
add constraint data_models_workspaces_id_fk
foreign key (workspace_id) references workspaces (id);
alter table account_notifications
add workspace_id int null;
alter table account_notifications
add constraint account_notifications_workspaces_id_fk
foreign key (workspace_id) references workspaces (id);
alter table data_warehouses
add workspace_id int null;
alter table data_warehouses
add constraint data_warehouses_workspaces_id_fk
foreign key (workspace_id) references workspaces (id);
alter table report_templates
add workspace_id int null;
alter table report_templates
add constraint report_templates_workspaces_id_fk
foreign key (workspace_id) references workspaces (id);
alter table dbt_credentials
add workspace_id int null;
alter table dbt_credentials
add constraint dbt_credentials_workspaces_id_fk
foreign key (workspace_id) references workspaces (id);
alter table dbt_projects
add workspace_id int null;
alter table dbt_projects
add constraint dbt_projects_workspaces_id_fk
foreign key (workspace_id) references workspaces (id);
ALTER TABLE notification_logs
ADD COLUMN workspace_id integer;
alter table notification_logs
add constraint notification_logs_workspaces_id_fk
foreign key (workspace_id) references workspaces (id);
ALTER TABLE orchestrations ADD workspace_id INT DEFAULT null NULL;
ALTER TABLE orchestrations ADD CONSTRAINT orchestrations_workspaces_FK FOREIGN KEY (workspace_id) REFERENCES workspaces(id);
```
### Alter table for logs db
```sql
use dc_logs;
alter table agg_report_run_status
add workspace_id int null;
alter table log_reverse_sync_runs
add workspace_id int null;
ALTER TABLE log_orchestration_runs ADD workspace_id INT DEFAULT null NULL;
```
- Insert ACTIVE,DELETED,PAUSED values into e_workspace_status table
1. ### Creating default account workspaces
```python=
import traceback
from loguru import logger
from dotenv import load_dotenv; load_dotenv()
from datachannel_models.reverse_connector import ReverseConnector
from datachannel_models.account import Account
from datachannel_models.workspace import Workspace
from datachannel_models.account_workspace import AccountWorkspace
from datachannel_models.account_workspace import Workspace
from datachannel.common.database import get_database_session
def create_workspace_for_account(account, db_session):
logger.info(f"adding for account {account.id}")
workspace_name = account.name
workspace_slug = f"{account.slug}_default"
timezone = account.account_timezone
workspace = Workspace(name = workspace_name, slug = workspace_slug, timezone = timezone, status="ACTIVE", is_default = True)
db_session.add(workspace)
db_session.flush()
account_workspace = AccountWorkspace(account_id = account.id, workspace_id = workspace.id)
db_session.add(account_workspace)
logger.info(f"finished adding for account {account.id}")
def main(db_session):
accounts = db_session.query(Account).all()
for account in accounts:
create_workspace_for_account(account, db_session)
logger.info("Completed")
db_session.commit()
if __name__ == '__main__':
try:
db_session = get_database_session()
main(db_session)
except Exception as exc:
print(traceback.format_exc())
finally:
logger.info("CLosing db session")
db_session.close()
```
2. ### User migration to workspace created in step 1
- Add role named "Super-Admin" in roles table in admin db
```sql
INSERT INTO dc_admin.roles (name,created_at,updated_at,service) VALUES
('Super-Admin',NULL,NULL,'Super-Admin');
```
- Add permissions to superadmin role. Same permissions as admin. Add 3 extra permissions - **workspace.read, workspace.create, workspace.edit. Replace id 9 with super-admin role id from step 2.**
```sql
INSERT INTO dc_admin.permissions (role_id,permission,created_at,updated_at,service) VALUES
(9,'report.create',NULL,NULL,'FRONTEND_APP'),
(9,'report.edit',NULL,NULL,'FRONTEND_APP'),
(9,'user.create',NULL,NULL,'FRONTEND_APP'),
(9,'user.edit',NULL,NULL,'FRONTEND_APP'),
(9,'user.read',NULL,NULL,'FRONTEND_APP'),
(9,'datadestination.edit',NULL,NULL,'FRONTEND_APP'),
(9,'datadestination.create',NULL,NULL,'FRONTEND_APP'),
(9,'notification.read',NULL,NULL,'FRONTEND_APP'),
(9,'notification.add',NULL,NULL,'FRONTEND_APP'),
(9,'notification.delete',NULL,NULL,'FRONTEND_APP');
INSERT INTO dc_admin.permissions (role_id,permission,created_at,updated_at,service) VALUES
(9,'report.run',NULL,NULL,'FRONTEND_APP'),
(9,'account.create',NULL,NULL,'FRONTEND_APP'),
(9,'template.create',NULL,NULL,'FRONTEND_APP'),
(9,'transformation.edit',NULL,NULL,'FRONTEND_APP'),
(9,'transformation.delete',NULL,NULL,'FRONTEND_APP'),
(9,'transformation.run',NULL,NULL,'FRONTEND_APP'),
(9,'transformation.add',NULL,NULL,'FRONTEND_APP'),
(9,'transformation.read',NULL,NULL,'FRONTEND_APP'),
(9,'datawarehouse.read',NULL,NULL,'FRONTEND_APP'),
(9,'datawarehouse.add',NULL,NULL,'FRONTEND_APP');
INSERT INTO dc_admin.permissions (role_id,permission,created_at,updated_at,service) VALUES
(9,'datawarehouse.delete',NULL,NULL,'FRONTEND_APP'),
(9,'datawarehouse.edit',NULL,NULL,'FRONTEND_APP'),
(9,'user.delete',NULL,NULL,'FRONTEND_APP'),
(9,'workspace.read',NULL,NULL,'FRONTEND_APP'),
(9,'workspace.create',NULL,NULL,'FRONTEND_APP'),
(9,'workspace.edit',NULL,NULL,'FRONTEND_APP');
```
- script for migrating users. **Change on prod accordingly (SUperadmin role id)**
```python=
import traceback
from loguru import logger
from dotenv import load_dotenv; load_dotenv()
from datachannel_models.reverse_connector import ReverseConnector
from datachannel_models.role import Role
from datachannel_models.account import Account
from datachannel_models.account_role_user import AccountRoleUser
from datachannel_models.workspace_user import WorkspaceUser
from datachannel_models.workspace import Workspace
from datachannel_models.user import User
from datachannel_models.account_workspace import AccountWorkspace
from datachannel_models.account_workspace import Workspace
from datachannel.common.database import get_database_session
def main(db_session):
accounts = db_session.query(Account).all()
for account in accounts:
logger.info(f"account id = {account.id}")
acc_workspace = db_session.query(AccountWorkspace).filter_by(account_id = account.id).one()
acu = db_session.query(AccountRoleUser).filter_by(account_id = account.id).order_by(AccountRoleUser.id.asc()).all()
added_super_admin = False
for account_role_user in acu:
if account_role_user.user:
logger.info(f"started for account role user id = {account_role_user.id}")
logger.info(f"working on user id = {account_role_user.user.id}")
logger.debug(account_role_user.user.id)
role = account_role_user.role_id
if account_role_user.user.status == "ACTIVE" and not added_super_admin:
role = 9 # Change on prod accordingly (SUperadmin role id)
added_super_admin = True
workspace_user = WorkspaceUser(user_id = account_role_user.user.id, workspace_id = acc_workspace.workspace_id, role_id = account_role_user.role_id)
account_role_user.role_id = role
db_session.add(workspace_user)
logger.info(f"finished for user id = {account_role_user.user.id}")
logger.info("Completed")
db_session.commit()
if __name__ == '__main__':
try:
db_session = get_database_session()
main(db_session)
except Exception as exc:
print(traceback.format_exc())
finally:
logger.info("CLosing db")
db_session.close()
```
- Updating firebase users to set custom claim to the default workspace
```python=
import traceback
import firebase_admin
from firebase_admin import credentials
from firebase_admin import auth
from loguru import logger
from dotenv import load_dotenv; load_dotenv()
from datachannel_models.workspace_user import WorkspaceUser
from datachannel_models.user import User
from datachannel.common.database import get_database_session
# change to firebaseConfig.prod.json on production deployment
cred = credentials.Certificate(
"/app/datachannel/firebaseConfig.stage.json")
firebase_app = firebase_admin.initialize_app(cred)
def main(db_session):
users = db_session.query(User).all()
for user in users:
logger.info(f"working on user_id {user.id}")
firebase_id = user.firebase_id
workspaceUser = db_session.query(WorkspaceUser).filter_by(user_id = user.id).first()
if workspaceUser:
try:
auth.set_custom_user_claims(firebase_id,
{"userId": user.id,
"workspace_id": workspaceUser.workspace_id}, app=None)
except Exception as exc:
logger.error(exc)
logger.info(f"completed for user_id {user.id}")
logger.info("Completed")
db_session.commit()
if __name__ == '__main__':
try:
db_session = get_database_session()
main(db_session)
except Exception as exc:
print(traceback.format_exc())
finally:
logger.info("CLosing db")
db_session.close()
```
3. ### MIgrating Entities for workspace
```python=
import traceback
from loguru import logger
from dotenv import load_dotenv; load_dotenv()
from datachannel_models.account import Account
from datachannel_models.datasource import Datasource
from datachannel_models.data_destination import DataDestination
from datachannel_models.account_workspace import AccountWorkspace
from datachannel_models.configured_report import ConfiguredReport
from datachannel_models.configured_sync import ConfiguredSync
from datachannel_models.data_warehouse import DataWarehouse
from datachannel_models.transformation import Transformation
from datachannel_models.datasource_cred import DatasourceCred
from datachannel.models.core import DefaultReport
from datachannel_models.storage import Storage
from datachannel_models.dbt_credential import DBTCredential
from datachannel_models.dbt_project import DBTProject
from datachannel_models.data_model import DataModel
from datachannel.models.core import AccountNotification
from datachannel_models.user import User
from datachannel.common.database import get_database_session
from datachannel_models.orchestration import Orchestration
def update_orchestrations(account, workspace_id, db_session):
logger.info("update_orchestrations")
orchestrations = db_session.query(Orchestration).filter_by(account_id = account.id).all()
for orchestration in orchestrations:
orchestration.workspace_id = workspace_id
def update_configured_reports(account, workspace_id, db_session):
logger.info("update_configured_reports")
configured_reports = db_session.query(ConfiguredReport).filter_by(account_id = account.id).all()
for report in configured_reports:
report.workspace_id = workspace_id
def update_configured_syncs(account, workspace_id, db_session):
logger.info("update_configured_syncs")
configured_syncs = db_session.query(ConfiguredSync).filter_by(account_id = account.id).all()
for sync in configured_syncs:
sync.workspace_id = workspace_id
def update_warehouses(account, workspace_id, db_session):
logger.info("update_warehouses")
warehouses = db_session.query(DataWarehouse).filter_by(account_id = account.id).all()
for warehouse in warehouses:
warehouse.workspace_id = workspace_id
def update_storages(account, workspace_id, db_session):
logger.info("update_storages")
storages = db_session.query(Storage).filter_by(account_id = account.id).all()
for storage in storages:
storage.workspace_id = workspace_id
def update_transformations(account, workspace_id, db_session):
logger.info("update_transformations")
transformations = db_session.query(Transformation).filter_by(account_id = account.id).all()
for transformation in transformations:
transformation.workspace_id = workspace_id
def update_datasource_creds(account, workspace_id, db_session):
logger.info("update_datasource_creds")
creds = db_session.query(DatasourceCred).filter_by(account_id = account.id).all()
for cred in creds:
cred.workspace_id = workspace_id
def update_default_reports(account, workspace_id, db_session):
logger.info("update_default_reports")
d_reports = db_session.query(DefaultReport).filter_by(account_id = account.id).all()
for d_report in d_reports:
d_report.workspace_id = workspace_id
def update_dbt_cred(account, workspace_id, db_session):
logger.info("update_dbt_cred")
creds = db_session.query(DBTCredential).filter_by(account_id = account.id).all()
for cred in creds:
cred.workspace_id = workspace_id
def update_dbt_projects(account, workspace_id, db_session):
logger.info("update_dbt_projects")
projects = db_session.query(DBTProject).filter_by(account_id = account.id).all()
for project in projects:
project.workspace_id = workspace_id
def update_data_models(account, workspace_id, db_session):
logger.info("update_data_models")
models = db_session.query(DataModel).filter_by(account_id = account.id).all()
for model in models:
model.workspace_id = workspace_id
def update_account_notifications(account, workspace_id, db_session):
logger.info("update_account_notifications")
nots = db_session.query(AccountNotification).filter_by(account_id = account.id).all()
for noti in nots:
noti.workspace_id = workspace_id
def main(db_session):
logger.info("Started")
accounts = db_session.query(Account).all()
for account in accounts:
logger.debug(f"################## account = {account.id} ##################")
acc_workspace = db_session.query(AccountWorkspace).filter_by(account_id = account.id).one()
update_warehouses(account, acc_workspace.workspace_id, db_session)
update_storages(account, acc_workspace.workspace_id, db_session)
update_configured_reports(account, acc_workspace.workspace_id, db_session)
update_datasource_creds(account, acc_workspace.workspace_id, db_session)
update_data_models(account, acc_workspace.workspace_id, db_session)
update_configured_syncs(account, acc_workspace.workspace_id, db_session)
update_transformations(account, acc_workspace.workspace_id, db_session)
update_default_reports(account, acc_workspace.workspace_id, db_session)
update_dbt_cred(account, acc_workspace.workspace_id, db_session)
update_dbt_projects(account, acc_workspace.workspace_id, db_session)
update_account_notifications(account, acc_workspace.workspace_id, db_session)
update_orchestrations(account, acc_workspace.workspace_id, db_session)
logger.info("Completed")
db_session.commit()
if __name__ == '__main__':
try:
db_session = get_database_session()
main(db_session)
except Exception as exc:
print(traceback.format_exc())
finally:
logger.info("CLosing db")
db_session.close()
```
4. ### Migrating Logs
- Script for AggReportRunStatus
```python=
import traceback
from loguru import logger
from dotenv import load_dotenv; load_dotenv()
from datachannel_models.account import Account
from datachannel_models.log_tables.agg_report_run_status import AggReportRunStatus
from datachannel_models.account_workspace import AccountWorkspace
from datachannel.common.database import get_database_session
account_id_to_worksapce_id_map = {}
def update_agg_report_run_status(admin_db_session, log_db_session):
logs = log_db_session.query(AggReportRunStatus).all()
for log in logs:
if log.account_id:
logger.info(f"working for log id {log.id}")
account_workspace_id = None
if account_id_to_worksapce_id_map.get(log.account_id, None) is not None:
account_workspace_id = account_id_to_worksapce_id_map.get(log.account_id, None)
else:
account_workspace = admin_db_session.query(AccountWorkspace).filter_by(account_id = log.account_id).one()
account_id_to_worksapce_id_map[log.account_id] = account_workspace.workspace_id
account_workspace_id = account_workspace.workspace_id
logger.debug(account_workspace_id)
log.workspace_id = account_workspace_id
log.ts_updated = log.ts_updated
logger.debug(log.workspace_id)
logger.info(f"finished working for log id {log.id}")
else:
logger.error(f"No record found for {log.account_id}")
def main(admin_db_session, log_db_session):
update_agg_report_run_status(admin_db_session, log_db_session)
logger.info("Starting Commit")
log_db_session.commit()
logger.info("Completed")
if __name__ == '__main__':
try:
log_db_session = get_database_session("logs")
admin_db_session = get_database_session()
main(admin_db_session, log_db_session)
except Exception as exc:
logger.error(traceback.format_exc())
finally:
logger.info("CLosing db")
admin_db_session.close()
log_db_session.close()
```
- Script to migrate log_reverse_sync_run
```python=
import traceback
from loguru import logger
from dotenv import load_dotenv; load_dotenv()
from datachannel_models.account import Account
from datachannel_models.log_tables.log_reverse_sync_run import LogReverseSyncRun
from datachannel_models.account_workspace import AccountWorkspace
from datachannel.common.database import get_database_session
account_id_to_worksapce_id_map = {}
def update_log_reverse_sync_run(admin_db_session, log_db_session):
logs = log_db_session.query(LogReverseSyncRun).all()
for log in logs:
if log.account_id:
logger.info(f"working for log id {log.id}")
account_workspace_id = None
if account_id_to_worksapce_id_map.get(log.account_id, None) is not None:
account_workspace_id = account_id_to_worksapce_id_map.get(log.account_id, None)
else:
account_workspace = admin_db_session.query(AccountWorkspace).filter_by(account_id = log.account_id).one()
account_id_to_worksapce_id_map[log.account_id] = account_workspace.workspace_id
account_workspace_id = account_workspace.workspace_id
logger.debug(account_workspace_id)
log.workspace_id = account_workspace_id
log.updated_at = log.updated_at
logger.debug(log.workspace_id)
logger.info(f"finished working for log id {log.id}")
else:
logger.error(f"No record found for {log.account_id}")
def main(admin_db_session, log_db_session):
update_log_reverse_sync_run(admin_db_session, log_db_session)
logger.info("Started Commit")
log_db_session.commit()
logger.info("Finished Commit")
if __name__ == '__main__':
try:
log_db_session = get_database_session("logs")
admin_db_session = get_database_session()
main(admin_db_session, log_db_session)
except Exception as exc:
logger.error(traceback.format_exc())
finally:
logger.info("CLosing db")
admin_db_session.close()
log_db_session.close()
```
- Script to migrate dc_admin.notification_logs table
```python=
import traceback
from loguru import logger
from dotenv import load_dotenv; load_dotenv()
from datachannel_models.account import Account
from datachannel_models.notification_logs import NotificationLog
from datachannel_models.account_workspace import AccountWorkspace
from datachannel.common.database import get_database_session
account_id_to_worksapce_id_map = {}
def update_account_notification_logs(admin_db_session):
logs = admin_db_session.query(NotificationLog).all()
for log in logs:
if log.account_id:
logger.info(f"working for log id {log.id}")
account_workspace_id = None
if account_id_to_worksapce_id_map.get(log.account_id, None) is not None:
account_workspace_id = account_id_to_worksapce_id_map.get(log.account_id, None)
else:
account_workspace = admin_db_session.query(AccountWorkspace).filter_by(account_id = log.account_id).one()
account_id_to_worksapce_id_map[log.account_id] = account_workspace.workspace_id
account_workspace_id = account_workspace.workspace_id
logger.debug(account_workspace_id)
log.workspace_id = account_workspace_id
log.updated_at = log.updated_at
logger.debug(log.workspace_id)
logger.info(f"finished working for log id {log.id}")
else:
logger.error(f"No record found for {log.account_id}")
def main(admin_db_session):
update_account_notification_logs(admin_db_session)
logger.info("Started Commit")
admin_db_session.commit()
logger.info("Finished Commit")
if __name__ == '__main__':
try:
admin_db_session = get_database_session()
main(admin_db_session)
except Exception as exc:
logger.error(traceback.format_exc())
finally:
logger.info("CLosing db")
admin_db_session.close()
```
5. Deploy All Branches
- DataChannelFrontendV2 - feature/Dat-I1372-workspaces
- DataChannelFrontendAPI - feature/Dat-I1372-workspaces
- DataChannelReverseAPI - feature/Dat-I1372-workspaces
- DataChannelModels - feature/Dat-I1372-workspaces
- DataChannelMetaAPI - feature/Dat-I1372-workspaces
- DataChannelLoggingAPI - feature/I1372-workspace_id-to-logging-routes
- DataChannelV2 - feature/I1372-workspace_id-passed-to-notification
- DataChannelNotification - feature/I1372-workspace_id-logged-to-notification_logs
- DataChannelDjangoAdmin - feature/I1372-workspaces
- DataChannelReverse - feature/I1372-workspace_id-passed-to-notification
- DataChannelDBT - feature/I1372-workspace_id-passed-to-notification
- DataChannelAdminUI - feature/I1372-workspaces