fix: seeder idempotent with firstOrCreate
Use firstOrCreate instead of create so db:seed can run safely on container restart without duplicate key violation.
This commit is contained in:
530
autoscript/sidesdecode.py
Normal file
530
autoscript/sidesdecode.py
Normal file
@@ -0,0 +1,530 @@
|
||||
from ftplib import FTP
|
||||
from datetime import datetime
|
||||
from datetime import date
|
||||
from io import BytesIO
|
||||
import os
|
||||
import time
|
||||
import psycopg2
|
||||
import requests
|
||||
|
||||
today = date.today()
|
||||
day = f"{today.day:02}"
|
||||
month = f"{today.month:02}"
|
||||
year = str(today.year)
|
||||
|
||||
today_str_file = datetime.today().strftime('%d%m%Y')
|
||||
|
||||
ftp_server = os.environ.get("FTP_SERVER", "myvscada.com")
|
||||
ftp_username = os.environ.get("FTP_USERNAME", "tck")
|
||||
ftp_password = os.environ.get("FTP_PASSWORD", "")
|
||||
ftp_root_folder = f"files/SIDES/SUCCESS/{year}/{month}/{day}"
|
||||
|
||||
pg_host = os.environ.get("PG_HOST", "postgres")
|
||||
pg_database = os.environ.get("PG_DATABASE", "sides_db")
|
||||
pg_user = os.environ.get("PG_USER", "tck")
|
||||
pg_password = os.environ.get("PG_PASSWORD", "")
|
||||
|
||||
# Connect to PostgreSQL
|
||||
conn = psycopg2.connect(
|
||||
host=pg_host,
|
||||
database=pg_database,
|
||||
user=pg_user,
|
||||
password=pg_password
|
||||
)
|
||||
|
||||
|
||||
cursor = conn.cursor()
|
||||
|
||||
|
||||
|
||||
# Connect to FTP
|
||||
ftp = FTP(ftp_server)
|
||||
ftp.login(user=ftp_username, passwd=ftp_password)
|
||||
ftp.cwd(ftp_root_folder)
|
||||
|
||||
# Get today's date in yymmdd format
|
||||
today_str = datetime.today().strftime('%y%m%d')
|
||||
|
||||
# List all subfolders in /files/JKR
|
||||
folders = ftp.nlst()
|
||||
|
||||
#Move File to Error Folder Function
|
||||
def move_to_error_folder(filename,filecontent,error_folder="/"+ftp_root_folder+"/ERROR/"):
|
||||
with FTP(ftp_server) as ftp:
|
||||
ftp.login(ftp_username, ftp_password)
|
||||
parts = error_folder.strip("/").split("/")
|
||||
current_path = ''
|
||||
for part in parts :
|
||||
current_path += "/" + part
|
||||
try :
|
||||
ftp.mkd(current_path)
|
||||
except Exception :
|
||||
pass
|
||||
ftp.cwd(error_folder)
|
||||
if isinstance(filecontent,str) :
|
||||
filecontent = filecontent.encode("utf-8")
|
||||
|
||||
bio = BytesIO(filecontent)
|
||||
ftp.storbinary(f"STOR {filename}",bio)
|
||||
bio.close()
|
||||
print(f"⚠️ Moved {file_name} into {error_folder}")
|
||||
|
||||
|
||||
try:
|
||||
ftp.cwd('..')
|
||||
ftp.delete(filename)
|
||||
print(f"🗑️ Removed {file_name} from {ftp_root_folder}")
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not remove {file_name} from {ftp_root_folder}: {e}")
|
||||
|
||||
#Move File to Success Folder Function
|
||||
def move_to_success_folder(filename,filecontent,success_folder="/"+ftp_root_folder+"/SUCCESS/"):
|
||||
with FTP(ftp_server) as ftp :
|
||||
ftp.login(ftp_username,ftp_password)
|
||||
|
||||
|
||||
parts = success_folder.strip("/").split("/")
|
||||
current_path =""
|
||||
for part in parts :
|
||||
current_path += "/" + part
|
||||
try :
|
||||
ftp.mkd(current_path)
|
||||
print(f"Created Folder : {current_path}")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
ftp.cwd(success_folder)
|
||||
|
||||
if isinstance(filecontent,str):
|
||||
filecontent = filecontent.encode("utf-8")
|
||||
|
||||
with open("temp_upload.tmp", "wb") as temp_file :
|
||||
temp_file.write(filecontent)
|
||||
|
||||
with open("temp_upload.tmp", "rb") as temp_file:
|
||||
ftp.storbinary(f"STOR {filename}",temp_file)
|
||||
|
||||
os.remove("temp_upload.tmp")
|
||||
print(f"Uploaded {filename} to {success_folder}")
|
||||
|
||||
try :
|
||||
ftp.cwd(f"/{ftp_root_folder}")
|
||||
ftp.delete(filename)
|
||||
print(f"🗑️ Removed {file_name} from {ftp_root_folder}")
|
||||
except Exception as e:
|
||||
print(f"⚠️ Could not remove {file_name} from {ftp_root_folder}: {e}")
|
||||
|
||||
#Send Alert Function
|
||||
def send_alert_to_laravel(stationid,level,stationtype):
|
||||
|
||||
#Each parameter variable is from decoded process in Function process_line()
|
||||
#Data will send to SIDES API in src/app/Http/Controllers/Api/AlertController.php
|
||||
payload = {
|
||||
"stationid" : stationid,
|
||||
"level" : level,
|
||||
"stationtype" :stationtype,
|
||||
}
|
||||
|
||||
#Request and send the payload data to API URL
|
||||
try:
|
||||
response = requests.post("https://sides.tck.com.my/api/alert",json=payload,timeout=5)
|
||||
print("Alert Sent:" ,response.status_code)
|
||||
except Exception as e:
|
||||
print("Failed to send Alert",e)
|
||||
|
||||
|
||||
# Define processing function once
|
||||
def process_line(line,filename=None):
|
||||
|
||||
# Strip each ',' seperator in file line
|
||||
columns = line.strip().split(",")
|
||||
|
||||
#Check Columns Length each line
|
||||
if len(columns) < 25:
|
||||
print(f"Skipping malformed line: {line}")
|
||||
|
||||
#Uncomment If needed
|
||||
# try :
|
||||
# #Move decode folder into ERROR folder files/SIDES/ERROR
|
||||
# #move_to_error_folder(filename,line,error_folder="/"+ftp_root_folder+"/ERROR")
|
||||
|
||||
# except Exception as e :
|
||||
# print(f"⚠️ Failed to move malformed line to ERROR folder: {e}")
|
||||
return
|
||||
|
||||
|
||||
# Variable Declaration based on CSV Column Data
|
||||
|
||||
#Common Data Variable that needed
|
||||
station_id = columns[1]
|
||||
timestamp = columns[4]
|
||||
|
||||
#Siren Data Variable
|
||||
sirenid = columns[18]
|
||||
siren = columns[19]
|
||||
|
||||
#Rainfall Variable
|
||||
anncumm = float(columns[21]) if columns[21] else None
|
||||
dailycumm = float(columns[22]) if columns[22] else None
|
||||
hourlycumm = float(columns[23]) if columns[23] else None
|
||||
currrf = float(columns[24]) if columns[24] else None
|
||||
|
||||
#Water Level Variable
|
||||
waterlevel = float(columns[36]) if columns[36] else None
|
||||
|
||||
#Water Level Threshold Variable
|
||||
wldgr = float(columns[17]) if columns[17] else None
|
||||
wlwarn = float(columns[16]) if columns[16] else None
|
||||
wlalert = float(columns[15]) if columns[15] else None
|
||||
|
||||
#Battery Variable
|
||||
battery = float(columns[6]) if columns[6] else None
|
||||
|
||||
|
||||
|
||||
|
||||
try:
|
||||
|
||||
#Timestamp Format
|
||||
datetime_object = datetime.strptime(timestamp, "%y%m%d%H%M%S")
|
||||
|
||||
#Uncomment If Using Separate Date And Time Column
|
||||
# date_part = datetime_object.strftime("%Y-%m-%d")
|
||||
# time_part = datetime_object.strftime("%H:%M:%S")
|
||||
|
||||
# date_part2 = datetime_object.date()
|
||||
|
||||
|
||||
#Rainfall Data Insertion Into Database
|
||||
#Check dailycumm variable Or hourtycumm variable is Not Null
|
||||
#If either dailycumm or hourlycum, is Null. Skip this process
|
||||
|
||||
if dailycumm != None or hourlycumm != None :
|
||||
|
||||
#Execute Query for check data is already available or not in rainfall Table
|
||||
#TABLE : RAINFALL
|
||||
#COLUMN : stationid,timestamp
|
||||
#INPUT : station_id,datetime_object
|
||||
#OUTPUT : -
|
||||
|
||||
cursor.execute("""
|
||||
SELECT COUNT(*)
|
||||
FROM rainfall
|
||||
WHERE stationid = %s AND timestamp = %s
|
||||
""", (station_id, datetime_object))
|
||||
|
||||
#Delcarable Variable Record Exists Condition
|
||||
record_exists = cursor.fetchone()[0] > 0
|
||||
|
||||
|
||||
# Check If Record is exist
|
||||
# If record not exist execute INSERT rainfall data into rainfall Table
|
||||
if not record_exists:
|
||||
|
||||
#Execute Insert Query into rainfall table
|
||||
#TABLE : RAINFALL
|
||||
#COLUMN : stationid,timestamp,anncum,daily,hourly,currentrf,battery
|
||||
#INPUT : station_id ,date_object,anncumm,dailycumm,hourlycumm,currrf,battery
|
||||
#OUTPUT : -
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO rainfall (stationid,timestamp, anncum, daily, hourly, currentrf,battery)
|
||||
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
||||
""", (station_id, datetime_object,anncumm, dailycumm, hourlycumm, currrf,battery))
|
||||
conn.commit()
|
||||
|
||||
# If Rainfall Threshold Warning Threshold
|
||||
if hourlycumm >= 30 :
|
||||
|
||||
#Threshold Level Declarion
|
||||
if hourlycumm >= 30 and hourlycumm < 60 :
|
||||
level = 'Warning'
|
||||
elif hourlycumm >= 60 :
|
||||
level = 'Danger'
|
||||
else :
|
||||
level = 'Normal'
|
||||
|
||||
#Execute Query for check data is already available or not in notification Table
|
||||
#TABLE : NOTIFICATION
|
||||
#COLUMN : stationid,timestamp,stationtype
|
||||
#INPUT : station_id,datetime_object,1
|
||||
#OUTPUT : -
|
||||
|
||||
cursor.execute("""
|
||||
SELECT COUNT(*)
|
||||
FROM notification
|
||||
WHERE stationid = %s AND timestamp = %s AND stationtype = %s
|
||||
""", (station_id, datetime_object,1))
|
||||
|
||||
record_exists = cursor.fetchone()[0] > 0
|
||||
|
||||
# Check If Record is exist
|
||||
# If record not exist execute INSERT rainfall data that trigger threshold into notification Table
|
||||
if not record_exists:
|
||||
|
||||
#Execute Insert Query into notification table
|
||||
#stationtype = 1 because rainfall data
|
||||
#TABLE : NOTIFICATION
|
||||
#COLUMN : stationid,timestamp.stationtype,level,active_time
|
||||
#INPUT : station_id,datetime_object,1,level,datetime_object
|
||||
#OUTPUT : -
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO notification (stationid,timestamp, stationtype, level,active_time)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
""", (station_id, datetime_object,1, level,datetime_object))
|
||||
conn.commit()
|
||||
|
||||
#Call Alert Function with station id , level , stationtype as parameter
|
||||
#Send Status Station Detail to laravel API for Alert Notification
|
||||
send_alert_to_laravel(station_id,level,1)
|
||||
|
||||
#Print Rainfall Details
|
||||
print(f"Station ID {station_id}")
|
||||
print(f"Timestamp {timestamp}")
|
||||
print(f"Daily Cumm : {dailycumm}")
|
||||
print(f"Hourly : {hourlycumm}\n")
|
||||
|
||||
|
||||
#Water Level Data Insertion Into Database
|
||||
#Check waterlevel is Not Null
|
||||
#If water level is Null. Skip this process
|
||||
|
||||
if waterlevel != None :
|
||||
|
||||
#Execute Query for check data is already available or not in waterlevel Table
|
||||
#TABLE : WATERLEVEL
|
||||
#COLUMN : stationid,datetime
|
||||
#INPUT : station_id,datetime_object
|
||||
#OUTPUT : -
|
||||
|
||||
cursor.execute("""
|
||||
SELECT COUNT(*)
|
||||
FROM waterlevel
|
||||
WHERE stationid = %s AND datetime = %s
|
||||
""", (station_id, datetime_object))
|
||||
record_exists = cursor.fetchone()[0] > 0
|
||||
|
||||
|
||||
# Check If Record is exist
|
||||
# If record not exist execute INSERT waterlevel data into waterlevel Table
|
||||
if not record_exists:
|
||||
|
||||
#Execute Insert Query into waterlevel table
|
||||
#TABLE : WATERLEVEL
|
||||
#COLUMN : stationid,datetime,waterlevel,alert,warning,danger
|
||||
#INPUT : station_id,datetime_object,waterlevel,wlalert,wlwarn,wldgr
|
||||
#OUTPUT : -
|
||||
cursor.execute("""
|
||||
INSERT INTO waterlevel (stationid,datetime, waterlevel, alert, warning,danger)
|
||||
VALUES (%s, %s, %s, %s, %s, %s)
|
||||
""", (station_id, datetime_object,waterlevel, wlalert, wlwarn, wldgr))
|
||||
conn.commit()
|
||||
|
||||
# If Water Level Threshold Trigger
|
||||
if waterlevel >= wlalert :
|
||||
|
||||
#Water Level Level Declaration
|
||||
if waterlevel >= wlalert and waterlevel < wlwarn :
|
||||
level = 'Alert'
|
||||
elif waterlevel >= wlwarn and waterlevel < wldgr :
|
||||
level = 'Warning'
|
||||
elif waterlevel >= wldgr:
|
||||
level = 'Danger'
|
||||
else :
|
||||
level = 'Normal'
|
||||
|
||||
#Execute Query for check data is already available or not in notification Table
|
||||
#TABLE : NOTIFICATION
|
||||
#COLUMN : stationid,timestamp,stationtype
|
||||
#INPUT : station_id,datetime_object,2
|
||||
#OUTPUT : -
|
||||
|
||||
cursor.execute("""
|
||||
SELECT COUNT(*)
|
||||
FROM notification
|
||||
WHERE stationid = %s AND timestamp = %s AND stationtype = %s
|
||||
""", (station_id, datetime_object,2))
|
||||
record_exists = cursor.fetchone()[0] > 0
|
||||
|
||||
|
||||
# Check If Record is exist
|
||||
# If record not exist execute INSERT waterlevel data that trigger threshold notification Table
|
||||
if not record_exists:
|
||||
|
||||
#Execute Insert Query into notification table
|
||||
#stationtype = 2 because waterlevel data
|
||||
|
||||
#TABLE : NOTIFICATION
|
||||
#COLUMN : stationid,timestamp,stationtype,level,active_time
|
||||
#INPUT : station_id,datetime_object,2,level,datetime_object
|
||||
#OUTPUT : -
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO notification (stationid,timestamp, stationtype, level,active_time)
|
||||
VALUES (%s, %s, %s, %s, %s)
|
||||
""", (station_id, datetime_object,2, level,datetime_object))
|
||||
conn.commit()
|
||||
|
||||
|
||||
#Call Alert Function with station id , level , stationtype as parameter
|
||||
#Send Status Station Detail to laravel API for Alert Notification
|
||||
send_alert_to_laravel(station_id,level,2)
|
||||
print(f"Station ID {station_id}")
|
||||
print(f"Timestamp {timestamp}")
|
||||
print(f"Water Level : {waterlevel}")
|
||||
print(f"Danger : {wldgr}")
|
||||
print(f"Warning : {wlwarn}\n")
|
||||
|
||||
|
||||
#Siren Data Insertion Into Database
|
||||
#Check sirenid variable is Not Null
|
||||
#If sirenid is Null. Skip this process
|
||||
|
||||
if sirenid != None:
|
||||
|
||||
#Execute Query for check data is already available or not in siren Table
|
||||
#TABLE : SIREN
|
||||
#COLUMN : stationid,active_time
|
||||
#INPUT : station_id,datetime_object
|
||||
#OUTPUT : -
|
||||
|
||||
cursor.execute("""
|
||||
SELECT COUNT(*)
|
||||
FROM siren
|
||||
WHERE stationid = %s AND active_time = %s
|
||||
""", (station_id, datetime_object))
|
||||
record_exists = cursor.fetchone()[0] > 0
|
||||
|
||||
|
||||
#Siren level Declaration
|
||||
if siren == 'H' :
|
||||
level = 'Danger'
|
||||
elif siren == 'L':
|
||||
level = 'Warning'
|
||||
else :
|
||||
level = 'Normal'
|
||||
|
||||
|
||||
# check siren condition
|
||||
# If Siren is Normal
|
||||
|
||||
if siren == 'N':
|
||||
|
||||
# Check If Record is exist
|
||||
# If record not exist execute INSERT siren data into siren Table
|
||||
|
||||
if not record_exists:
|
||||
|
||||
#TABLE : SIREN
|
||||
#COLUMN : stationid,active_time,level
|
||||
#INPUT : station_id,datetime_object,siren
|
||||
#OUTPUT : -
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO siren (stationid, stationtype, active_time, level)
|
||||
VALUES (%s, %s, %s, %s)
|
||||
""", (station_id, 3, datetime_object, siren))
|
||||
conn.commit()
|
||||
|
||||
|
||||
#Print Siren Details
|
||||
print(f"Station ID {station_id}")
|
||||
print(f"SRID {sirenid}")
|
||||
print(f"Timestamp {timestamp}")
|
||||
print(f"Alarm : {siren}\n")
|
||||
print(f"Level : {level}\n")
|
||||
|
||||
#If siren not Normal
|
||||
elif siren != 'N' :
|
||||
|
||||
# Check If Record is exist
|
||||
# If record not exist execute INSERT siren data into siren Table
|
||||
|
||||
if not record_exists:
|
||||
|
||||
|
||||
#Execute Insert Query into siren table
|
||||
#TABLE : SIREN
|
||||
#COLUMN : stationid,active_time,level
|
||||
#INPUT : station_id,datetime_object,siren
|
||||
#OUTPUT : -
|
||||
|
||||
cursor.execute("""
|
||||
INSERT INTO siren (stationid, stationtype, active_time, level)
|
||||
VALUES (%s, %s, %s, %s)
|
||||
""", (station_id, 3, datetime_object, siren))
|
||||
conn.commit()
|
||||
|
||||
|
||||
|
||||
#Print Siren Table
|
||||
print(f"Station ID {station_id}")
|
||||
print(f"SRID {sirenid}")
|
||||
print(f"Timestamp {timestamp}")
|
||||
print(f"Alarm : {siren}\n")
|
||||
|
||||
|
||||
|
||||
#Send Station Alert To API When Siren Level is not normal
|
||||
#Call Alert Function with station id , level , stationtype as parameter
|
||||
#Send Status Station Detail to laravel API for Alert Notification
|
||||
|
||||
if level != 'Normal' :
|
||||
send_alert_to_laravel(station_id,level,3)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
conn.rollback()
|
||||
|
||||
#Move decode folder into ERROR folder files/SIDES/ERROR
|
||||
#move_to_error_folder(filename,line,error_folder="/"+ftp_root_folder+"/ERROR")
|
||||
print(f"Error processing line: {e}")
|
||||
|
||||
|
||||
#Start Process FTP File
|
||||
try:
|
||||
|
||||
#Folder File List Variable Declaration
|
||||
files = ftp.nlst()
|
||||
|
||||
#Loop Through the Folder/Files
|
||||
for file_name in files:
|
||||
|
||||
#Skip Tideda File
|
||||
if "rf" in file_name.lower():
|
||||
#print(f"Skipping file: {file_name} (contains 'rf')")
|
||||
continue
|
||||
|
||||
#Skip Others Day File
|
||||
if today_str not in file_name:
|
||||
#print(f"Skipping file: {file_name} (not from today {today_str})")
|
||||
continue
|
||||
|
||||
print(f"Processing file: {file_name}")
|
||||
all_lines = []
|
||||
|
||||
#Retrieve the files as a text , line by line
|
||||
ftp.retrlines(
|
||||
f"RETR {file_name}",
|
||||
lambda line: (all_lines.append(line), process_line(line, file_name))
|
||||
)
|
||||
file_content = "\n".join(all_lines)
|
||||
|
||||
#Uncomment if needed
|
||||
#Move decode folder into success folder files/SIDES/SUCCESS/year/month/day
|
||||
|
||||
#move_to_success_folder(file_name,file_content,success_folder="/"+ftp_root_folder+"/SUCCESS/"+year+"/"+month+"/"+day)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Failed to process folder: {e}")
|
||||
|
||||
#Uncomment this for test Alert Notification
|
||||
# send_alert_to_laravel('KBLG0026','Warning',1)
|
||||
# send_alert_to_laravel('KBLG0026','Danger',2)
|
||||
# send_alert_to_laravel('KBLG0031','Warning',3)
|
||||
|
||||
# Close connections
|
||||
ftp.quit()
|
||||
cursor.close()
|
||||
conn.close()
|
||||
Reference in New Issue
Block a user