- Remove adminer references (service was removed) - Remove mermaid diagrams (ASCII only) - Remove hardcoded credentials (use env var references) - Update all Docker references to 4-container setup (app, postgres, web, pgadmin) - Document env-based admin credentials (ADMIN_EMAIL/ADMIN_PASSWORD) - Document parameterized queries (SQL injection fixed) - Document FCM topic routing by stationtype+level - Document siren stationtype=3 fix in sidesdecode.py - Document idempotent seeder (firstOrCreate) - Document reverse proxy setup in deployment guide - Remove Makefile references (Docker Compose only)
8.0 KiB
Data Pipeline: Python Autoscript
Overview
autoscript/sidesdecode.py is the data ingestion pipeline that:
- Connects to an FTP server where telemetry stations upload CSV data
- Downloads and parses CSV files for the current day
- Inserts rainfall, water level, and siren data into PostgreSQL
- Triggers push notifications when thresholds are exceeded
The script is designed to run on a schedule (e.g., cron job), processing new files uploaded by remote telemetry stations throughout the day.
Environment Variables
All credentials come from environment variables with defaults:
| Variable | Default | Description |
|---|---|---|
FTP_SERVER |
myvscada.com |
FTP server hostname |
FTP_USERNAME |
tck |
FTP login username |
FTP_PASSWORD |
(empty) | FTP login password |
PG_HOST |
postgres |
PostgreSQL host (postgres on Docker network, localhost on Docker host) |
PG_DATABASE |
sides_db |
PostgreSQL database name |
PG_USER |
tck |
PostgreSQL username |
PG_PASSWORD |
(empty) | PostgreSQL password |
FTP Connection
The script connects to the configured FTP server and navigates to today's date folder:
Path: files/SIDES/SUCCESS/{year}/{month}/{day}/
Example path for 21 May 2026: files/SIDES/SUCCESS/2026/05/21/
File Filtering
- Skips files containing "rf" in the filename (Tideda format files)
- Only processes files with today's date in
yymmddformat in the filename
CSV Format
Each line in the CSV file contains 37+ comma-separated columns. The script requires at least 25 columns per line; shorter lines are skipped.
Key columns extracted:
| Column Index | Variable | Description |
|---|---|---|
| 1 | station_id |
Station identifier (e.g., KBLG0026) |
| 4 | timestamp |
Timestamp in yymmddHHMMSS format |
| 6 | battery |
Battery voltage |
| 15 | wlalert |
Water level alert threshold |
| 16 | wlwarn |
Water level warning threshold |
| 17 | wldgr |
Water level danger threshold |
| 18 | sirenid |
Siren identifier |
| 19 | siren |
Siren status: H=Danger, L=Warning, N=Normal |
| 21 | anncumm |
Annual cumulative rainfall |
| 22 | dailycumm |
Daily cumulative rainfall |
| 23 | hourlycumm |
Hourly rainfall |
| 24 | currrf |
Current rainfall |
| 36 | waterlevel |
Current water level reading |
Data Processing Logic
The process_line() function handles each CSV line. All database operations use psycopg2 parameterized queries to prevent SQL injection.
Rainfall Data
CSV line
|
v
[dailycumm or hourlycumm not null?]
|-- No --> skip
|-- Yes
v
[record exists for station+timestamp?]
|-- Yes --> skip insert
|-- No --> INSERT INTO rainfall (stationid, timestamp, anncum, daily, hourly, currentrf, battery)
|
v
[hourlycumm >= 30?]
|-- No --> done
|-- Yes
|
v
Determine level:
30 <= hourly < 60 --> Warning
hourly >= 60 --> Danger
|
v
[notification exists for station+timestamp+stationtype=1?]
|-- Yes --> skip insert
|-- No --> INSERT INTO notification (stationid, timestamp, stationtype=1, level, active_time)
|
v
send_alert_to_laravel(station_id, level, 1)
Water Level Data
CSV line
|
v
[waterlevel not null?]
|-- No --> skip
|-- Yes
v
[record exists for station+datetime?]
|-- Yes --> skip insert
|-- No --> INSERT INTO waterlevel (stationid, datetime, waterlevel, alert, warning, danger)
|
v
[waterlevel >= wlalert?]
|-- No --> done
|-- Yes
|
v
Determine level:
alert <= wl < warning --> Alert
warning <= wl < danger --> Warning
wl >= danger --> Danger
|
v
[notification exists for station+timestamp+stationtype=2?]
|-- Yes --> skip insert
|-- No --> INSERT INTO notification (stationid, timestamp, stationtype=2, level, active_time)
|
v
send_alert_to_laravel(station_id, level, 2)
Siren Data
CSV line
|
v
[sirenid not null?]
|-- No --> skip
|-- Yes
v
Determine level from siren status:
H --> Danger
L --> Warning
N --> Normal
|
v
[record exists for station+active_time?]
|-- Yes --> skip insert
|-- No --> INSERT INTO siren (stationid, stationtype=3, active_time, level)
|
v
[level != Normal?]
|-- No --> done
|-- Yes --> send_alert_to_laravel(station_id, level, 3)
Station Types
The stationtype integer identifies the data source in notifications and alerts:
| stationtype | Data Source |
|---|---|
| 1 | Rainfall |
| 2 | Water Level |
| 3 | Siren |
Threshold Summary
Rainfall Thresholds
| Condition | Level |
|---|---|
hourlycumm >= 30 and < 60 |
Warning |
hourlycumm >= 60 |
Danger |
Water Level Thresholds
Thresholds are per-station values from CSV columns 15-17.
| Condition | Level |
|---|---|
waterlevel >= wlalert and < wlwarn |
Alert |
waterlevel >= wlwarn and < wldgr |
Warning |
waterlevel >= wldgr |
Danger |
Alert Notification Flow
When a threshold is triggered, the script calls send_alert_to_laravel():
def send_alert_to_laravel(stationid, level, stationtype):
payload = {
"stationid": stationid,
"level": level,
"stationtype": stationtype, # 1=rainfall, 2=waterlevel, 3=siren
}
response = requests.post("https://sides.tck.com.my/api/alert", json=payload, timeout=5)
The full notification chain:
sidesdecode.py
| POST /api/alert {stationid, level, stationtype}
v
AlertController (Laravel)
| Builds notification title/body from station type and level
v
FcmService::sendToTopic()
| Routes to FCM topic by stationtype and level
| (e.g., rainfall_warning, rainfall_danger, waterlevel_alert, waterlevel_danger)
v
Firebase Cloud Messaging
| Push notification delivered to subscribed mobile devices
v
Mobile App
Deduplication
Before every INSERT, the script checks for an existing record:
- Rainfall:
SELECT COUNT(*) FROM rainfall WHERE stationid = %s AND timestamp = %s - Water Level:
SELECT COUNT(*) FROM waterlevel WHERE stationid = %s AND datetime = %s - Siren:
SELECT COUNT(*) FROM siren WHERE stationid = %s AND active_time = %s - Notification:
SELECT COUNT(*) FROM notification WHERE stationid = %s AND timestamp = %s AND stationtype = %s
If a record exists, the INSERT is skipped. This makes the script safe to re-run for the same time period.
File Management (Commented Out)
Two file management functions are defined but currently commented out:
move_to_error_folder()-- Move malformed files to an FTP error subfoldermove_to_success_folder()-- Move processed files to a success archive subfolder
When active, these functions create the target FTP directory if it does not exist, upload the file, and delete the original. Currently disabled -- processed files remain in the source FTP folder after processing.
Error Handling
- Malformed lines (fewer than 25 columns) are skipped with a log message
- Any exception during line processing triggers
conn.rollback()to prevent partial inserts - Alert sending failures are caught and logged but do not halt processing
- The script closes both FTP and database connections on exit
Known Issues
- No file archiving --
move_to_error_folderandmove_to_success_folderare commented out, so files are never moved after processing - No broader deduplication -- Deduplication only checks exact station+timestamp matches; no handling for near-duplicate records
- No connection retry -- If FTP or PostgreSQL is unreachable, the script fails immediately with no retry logic
- Partial processing risk -- If the script crashes mid-file, lines already processed are committed but remaining lines are lost until the next run