Files
sides/docs/04-DATA-PIPELINE.md
root 6863f39a24 docs: rewrite all documentation to reflect current state
- Remove adminer references (service was removed)
- Remove mermaid diagrams (ASCII only)
- Remove hardcoded credentials (use env var references)
- Update all Docker references to 4-container setup (app, postgres, web, pgadmin)
- Document env-based admin credentials (ADMIN_EMAIL/ADMIN_PASSWORD)
- Document parameterized queries (SQL injection fixed)
- Document FCM topic routing by stationtype+level
- Document siren stationtype=3 fix in sidesdecode.py
- Document idempotent seeder (firstOrCreate)
- Document reverse proxy setup in deployment guide
- Remove Makefile references (Docker Compose only)
2026-05-21 02:59:32 +08:00

8.0 KiB

Data Pipeline: Python Autoscript

Overview

autoscript/sidesdecode.py is the data ingestion pipeline that:

  1. Connects to an FTP server where telemetry stations upload CSV data
  2. Downloads and parses CSV files for the current day
  3. Inserts rainfall, water level, and siren data into PostgreSQL
  4. Triggers push notifications when thresholds are exceeded

The script is designed to run on a schedule (e.g., cron job), processing new files uploaded by remote telemetry stations throughout the day.

Environment Variables

All credentials come from environment variables with defaults:

Variable Default Description
FTP_SERVER myvscada.com FTP server hostname
FTP_USERNAME tck FTP login username
FTP_PASSWORD (empty) FTP login password
PG_HOST postgres PostgreSQL host (postgres on Docker network, localhost on Docker host)
PG_DATABASE sides_db PostgreSQL database name
PG_USER tck PostgreSQL username
PG_PASSWORD (empty) PostgreSQL password

FTP Connection

The script connects to the configured FTP server and navigates to today's date folder:

Path: files/SIDES/SUCCESS/{year}/{month}/{day}/

Example path for 21 May 2026: files/SIDES/SUCCESS/2026/05/21/

File Filtering

  • Skips files containing "rf" in the filename (Tideda format files)
  • Only processes files with today's date in yymmdd format in the filename

CSV Format

Each line in the CSV file contains 37+ comma-separated columns. The script requires at least 25 columns per line; shorter lines are skipped.

Key columns extracted:

Column Index Variable Description
1 station_id Station identifier (e.g., KBLG0026)
4 timestamp Timestamp in yymmddHHMMSS format
6 battery Battery voltage
15 wlalert Water level alert threshold
16 wlwarn Water level warning threshold
17 wldgr Water level danger threshold
18 sirenid Siren identifier
19 siren Siren status: H=Danger, L=Warning, N=Normal
21 anncumm Annual cumulative rainfall
22 dailycumm Daily cumulative rainfall
23 hourlycumm Hourly rainfall
24 currrf Current rainfall
36 waterlevel Current water level reading

Data Processing Logic

The process_line() function handles each CSV line. All database operations use psycopg2 parameterized queries to prevent SQL injection.

Rainfall Data

CSV line
  |
  v
[dailycumm or hourlycumm not null?]
  |-- No --> skip
  |-- Yes
       v
  [record exists for station+timestamp?]
    |-- Yes --> skip insert
    |-- No --> INSERT INTO rainfall (stationid, timestamp, anncum, daily, hourly, currentrf, battery)
         |
         v
  [hourlycumm >= 30?]
    |-- No --> done
    |-- Yes
         |
         v
  Determine level:
    30 <= hourly < 60 --> Warning
    hourly >= 60       --> Danger
         |
         v
  [notification exists for station+timestamp+stationtype=1?]
    |-- Yes --> skip insert
    |-- No --> INSERT INTO notification (stationid, timestamp, stationtype=1, level, active_time)
         |
         v
  send_alert_to_laravel(station_id, level, 1)

Water Level Data

CSV line
  |
  v
