Use firstOrCreate instead of create so db:seed can run safely on container restart without duplicate key violation.
531 lines
19 KiB
Python
531 lines
19 KiB
Python
from ftplib import FTP
|
|
from datetime import datetime
|
|
from datetime import date
|
|
from io import BytesIO
|
|
import os
|
|
import time
|
|
import psycopg2
|
|
import requests
|
|
|
|
today = date.today()
|
|
day = f"{today.day:02}"
|
|
month = f"{today.month:02}"
|
|
year = str(today.year)
|
|
|
|
today_str_file = datetime.today().strftime('%d%m%Y')
|
|
|
|
ftp_server = os.environ.get("FTP_SERVER", "myvscada.com")
|
|
ftp_username = os.environ.get("FTP_USERNAME", "tck")
|
|
ftp_password = os.environ.get("FTP_PASSWORD", "")
|
|
ftp_root_folder = f"files/SIDES/SUCCESS/{year}/{month}/{day}"
|
|
|
|
pg_host = os.environ.get("PG_HOST", "postgres")
|
|
pg_database = os.environ.get("PG_DATABASE", "sides_db")
|
|
pg_user = os.environ.get("PG_USER", "tck")
|
|
pg_password = os.environ.get("PG_PASSWORD", "")
|
|
|
|
# Connect to PostgreSQL
|
|
conn = psycopg2.connect(
|
|
host=pg_host,
|
|
database=pg_database,
|
|
user=pg_user,
|
|
password=pg_password
|
|
)
|
|
|
|
|
|
cursor = conn.cursor()
|
|
|
|
|
|
|
|
# Connect to FTP
|
|
ftp = FTP(ftp_server)
|
|
ftp.login(user=ftp_username, passwd=ftp_password)
|
|
ftp.cwd(ftp_root_folder)
|
|
|
|
# Get today's date in yymmdd format
|
|
today_str = datetime.today().strftime('%y%m%d')
|
|
|
|
# List all subfolders in /files/JKR
|
|
folders = ftp.nlst()
|
|
|
|
#Move File to Error Folder Function
|
|
def move_to_error_folder(filename,filecontent,error_folder="/"+ftp_root_folder+"/ERROR/"):
|
|
with FTP(ftp_server) as ftp:
|
|
ftp.login(ftp_username, ftp_password)
|
|
parts = error_folder.strip("/").split("/")
|
|
current_path = ''
|
|
for part in parts :
|
|
current_path += "/" + part
|
|
try :
|
|
ftp.mkd(current_path)
|
|
except Exception :
|
|
pass
|
|
ftp.cwd(error_folder)
|
|
if isinstance(filecontent,str) :
|
|
filecontent = filecontent.encode("utf-8")
|
|
|
|
bio = BytesIO(filecontent)
|
|
ftp.storbinary(f"STOR {filename}",bio)
|
|
bio.close()
|
|
print(f"⚠️ Moved {file_name} into {error_folder}")
|
|
|
|
|
|
try:
|
|
ftp.cwd('..')
|
|
ftp.delete(filename)
|
|
print(f"🗑️ Removed {file_name} from {ftp_root_folder}")
|
|
except Exception as e:
|
|
print(f"⚠️ Could not remove {file_name} from {ftp_root_folder}: {e}")
|
|
|
|
#Move File to Success Folder Function
|
|
def move_to_success_folder(filename,filecontent,success_folder="/"+ftp_root_folder+"/SUCCESS/"):
|
|
with FTP(ftp_server) as ftp :
|
|
ftp.login(ftp_username,ftp_password)
|
|
|
|
|
|
parts = success_folder.strip("/").split("/")
|
|
current_path =""
|
|
for part in parts :
|
|
current_path += "/" + part
|
|
try :
|
|
ftp.mkd(current_path)
|
|
print(f"Created Folder : {current_path}")
|
|
except Exception:
|
|
pass
|
|
|
|
ftp.cwd(success_folder)
|
|
|
|
if isinstance(filecontent,str):
|
|
filecontent = filecontent.encode("utf-8")
|
|
|
|
with open("temp_upload.tmp", "wb") as temp_file :
|
|
temp_file.write(filecontent)
|
|
|
|
with open("temp_upload.tmp", "rb") as temp_file:
|
|
ftp.storbinary(f"STOR {filename}",temp_file)
|
|
|
|
os.remove("temp_upload.tmp")
|
|
print(f"Uploaded {filename} to {success_folder}")
|
|
|
|
try :
|
|
ftp.cwd(f"/{ftp_root_folder}")
|
|
ftp.delete(filename)
|
|
print(f"🗑️ Removed {file_name} from {ftp_root_folder}")
|
|
except Exception as e:
|
|
print(f"⚠️ Could not remove {file_name} from {ftp_root_folder}: {e}")
|
|
|
|
#Send Alert Function
|
|
def send_alert_to_laravel(stationid,level,stationtype):
|
|
|
|
#Each parameter variable is from decoded process in Function process_line()
|
|
#Data will send to SIDES API in src/app/Http/Controllers/Api/AlertController.php
|
|
payload = {
|
|
"stationid" : stationid,
|
|
"level" : level,
|
|
"stationtype" :stationtype,
|
|
}
|
|
|
|
#Request and send the payload data to API URL
|
|
try:
|
|
response = requests.post("https://sides.tck.com.my/api/alert",json=payload,timeout=5)
|
|
print("Alert Sent:" ,response.status_code)
|
|
except Exception as e:
|
|
print("Failed to send Alert",e)
|
|
|
|
|
|
# Define processing function once
|
|
def process_line(line,filename=None):
|
|
|
|
# Strip each ',' seperator in file line
|
|
columns = line.strip().split(",")
|
|
|
|
#Check Columns Length each line
|
|
if len(columns) < 25:
|
|
print(f"Skipping malformed line: {line}")
|
|
|
|
#Uncomment If needed
|
|
# try :
|
|
# #Move decode folder into ERROR folder files/SIDES/ERROR
|
|
# #move_to_error_folder(filename,line,error_folder="/"+ftp_root_folder+"/ERROR")
|
|
|
|
# except Exception as e :
|
|
# print(f"⚠️ Failed to move malformed line to ERROR folder: {e}")
|
|
return
|
|
|
|
|
|
# Variable Declaration based on CSV Column Data
|
|
|
|
#Common Data Variable that needed
|
|
station_id = columns[1]
|
|
timestamp = columns[4]
|
|
|
|
#Siren Data Variable
|
|
sirenid = columns[18]
|
|
siren = columns[19]
|
|
|
|
#Rainfall Variable
|
|
anncumm = float(columns[21]) if columns[21] else None
|
|
dailycumm = float(columns[22]) if columns[22] else None
|
|
hourlycumm = float(columns[23]) if columns[23] else None
|
|
currrf = float(columns[24]) if columns[24] else None
|
|
|
|
#Water Level Variable
|
|
waterlevel = float(columns[36]) if columns[36] else None
|
|
|
|
#Water Level Threshold Variable
|
|
wldgr = float(columns[17]) if columns[17] else None
|
|
wlwarn = float(columns[16]) if columns[16] else None
|
|
wlalert = float(columns[15]) if columns[15] else None
|
|
|
|
#Battery Variable
|
|
battery = float(columns[6]) if columns[6] else None
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
#Timestamp Format
|
|
datetime_object = datetime.strptime(timestamp, "%y%m%d%H%M%S")
|
|
|
|
#Uncomment If Using Separate Date And Time Column
|
|
# date_part = datetime_object.strftime("%Y-%m-%d")
|
|
# time_part = datetime_object.strftime("%H:%M:%S")
|
|
|
|
# date_part2 = datetime_object.date()
|
|
|
|
|
|
#Rainfall Data Insertion Into Database
|
|
#Check dailycumm variable Or hourtycumm variable is Not Null
|
|
#If either dailycumm or hourlycum, is Null. Skip this process
|
|
|
|
if dailycumm != None or hourlycumm != None :
|
|
|
|
#Execute Query for check data is already available or not in rainfall Table
|
|
#TABLE : RAINFALL
|
|
#COLUMN : stationid,timestamp
|
|
#INPUT : station_id,datetime_object
|
|
#OUTPUT : -
|
|
|
|
cursor.execute("""
|
|
SELECT COUNT(*)
|
|
FROM rainfall
|
|
WHERE stationid = %s AND timestamp = %s
|
|
""", (station_id, datetime_object))
|
|
|
|
#Delcarable Variable Record Exists Condition
|
|
record_exists = cursor.fetchone()[0] > 0
|
|
|
|
|
|
# Check If Record is exist
|
|
# If record not exist execute INSERT rainfall data into rainfall Table
|
|
if not record_exists:
|
|
|
|
#Execute Insert Query into rainfall table
|
|
#TABLE : RAINFALL
|
|
#COLUMN : stationid,timestamp,anncum,daily,hourly,currentrf,battery
|
|
#INPUT : station_id ,date_object,anncumm,dailycumm,hourlycumm,currrf,battery
|
|
#OUTPUT : -
|
|
|
|
cursor.execute("""
|
|
INSERT INTO rainfall (stationid,timestamp, anncum, daily, hourly, currentrf,battery)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
""", (station_id, datetime_object,anncumm, dailycumm, hourlycumm, currrf,battery))
|
|
conn.commit()
|
|
|
|
# If Rainfall Threshold Warning Threshold
|
|
if hourlycumm >= 30 :
|
|
|
|
#Threshold Level Declarion
|
|
if hourlycumm >= 30 and hourlycumm < 60 :
|
|
level = 'Warning'
|
|
elif hourlycumm >= 60 :
|
|
level = 'Danger'
|
|
else :
|
|
level = 'Normal'
|
|
|
|
#Execute Query for check data is already available or not in notification Table
|
|
#TABLE : NOTIFICATION
|
|
#COLUMN : stationid,timestamp,stationtype
|
|
#INPUT : station_id,datetime_object,1
|
|
#OUTPUT : -
|
|
|
|
cursor.execute("""
|
|
SELECT COUNT(*)
|
|
FROM notification
|
|
WHERE stationid = %s AND timestamp = %s AND stationtype = %s
|
|
""", (station_id, datetime_object,1))
|
|
|
|
record_exists = cursor.fetchone()[0] > 0
|
|
|
|
# Check If Record is exist
|
|
# If record not exist execute INSERT rainfall data that trigger threshold into notification Table
|
|
if not record_exists:
|
|
|
|
#Execute Insert Query into notification table
|
|
#stationtype = 1 because rainfall data
|
|
#TABLE : NOTIFICATION
|
|
#COLUMN : stationid,timestamp.stationtype,level,active_time
|
|
#INPUT : station_id,datetime_object,1,level,datetime_object
|
|
#OUTPUT : -
|
|
|
|
cursor.execute("""
|
|
INSERT INTO notification (stationid,timestamp, stationtype, level,active_time)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
""", (station_id, datetime_object,1, level,datetime_object))
|
|
conn.commit()
|
|
|
|
#Call Alert Function with station id , level , stationtype as parameter
|
|
#Send Status Station Detail to laravel API for Alert Notification
|
|
send_alert_to_laravel(station_id,level,1)
|
|
|
|
#Print Rainfall Details
|
|
print(f"Station ID {station_id}")
|
|
print(f"Timestamp {timestamp}")
|
|
print(f"Daily Cumm : {dailycumm}")
|
|
print(f"Hourly : {hourlycumm}\n")
|
|
|
|
|
|
#Water Level Data Insertion Into Database
|
|
#Check waterlevel is Not Null
|
|
#If water level is Null. Skip this process
|
|
|
|
if waterlevel != None :
|
|
|
|
#Execute Query for check data is already available or not in waterlevel Table
|
|
#TABLE : WATERLEVEL
|
|
#COLUMN : stationid,datetime
|
|
#INPUT : station_id,datetime_object
|
|
#OUTPUT : -
|
|
|
|
cursor.execute("""
|
|
SELECT COUNT(*)
|
|
FROM waterlevel
|
|
WHERE stationid = %s AND datetime = %s
|
|
""", (station_id, datetime_object))
|
|
record_exists = cursor.fetchone()[0] > 0
|
|
|
|
|
|
# Check If Record is exist
|
|
# If record not exist execute INSERT waterlevel data into waterlevel Table
|
|
if not record_exists:
|
|
|
|
#Execute Insert Query into waterlevel table
|
|
#TABLE : WATERLEVEL
|
|
#COLUMN : stationid,datetime,waterlevel,alert,warning,danger
|
|
#INPUT : station_id,datetime_object,waterlevel,wlalert,wlwarn,wldgr
|
|
#OUTPUT : -
|
|
cursor.execute("""
|
|
INSERT INTO waterlevel (stationid,datetime, waterlevel, alert, warning,danger)
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
""", (station_id, datetime_object,waterlevel, wlalert, wlwarn, wldgr))
|
|
conn.commit()
|
|
|
|
# If Water Level Threshold Trigger
|
|
if waterlevel >= wlalert :
|
|
|
|
#Water Level Level Declaration
|
|
if waterlevel >= wlalert and waterlevel < wlwarn :
|
|
level = 'Alert'
|
|
elif waterlevel >= wlwarn and waterlevel < wldgr :
|
|
level = 'Warning'
|
|
elif waterlevel >= wldgr:
|
|
level = 'Danger'
|
|
else :
|
|
level = 'Normal'
|
|
|
|
#Execute Query for check data is already available or not in notification Table
|
|
#TABLE : NOTIFICATION
|
|
#COLUMN : stationid,timestamp,stationtype
|
|
#INPUT : station_id,datetime_object,2
|
|
#OUTPUT : -
|
|
|
|
cursor.execute("""
|
|
SELECT COUNT(*)
|
|
FROM notification
|
|
WHERE stationid = %s AND timestamp = %s AND stationtype = %s
|
|
""", (station_id, datetime_object,2))
|
|
record_exists = cursor.fetchone()[0] > 0
|
|
|
|
|
|
# Check If Record is exist
|
|
# If record not exist execute INSERT waterlevel data that trigger threshold notification Table
|
|
if not record_exists:
|
|
|
|
#Execute Insert Query into notification table
|
|
#stationtype = 2 because waterlevel data
|
|
|
|
#TABLE : NOTIFICATION
|
|
#COLUMN : stationid,timestamp,stationtype,level,active_time
|
|
#INPUT : station_id,datetime_object,2,level,datetime_object
|
|
#OUTPUT : -
|
|
|
|
cursor.execute("""
|
|
INSERT INTO notification (stationid,timestamp, stationtype, level,active_time)
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
""", (station_id, datetime_object,2, level,datetime_object))
|
|
conn.commit()
|
|
|
|
|
|
#Call Alert Function with station id , level , stationtype as parameter
|
|
#Send Status Station Detail to laravel API for Alert Notification
|
|
send_alert_to_laravel(station_id,level,2)
|
|
print(f"Station ID {station_id}")
|
|
print(f"Timestamp {timestamp}")
|
|
print(f"Water Level : {waterlevel}")
|
|
print(f"Danger : {wldgr}")
|
|
print(f"Warning : {wlwarn}\n")
|
|
|
|
|
|
#Siren Data Insertion Into Database
|
|
#Check sirenid variable is Not Null
|
|
#If sirenid is Null. Skip this process
|
|
|
|
if sirenid != None:
|
|
|
|
#Execute Query for check data is already available or not in siren Table
|
|
#TABLE : SIREN
|
|
#COLUMN : stationid,active_time
|
|
#INPUT : station_id,datetime_object
|
|
#OUTPUT : -
|
|
|
|
cursor.execute("""
|
|
SELECT COUNT(*)
|
|
FROM siren
|
|
WHERE stationid = %s AND active_time = %s
|
|
""", (station_id, datetime_object))
|
|
record_exists = cursor.fetchone()[0] > 0
|
|
|
|
|
|
#Siren level Declaration
|
|
if siren == 'H' :
|
|
level = 'Danger'
|
|
elif siren == 'L':
|
|
level = 'Warning'
|
|
else :
|
|
level = 'Normal'
|
|
|
|
|
|
# check siren condition
|
|
# If Siren is Normal
|
|
|
|
if siren == 'N':
|
|
|
|
# Check If Record is exist
|
|
# If record not exist execute INSERT siren data into siren Table
|
|
|
|
if not record_exists:
|
|
|
|
#TABLE : SIREN
|
|
#COLUMN : stationid,active_time,level
|
|
#INPUT : station_id,datetime_object,siren
|
|
#OUTPUT : -
|
|
|
|
cursor.execute("""
|
|
INSERT INTO siren (stationid, stationtype, active_time, level)
|
|
VALUES (%s, %s, %s, %s)
|
|
""", (station_id, 3, datetime_object, siren))
|
|
conn.commit()
|
|
|
|
|
|
#Print Siren Details
|
|
print(f"Station ID {station_id}")
|
|
print(f"SRID {sirenid}")
|
|
print(f"Timestamp {timestamp}")
|
|
print(f"Alarm : {siren}\n")
|
|
print(f"Level : {level}\n")
|
|
|
|
#If siren not Normal
|
|
elif siren != 'N' :
|
|
|
|
# Check If Record is exist
|
|
# If record not exist execute INSERT siren data into siren Table
|
|
|
|
if not record_exists:
|
|
|
|
|
|
#Execute Insert Query into siren table
|
|
#TABLE : SIREN
|
|
#COLUMN : stationid,active_time,level
|
|
#INPUT : station_id,datetime_object,siren
|
|
#OUTPUT : -
|
|
|
|
cursor.execute("""
|
|
INSERT INTO siren (stationid, stationtype, active_time, level)
|
|
VALUES (%s, %s, %s, %s)
|
|
""", (station_id, 3, datetime_object, siren))
|
|
conn.commit()
|
|
|
|
|
|
|
|
#Print Siren Table
|
|
print(f"Station ID {station_id}")
|
|
print(f"SRID {sirenid}")
|
|
print(f"Timestamp {timestamp}")
|
|
print(f"Alarm : {siren}\n")
|
|
|
|
|
|
|
|
#Send Station Alert To API When Siren Level is not normal
|
|
#Call Alert Function with station id , level , stationtype as parameter
|
|
#Send Status Station Detail to laravel API for Alert Notification
|
|
|
|
if level != 'Normal' :
|
|
send_alert_to_laravel(station_id,level,3)
|
|
|
|
except Exception as e:
|
|
|
|
conn.rollback()
|
|
|
|
#Move decode folder into ERROR folder files/SIDES/ERROR
|
|
#move_to_error_folder(filename,line,error_folder="/"+ftp_root_folder+"/ERROR")
|
|
print(f"Error processing line: {e}")
|
|
|
|
|
|
#Start Process FTP File
|
|
try:
|
|
|
|
#Folder File List Variable Declaration
|
|
files = ftp.nlst()
|
|
|
|
#Loop Through the Folder/Files
|
|
for file_name in files:
|
|
|
|
#Skip Tideda File
|
|
if "rf" in file_name.lower():
|
|
#print(f"Skipping file: {file_name} (contains 'rf')")
|
|
continue
|
|
|
|
#Skip Others Day File
|
|
if today_str not in file_name:
|
|
#print(f"Skipping file: {file_name} (not from today {today_str})")
|
|
continue
|
|
|
|
print(f"Processing file: {file_name}")
|
|
all_lines = []
|
|
|
|
#Retrieve the files as a text , line by line
|
|
ftp.retrlines(
|
|
f"RETR {file_name}",
|
|
lambda line: (all_lines.append(line), process_line(line, file_name))
|
|
)
|
|
file_content = "\n".join(all_lines)
|
|
|
|
#Uncomment if needed
|
|
#Move decode folder into success folder files/SIDES/SUCCESS/year/month/day
|
|
|
|
#move_to_success_folder(file_name,file_content,success_folder="/"+ftp_root_folder+"/SUCCESS/"+year+"/"+month+"/"+day)
|
|
|
|
except Exception as e:
|
|
print(f"Failed to process folder: {e}")
|
|
|
|
#Uncomment this for test Alert Notification
|
|
# send_alert_to_laravel('KBLG0026','Warning',1)
|
|
# send_alert_to_laravel('KBLG0026','Danger',2)
|
|
# send_alert_to_laravel('KBLG0031','Warning',3)
|
|
|
|
# Close connections
|
|
ftp.quit()
|
|
cursor.close()
|
|
conn.close()
|