Recipe 03 — Daily watchlist refresh from your internal CRM
30 minutes. Result: every counterparty in your internal CRM is in OilFlow's continuous-monitoring watchlist. New matches fire watchlist.match_detected webhooks.What this recipe does
You already have a counterparty list somewhere (Salesforce, an internal Postgres table, a spreadsheet). This recipe shows how to push it to OilFlow once a night so the daily watchlist_sync agent screens every entity against fresh sanctions deltas, adverse-media findings, and cluster severity upgrades — firing webhooks on every new match.
Step 1 — Export your CRM to CSV
The CSV format OilFlow expects:
entity_name,aliases,jurisdictions
"Acme Trading FZE","Acme FZE | Acme Trading","UAE | KSA"
"Northbound Petrochemicals","NBP","Singapore"aliases and jurisdictions are pipe-separated within the cell.
Sample CSV: <https://oilflow.us/sample-watchlist.csv>.
Step 2 — Upload nightly
Python (cron job)
# scripts/refresh_watchlist.py
import csv
import io
import sys
from oilflow import Client
from your_app import db # however you query Salesforce / Postgres / Sheets
def export_counterparties_to_csv() -> str:
rows = db.fetch_active_counterparties() # → list of dicts
out = io.StringIO()
writer = csv.writer(out)
writer.writerow(["entity_name", "aliases", "jurisdictions"])
for r in rows:
writer.writerow([
r["legal_name"],
" | ".join(r.get("aliases", [])),
" | ".join(r.get("jurisdictions", [])),
])
return out.getvalue()
def main() -> int:
client = Client()
csv_body = export_counterparties_to_csv()
result = client.watchlists.upload(
name=f"crm-sync-{__import__('datetime').date.today().isoformat()}",
description="Auto-uploaded from internal CRM",
csv=csv_body,
)
print(f"inserted: {result['entries_inserted']}")
print(f"rejected: {result.get('rows_rejected', 0)}")
return 0
if __name__ == "__main__":
sys.exit(main())Run via cron at 22:00 local time the night before the OilFlow watchlist sync (which fires at 09:00 UTC).
Node (cron job)
// scripts/refresh-watchlist.ts
import OilFlow from "@oilflow/sdk";
import { fetchActiveCounterparties } from "./db.js";
async function main() {
const client = new OilFlow();
const rows = await fetchActiveCounterparties();
const csv = [
"entity_name,aliases,jurisdictions",
...rows.map(
(r) =>
`"${r.legal_name}","${(r.aliases ?? []).join(" | ")}","${(r.jurisdictions ?? []).join(" | ")}"`,
),
].join("\n");
const result = await client.watchlists.upload({
name: `crm-sync-${new Date().toISOString().slice(0, 10)}`,
description: "Auto-uploaded from internal CRM",
csv,
});
console.log(`inserted: ${result.entries_inserted}`);
}
main();Step 3 — Subscribe to `watchlist.match_detected`
Match events fire whenever the daily sync surfaces a new adverse-media finding, a cluster severity upgrade, or a sanctions delta touching one of your entries:
client.webhooks.create(
url="https://your-app.example.com/webhooks/oilflow",
events=["watchlist.match_detected"],
description="CRM watchlist matches",
# delivery_format defaults to "raw" — HMAC-signed JSON.
)The payload includes entity_name, source (adverse_media | cluster_upgrade | sanctions_delta), and a detail object specific to the source.
Step 4 — Verify
After running the upload script:
curl -H "Authorization: Bearer $OILFLOW_API_KEY" \
https://oilflow.us/api/v1/watchlistsYou should see the new watchlist with the expected entries_count. The dashboard at /dashboard → Customer watchlists shows the same.
Gotchas
- Max 5,000 rows per upload. Larger lists: chunk into multiple uploads with distinct
namevalues. - Max 1 MB CSV body. If your list is bigger, gzip+chunk or paginate.
- Duplicate (member_id, entity_name_normalized) tuples are merged — re-uploading the same CRM nightly is idempotent.
Next steps
- Recipe 04: UBO traversal + risk export
- Recipe 05: LC discrepancy validation