[waterlevel not null?]
  |-- No --> skip
  |-- Yes
       v
  [record exists for station+datetime?]
    |-- Yes --> skip insert
    |-- No --> INSERT INTO waterlevel (stationid, datetime, waterlevel, alert, warning, danger)
         |
         v
  [waterlevel >= wlalert?]
    |-- No --> done
    |-- Yes
         |
         v
  Determine level:
    alert   <= wl < warning --> Alert
    warning <= wl < danger  --> Warning
    wl >= danger            --> Danger
         |
         v
  [notification exists for station+timestamp+stationtype=2?]
    |-- Yes --> skip insert
    |-- No --> INSERT INTO notification (stationid, timestamp, stationtype=2, level, active_time)
         |
         v
  send_alert_to_laravel(station_id, level, 2)

Siren Data

CSV line
  |
  v
[sirenid not null?]
  |-- No --> skip
  |-- Yes
       v
  Determine level from siren status:
    H --> Danger
    L --> Warning
    N --> Normal
       |
       v
  [record exists for station+active_time?]
    |-- Yes --> skip insert
    |-- No --> INSERT INTO siren (stationid, stationtype=3, active_time, level)
         |
         v
  [level != Normal?]
    |-- No --> done
    |-- Yes --> send_alert_to_laravel(station_id, level, 3)

Station Types

The stationtype integer identifies the data source in notifications and alerts:

stationtype Data Source
1 Rainfall
2 Water Level
3 Siren

Threshold Summary

Rainfall Thresholds

Condition Level
hourlycumm >= 30 and < 60 Warning
hourlycumm >= 60 Danger

Water Level Thresholds

Thresholds are per-station values from CSV columns 15-17.

Condition Level
waterlevel >= wlalert and < wlwarn Alert
waterlevel >= wlwarn and < wldgr Warning
waterlevel >= wldgr Danger

Alert Notification Flow

When a threshold is triggered, the script calls send_alert_to_laravel():

def send_alert_to_laravel(stationid, level, stationtype):
    payload = {
        "stationid": stationid,
        "level": level,
        "stationtype": stationtype,  # 1=rainfall, 2=waterlevel, 3=siren
    }
    response = requests.post("https://sides.tck.com.my/api/alert", json=payload, timeout=5)

The full notification chain:

sidesdecode.py
  |  POST /api/alert  {stationid, level, stationtype}
  v
AlertController (Laravel)
  |  Builds notification title/body from station type and level
  v
FcmService::sendToTopic()
  |  Routes to FCM topic by stationtype and level
  |  (e.g., rainfall_warning, rainfall_danger, waterlevel_alert, waterlevel_danger)
  v
Firebase Cloud Messaging
  |  Push notification delivered to subscribed mobile devices
  v
Mobile App

Deduplication

Before every INSERT, the script checks for an existing record:

  • Rainfall: SELECT COUNT(*) FROM rainfall WHERE stationid = %s AND timestamp = %s
  • Water Level: SELECT COUNT(*) FROM waterlevel WHERE stationid = %s AND datetime = %s
  • Siren: SELECT COUNT(*) FROM siren WHERE stationid = %s AND active_time = %s
  • Notification: SELECT COUNT(*) FROM notification WHERE stationid = %s AND timestamp = %s AND stationtype = %s

If a record exists, the INSERT is skipped. This makes the script safe to re-run for the same time period.

File Management (Commented Out)

Two file management functions are defined but currently commented out:

  • move_to_error_folder() -- Move malformed files to an FTP error subfolder
  • move_to_success_folder() -- Move processed files to a success archive subfolder

When active, these functions create the target FTP directory if it does not exist, upload the file, and delete the original. Currently disabled -- processed files remain in the source FTP folder after processing.

Error Handling

  • Malformed lines (fewer than 25 columns) are skipped with a log message
  • Any exception during line processing triggers conn.rollback() to prevent partial inserts
  • Alert sending failures are caught and logged but do not halt processing
  • The script closes both FTP and database connections on exit

Known Issues

  1. No file archiving -- move_to_error_folder and move_to_success_folder are commented out, so files are never moved after processing
  2. No broader deduplication -- Deduplication only checks exact station+timestamp matches; no handling for near-duplicate records
  3. No connection retry -- If FTP or PostgreSQL is unreachable, the script fails immediately with no retry logic
  4. Partial processing risk -- If the script crashes mid-file, lines already processed are committed but remaining lines are lost until the next run