Files
sides/docs/04-DATA-PIPELINE.md
root 6863f39a24 docs: rewrite all documentation to reflect current state
- Remove adminer references (service was removed)
- Remove mermaid diagrams (ASCII only)
- Remove hardcoded credentials (use env var references)
- Update all Docker references to 4-container setup (app, postgres, web, pgadmin)
- Document env-based admin credentials (ADMIN_EMAIL/ADMIN_PASSWORD)
- Document parameterized queries (SQL injection fixed)
- Document FCM topic routing by stationtype+level
- Document siren stationtype=3 fix in sidesdecode.py
- Document idempotent seeder (firstOrCreate)
- Document reverse proxy setup in deployment guide
- Remove Makefile references (Docker Compose only)
2026-05-21 02:59:32 +08:00

260 lines
8.0 KiB
Markdown

<!-- generated-by: gsd-doc-writer -->
# Data Pipeline: Python Autoscript
## Overview
`autoscript/sidesdecode.py` is the data ingestion pipeline that:
1. Connects to an FTP server where telemetry stations upload CSV data
2. Downloads and parses CSV files for the current day
3. Inserts rainfall, water level, and siren data into PostgreSQL
4. Triggers push notifications when thresholds are exceeded
The script is designed to run on a schedule (e.g., cron job), processing new files uploaded by remote telemetry stations throughout the day.
## Environment Variables
All credentials come from environment variables with defaults:
| Variable | Default | Description |
|----------|---------|-------------|
| `FTP_SERVER` | `myvscada.com` | FTP server hostname |
| `FTP_USERNAME` | `tck` | FTP login username |
| `FTP_PASSWORD` | *(empty)* | FTP login password |
| `PG_HOST` | `postgres` | PostgreSQL host (`postgres` on Docker network, `localhost` on Docker host) |
| `PG_DATABASE` | `sides_db` | PostgreSQL database name |
| `PG_USER` | `tck` | PostgreSQL username |
| `PG_PASSWORD` | *(empty)* | PostgreSQL password |
## FTP Connection
The script connects to the configured FTP server and navigates to today's date folder:
```
Path: files/SIDES/SUCCESS/{year}/{month}/{day}/
```
Example path for 21 May 2026: `files/SIDES/SUCCESS/2026/05/21/`
### File Filtering
- Skips files containing "rf" in the filename (Tideda format files)
- Only processes files with today's date in `yymmdd` format in the filename
## CSV Format
Each line in the CSV file contains 37+ comma-separated columns. The script requires at least 25 columns per line; shorter lines are skipped.
Key columns extracted:
| Column Index | Variable | Description |
|-------------|----------|-------------|
| 1 | `station_id` | Station identifier (e.g., `KBLG0026`) |
| 4 | `timestamp` | Timestamp in `yymmddHHMMSS` format |
| 6 | `battery` | Battery voltage |
| 15 | `wlalert` | Water level alert threshold |
| 16 | `wlwarn` | Water level warning threshold |
| 17 | `wldgr` | Water level danger threshold |
| 18 | `sirenid` | Siren identifier |
| 19 | `siren` | Siren status: `H`=Danger, `L`=Warning, `N`=Normal |
| 21 | `anncumm` | Annual cumulative rainfall |
| 22 | `dailycumm` | Daily cumulative rainfall |
| 23 | `hourlycumm` | Hourly rainfall |
| 24 | `currrf` | Current rainfall |
| 36 | `waterlevel` | Current water level reading |
## Data Processing Logic
The `process_line()` function handles each CSV line. All database operations use `psycopg2` parameterized queries to prevent SQL injection.
### Rainfall Data
```
CSV line
|
v
[dailycumm or hourlycumm not null?]
|-- No --> skip
|-- Yes
v
[record exists for station+timestamp?]
|-- Yes --> skip insert
|-- No --> INSERT INTO rainfall (stationid, timestamp, anncum, daily, hourly, currentrf, battery)
|
v
[hourlycumm >= 30?]
|-- No --> done
|-- Yes
|
v
Determine level:
30 <= hourly < 60 --> Warning
hourly >= 60 --> Danger
|
v
[notification exists for station+timestamp+stationtype=1?]
|-- Yes --> skip insert
|-- No --> INSERT INTO notification (stationid, timestamp, stationtype=1, level, active_time)
|
v
send_alert_to_laravel(station_id, level, 1)
```
### Water Level Data
```
CSV line
|
v
[waterlevel not null?]
|-- No --> skip
|-- Yes
v
[record exists for station+datetime?]
|-- Yes --> skip insert
|-- No --> INSERT INTO waterlevel (stationid, datetime, waterlevel, alert, warning, danger)
|
v
[waterlevel >= wlalert?]
|-- No --> done
|-- Yes
|
v
Determine level:
alert <= wl < warning --> Alert
warning <= wl < danger --> Warning
wl >= danger --> Danger
|
v
[notification exists for station+timestamp+stationtype=2?]
|-- Yes --> skip insert
|-- No --> INSERT INTO notification (stationid, timestamp, stationtype=2, level, active_time)
|
v
send_alert_to_laravel(station_id, level, 2)
```
### Siren Data
```
CSV line
|
v
[sirenid not null?]
|-- No --> skip
|-- Yes
v
Determine level from siren status:
H --> Danger
L --> Warning
N --> Normal
|
v
[record exists for station+active_time?]
|-- Yes --> skip insert
|-- No --> INSERT INTO siren (stationid, stationtype=3, active_time, level)
|
v
[level != Normal?]
|-- No --> done
|-- Yes --> send_alert_to_laravel(station_id, level, 3)
```
## Station Types
The `stationtype` integer identifies the data source in notifications and alerts:
| stationtype | Data Source |
|------------|-------------|
| 1 | Rainfall |
| 2 | Water Level |
| 3 | Siren |
## Threshold Summary
### Rainfall Thresholds
| Condition | Level |
|-----------|-------|
| `hourlycumm >= 30` and `< 60` | Warning |
| `hourlycumm >= 60` | Danger |
### Water Level Thresholds
Thresholds are per-station values from CSV columns 15-17.
| Condition | Level |
|-----------|-------|
| `waterlevel >= wlalert` and `< wlwarn` | Alert |
| `waterlevel >= wlwarn` and `< wldgr` | Warning |
| `waterlevel >= wldgr` | Danger |
## Alert Notification Flow
When a threshold is triggered, the script calls `send_alert_to_laravel()`:
```python
def send_alert_to_laravel(stationid, level, stationtype):
payload = {
"stationid": stationid,
"level": level,
"stationtype": stationtype, # 1=rainfall, 2=waterlevel, 3=siren
}
response = requests.post("https://sides.tck.com.my/api/alert", json=payload, timeout=5)
```
<!-- VERIFY: Alert API endpoint is https://sides.tck.com.my/api/alert -->
The full notification chain:
```
sidesdecode.py
| POST /api/alert {stationid, level, stationtype}
v
AlertController (Laravel)
| Builds notification title/body from station type and level
v
FcmService::sendToTopic()
| Routes to FCM topic by stationtype and level
| (e.g., rainfall_warning, rainfall_danger, waterlevel_alert, waterlevel_danger)
v
Firebase Cloud Messaging
| Push notification delivered to subscribed mobile devices
v
Mobile App
```
## Deduplication
Before every INSERT, the script checks for an existing record:
- **Rainfall**: `SELECT COUNT(*) FROM rainfall WHERE stationid = %s AND timestamp = %s`
- **Water Level**: `SELECT COUNT(*) FROM waterlevel WHERE stationid = %s AND datetime = %s`
- **Siren**: `SELECT COUNT(*) FROM siren WHERE stationid = %s AND active_time = %s`
- **Notification**: `SELECT COUNT(*) FROM notification WHERE stationid = %s AND timestamp = %s AND stationtype = %s`
If a record exists, the INSERT is skipped. This makes the script safe to re-run for the same time period.
## File Management (Commented Out)
Two file management functions are defined but currently commented out:
- `move_to_error_folder()` -- Move malformed files to an FTP error subfolder
- `move_to_success_folder()` -- Move processed files to a success archive subfolder
When active, these functions create the target FTP directory if it does not exist, upload the file, and delete the original. Currently disabled -- processed files remain in the source FTP folder after processing.
## Error Handling
- Malformed lines (fewer than 25 columns) are skipped with a log message
- Any exception during line processing triggers `conn.rollback()` to prevent partial inserts
- Alert sending failures are caught and logged but do not halt processing
- The script closes both FTP and database connections on exit
## Known Issues
1. **No file archiving** -- `move_to_error_folder` and `move_to_success_folder` are commented out, so files are never moved after processing
2. **No broader deduplication** -- Deduplication only checks exact station+timestamp matches; no handling for near-duplicate records
3. **No connection retry** -- If FTP or PostgreSQL is unreachable, the script fails immediately with no retry logic
4. **Partial processing risk** -- If the script crashes mid-file, lines already processed are committed but remaining lines are lost until the next run