# Integrations API: Sync Endpoint
We need to implement an endpoint and a command that will:
1. Add a sync command to the queue with a sync ID
2. Create a sync entity and persist it, starting in the queued state
Then each integration-specific sync receiver needs to report on the status of the sync:
1. When it starts syncing
2. When it completes
3. When it fails
Only one sync per integration per client will be allowed at a time. If a sync is requested and one is already queued but not yet started, it will be ignored.
The sync will then go on a Leadflo-managed queue of syncs that will be processed and completed at a regular interval (once every five minutes seems appropriate).

## Questions
* ~~How do we enforce one sync at a time at the database level?~~
* ~~How do we know there is only one sync at a time?~~
* ~~How are sync placed on the queue?~~
* ~~How do we process the queue of outstanding syncs?~~
* ~~How do we store, if necessary, the object IDs that need to be synced?~~
* ~~How do we provide the correct `Sync` command for each integration?~~
* ~~How do we instantiate the `Sync` command with the correct arguments?~~
* ~~How do we hide the knowledge of how the sync state is tracked?~~
* ~~How do we know a sync is successful?~~
* ~~How do we know a sync has failed i.e exceeded all retry attempts?~~
## Database
We can enforce one sync at a time per-client, per-integration by creating a unique constraint with the condition:
```
state = 'queued'
```
We should also enforce one integration sync per client per integration too, since there can only be one sync happening at a time.
This could be achieved by:
```
state = 'syncing'
```
We should perhaps consider:
* Implementing `CHECK` exceptions so the `state` column makes sense with respect to the `queued_at`, `syncing_at`, `failed_at` and `completed_at` columns
* Derive the `state` property from state-change timestamps in the DB
The latter would be simpler and require much less code at the DB level. We could still employ `CHECK` constraints that enforce state rules to act as a fail-safe against corrupted/unreadable state.
The biggest advantage to tracking the state independently is that it massively simplifies the uniqueness constraints above. Otherwise they would need to be:
```
queued_at is not null
and syncing_at is null
and failed_at is null
and completed_at is null
```
and:
```
queued_at is not null
and syncing_at is not null
and failed_at is null
and completed_at is null
```
But this isn't that big of a deal compared the amount of code required to keep the two representations of state synchronised at all times.
## Triggers
A sync can be triggered by:
* Manually requesting a sync
* A webhook event is received (in some cases)
* Connecting an integration for the first time
These should emit a `SyncRequested` event. A listener receives this and creates the `IntegrationSync` in the queued state, doing nothing if there already exists a queued integration sync for the client integration.
```php
// TBD: The shape of this event needs integration and sync ID
final class SyncRequested extends Event
{
}
```
Some integrations may need to buffer data to be processed by the sync received by webhooks. The Messenger API, for example, does not allow pull-based syncs and it only works for new messages going forward. In this instance, the integration is responsible for buffering messages to be processed - this is how it should work for any APIs that only send data via webhooks.
Other APIs, like Dentally, Outlook and Gmail, work based on a pull-based model where by webhooks signal we should sync and we use some form of history token (in Dentally's case, the date of last sync) to pull changed data.
In this way, the integrations are responsible for buffering any metadata, like object IDs or objects and the `Integrations` is responsible for controlling when syncs are processed, which will read from the buffer.
When the need arises, we can provide generic APIs to buffer up data in whichever shape integrations need them (based on loosely on how Infusionsoft does it now with Redis).
## Processing
We will bind a listener to the `Heartbeat` event that will schedule syncs every heartbeat.
We should only begin a sync if there is not a sync in progress for the client integration. We can do this by:
* Only listing available syncs to initiate that do not have another sync in the `syncing` state
* By exiting early if starting the sync and one is already in progress
The processor will iterate over the available, queued syncs and use the common factory method provided by `IntegrationSync` interface to create and dispatch the correct sync for the client integration:
```php
interface IntegrationSync
{
public static function from(IntegrationSyncID $id): static;
}
```
Currently, an `IntegrationSyncID` does not exist. We will need to implement this and refactor existing interfaces and implementations to this value object.
To resolve the `IntegrationSync` command, we will use a mapping function that is injected to the listener and declared by each integration as with `WatchFactory` and `AuthFactory`.
## Tracking
This is extremely tricky. We want to track the state of integration syncs but we don't want to repeat that logic for every single receiver.
One possibility is creating a base `SyncReceiver` class but then a particular sync receiver might have it's own dependencies - especially if it's responsible for it's own buffering.
Another possibility is that we could emit a set of events:
* `SyncStarted`
* `SyncCompleted`
* `SyncFailed`
But there is nothing to guarantee that an integration sync emits those events, which could leave integrations in a stale state.
We could take a mixture of both world's and use a trait that:
1. Mandates the provision of `EventPublisher`
2. Provides a final `execute` method
3. Mandates the implementation of `sync` which does the work to sync
The provided final `execute` method would handle all of the error checking logic and emitting events on state change.
```php
trait ReceivesSyncCommands
{
abstract protected function events(): EventPublisher;
abstract protected function sync($cmd): void;
final public function execute($cmd): void
{
// ... sync state logic ...
}
}
```
The `Integrations` provides simple listeners for updating the state of syncs in response to events:
* `RecordSyncStarted`
* `RecordSyncEnded`
* `RecordSyncCompleted`
To reconcile our job retry policy with failing syncs, we could:
* Provide current attempt on all commands
* Provide max attempts on all commands
* Provide a method to test if final attempt on all commands
* Emit `SyncFailed` event if there's a failure on the final attempt
If the events and listeners for tracking state change is simple and similar enough, we could combine into a single `SyncStateChanged` event and `RecordSyncState` event/listener pair:
```php
final class SyncStateChanged {
public function __construct(
public readonly IntegrationSyncID $id,
public readonly IntegrationSyncState $state
) {}
}
```
We know a sync is successful if it completes without failure. This means that we record a sync as complete at the very end and have any failures short circuit completion.
Knowing it has failed simply means an exception is thrown that causes the normal processing of the command to interrupt. We can use a general catch-all to intercept these fails, rethrowing afterwards.
## Tasks
DB constraints:
- [x] Implement queued uniqueness constraint on `integration_syncs`
- [x] Implement syncing uniqueness constraint on `integration_syncs`
- [x] Extend `IntegrationSyncRepository` to account for uniqueness errors
- [x] Implement `CHECK` constraints so `state` lines up with timings
`IntegrationSyncID`:
- [x] Define `IntegrationSyncID` value object
- [x] Refactor `IntegrationSync` to `IntegrationSyncID`
- [x] Refactor `IntegrationSyncRepository` to `IntegrationSyncID`
- [x] Refactor `MemoryIntegrationSyncRepository` to `IntegrationSyncID`
- [x] Refactor `DBIntegrationSyncRepository` to `IntegrationSyncID`
Queuing:
- [x] Define `SyncRequested` event
- [x] Implement `QueueSync` listener
- [x] Bind `QueueSync` to `SyncRequested` event
Sync command map:
- [x] Define `IntegrationSync` command interface
- [x] Refactor `Outlook\Sync` command to interface
- [x] Refactor `Gmail\Sync` command to interface
- [x] Implement a mechanism to resolve the `IntegrationSync` command
Processing:
- [x] Implement `available` method on `IntegrationSyncRepository`
- [x] Implement `ProcessSyncQueue`
- [x] Bind `ProcessSyncQueue` to `Heartbeat` event
Commands:
- [x] Provide `attempt` for all `Command` objects
- [x] Provide `retries` for all `Command` objects
- [x] Implement `finalAttempt` for all `Command` objects
Tracking:
- [x] Define `SyncStateChanged` event
- [x] Implement `RecordSyncState` listener
- [x] Implement `ReceivesSyncCommands`
- [x] Refactor `Outlook\SyncReceiver` to `ReceivesSyncCommands`
- [x] Refactor `Gmail\SyncReceiver` to `ReceivesSyncCommands`
Endpoint:
- [x] Define `POST /integrations/{integration}/sync` endpoint
- [x] Implement `PostIntegrationSync` controller
- [x] Cover `PostIntegrationSync` controller with integration test
Refactoring:
- [ ] Refactor Outlook webhooks to `SyncRequested`
- [ ] Refactor Gmail webhooks to `SyncRequested`