from ftplib import FTP from datetime import datetime from datetime import date from io import BytesIO import os import time import psycopg2 import requests today = date.today() day = f"{today.day:02}" month = f"{today.month:02}" year = str(today.year) today_str_file = datetime.today().strftime('%d%m%Y') ftp_server = os.environ.get("FTP_SERVER", "myvscada.com") ftp_username = os.environ.get("FTP_USERNAME", "tck") ftp_password = os.environ.get("FTP_PASSWORD", "") ftp_root_folder = f"files/SIDES/SUCCESS/{year}/{month}/{day}" pg_host = os.environ.get("PG_HOST", "postgres") pg_database = os.environ.get("PG_DATABASE", "sides_db") pg_user = os.environ.get("PG_USER", "tck") pg_password = os.environ.get("PG_PASSWORD", "") # Connect to PostgreSQL conn = psycopg2.connect( host=pg_host, database=pg_database, user=pg_user, password=pg_password ) cursor = conn.cursor() # Connect to FTP ftp = FTP(ftp_server) ftp.login(user=ftp_username, passwd=ftp_password) ftp.cwd(ftp_root_folder) # Get today's date in yymmdd format today_str = datetime.today().strftime('%y%m%d') # List all subfolders in /files/JKR folders = ftp.nlst() #Move File to Error Folder Function def move_to_error_folder(filename,filecontent,error_folder="/"+ftp_root_folder+"/ERROR/"): with FTP(ftp_server) as ftp: ftp.login(ftp_username, ftp_password) parts = error_folder.strip("/").split("/") current_path = '' for part in parts : current_path += "/" + part try : ftp.mkd(current_path) except Exception : pass ftp.cwd(error_folder) if isinstance(filecontent,str) : filecontent = filecontent.encode("utf-8") bio = BytesIO(filecontent) ftp.storbinary(f"STOR {filename}",bio) bio.close() print(f"⚠️ Moved {file_name} into {error_folder}") try: ftp.cwd('..') ftp.delete(filename) print(f"🗑️ Removed {file_name} from {ftp_root_folder}") except Exception as e: print(f"⚠️ Could not remove {file_name} from {ftp_root_folder}: {e}") #Move File to Success Folder Function def move_to_success_folder(filename,filecontent,success_folder="/"+ftp_root_folder+"/SUCCESS/"): with FTP(ftp_server) as ftp : ftp.login(ftp_username,ftp_password) parts = success_folder.strip("/").split("/") current_path ="" for part in parts : current_path += "/" + part try : ftp.mkd(current_path) print(f"Created Folder : {current_path}") except Exception: pass ftp.cwd(success_folder) if isinstance(filecontent,str): filecontent = filecontent.encode("utf-8") with open("temp_upload.tmp", "wb") as temp_file : temp_file.write(filecontent) with open("temp_upload.tmp", "rb") as temp_file: ftp.storbinary(f"STOR {filename}",temp_file) os.remove("temp_upload.tmp") print(f"Uploaded {filename} to {success_folder}") try : ftp.cwd(f"/{ftp_root_folder}") ftp.delete(filename) print(f"🗑️ Removed {file_name} from {ftp_root_folder}") except Exception as e: print(f"⚠️ Could not remove {file_name} from {ftp_root_folder}: {e}") #Send Alert Function def send_alert_to_laravel(stationid,level,stationtype): #Each parameter variable is from decoded process in Function process_line() #Data will send to SIDES API in src/app/Http/Controllers/Api/AlertController.php payload = { "stationid" : stationid, "level" : level, "stationtype" :stationtype, } #Request and send the payload data to API URL try: response = requests.post("https://sides.tck.com.my/api/alert",json=payload,timeout=5) print("Alert Sent:" ,response.status_code) except Exception as e: print("Failed to send Alert",e) # Define processing function once def process_line(line,filename=None): # Strip each ',' seperator in file line columns = line.strip().split(",") #Check Columns Length each line if len(columns) < 25: print(f"Skipping malformed line: {line}") #Uncomment If needed # try : # #Move decode folder into ERROR folder files/SIDES/ERROR # #move_to_error_folder(filename,line,error_folder="/"+ftp_root_folder+"/ERROR") # except Exception as e : # print(f"⚠️ Failed to move malformed line to ERROR folder: {e}") return # Variable Declaration based on CSV Column Data #Common Data Variable that needed station_id = columns[1] timestamp = columns[4] #Siren Data Variable sirenid = columns[18] siren = columns[19] #Rainfall Variable anncumm = float(columns[21]) if columns[21] else None dailycumm = float(columns[22]) if columns[22] else None hourlycumm = float(columns[23]) if columns[23] else None currrf = float(columns[24]) if columns[24] else None #Water Level Variable waterlevel = float(columns[36]) if columns[36] else None #Water Level Threshold Variable wldgr = float(columns[17]) if columns[17] else None wlwarn = float(columns[16]) if columns[16] else None wlalert = float(columns[15]) if columns[15] else None #Battery Variable battery = float(columns[6]) if columns[6] else None try: #Timestamp Format datetime_object = datetime.strptime(timestamp, "%y%m%d%H%M%S") #Uncomment If Using Separate Date And Time Column # date_part = datetime_object.strftime("%Y-%m-%d") # time_part = datetime_object.strftime("%H:%M:%S") # date_part2 = datetime_object.date() #Rainfall Data Insertion Into Database #Check dailycumm variable Or hourtycumm variable is Not Null #If either dailycumm or hourlycum, is Null. Skip this process if dailycumm != None or hourlycumm != None : #Execute Query for check data is already available or not in rainfall Table #TABLE : RAINFALL #COLUMN : stationid,timestamp #INPUT : station_id,datetime_object #OUTPUT : - cursor.execute(""" SELECT COUNT(*) FROM rainfall WHERE stationid = %s AND timestamp = %s """, (station_id, datetime_object)) #Delcarable Variable Record Exists Condition record_exists = cursor.fetchone()[0] > 0 # Check If Record is exist # If record not exist execute INSERT rainfall data into rainfall Table if not record_exists: #Execute Insert Query into rainfall table #TABLE : RAINFALL #COLUMN : stationid,timestamp,anncum,daily,hourly,currentrf,battery #INPUT : station_id ,date_object,anncumm,dailycumm,hourlycumm,currrf,battery #OUTPUT : - cursor.execute(""" INSERT INTO rainfall (stationid,timestamp, anncum, daily, hourly, currentrf,battery) VALUES (%s, %s, %s, %s, %s, %s, %s) """, (station_id, datetime_object,anncumm, dailycumm, hourlycumm, currrf,battery)) conn.commit() # If Rainfall Threshold Warning Threshold if hourlycumm >= 30 : #Threshold Level Declarion if hourlycumm >= 30 and hourlycumm < 60 : level = 'Warning' elif hourlycumm >= 60 : level = 'Danger' else : level = 'Normal' #Execute Query for check data is already available or not in notification Table #TABLE : NOTIFICATION #COLUMN : stationid,timestamp,stationtype #INPUT : station_id,datetime_object,1 #OUTPUT : - cursor.execute(""" SELECT COUNT(*) FROM notification WHERE stationid = %s AND timestamp = %s AND stationtype = %s """, (station_id, datetime_object,1)) record_exists = cursor.fetchone()[0] > 0 # Check If Record is exist # If record not exist execute INSERT rainfall data that trigger threshold into notification Table if not record_exists: #Execute Insert Query into notification table #stationtype = 1 because rainfall data #TABLE : NOTIFICATION #COLUMN : stationid,timestamp.stationtype,level,active_time #INPUT : station_id,datetime_object,1,level,datetime_object #OUTPUT : - cursor.execute(""" INSERT INTO notification (stationid,timestamp, stationtype, level,active_time) VALUES (%s, %s, %s, %s, %s) """, (station_id, datetime_object,1, level,datetime_object)) conn.commit() #Call Alert Function with station id , level , stationtype as parameter #Send Status Station Detail to laravel API for Alert Notification send_alert_to_laravel(station_id,level,1) #Print Rainfall Details print(f"Station ID {station_id}") print(f"Timestamp {timestamp}") print(f"Daily Cumm : {dailycumm}") print(f"Hourly : {hourlycumm}\n") #Water Level Data Insertion Into Database #Check waterlevel is Not Null #If water level is Null. Skip this process if waterlevel != None : #Execute Query for check data is already available or not in waterlevel Table #TABLE : WATERLEVEL #COLUMN : stationid,datetime #INPUT : station_id,datetime_object #OUTPUT : - cursor.execute(""" SELECT COUNT(*) FROM waterlevel WHERE stationid = %s AND datetime = %s """, (station_id, datetime_object)) record_exists = cursor.fetchone()[0] > 0 # Check If Record is exist # If record not exist execute INSERT waterlevel data into waterlevel Table if not record_exists: #Execute Insert Query into waterlevel table #TABLE : WATERLEVEL #COLUMN : stationid,datetime,waterlevel,alert,warning,danger #INPUT : station_id,datetime_object,waterlevel,wlalert,wlwarn,wldgr #OUTPUT : - cursor.execute(""" INSERT INTO waterlevel (stationid,datetime, waterlevel, alert, warning,danger) VALUES (%s, %s, %s, %s, %s, %s) """, (station_id, datetime_object,waterlevel, wlalert, wlwarn, wldgr)) conn.commit() # If Water Level Threshold Trigger if waterlevel >= wlalert : #Water Level Level Declaration if waterlevel >= wlalert and waterlevel < wlwarn : level = 'Alert' elif waterlevel >= wlwarn and waterlevel < wldgr : level = 'Warning' elif waterlevel >= wldgr: level = 'Danger' else : level = 'Normal' #Execute Query for check data is already available or not in notification Table #TABLE : NOTIFICATION #COLUMN : stationid,timestamp,stationtype #INPUT : station_id,datetime_object,2 #OUTPUT : - cursor.execute(""" SELECT COUNT(*) FROM notification WHERE stationid = %s AND timestamp = %s AND stationtype = %s """, (station_id, datetime_object,2)) record_exists = cursor.fetchone()[0] > 0 # Check If Record is exist # If record not exist execute INSERT waterlevel data that trigger threshold notification Table if not record_exists: #Execute Insert Query into notification table #stationtype = 2 because waterlevel data #TABLE : NOTIFICATION #COLUMN : stationid,timestamp,stationtype,level,active_time #INPUT : station_id,datetime_object,2,level,datetime_object #OUTPUT : - cursor.execute(""" INSERT INTO notification (stationid,timestamp, stationtype, level,active_time) VALUES (%s, %s, %s, %s, %s) """, (station_id, datetime_object,2, level,datetime_object)) conn.commit() #Call Alert Function with station id , level , stationtype as parameter #Send Status Station Detail to laravel API for Alert Notification send_alert_to_laravel(station_id,level,2) print(f"Station ID {station_id}") print(f"Timestamp {timestamp}") print(f"Water Level : {waterlevel}") print(f"Danger : {wldgr}") print(f"Warning : {wlwarn}\n") #Siren Data Insertion Into Database #Check sirenid variable is Not Null #If sirenid is Null. Skip this process if sirenid != None: #Execute Query for check data is already available or not in siren Table #TABLE : SIREN #COLUMN : stationid,active_time #INPUT : station_id,datetime_object #OUTPUT : - cursor.execute(""" SELECT COUNT(*) FROM siren WHERE stationid = %s AND active_time = %s """, (station_id, datetime_object)) record_exists = cursor.fetchone()[0] > 0 #Siren level Declaration if siren == 'H' : level = 'Danger' elif siren == 'L': level = 'Warning' else : level = 'Normal' # check siren condition # If Siren is Normal if siren == 'N': # Check If Record is exist # If record not exist execute INSERT siren data into siren Table if not record_exists: #TABLE : SIREN #COLUMN : stationid,active_time,level #INPUT : station_id,datetime_object,siren #OUTPUT : - cursor.execute(""" INSERT INTO siren (stationid, stationtype, active_time, level) VALUES (%s, %s, %s, %s) """, (station_id, 3, datetime_object, siren)) conn.commit() #Print Siren Details print(f"Station ID {station_id}") print(f"SRID {sirenid}") print(f"Timestamp {timestamp}") print(f"Alarm : {siren}\n") print(f"Level : {level}\n") #If siren not Normal elif siren != 'N' : # Check If Record is exist # If record not exist execute INSERT siren data into siren Table if not record_exists: #Execute Insert Query into siren table #TABLE : SIREN #COLUMN : stationid,active_time,level #INPUT : station_id,datetime_object,siren #OUTPUT : - cursor.execute(""" INSERT INTO siren (stationid, stationtype, active_time, level) VALUES (%s, %s, %s, %s) """, (station_id, 3, datetime_object, siren)) conn.commit() #Print Siren Table print(f"Station ID {station_id}") print(f"SRID {sirenid}") print(f"Timestamp {timestamp}") print(f"Alarm : {siren}\n") #Send Station Alert To API When Siren Level is not normal #Call Alert Function with station id , level , stationtype as parameter #Send Status Station Detail to laravel API for Alert Notification if level != 'Normal' : send_alert_to_laravel(station_id,level,3) except Exception as e: conn.rollback() #Move decode folder into ERROR folder files/SIDES/ERROR #move_to_error_folder(filename,line,error_folder="/"+ftp_root_folder+"/ERROR") print(f"Error processing line: {e}") #Start Process FTP File try: #Folder File List Variable Declaration files = ftp.nlst() #Loop Through the Folder/Files for file_name in files: #Skip Tideda File if "rf" in file_name.lower(): #print(f"Skipping file: {file_name} (contains 'rf')") continue #Skip Others Day File if today_str not in file_name: #print(f"Skipping file: {file_name} (not from today {today_str})") continue print(f"Processing file: {file_name}") all_lines = [] #Retrieve the files as a text , line by line ftp.retrlines( f"RETR {file_name}", lambda line: (all_lines.append(line), process_line(line, file_name)) ) file_content = "\n".join(all_lines) #Uncomment if needed #Move decode folder into success folder files/SIDES/SUCCESS/year/month/day #move_to_success_folder(file_name,file_content,success_folder="/"+ftp_root_folder+"/SUCCESS/"+year+"/"+month+"/"+day) except Exception as e: print(f"Failed to process folder: {e}") #Uncomment this for test Alert Notification # send_alert_to_laravel('KBLG0026','Warning',1) # send_alert_to_laravel('KBLG0026','Danger',2) # send_alert_to_laravel('KBLG0031','Warning',3) # Close connections ftp.quit() cursor.close() conn.close()