- Remove adminer references (service was removed) - Remove mermaid diagrams (ASCII only) - Remove hardcoded credentials (use env var references) - Update all Docker references to 4-container setup (app, postgres, web, pgadmin) - Document env-based admin credentials (ADMIN_EMAIL/ADMIN_PASSWORD) - Document parameterized queries (SQL injection fixed) - Document FCM topic routing by stationtype+level - Document siren stationtype=3 fix in sidesdecode.py - Document idempotent seeder (firstOrCreate) - Document reverse proxy setup in deployment guide - Remove Makefile references (Docker Compose only)
260 lines
8.0 KiB
Markdown
260 lines
8.0 KiB
Markdown
<!-- generated-by: gsd-doc-writer -->
|
|
# Data Pipeline: Python Autoscript
|
|
|
|
## Overview
|
|
|
|
`autoscript/sidesdecode.py` is the data ingestion pipeline that:
|
|
|
|
1. Connects to an FTP server where telemetry stations upload CSV data
|
|
2. Downloads and parses CSV files for the current day
|
|
3. Inserts rainfall, water level, and siren data into PostgreSQL
|
|
4. Triggers push notifications when thresholds are exceeded
|
|
|
|
The script is designed to run on a schedule (e.g., cron job), processing new files uploaded by remote telemetry stations throughout the day.
|
|
|
|
## Environment Variables
|
|
|
|
All credentials come from environment variables with defaults:
|
|
|
|
| Variable | Default | Description |
|
|
|----------|---------|-------------|
|
|
| `FTP_SERVER` | `myvscada.com` | FTP server hostname |
|
|
| `FTP_USERNAME` | `tck` | FTP login username |
|
|
| `FTP_PASSWORD` | *(empty)* | FTP login password |
|
|
| `PG_HOST` | `postgres` | PostgreSQL host (`postgres` on Docker network, `localhost` on Docker host) |
|
|
| `PG_DATABASE` | `sides_db` | PostgreSQL database name |
|
|
| `PG_USER` | `tck` | PostgreSQL username |
|
|
| `PG_PASSWORD` | *(empty)* | PostgreSQL password |
|
|
|
|
## FTP Connection
|
|
|
|
The script connects to the configured FTP server and navigates to today's date folder:
|
|
|
|
```
|
|
Path: files/SIDES/SUCCESS/{year}/{month}/{day}/
|
|
```
|
|
|
|
Example path for 21 May 2026: `files/SIDES/SUCCESS/2026/05/21/`
|
|
|
|
### File Filtering
|
|
|
|
- Skips files containing "rf" in the filename (Tideda format files)
|
|
- Only processes files with today's date in `yymmdd` format in the filename
|
|
|
|
## CSV Format
|
|
|
|
Each line in the CSV file contains 37+ comma-separated columns. The script requires at least 25 columns per line; shorter lines are skipped.
|
|
|
|
Key columns extracted:
|
|
|
|
| Column Index | Variable | Description |
|
|
|-------------|----------|-------------|
|
|
| 1 | `station_id` | Station identifier (e.g., `KBLG0026`) |
|
|
| 4 | `timestamp` | Timestamp in `yymmddHHMMSS` format |
|
|
| 6 | `battery` | Battery voltage |
|
|
| 15 | `wlalert` | Water level alert threshold |
|
|
| 16 | `wlwarn` | Water level warning threshold |
|
|
| 17 | `wldgr` | Water level danger threshold |
|
|
| 18 | `sirenid` | Siren identifier |
|
|
| 19 | `siren` | Siren status: `H`=Danger, `L`=Warning, `N`=Normal |
|
|
| 21 | `anncumm` | Annual cumulative rainfall |
|
|
| 22 | `dailycumm` | Daily cumulative rainfall |
|
|
| 23 | `hourlycumm` | Hourly rainfall |
|
|
| 24 | `currrf` | Current rainfall |
|
|
| 36 | `waterlevel` | Current water level reading |
|
|
|
|
## Data Processing Logic
|
|
|
|
The `process_line()` function handles each CSV line. All database operations use `psycopg2` parameterized queries to prevent SQL injection.
|
|
|
|
### Rainfall Data
|
|
|
|
```
|
|
CSV line
|
|
|
|
|
v
|
|
[dailycumm or hourlycumm not null?]
|
|
|-- No --> skip
|
|
|-- Yes
|
|
v
|
|
[record exists for station+timestamp?]
|
|
|-- Yes --> skip insert
|
|
|-- No --> INSERT INTO rainfall (stationid, timestamp, anncum, daily, hourly, currentrf, battery)
|
|
|
|
|
v
|
|
[hourlycumm >= 30?]
|
|
|-- No --> done
|
|
|-- Yes
|
|
|
|
|
v
|
|
Determine level:
|
|
30 <= hourly < 60 --> Warning
|
|
hourly >= 60 --> Danger
|
|
|
|
|
v
|
|
[notification exists for station+timestamp+stationtype=1?]
|
|
|-- Yes --> skip insert
|
|
|-- No --> INSERT INTO notification (stationid, timestamp, stationtype=1, level, active_time)
|
|
|
|
|
v
|
|
send_alert_to_laravel(station_id, level, 1)
|
|
```
|
|
|
|
### Water Level Data
|
|
|
|
```
|
|
CSV line
|
|
|
|
|
v
|
|
[waterlevel not null?]
|
|
|-- No --> skip
|
|
|-- Yes
|
|
v
|
|
[record exists for station+datetime?]
|
|
|-- Yes --> skip insert
|
|
|-- No --> INSERT INTO waterlevel (stationid, datetime, waterlevel, alert, warning, danger)
|
|
|
|
|
v
|
|
[waterlevel >= wlalert?]
|
|
|-- No --> done
|
|
|-- Yes
|
|
|
|
|
v
|
|
Determine level:
|
|
alert <= wl < warning --> Alert
|
|
warning <= wl < danger --> Warning
|
|
wl >= danger --> Danger
|
|
|
|
|
v
|
|
[notification exists for station+timestamp+stationtype=2?]
|
|
|-- Yes --> skip insert
|
|
|-- No --> INSERT INTO notification (stationid, timestamp, stationtype=2, level, active_time)
|
|
|
|
|
v
|
|
send_alert_to_laravel(station_id, level, 2)
|
|
```
|
|
|
|
### Siren Data
|
|
|
|
```
|
|
CSV line
|
|
|
|
|
v
|
|
[sirenid not null?]
|
|
|-- No --> skip
|
|
|-- Yes
|
|
v
|
|
Determine level from siren status:
|
|
H --> Danger
|
|
L --> Warning
|
|
N --> Normal
|
|
|
|
|
v
|
|
[record exists for station+active_time?]
|
|
|-- Yes --> skip insert
|
|
|-- No --> INSERT INTO siren (stationid, stationtype=3, active_time, level)
|
|
|
|
|
v
|
|
[level != Normal?]
|
|
|-- No --> done
|
|
|-- Yes --> send_alert_to_laravel(station_id, level, 3)
|
|
```
|
|
|
|
## Station Types
|
|
|
|
The `stationtype` integer identifies the data source in notifications and alerts:
|
|
|
|
| stationtype | Data Source |
|
|
|------------|-------------|
|
|
| 1 | Rainfall |
|
|
| 2 | Water Level |
|
|
| 3 | Siren |
|
|
|
|
## Threshold Summary
|
|
|
|
### Rainfall Thresholds
|
|
|
|
| Condition | Level |
|
|
|-----------|-------|
|
|
| `hourlycumm >= 30` and `< 60` | Warning |
|
|
| `hourlycumm >= 60` | Danger |
|
|
|
|
### Water Level Thresholds
|
|
|
|
Thresholds are per-station values from CSV columns 15-17.
|
|
|
|
| Condition | Level |
|
|
|-----------|-------|
|
|
| `waterlevel >= wlalert` and `< wlwarn` | Alert |
|
|
| `waterlevel >= wlwarn` and `< wldgr` | Warning |
|
|
| `waterlevel >= wldgr` | Danger |
|
|
|
|
## Alert Notification Flow
|
|
|
|
When a threshold is triggered, the script calls `send_alert_to_laravel()`:
|
|
|
|
```python
|
|
def send_alert_to_laravel(stationid, level, stationtype):
|
|
payload = {
|
|
"stationid": stationid,
|
|
"level": level,
|
|
"stationtype": stationtype, # 1=rainfall, 2=waterlevel, 3=siren
|
|
}
|
|
response = requests.post("https://sides.tck.com.my/api/alert", json=payload, timeout=5)
|
|
```
|
|
|
|
<!-- VERIFY: Alert API endpoint is https://sides.tck.com.my/api/alert -->
|
|
|
|
The full notification chain:
|
|
|
|
```
|
|
sidesdecode.py
|
|
| POST /api/alert {stationid, level, stationtype}
|
|
v
|
|
AlertController (Laravel)
|
|
| Builds notification title/body from station type and level
|
|
v
|
|
FcmService::sendToTopic()
|
|
| Routes to FCM topic by stationtype and level
|
|
| (e.g., rainfall_warning, rainfall_danger, waterlevel_alert, waterlevel_danger)
|
|
v
|
|
Firebase Cloud Messaging
|
|
| Push notification delivered to subscribed mobile devices
|
|
v
|
|
Mobile App
|
|
```
|
|
|
|
## Deduplication
|
|
|
|
Before every INSERT, the script checks for an existing record:
|
|
|
|
- **Rainfall**: `SELECT COUNT(*) FROM rainfall WHERE stationid = %s AND timestamp = %s`
|
|
- **Water Level**: `SELECT COUNT(*) FROM waterlevel WHERE stationid = %s AND datetime = %s`
|
|
- **Siren**: `SELECT COUNT(*) FROM siren WHERE stationid = %s AND active_time = %s`
|
|
- **Notification**: `SELECT COUNT(*) FROM notification WHERE stationid = %s AND timestamp = %s AND stationtype = %s`
|
|
|
|
If a record exists, the INSERT is skipped. This makes the script safe to re-run for the same time period.
|
|
|
|
## File Management (Commented Out)
|
|
|
|
Two file management functions are defined but currently commented out:
|
|
|
|
- `move_to_error_folder()` -- Move malformed files to an FTP error subfolder
|
|
- `move_to_success_folder()` -- Move processed files to a success archive subfolder
|
|
|
|
When active, these functions create the target FTP directory if it does not exist, upload the file, and delete the original. Currently disabled -- processed files remain in the source FTP folder after processing.
|
|
|
|
## Error Handling
|
|
|
|
- Malformed lines (fewer than 25 columns) are skipped with a log message
|
|
- Any exception during line processing triggers `conn.rollback()` to prevent partial inserts
|
|
- Alert sending failures are caught and logged but do not halt processing
|
|
- The script closes both FTP and database connections on exit
|
|
|
|
## Known Issues
|
|
|
|
1. **No file archiving** -- `move_to_error_folder` and `move_to_success_folder` are commented out, so files are never moved after processing
|
|
2. **No broader deduplication** -- Deduplication only checks exact station+timestamp matches; no handling for near-duplicate records
|
|
3. **No connection retry** -- If FTP or PostgreSQL is unreachable, the script fails immediately with no retry logic
|
|
4. **Partial processing risk** -- If the script crashes mid-file, lines already processed are committed but remaining lines are lost until the next run
|