# Connector Runner

The connector runner is the **bidirectional data pipeline** between external systems and the workspace's [world model](https://docs.amigo.ai/developer-guide/platform-api/platform-api/data-world-model). It continuously polls external systems (EHR platforms, FHIR stores, CRMs), writes events with entity resolution (matching records from different systems to the same real-world entity), syncs agent-created data back to source systems after quality review, dispatches scheduled outbound calls, and runs the automated data quality pipeline.

Inbound data arrives via **polling** (dispatched by source type: `ehr`, `fhir_store`, `smart_fhir`, `charmhealth`, `crm`, `rest_api`, `webhook`, `file_drop`) or **real-time webhooks** (for EHR systems that support FHIR Subscriptions or event notifications). Outbound write-back routes on the workspace's **connector type** (`revolution`, `athenahealth`, `epic`, `cerner`, `allscripts`, `fhir_store`, `smart_fhir`, `charmhealth`, `hubspot`) through a handler registry.

{% hint style="info" %}
**The connector runner has no public-facing API.** It operates as a background service with 7 concurrent loops. Data sources and sync configuration are managed through the [Data Sources](https://docs.amigo.ai/api-reference/readme/platform/data-sources) endpoints on the Platform API.
{% endhint %}

## Architecture Overview

The connector runner operates **seven concurrent background processes** that together form the complete data lifecycle - from external system polling through entity resolution, quality review, and outbound sync:

{% @mermaid/diagram content="%%{init: {"theme": "base", "themeVariables": {"primaryColor": "#D4E2E7", "primaryTextColor": "#100F0F", "primaryBorderColor": "#083241", "lineColor": "#575452", "textColor": "#100F0F", "clusterBkg": "#F1EAE7", "clusterBorder": "#D7D2D0"}}}%%
flowchart TB
subgraph Loops\["7 Background Loops"]
L1\["1. Config Refresh\n(60s interval)\nReload data source +\nconnector configs"]
L2\["2. Poll Loop\n(10s interval)\nPoll due sources\nunder distributed mutex"]
L3\["3. Outbound Subscriber\n(real-time pub/sub)\nSync new events\nto EHR immediately"]
L4\["4. Reconciliation\n(300s interval)\nSafety net for\nmissed pub/sub"]
L5\["5. Outbound Dispatch\n(30s interval)\nFire scheduled\noutbound calls"]
L6\["6. Entity Resolution\n(30s interval)\nLink unlinked events\nto entities"]
L7\["7. Review Loop\n(30s interval)\nLLM confidence\nreview pipeline"]
end

```
subgraph External["External Systems"]
    EHR["EHR APIs"]
    FHIR["FHIR Stores"]
    REST["REST APIs"]
end

subgraph WM["World Model"]
    EV["Events"]
    EN["Entities"]
    EG["Entity Graph"]
end

subgraph VA["Voice Agent"]
    CALLS["Active Calls"]
    OB["Outbound Calls"]
end

L2 <-->|"Poll + write"| External
L2 -->|"Events"| WM
L6 -->|"Link + resolve"| WM
L7 -->|"Review + upgrade"| WM
L3 -->|"Sync verified data"| External
L4 -->|"Retry missed"| External
L5 -->|"Dispatch"| VA
VA -->|"Clinical events"| WM" %}
```

## Inbound Pipeline

### Inbound Data (Polling + Webhooks)

Data sources feed into the world model through two mechanisms:

* **Polling** - The poll scheduler evaluates which sources are due and dispatches concurrent poll tasks by source type
* **Real-time webhooks** - EHR systems that support FHIR Subscriptions or event notifications push changes directly to the connector runner. Incoming webhooks are cryptographically verified, deduplicated, and the full resource is fetched and mapped to FHIR before writing to the world model. The connector runner manages subscription lifecycle (creation, renewal, cleanup) per data source.

Both paths converge at the same world model event pipeline with identical entity resolution, enrichment, and confidence handling.

#### Polling

Data sources are polled based on their [sync strategy](https://docs.amigo.ai/developer-guide/platform-api/data-world-model#data-sources) with safeguards to prevent duplicate processing:

{% @mermaid/diagram content="%%{init: {"theme": "base", "themeVariables": {"primaryColor": "#D4E2E7", "primaryTextColor": "#100F0F", "primaryBorderColor": "#083241", "lineColor": "#575452", "textColor": "#100F0F", "clusterBkg": "#F1EAE7", "clusterBorder": "#D7D2D0"}}}%%
flowchart TB
A\["Poll scheduler evaluates\nwhich sources are due"] --> B\["Spawn concurrent\npoll tasks\n(bounded)"]
B --> C{"Source type?"}

```
C -->|"ehr"| D["EHR Adapter"]
C -->|"fhir_store"| FS["FHIR Store Adapter\n(FHIR R4)"]
C -->|"smart_fhir"| SF["SMART FHIR Adapter\n(SMART Backend Services)"]
C -->|"charmhealth"| CH["Charm EHR Adapter\n(OAuth2 + API Key)"]
C -->|"rest_api"| E["REST Connector"]

subgraph EHR["EHR Polling (per resource type)"]
    D1["Practitioners"] --> D2["Locations"]
    D2 --> D3["Schedules"]
    D3 --> D4["Patients"]
    D4 --> D5["Insurance"]
    D5 --> D6["Encounters"]
end

D --> EHR
EHR -->|"Write each type\nindependently"| F["World Model Events"]

FS -->|"Poll FHIR resources\n(configurable cadences)"| F
SF -->|"SMART auth + FHIR R4\n(per-resource cadences)"| F
CH -->|"OAuth2 + api_key\npage-based pagination"| F

E --> G["Content-hash dedup\n(SHA-256)"]
G --> H["Unification engine\n(field mappings)"]
H --> F

F --> I["Post-commit:\nembedding → LLM enrichment\n→ entity embedding"]" %}
```

**Key design principles:**

* **Distributed mutex per source** - Each poll acquires a mutex to prevent double-polling from multiple instances
* **Write at point of knowledge** - Each resource type batch is durable immediately. Partial failures don't lose already-written data
* **Content-hash deduplication** - Hashes prevent duplicate writes on repeated polls
* **Per-record error isolation** - Individual record failures are retried on the next poll cycle without blocking other records

**Adapter features:**

| Feature                        | How It Works                                                                                               |
| ------------------------------ | ---------------------------------------------------------------------------------------------------------- |
| **Full vs light cycles**       | Full sync polls all resource types. Light sync polls only high-frequency types using cached reference data |
| **Business hours gate**        | Polling can be restricted to defined operating hours to respect external API usage patterns                |
| **Reference data caching**     | Frequently-accessed reference data (locations, carrier lists) is cached to reduce API call volume          |
| **Configurable poll cadences** | Per-resource poll intervals via `poll_cadences` in the data source connection config                       |

### Entity Resolution

Events arrive with references to external entities (e.g., an appointment references a patient by external ID). The entity resolution loop links these:

{% @mermaid/diagram content="%%{init: {"theme": "base", "themeVariables": {"primaryColor": "#D4E2E7", "primaryTextColor": "#100F0F", "primaryBorderColor": "#083241", "lineColor": "#575452", "textColor": "#100F0F", "clusterBkg": "#F1EAE7", "clusterBorder": "#D7D2D0"}}}%%
flowchart TB
A\["Query unlinked events\n(no entity assigned)"] --> B\["Parse references\n(deterministic, no I/O)"]
B --> C\["Map to canonical IDs\n(source:ResourceType:id)"]
C --> D\["Group by\ncanonical ID"]
D --> E\["Create or find entity\nper canonical ID\n(idempotent)"]
E --> F\["Link events to entity\n(transactional:\nlink + recompute state\n+ project graph)"]
F --> G\["Cross-source merge\ndetection"]
G --> H\["Create relationships\n(fallback for\nordering races)"]" %}

**Reference resolution** maps external resource types to world model entity types:

| External Type        | Entity Type       | Canonical ID Format           |
| -------------------- | ----------------- | ----------------------------- |
| Patient              | `patient`         | `{source}:Patient:{id}`       |
| Practitioner         | `practitioner`    | `{source}:Practitioner:{id}`  |
| Location             | `location`        | `{source}:Location:{id}`      |
| Appointment          | `appointment`     | `{source}:Appointment:{id}`   |
| Coverage / Encounter | Linked to patient | Via participant references    |
| Slot                 | Skipped           | Marked to prevent re-fetching |

Entity graph relationships are extracted from references (e.g., Appointment.participant → Patient) and projected after linking.

#### Cross-Source Entity Merge

When a workspace has multiple data sources, entity resolution automatically detects when entities from different sources represent the same real-world thing. Matched entities are linked and their projected state is unified.

| Entity Type  | Match Criteria                  | Confidence |
| ------------ | ------------------------------- | ---------- |
| Patient      | Phone number (E.164 normalized) | High       |
| Patient      | Email address                   | High       |
| Patient      | Name + date of birth            | Moderate   |
| Practitioner | NPI                             | High       |
| Practitioner | Name + specialty                | Moderate   |

When a merge is detected, the system creates a relationship between the entities and recomputes both projections to reflect the combined event history. This means a single patient view reflects data from every connected system.

### Post-Commit Enrichment

After events are committed, three background layers run with **zero added latency to the write path**:

| Layer                | What It Does                                                              | When                       |
| -------------------- | ------------------------------------------------------------------------- | -------------------------- |
| **Embedding**        | Generates vector embedding for semantic search                            | Every event + entity write |
| **LLM enrichment**   | AI-powered data enhancement and field extraction (batched for efficiency) | Events needing enrichment  |
| **Entity embedding** | Re-embeds entity after state changes                                      | After entity recomputation |

## Outbound Pipeline

### Three-Layer Confidence Gate

Data captured by the [voice agent](https://docs.amigo.ai/developer-guide/platform-api/platform-api/voice-agent) during calls must pass quality review before syncing to external systems. Three independent layers enforce this - defense in depth:

{% @mermaid/diagram content="%%{init: {"theme": "base", "themeVariables": {"primaryColor": "#D4E2E7", "primaryTextColor": "#100F0F", "primaryBorderColor": "#083241", "lineColor": "#575452", "textColor": "#100F0F", "clusterBkg": "#F1EAE7", "clusterBorder": "#D7D2D0"}}}%%
flowchart TB
subgraph Gates\["Confidence Gates (all must pass)"]
G1\["SQL gate\n(query filters\nconfidence ≥ verified)"]
G2\["Subscriber gate\n(checks before\nEHR write)"]
G3\["Reconciliation gate\n(same filter\non retry path)"]
end

```
VA["Voice agent\nwrites at\npending confidence"] -->|"Below threshold"| HOLD["Held until\nreview upgrades\nconfidence"]

REV["Review loop\nupgrades to\nverified"] --> Gates

Gates --> SYNC["Sync to external system"]" %}
```

| Gate                    | Where                     | What It Prevents                                                                                                                                            |
| ----------------------- | ------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------- |
| **Source allowlist**    | All outbound paths        | Only `voice_agent` events are eligible. `api`, `outbound_sync_agent`, `ehr_sync`, `connector_runner` sources excluded. New sources must be explicitly added |
| **SQL gate**            | Query for unsynced events | Raw voice agent writes never appear in the unsynced event query                                                                                             |
| **Subscriber gate**     | Real-time sync listener   | Checks confidence before writing to EHR                                                                                                                     |
| **Reconciliation gate** | Retry safety net          | Same confidence filter on the fallback path                                                                                                                 |

### Automated Review (LLM Judge)

The review loop bridges the gap between raw voice agent writes and sync-eligible confidence:

{% @mermaid/diagram content="%%{init: {"theme": "base", "themeVariables": {"primaryColor": "#D4E2E7", "primaryTextColor": "#100F0F", "primaryBorderColor": "#083241", "lineColor": "#575452", "textColor": "#100F0F", "clusterBkg": "#F1EAE7", "clusterBorder": "#D7D2D0"}}}%%
flowchart TB
A\["Poll: events with\npending confidence +\nauto\_approved status +\nvoice\_agent source"] --> B\["Per-event:\nLLM judge evaluates\ncompleteness,\nplausibility,\nconsistency"]
B --> C\["Cross-reference:\nentity state\nfor context"]
C --> D{"Structured output\ndecision"}
D -->|"Approve"| E\["✅ Upgrade confidence\nto verified\nstatus → approved"]
D -->|"Reject"| F\["❌ Confidence → 0\nstatus → rejected"]
D -->|"Flag"| G\["⚠️ Held at pending\nstatus → flagged\n→ review queue"]" %}

**Entity-level deduplication**: The review loop skips events whose entity already has a pending review queue item, preventing redundant LLM evaluations. When a verdict flags an event for an entity that already has a pending item, the event ID is appended to the existing item rather than creating a duplicate row.

The review pipeline is designed for minimal cost - lightweight classification and per-event judging keep overhead negligible even at high call volumes.

### Outbound Write-Back

A workspace can write back to **multiple external systems simultaneously** (multi-sink). For example, a workspace might sync patient and appointment data to the EHR while syncing patient and contact data to the CRM. Each sink is configured independently with its own entity types, and sync progress is tracked per-sink - a failure writing to one system does not block writes to others.

Outbound events are routed through a **handler registry** that maps each sink's `connector_type` to the correct write-back handler. The registry is the single routing authority for all outbound paths (both the real-time sync and the reconciliation poller). Events are filtered per-sink by the configured `outbound_entity_types`.

| Connector Type | Write-Back                                                                | Auth                                               |
| -------------- | ------------------------------------------------------------------------- | -------------------------------------------------- |
| `fhir_store`   | FHIR R4 (ETag optimistic locking)                                         | Managed credentials                                |
| `smart_fhir`   | SMART-compliant FHIR R4                                                   | SMART Backend Services (RS384/ES384 JWT assertion) |
| `revolution`   | Revolution EHR adapter                                                    | Managed credentials                                |
| `athenahealth` | athenahealth EHR adapter (Patient create/update, Appointment book/cancel) | OAuth2 client credentials                          |
| `charmhealth`  | Charm EHR adapter                                                         | OAuth2 + API key                                   |
| `crm`          | CRM adapter (contacts, companies, deals)                                  | OAuth token                                        |

For approved events, the connector runner translates world model data into the target system's format using an **entity-first architecture** - events are triggers, entities provide context:

| Event Type                            | What Happens                                                                                           |
| ------------------------------------- | ------------------------------------------------------------------------------------------------------ |
| `patient.created`                     | Create patient record in EHR (with LLM translation for complex field mapping + deterministic fallback) |
| `patient.updated`                     | Merge demographics with current EHR record                                                             |
| `appointment.booked`                  | Create appointment (patient resolved from entity external IDs)                                         |
| `appointment.confirmed` / `cancelled` | Update appointment status                                                                              |
| `coverage.created`                    | Create insurance record (carrier fuzzy-matched from EHR carrier list)                                  |

**Optimistic locking**: For FHIR stores, ETag-based optimistic locking prevents lost updates. On conflict (412), the system re-reads and retries (up to 3 attempts).

**LLM translation**: Complex field mappings (e.g., patient demographics → proprietary EHR format) use an LLM with structured output for flexible translation, with a deterministic fallback mapper if the LLM is unavailable.

**Source allowlist**: Only `voice_agent` events are eligible for outbound sync. Events from `api`, `outbound_sync_agent`, `ehr_sync`, and `connector_runner` are excluded. This is an allowlist, not a blocklist - new sources must be explicitly added to become eligible.

**Confidence-aware dependencies**: When an outbound write depends on another entity (e.g., appointment creation requires the patient to exist in the EHR), the dependency check evaluates confidence. If the dependency entity's confidence is below the outbound threshold, the dependency returns "failed" rather than "pending", preventing infinite waits on raw voice data that hasn't been promoted through review.

**Autonomous patient creation**: When the voice agent creates a patient not yet in the target EHR, an autonomous agent verifies no duplicates exist before creating the record - preventing duplicate patient entries.

## Outbound Call Dispatch

The connector runner dispatches outbound calls when scheduled `outbound_task` entities become due. This loop implements the execution side of the [voice agent's outbound system](https://docs.amigo.ai/developer-guide/platform-api/voice-agent#outbound-call-flow):

{% @mermaid/diagram content="%%{init: {"theme": "base", "themeVariables": {"primaryColor": "#D4E2E7", "primaryTextColor": "#100F0F", "primaryBorderColor": "#083241", "lineColor": "#575452", "textColor": "#100F0F", "clusterBkg": "#F1EAE7", "clusterBorder": "#D7D2D0"}}}%%
flowchart TB
A\["Find scheduled outbound tasks\nthat are due\n(priority-ordered)"] --> B\["Per task:\nacquire distributed mutex"]
B --> C\["Load patient entity\nfrom world model\n(demographics, conditions,\nmeds, appointments)"]
C --> D\["Build rich system prompt\nfrom patient projection"]
D --> E\["Resolve outbound\nphone number"]
E --> F\["Dispatch call to\nvoice agent\n(with idempotency key)"]
F -->|"Success"| G\["Write outbound.dispatched\nevent"]
F -->|"Failure"| H\["Write outbound.failed\nevent\n(retry logic activates)"]" %}

**Key features:**

| Feature                 | How It Works                                                                                       |
| ----------------------- | -------------------------------------------------------------------------------------------------- |
| **Business hours gate** | Per-task timezone-aware window (e.g., 9am-5pm ET). Outside hours → wait for next window            |
| **Priority ordering**   | Tasks dispatched highest-priority first (1-10 scale)                                               |
| **Distributed mutex**   | Per-entity mutex prevents double-dispatch from multiple pods                                       |
| **Retry with backoff**  | Configurable max attempts and backoff interval. Failed tasks automatically schedule next attempt   |
| **Rich context**        | Patient projection enriches the system prompt - the voice agent starts with full patient knowledge |
| **Idempotency**         | Dispatch keyed by `outbound:{entity_id}:{attempt}` - safe to retry                                 |

## Reconciliation

A safety-net process catches sync events that were missed due to transient failures (pod restarts, network partitions where pub/sub messages were lost). Like the real-time subscriber, reconciliation routes all writes through the outbound handler registry - both paths use the same routing logic.

* Polls for events with no sync timestamp
* Filters by confidence gate (same threshold as primary path)
* Routes writes through the handler registry (same as real-time path)
* On permanent rejection (400/422): records error to stop retries
* **Exponential backoff**: After 3 consecutive all-failure cycles, back off (2x multiplier, max 600s). Resets on any successful write

## Key Invariants

| Invariant                              | Why It Matters                                                                                                |
| -------------------------------------- | ------------------------------------------------------------------------------------------------------------- |
| **Dedup is deterministic**             | Source IDs must be identical across poll cycles for content-hash dedup to work                                |
| **Mutex isolation**                    | One distributed lock per data source per cycle - prevents double-poll                                         |
| **Write at point of knowledge**        | Each resource type batch is durable immediately - partial failures don't lose written data                    |
| **Entity resolution is transactional** | Events linked atomically inside state recomputation. Post-batch fallback handles ordering races               |
| **Pub/sub is fire-and-forget**         | Messages may be lost on pod restart - reconciliation loop is the safety net                                   |
| **Single routing authority**           | Both real-time sync and reconciliation route through the same handler registry - no duplicated routing logic  |
| **Loop prevention**                    | Events from EHR sync sources always skipped in outbound paths                                                 |
| **Write scope**                        | Connector runner is a trusted system service - not scoped by per-session write isolation (unlike voice agent) |

## Pipeline Observability

The Platform API exposes 9 read-only pipeline observability endpoints under `/v1/{workspace_id}/pipeline/` that combine live connector-runner state, cached source health data, and database aggregation queries. These power the pipeline dashboard in the developer console.

### Endpoints

| Endpoint                                      | Description                                                                                                        |
| --------------------------------------------- | ------------------------------------------------------------------------------------------------------------------ |
| `GET /pipeline/status`                        | Composite pipeline status: overall health, active polls, total events/entities, per-source status, all loop states |
| `GET /pipeline/sources`                       | Data sources with live health from cached poll results                                                             |
| `GET /pipeline/sources/{source_id}/overview`  | Aggregated overview of a source's pipeline health: event counts by status, last sync time, error summary           |
| `GET /pipeline/sources/{source_id}/history`   | Sync history bucketed by time window (`1h` or `1d`) for a specific source                                          |
| `GET /pipeline/sources/{source_id}/events`    | Paginated events for a specific source (sortable by `ingested_at`, `event_type`, `confidence`)                     |
| `GET /pipeline/outbound`                      | Per-sink outbound sync summary: synced/failed/pending counts                                                       |
| `GET /pipeline/outbound/{data_source_id}/log` | Paginated outbound sync log for a specific sink (sortable by `created_at`, `synced_at`, `attempt_count`)           |
| `GET /pipeline/entity-resolution`             | Entity resolution metrics: total same-as edges, recent merges (24h), loop status                                   |
| `GET /pipeline/review`                        | Review pipeline metrics: queue depth, pending by priority, approved/rejected (7d), average review time             |
| `GET /pipeline/throughput`                    | Event throughput time series across all sources (configurable date range and interval)                             |

All endpoints require `viewer+` permissions and are workspace-scoped.

### Health Snapshot

The `/pipeline/status` endpoint returns a composite view:

| Field                                                                                            | Description                                                              |
| ------------------------------------------------------------------------------------------------ | ------------------------------------------------------------------------ |
| `status`                                                                                         | `healthy`, `degraded`, `starting`, or `unavailable`                      |
| `connector_runner_status`                                                                        | Live connector runner status (same values)                               |
| `uptime_seconds`                                                                                 | Connector runner uptime                                                  |
| `active_polls`                                                                                   | Number of sources currently polling                                      |
| `total_events` / `total_entities`                                                                | Workspace-level counts from the database                                 |
| `sources`                                                                                        | Per-source status with poll metrics, error counts, and connection health |
| `entity_resolution`, `review_loop`, `outbound_dispatch`, `outbound_subscriber`, `reconciliation` | Per-loop operational state                                               |

### Source Health

Each source in the listing includes live health enrichment:

| Field                   | Type           | Description                          |
| ----------------------- | -------------- | ------------------------------------ |
| `health_status`         | string         | `healthy`, `unhealthy`, or `unknown` |
| `last_poll.at`          | ISO timestamp  | When the last poll completed         |
| `last_poll.duration_ms` | integer        | Poll duration in milliseconds        |
| `last_poll.event_count` | integer        | Number of events written             |
| `last_poll.error`       | string or null | Error message if the poll failed     |

**Connection health** is tracked per source using consecutive error counts. A source is marked unhealthy after 3+ consecutive poll failures and recovers automatically on the next successful poll.

### Graceful Degradation

If the connector runner is temporarily unavailable, the pipeline endpoints still return database-backed data (event counts, sync history, review stats, entity resolution metrics). Only live loop states and active poll counts are omitted. This means the dashboard remains functional during connector runner restarts or deployments.

### Metrics Architecture

The connector runner tracks in-memory metrics per loop (ephemeral, reset on restart). Per-source poll results are cached on every poll cycle with a 1-hour expiry. The Platform API reads these values for the source listing and status endpoints, providing near-real-time source health without adding latency to the connector runner itself.

## API Reference

The connector runner has no public-facing API. Data sources and sync configuration are managed through the [Data Sources](https://docs.amigo.ai/api-reference/readme/platform/data-sources) endpoints on the Platform API. Pipeline observability is available through the [Pipeline](https://docs.amigo.ai/api-reference/readme/platform/pipeline) endpoints.
