Optimisation de l'autoconsommation solaire

A partir de mon installation solaire (onduleur Hoymiles) , la surproduction est envoyée dans une voiture électrique MG. Le tout est piloté par du python. Mon objectif est de rouler complètement à l'énergie solaire...


#!smiles_venv/bin/python3
from hoymiles_wifi.dtu import DTU
import asyncio
import logging
import sys
from saic_ismart_client_ng import SaicApi
from saic_ismart_client_ng.model import SaicApiConfiguration
from saic_ismart_client_ng.api.vehicle_charging.schema import TargetBatteryCode, ChargeCurrentLimitCode
import time
import numpy as np

import csv

def ajouter_ligne(etat):
    # 'a' signifie Append (ajouter à la fin)
    with open('mesures.csv', 'a', newline='', encoding='utf-8') as file:
        writer = csv.writer(file)
        writer.writerow(etat)

#pv
async def production():
	dtu = DTU("192.168.1.66")
	resp = await dtu.async_get_real_data_new()
	return resp
	
def mesure():
	try:
		data = asyncio.run(production())
		puissance = int((data.sgs_data[0].active_power+data.sgs_data[1].active_power)/10)
	except Exception as e:
		print("Erreur de connexion :", e)
		puissance = 0
	return puissance

#mg
def config():
	config = SaicApiConfiguration(
		username="********",
		password="********",
	)
	return config

async def mg_stop_charge():
	saic_api = SaicApi(config())
	await saic_api.login()
	logging.info("Auth token expires at %s", saic_api.token_expiration)
	vehicle_list_rest = await saic_api.vehicle_list()
	car = vehicle_list_rest.vinList[0]
	my_vin = car.vin
	await saic_api.control_charging(my_vin, stop_charging=True)
	info_mg(my_vin)
	
async def mg_start_charge(vitesse):
	saic_api = SaicApi(config())
	await saic_api.login()
	logging.info("Auth token expires at %s", saic_api.token_expiration)
	vehicle_list_rest = await saic_api.vehicle_list()
	car = vehicle_list_rest.vinList[0]
	my_vin = car.vin
	await saic_api.control_charging(my_vin, stop_charging=False)
	cible = {LENTE:ChargeCurrentLimitCode.C_6A,NORMALE:ChargeCurrentLimitCode.C_8A,RAPIDE:ChargeCurrentLimitCode.C_MAX}
	await saic_api.set_target_battery_soc(
		vin=my_vin,
		target_soc=TargetBatteryCode.P_100, 
		charge_current_limit=cible[vitesse]
	)

