**Indici data load process - dev steps** :::info **Useful resources** - Wiki overview: http://13.238.170.158/mediawiki/index.php/Indici_data - AWS SDK for Python (Boto3): https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/index.html ::: # Contents [TOC] # Goals > What are we trying to achieve here? The aim of this piece of work is to have cleaned, managed data from Indici available for analysis/use in a relational database. Incoming data should be staged as-is in one schema, and then deduplicated into a reporting schema for end use (we will probably use [DBT](https://www.getdbt.com/) for this part, but it is not confirmed yet). Incoming raw data follows a [delta encoding](https://en.wikipedia.org/wiki/Delta_encoding) approach, meaning that only differences from the previous day should be sent to us. However, it turns out that if there is no new data then Valentia still do send us an empty file. Additionally, Valentia notify us that certain datasets do not follow the delta approach and they will send the full dataset each time (this seems to apply to a few reference/lookup datasets only). # Requirements > What will I need to work through these steps? Some key requirements to note before proceeding further with any development work: * Account access to AWS console in our account * MFA device for authentication * AWS CLI setup for programmatic access (refer to [AWS guide](https://aws.amazon.com/cli/)) * A local python development environment, including Jupyter notebook * A SQL IDE to access RDS instance (for example, [DBeaver](https://dbeaver.io/)). Note that whilst SQL can easily be executed using python, a SQL IDE will be more useful here for finding table creation code and viewing/setting permissions etc. * Basic understanding of document databases. This solution uses Amazon DynamoDB, and some documentation for it can be found [here](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Introduction.html). * Basic understanding of the parquet file format. Some useful general information can be found [here](https://www.databricks.com/glossary/what-is-parquet). # Workflow overview Below image provides a high-level overview of the current solution. ![indici_load.drawio](https://hackmd.io/_uploads/BJHmTY8ap.png) # Current state ::: info **Note**: There are definitely lots of ways this process can be improved! Please make a note of any inefficiencies, issues or ideas for improvement you notice so we can think about implementation. ::: As at 7/3/2024 the framework for this solution has been implemented, and a total of 9 datasets are being actively managed by it. Indici provide a total of 94 files daily, so the remaining 85 all need to be added to the solution using the steps in the next section! A full list of the datasets and their current status is shown below - **please keep this updated**. Files marked 'High priority' should be done first where possible. | Table/filename | High priority? | Implemented? | Comments | | -------- | -------- | -------- | -------- | ACC ||Y| Accidents|Y|Y| AccidentsDiagnosis||Y| AdmintblGeneralPractice||Y| AdmintblGeneralPracticeProviders||Y| AdmintblGeneralPracticeSpeciality||Y|Empty| AdmintblLocationPractice||Y|Empty| AdmintblPracticeLocation||Y| AdmintblPracticeLocationAddress||Y| AdmintblProviderLocation||Y| Allergies||Y| AppointmentMaternity||Y|Empty| AppointmentMedications|Y|Y| Appointments|Y|Y| AppointmenttblACCDiagnosis||Y| AppointmenttblACCReferral||Y| AppointmenttblAppointment|Y|Y| AppointmenttblAppointmentDetail|Y|Y| AppointmenttblFamilyDiseases||Y|Empty| AppointmenttblFamilyHistory||Y|Empty| AppointmenttblHTIOutcomeRecorded||Y|Empty| AppointmenttblHTIReferral||Y|Empty|Empty| AppointmenttblHTIReferralAudit||Y|Empty| AppointmenttblLetterDocument||Y| AppointmenttblMedications|Y|Y| AppointmenttblObservation|Y|Y| AppointmenttblReasonForVisit||Y| AppointmenttblScreening|Y|Y| AppointmenttblScreeningData|Y|Y| AppointmenttblScreeningDiagnosis||Y|Empty| AppointmenttblScreeningRecall||Y|Empty| AppointmenttblTimeLine||Y| CarePlan||Y| CarePlanDiagnosis||Y|Empty| Claims||Y| Diagnosis|Y|Y| Funder||Y| HbAcResult||Y| HTIReferral||Y|Empty| HTIReferralAudit||Y|Empty| HTIReferralDetail||Y|Empty| HTIReferralOutcome||Y|Empty| Immunisation|Y|Y| ImmunisationSchedule||Y| Inbox|Y|Y| InboxDetail|Y|Y| InvoiceDetail||Y| Invoices||Y| InvoicesDetailHistorical||Y| IQISummary||Y| LabOrder||Y| LabRad||Y| LettersAndDocuments||Y| LookuptblAppointmentSttaus||Y| LookuptblDisease||Y| LookuptblEnrollmentStatus||Y| LookuptblEventType||Y| LookuptblFundingStatus||Y| LookuptblMedicine||Y| LookuptblSubstance||Y| LookuptblVaccine||Y| LookuptblVaccineOutCome||Y| Measurements|Y|Y| MeasurementServiceTemplateDetail||Y|Empty| MedicationDiagnosis||Y|Empty| NESEnrolment||Y| NextOfKin||Y| Outbox||Y| OutboxDetail||Y|Empty| Patient|Y|Y| PatientAlerts||Y| PatientLwi||Y|Empty| PatientSARecord||Y| PatientTask||Y| Payments||Y|Empty| PortalMessages||Y|Empty| PracticeInfo||Y| ProfileLanguage||Y|Empty| Provider||Y| ProviderTask||Y| QuickConsult|Y|Y| QuickConsultHistory||Y| ReadCodeToSnomedMappingExtract||Y|Lambda function used the maximum memory allocated - 128MB| Recalls||Y| ReferralGroupPatient||Y|Empty| ReferralGroupSession||Y|Empty| ReferralPatient||Y|Empty| ReferralPatientOutCome||Y|Empty| Roster||Y| Services|Y|Y| SnomedToReadCodeMappingExtract||Y|Lambda function used the maximum memory allocated - 128MB| TaskActivity||Y|Empty| TimeLine|Y|Y| TriageTemplate||Y|Empty| # Development steps :::warning **Note**: This section gives an overview of steps required to implement a single incoming dataset within the solution. The process is quite repetitive, but does requires use of judgement - particularly around subsetting data. ::: ## Inspect the parquet data First of all, you should access a sample of the incoming parquet data to understand the structure and data types involved. Pick the dataset you want to work on next, then scan the relevant subfolder in `arn:aws:s3:::kpa-indici-partitioned` to find a file that contains data. For example, if you are working on the `Patient` data, then you can view all files and their sizes using the CLI like: ```bash= aws s3 ls --profile kaute --recursive --human-readable --summarize s3://kpa-indici-partitioned/Patient/ ``` This will output every single file in that path and its size. Keeping this example, we can see that the smallest filesize is 110.4 KiB - suggesting that these are empty. We can pick any file with a size greater than this to get one containing real data. Download the file you want to inspect locally using the CLI like: ```bash= aws s3 cp --profile kaute s3://kpa-indici-partitioned/Patient/dt=2024-03-07/Patient.parquet . ``` :::danger **Danger!** Any data downloaded to your device should be deleted **immediately** after use. These files contain sensitive personal information and must be handled very carefully. ::: Now you have a sample parquet dataset, you must use python to read it. For example, viewing data in a file can be done like: ```python= import pandas as pd pd.options.display.max_columns = None # force show all columns df = pd.read_parquet('Patient.parquet',engine='fastparquet') print(df.head(30)) ``` The embedded schema of the data in the parquet file can be examined via: ```python= from fastparquet import ParquetFile pf = ParquetFile('Patient.parquet') print(pf.schema) ``` ## Design/create landing table Now that you can see the data, and understand the parquet schema, it's time to build a table in the `indici_staging` schema to load it. It's often helpful to just copy the DDL from a table that already exists, and edit it. If doing this in DBeaver, you can find it by double-clicking on a table, then going to 'DDL': ![dbeaver_ddl](https://hackmd.io/_uploads/SynsKqITa.png) Requirements for landing table creation: * A database field should be defined for *every column* in the incoming data set * Where possible, sensible data types should be selected. Specifically: * any field name indicating a date should be stored as `date`. * fields like `insertedat` and `updatedat` should be stored as `timestamp`. * fields that are definitely numeric should be stored as either `int` or `smallint` depending on range (refer to [PostgreSQL documentation](https://www.postgresql.org/docs/current/datatype-numeric.html)). Any field larger than `int` can just be stored as `text`. * any other field can just be set as `text` (note that this data type is generally recommended and setting specific varchar(x) data types is not a good approach in PLSQL - see [here](https://www.postgresql.org/docs/current/datatype-character.html) for discussion). * **Important!** Two additional metadata columns must be added to each table: * `filesourcekey(text)` * `kptinsertedat(timestamptz)` ::: success If you're unsure about the correct data type, find an example in the sample data. If there is none or you are *really* unsure then just set it to `text`. ::: ## Create record in DynamoDB This step creates a record so that the lambda function can both find the new key, and retrieve working code to execute for that dataset. The DynamoDB table is `indiciLoadSQL`. A single partition key is used (`pk_s3FileKey`), and can retrieve the following values for execution by lambda: * `df` This is used by lambda to define a *subset* of the incoming data to load into RDS. To be clear, the destination table created in `indici_staging` should contain ALL columns of the dataset. However, in this step we can choose columns to ignore. This step is in place because the incoming data provides access to everything, but we definitely do not want to load everything to RDS. For example, we do not want: * consultation notes * free text fields containing personal information * any other item that does not look immediately usable (ie. we should only load the minimum necessary - please look at some of the current state examples to get an idea of this). * `sql` This is used by lambda to insert the incoming data into your new table. Syntax is: * columns to insert: double quotes and comma separated, * values to insert: %(`fieldname`)s and comma separated. * Again, look at a current example in DynamoDB and use as a base. :::warning Be really cautious about accidentally leaving leading or trailing spaces in these entries - they are *directly executed* by the lambda function so the DynamoDB entries must be structured very carefully. ::: # Testing your changes Upon completion of the above steps, everything should be ready to go the next time a file of that type arrives in S3. We need to do two test processes before moving on to the next one: 1. Check that your changes have worked, and if successful 2. Backload all the files we have to date of that type. ## Check it works A proper test sequence would be really useful here! But for now, the easiest approach for initial testing is to simply re-upload the sample data you obtained earlier to its proper location. For example, if you downloaded `s3://kpa-indici-partitioned/Patient/dt=2024-03-07/Patient.parquet` then you can simply copy it back to that location using CLI. Because this acts as a PUT operation, it will automatically trigger the lambda function and you will immediately be able to see if the data for that file has been written to your table and also the `auditlog`. If you can see new entries here then it worked! Congratulations. If you don't see new entries, then the lambda operation failed and you'll have to diagnose and fix it. Each lambda run creates detailed logs in Cloudwatch - you can inspect these in the console by going to Cloudwatch, then choosing the log group `/aws/lambda/indiciLoadAppointmentMedications`. Errors or exceptions raised by the function will be recorded here. ## Backloading data Once you know it works, delete all database entries from your last run. For example, ```sql= truncate table indici_staging.your_new_table ``` and also, ```sql= delete * from indici_staging.auditlog where table = your_new_table and action_ts::date = current_date ``` Now you're ready to trigger a pick up of all existing files for your dataset. Switch to lambda function `reProcessIncomingFiles`. This simply executes a bulk reload of files from `kpa-valentia` to `kpa-indici-partitioned`. You must specify the file prefix of the files you've been working on, currently this can be manually edited on L21. Then, before executing just perform a test run by commenting out L47 and reviewing the print statements output from the final line. This is just to check that the correct destination path for each file is being set. If it looks ok, then uncomment L47 and execute the function. :::danger **Note** that any changes to lambda code have to be explicitly 'deployed' before they will take effect! ::: This should just send the same files across to `kpa-indici-partitioned`, but will now also trigger the `indiciLoadAppointmentMedications` function. Because you've correctly set up a DynamoDB key-value pair, the SQL will execute and write it all to the database. Again, if you experience any errors then the Cloudwatch logs should provide some clue. If anything does go wrong, then any changes you made to the RDS will need to be reset again before running `reProcessIncomingFiles`.