async def mg_modifie(vitesse):
	saic_api = SaicApi(config())
	await saic_api.login()
	logging.info("Auth token expires at %s", saic_api.token_expiration)
	vehicle_list_rest = await saic_api.vehicle_list()
	time.sleep(intervale//5)
	car = vehicle_list_rest.vinList[0]
	my_vin = car.vin
	cible = {LENTE:ChargeCurrentLimitCode.C_6A,NORMALE:ChargeCurrentLimitCode.C_8A,RAPIDE:ChargeCurrentLimitCode.C_MAX}
	await saic_api.set_target_battery_soc(
		vin=my_vin,
		target_soc=TargetBatteryCode.P_100, 
		charge_current_limit=cible[vitesse]
	)

async def mg_info_charge():
	saic_api = SaicApi(config())
	await saic_api.login()
	logging.info("Auth token expires at %s", saic_api.token_expiration)
	vehicle_list_rest = await saic_api.vehicle_list()
	car = vehicle_list_rest.vinList[0]	
	my_vin = car.vin
	charging_status = await saic_api.get_vehicle_charging_management_data(my_vin)
	prise = (charging_status.rvsChargeStatus.chargingGunState == 1)
	vitesse = LENTE
	ampere = 16
	if prise:
		pourcent_charge = charging_status.rvsChargeStatus.realtimePower / charging_status.rvsChargeStatus.totalBatteryCapacity *100
		if charging_status.chrgMgmtData.bmsChrgOtptCrntReq == 8:
			ampere = 6
		if charging_status.chrgMgmtData.bmsChrgOtptCrntReq == 10:
			ampere = 8
		if charging_status.chrgMgmtData.bmsChrgOtptCrntReq == 256:
			ampere = 16
		if charging_status.chrgMgmtData.bmsChrgOtptCrntReq == 129:
			ampere = 16
		if charging_status.chrgMgmtData.bmsChrgOtptCrntReq == 511:
			ampere = 0
		print("Niveau charge : ",pourcent_charge)
		print("Cable connecté : ",prise)
		print("Vitesse de charge: ",ampere)
	else:
		print("Câble non branché")
	vehicle_status = await saic_api.get_vehicle_status(my_vin)
	volt = vehicle_status.basicVehicleStatus.batteryVoltage
	logging.info(f"Batterie de service: {volt}")
	return ampere,prise

if __name__ == "__main__":
	LENTE = 6   #ChargeCurrentLimitCode.C_6A
	NORMALE = 8 #ChargeCurrentLimitCode.C_8A
	RAPIDE =  16 #ChargeCurrentLimitCode.C_MAX
	ampere , prise = asyncio.run(mg_info_charge())
	if ampere !=0:
		vitesse = ampere
		charge = True
	else:
		charge = False
		vistesse = LENTE
	if prise:
		periode = 30 #temps en mn pour remonter  dans le passé
		intervale = 30 #temps en ss entre deux mesures
		nb_mesure = int(periode * 60 / intervale)
		poids_passe = 10
		poids_present = 0.9
		poids_future = 0.9
		a = 0
		lst = []
		while True:
			#Présent
			prod_present = mesure()
			ajouter_ligne([time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "instant",prod_present])
			lst.append(prod_present)
			n = min(nb_mesure, len(lst))					
			if len(lst)>2:
				y = lst[-n:]			
				#Passée
				prod_moyenne = np.mean(y)
				#Nuage
				variance_y = np.sum((y-np.mean(y))**2)/n
				nuage = float(np.sqrt(variance_y))
				#Future (Moindres carrés)
				x = [k for k in range(1,n+1)]
				numerateur = np.sum( (x - np.mean(x)) * (y - np.mean(y)))
				denominateur = np.sum((x - np.mean(x)**2))
				a = numerateur / denominateur
				prod_future = a + prod_present
			else:
				prod_moyenne = prod_present 
				prod_future = prod_present 
				nuage = 0

			prod_ponderee = (poids_passe * prod_moyenne + poids_present * prod_present + poids_future * prod_future)/(poids_passe + poids_present + poids_future)
			
			if prod_ponderee<800 and charge and nuage<400:		    
				charge = False
				vitesse = LENTE		
				asyncio.run(mg_stop_charge())
				print("stop charge")
				
			if prod_ponderee>1100 and not(charge):	    
				vitesse = LENTE		
				asyncio.run(mg_start_charge(vitesse))
				charge = True
				print("charge en cours à "+str(vitesse))
							
			if prod_ponderee<1400 and charge and vitesse>LENTE and nuage<500:
				vitesse = LENTE		
				asyncio.run(mg_modifie(vitesse))			
				print("charge en cours à "+str(vitesse))
					
			if prod_ponderee>1900 and charge and vitesse < NORMALE and nuage < 600:
				vitesse = NORMALE	
				asyncio.run(mg_modifie(vitesse))			
				print("charge en cours à "+str(vitesse))

			if prod_ponderee<2300 and charge and vitesse>NORMALE:
				vitesse = NORMALE
				asyncio.run(mg_modifie(vitesse))			
				print("charge en cours à "+str(vitesse))
				
			if prod_ponderee > 2600 and charge and vitesse < RAPIDE and nuage < 600:
				vitesse = RAPIDE	
				asyncio.run(mg_modifie(vitesse))			
				print("charge en cours à "+str(vitesse))

			print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "   instant =",prod_present,"    Charge =",charge, "   Vitesse",vitesse)
			time.sleep(intervale)
    
35580