plasma-IV-analysis/cods/S_Langmuir.py
2026-02-06 19:13:03 +01:00

2046 lines
70 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from pathlib import Path
import pandas as pd
import matplotlib
matplotlib.use("Agg") # backend sin GUI, seguro en threads
import matplotlib.pyplot as plt
import os
import numpy as np
import re
import shutil
from scipy.signal import savgol_filter
# Carpeta donde está el script Datos.py → MedidasLab
BASE_DIR = Path(__file__).resolve().parent
##############################################################################
########################## PROCESADO DE CURVAS #############################
##############################################################################
def procesar_archivo(filepath: Path,
out_correct_lineal: Path,
out_correct_log: Path,
out_failed_lineal: Path,
out_failed_log: Path,
forzar_correcta: bool | None = None) -> bool:
"""
Lee un archivo de datos y genera:
- una gráfica lineal I-V (corriente en mA)
- una gráfica semi-log I-V (corriente en A)
Usa `es_curva_correcta` para decidir si la curva se guarda en 'Correctos' o en 'Fallidos'.
Devuelve:
True si la curva se ha clasificado como 'Correcta'
False si se ha clasificado como 'Fallida'
"""
# Leer datos
df = pd.read_csv(filepath)
# Extraer columnas
V = df["Voltage(V)"].to_numpy() # Voltios
I_mA = df["Current(mA)"].to_numpy() # mA
I_mA = -I_mA # invertimos signo
I_A = I_mA / 1000 # pasamos a A para la logarítmica
base_name = filepath.stem # nombre sin extensión
# ---- Decidir si la curva es 'correcta' o 'fallida' (nuevo criterio) ----
if forzar_correcta is None:
es_correcta = curva_correcta(V, I_A, min_pos=5, frac_min=0.6)
else:
es_correcta = bool(forzar_correcta)
if es_correcta:
dir_lineal = out_correct_lineal
dir_log = out_correct_log
else:
dir_lineal = out_failed_lineal
dir_log = out_failed_log
# ====================== GRÁFICA LINEAL ======================
fig_lin, ax_lin = plt.subplots()
ax_lin.plot(V, I_mA, marker="o", linestyle="None", markersize=2)
ax_lin.set_xlabel(" Voltaje (V)")
ax_lin.set_ylabel("Corriente (mA)")
ax_lin.set_title(f"I-V curve (lineal) ({base_name})")
ax_lin.grid(True)
fig_lin.savefig(dir_lineal / f"{base_name}_lineal.png",
dpi=300, bbox_inches="tight")
plt.close(fig_lin)
# ====================== GRÁFICA SEMI-LOG ======================
# Zona positiva (para la log)
mask_pos = I_A > 0
V_pos = V[mask_pos]
I_pos = I_A[mask_pos]
fig_log, ax_log = plt.subplots()
if len(I_pos) > 0:
ax_log.semilogy(V_pos, I_pos, marker="o", linestyle="None", markersize=2)
else:
# Si no hay puntos positivos, aun así guardamos algo
ax_log.text(0.5, 0.5, "Sin puntos I > 0",
transform=ax_log.transAxes,
ha="center", va="center")
ax_log.set_xlabel("Voltaje $V_p$ (V)")
ax_log.set_ylabel("Corriente (A)")
ax_log.set_title(f"I-V curva semi-log ({base_name})")
ax_log.grid(True, which="both")
fig_log.savefig(dir_log / f"{base_name}_log.png",
dpi=300, bbox_inches="tight")
plt.close(fig_log)
return es_correcta
def curva_correcta(V, I_A, min_pos=5, frac_min=0.6) -> bool:
"""
Decide si una curva I-V es 'correcta' según:
1) Tiene al menos `min_pos` puntos con I_A > 0
2) En la zona positiva, la corriente crece mayoritariamente con el voltaje
(al menos `frac_min` de los pasos son crecientes).
"""
# 1) Zona con corriente positiva
mask_pos = I_A > 0
V_pos = V[mask_pos]
I_pos = I_A[mask_pos]
# Si hay pocos puntos positivos, la consideramos fallida
if len(I_pos) < min_pos:
return False
# 2) Ordenamos por voltaje
order = np.argsort(V_pos)
I_ord = I_pos[order]
if len(I_ord) < 2:
return False # por si acaso
# 3) Miramos cuánto crece la corriente entre puntos consecutivos
diffs = np.diff(I_ord) # I[k+1] - I[k]
frac_up = np.mean(diffs >= 0) # fracción de pasos crecientes
# Consideramos correcta si, al menos, frac_min de los pasos son crecientes
return frac_up >= frac_min
def procesar_archivo_depurado(filepath: Path,
out_lineal_dep: Path,
out_log_dep: Path):
"""
Versión depurada de las curvas:
- Elimina puntos muy fuera de la tendencia usando un filtro robusto
- Guarda las curvas 'limpias' en las carpetas de Depurado.
No modifica las curvas originales.
"""
df = pd.read_csv(filepath)
V = df["Voltage(V)"].to_numpy()
I_mA = df["Current(mA)"].to_numpy()
I_mA = -I_mA
I_A = I_mA / 1000.0
# Construimos un DataFrame auxiliar ordenado por V
df2 = pd.DataFrame({"V": V, "I_mA": I_mA, "I_A": I_A})
df2 = df2.sort_values("V").reset_index(drop=True)
n = len(df2)
if n >= 3:
win = 5 # puntos en la ventana
k_lin = 5.0 # umbral para escala lineal
k_log = 3.0 # umbral para escala log
"""
Si el DataFrame tiene mas de 3 puntos se hace el depurado.
Para cada punto se analizan 5 puntos, los 2 a derechas, los 2 a izquierdas, y el propio punto.
K sirve para marcar el rango de aceptación de los puntos. Se multiplica K por la desviación típica, si el punto está mas lejos que K*desviacion entonces se considera que está fuera de rango.
"""
# ---- Filtro en escala lineal (I_mA) ----
med_lin = df2["I_mA"].rolling(win, center=True, min_periods=1).median()
abs_dev_lin = (df2["I_mA"] - med_lin).abs()
mad_lin = abs_dev_lin.median()
if mad_lin == 0:
mad_lin = abs_dev_lin.mean() + 1e-12
out_lin_mask = abs_dev_lin > (k_lin * mad_lin)
"""
Se calcula la mediana por cada grupo de 5 puntos (2, 3, 5, 10, 100 --> sería 5). Se utiliza la mediana y no la media para no permitir que puntos muy fuera de rango (100 por ejemplo) arrastre mucho.
Se calcula la desviacion absoluta. Si el punto va con la tendencia su desviacion será pequeña, si está muy fuera de rango la desviacion será muy grande.
Se calcula cuanto sueles separarse los puntos (mad_lin)
Se marcan los puntos como "OUTLIERS" que se separan mucho mas de lo permitido (separación > K*desviacion)
"""
# ---- Filtro en escala log (solo I_A > 0) ----
out_log_mask = pd.Series(False, index=df2.index)
pos_idx = df2.index[df2["I_A"] > 0]
if len(pos_idx) >= 3:
logI = np.log10(df2.loc[pos_idx, "I_A"])
med_log = logI.rolling(win, center=True, min_periods=1).median()
abs_dev_log = (logI - med_log).abs()
mad_log = abs_dev_log.median()
if mad_log == 0:
mad_log = abs_dev_log.mean() + 1e-12
out_log_mask.loc[pos_idx] = abs_dev_log > (k_log * mad_log)
# Combinar: si un punto es outlier en lineal o en log, lo quitamos
outlier_mask = out_lin_mask | out_log_mask
"""
ese |, actúa como un "or". Se combinan los outlier en lineal y en logaritmico. Si un punto es outlier en un caso se vuelve outlier en el otro
"""
else:
outlier_mask = pd.Series(False, index=df2.index)
df_clean = df2.loc[~outlier_mask].reset_index(drop=True)
V_clean = df_clean["V"].to_numpy()
I_clean_mA = df_clean["I_mA"].to_numpy()
I_clean_A = df_clean["I_A"].to_numpy()
"""
Se eliminan los outlier, se vuelve a guardar los datos sin los puntos fuera de rango. A partir de esto se vuelven a hacer las gráficas depuradas
"""
base_name = filepath.stem
# ---- Guardar también los datos depurados en un TXT ----
# Columnas: V, I_mA (ya invertida), I_A
df_clean.to_csv(out_lineal_dep / f"{base_name}_depurado.txt",
index=False)
# --------- Gráfica lineal depurada ---------
fig_lin, ax_lin = plt.subplots()
ax_lin.plot(V_clean, I_clean_mA, marker="o", linestyle="None", markersize=2)
ax_lin.set_xlabel("Voltaje (V)")
ax_lin.set_ylabel("Corriente (mA)")
ax_lin.set_title(f"I-V curva depurada (lineal) ({base_name})")
ax_lin.grid(True)
fig_lin.savefig(out_lineal_dep / f"{base_name}_lineal_dep.png",
dpi=300, bbox_inches="tight")
plt.close(fig_lin)
# --------- Gráfica semi-log depurada ---------
mask_pos_clean = I_clean_A > 0
V_pos_clean = V_clean[mask_pos_clean]
I_pos_clean_A = I_clean_A[mask_pos_clean]
fig_log, ax_log = plt.subplots()
if len(I_pos_clean_A) > 0:
ax_log.semilogy(V_pos_clean, I_pos_clean_A,
marker="o", linestyle="None", markersize=2)
else:
ax_log.text(0.5, 0.5, "Sin puntos I > 0 tras depurar",
transform=ax_log.transAxes,
ha="center", va="center")
ax_log.set_xlabel("Voltaje $V_p$ (V)")
ax_log.set_ylabel("Corriente (A)")
ax_log.set_title(f"I-V curva depurada (semi-log) ({base_name})")
ax_log.grid(True, which="both")
fig_log.savefig(out_log_dep / f"{base_name}_log_dep.png",
dpi=300, bbox_inches="tight")
plt.close(fig_log)
def obtener_grupo_desde_nombre(nombre_archivo: str) -> str:
"""
A partir del nombre del archivo genera un identificador de grupo:
- Si contiene 'catodo_float' -> 'float'
- Si es del tipo 'Medidas_Polz_-10_800V_...' -> '800V_-10'
- Si es del tipo 'Medidas_Polz_-30_28-11-...' (sin voltaje) -> 'Polz_-30'
- Si no encaja con nada, devuelve 'otros'
"""
name = nombre_archivo.lower()
# Caso cátodo flotante
if "catodo_float" in name:
return "float"
# Caso Polz con voltaje de cátodo explícito: Polz_-10_800V_...
m = re.search(r"polz_(-?\d+)_([0-9]+)v", name)
if m:
pol = m.group(1) # polarización, p.ej. "-10"
catv = m.group(2) # voltaje cátodo, p.ej. "800"
return f"{catv}V_{pol}"
# Caso Polz sin voltaje explícito: Polz_-30_28-11-...
m2 = re.search(r"polz_(-?\d+)_", name)
if m2:
pol = m2.group(1)
return f"Polz_{pol}"
# Si no reconocemos el patrón, lo mandamos a 'otros'
return "otros"
group_dirs_cache = {}
def preparar_carpetas_grupo(out_correct: Path, grupo: str):
"""
Devuelve las carpetas (g_lineal, g_log, g_dep_lineal, g_dep_log)
para un grupo concreto. Si no existen, las crea una sola vez.
Si ya existen, reutiliza las rutas sin volver a crear nada.
"""
if grupo in group_dirs_cache:
return group_dirs_cache[grupo]
grupo_dir = out_correct / Path(*grupo.split("/"))
grupo_dir.mkdir(parents=True, exist_ok=True)
g_lineal = grupo_dir / "Lineal"
g_log = grupo_dir / "Logaritmica"
g_dep = grupo_dir / "Depurado"
g_dep_lineal = g_dep / "Lineal"
g_dep_log = g_dep / "Logaritmica"
for d in [g_lineal, g_log, g_dep, g_dep_lineal, g_dep_log]:
d.mkdir(exist_ok=True)
group_dirs_cache[grupo] = (g_lineal, g_log, g_dep_lineal, g_dep_log)
return g_lineal, g_log, g_dep_lineal, g_dep_log
def calcular_curva_media_grupo(filepaths, grupo_dir: Path):
"""
A partir de una lista de archivos *depurados* de un grupo
(TXT con columnas V, I_mA, I_A),
calcula una curva media I-V y guarda:
- curva_media_lineal.png (I en mA)
- curva_media_log.png (I en A, semilog)
- curva_media_lineal.txt (V_grid, MeanCurrent(mA))
"""
if not filepaths:
return # nada que hacer
curvas_V = []
curvas_I_mA = []
# 1) Leer todas las curvas depuradas de este grupo
for fp in filepaths:
df = pd.read_csv(fp)
V = df["V"].to_numpy()
I_mA = df["I_mA"].to_numpy() # ya invertida y depurada
# Por seguridad, ordenamos por voltaje
order = np.argsort(V)
V = V[order]
I_mA = I_mA[order]
curvas_V.append(V)
curvas_I_mA.append(I_mA)
# 2) Construir malla común SOLO en el solape de todas las curvas
Vmin_common = max(V.min() for V in curvas_V)
Vmax_common = min(V.max() for V in curvas_V)
V_grid = np.unique(np.concatenate(curvas_V))
V_grid = V_grid[(V_grid >= Vmin_common) & (V_grid <= Vmax_common)]
# 3) Interpolar cada curva a la malla común
I_grid_mA = []
for V, I_mA in zip(curvas_V, curvas_I_mA):
I_interp = np.interp(V_grid, V, I_mA)
I_grid_mA.append(I_interp)
I_grid_mA = np.vstack(I_grid_mA) # shape: (n_curvas, n_puntos)
# 4) Media punto a punto
I_mean_mA = I_grid_mA.mean(axis=0)
I_mean_A = I_mean_mA / 1000.0
# ================== GRÁFICA MEDIA LINEAL ==================
fig_lin, ax_lin = plt.subplots()
ax_lin.plot(V_grid, I_mean_mA, marker="o", linestyle="-", markersize=2)
ax_lin.set_xlabel("Voltaje (V)")
ax_lin.set_ylabel("Corriente (mA)")
ax_lin.set_title(f"Curva media I-V (lineal) {grupo_dir.name}")
ax_lin.grid(True)
fig_lin.savefig(grupo_dir / "curva_media_lineal.png",
dpi=300, bbox_inches="tight")
plt.close(fig_lin)
# ---- Guardar también los datos de la curva media lineal ----
df_mean = pd.DataFrame({
"Voltage(V)": V_grid,
"MeanCurrent(mA)": I_mean_mA
})
df_mean.to_csv(grupo_dir / "curva_media_lineal.txt",
index=False)
# ================== GRÁFICA MEDIA SEMI-LOG ==================
mask_pos = I_mean_A > 0
V_pos = V_grid[mask_pos]
I_pos_A = I_mean_A[mask_pos]
fig_log, ax_log = plt.subplots()
if len(I_pos_A) > 0:
ax_log.semilogy(V_pos, I_pos_A, marker="o", linestyle="-", markersize=2)
else:
ax_log.text(0.5, 0.5, "Sin puntos I > 0 en la curva media",
transform=ax_log.transAxes,
ha="center", va="center")
ax_log.set_xlabel("Voltaje $V_p$ (V)")
ax_log.set_ylabel("Corriente (A)")
ax_log.set_title(f"Curva media I-V (semi-log) {grupo_dir.name}")
ax_log.grid(True, which="both")
fig_log.savefig(grupo_dir / "curva_media_log.png",
dpi=300, bbox_inches="tight")
plt.close(fig_log)
##############################################################################
#################### POSTPROCESADO DE CURVAS CATODOS #######################
##############################################################################
def sugerir_ventanas_exponencial_y_saturacion(
V,
I_sg,
Id_sg,
Idd_sg,
V_f,
min_puntos_region=8,
):
"""
Usa la señal suavizada y sus derivadas para proponer:
- una ventana [idx_exp1, idx_exp2] en la región exponencial (electrones repelidos)
- una ventana [idx_sat1, idx_sat2] en la región de saturación electrónica
NO calcula V_sp. Solo devuelve índices para que luego puedas:
- ajustar ln(I) vs V en la zona exponencial
- ajustar ln(I) vs V en la zona de saturación
- obtener V_sp como intersección de las dos rectas (en otra función).
Parámetros
----------
V : np.ndarray
Voltajes (ya recortados en la zona útil).
I_sg : np.ndarray
Corriente corregida y suavizada (Savgol).
Id_sg : np.ndarray
Primera derivada dI/dV.
Idd_sg : np.ndarray
Segunda derivada d²I/dV².
V_f : float
Potencial flotante estimado.
min_puntos_region : int
Número mínimo de puntos en cada región para poder ajustar una recta.
Devuelve
--------
idx_exp1, idx_exp2, idx_sat1, idx_sat2 : int
Índices en el array V que delimitan las dos ventanas.
"""
V = np.asarray(V)
I_sg = np.asarray(I_sg)
Id_sg = np.asarray(Id_sg)
Idd_sg = np.asarray(Idd_sg)
n = len(V)
if n < 2 * min_puntos_region:
# Curva demasiado corta: devolvemos algo simple
idx_exp1 = 0
idx_exp2 = max(0, min_puntos_region - 1)
idx_sat1 = max(idx_exp2 + 1, n // 2)
idx_sat2 = n - 1
return idx_exp1, idx_exp2, idx_sat1, idx_sat2
# ------------------------------------------------------
# 1) Zona electrónica: puntos con I_sg > 0 y V > V_f
# ------------------------------------------------------
idx_f = int(np.argmin(np.abs(V - V_f)))
idx_f = min(idx_f, n - 2)
mask_elec = (I_sg > 0) & (np.arange(n) > idx_f)
idx_elec = np.where(mask_elec)[0]
if len(idx_elec) < min_puntos_region:
# Si casi no hay puntos positivos, trabajamos con todo lo que haya a la derecha de V_f
idx_elec = np.arange(idx_f + 1, n)
if len(idx_elec) < min_puntos_region:
# Sigue siendo muy poco: devolvemos ventanas muy básicas
idx_exp1 = idx_f + 1
idx_exp2 = min(n - 1, idx_exp1 + min_puntos_region - 1)
idx_sat1 = max(idx_exp2 + 1, (idx_exp2 + n) // 2)
idx_sat2 = n - 1
return idx_exp1, idx_exp2, idx_sat1, idx_sat2
V_elec = V[idx_elec]
Id_elec = Id_sg[idx_elec]
# ------------------------------------------------------
# 2) Localizar la "rodilla" con la 1ª derivada (solo como guía)
# ------------------------------------------------------
# No es V_sp, solo zona del codo
idx_knee_local = int(np.argmax(Id_elec))
idx_knee = int(idx_elec[idx_knee_local])
# ------------------------------------------------------
# 3) Ventana exponencial [idx_exp1, idx_exp2]
# → por debajo de la rodilla, encima de V_f
# ------------------------------------------------------
# Región candidata: desde justo después de V_f hasta antes de la rodilla
idx_min_exp = idx_f + 1
idx_max_exp = max(idx_min_exp + min_puntos_region - 1, idx_knee - 1)
idx_max_exp = min(idx_max_exp, idx_knee - 1)
if idx_max_exp <= idx_min_exp:
# fallback: comprimimos la ventana entera antes de la rodilla
idx_min_exp = max(idx_knee - min_puntos_region, 0)
idx_max_exp = max(idx_min_exp + min_puntos_region - 1, idx_min_exp)
# aseguramos que está dentro de [0, n-1]
idx_min_exp = max(0, min(idx_min_exp, n - 1))
idx_max_exp = max(0, min(idx_max_exp, n - 1))
# Si hay suficientes puntos, podemos intentar mover el extremo superior de la exponencial
# hacia el último cambio de signo de la segunda derivada antes de la rodilla.
idx_candidatos_exp = np.arange(idx_min_exp, idx_knee + 1)
Idd_cand = Idd_sg[idx_candidatos_exp]
cambios = []
for k in range(len(idx_candidatos_exp) - 1):
s1 = Idd_cand[k]
s2 = Idd_cand[k + 1]
if s1 == 0 or s1 * s2 < 0:
cambios.append(k)
if cambios:
# Tomamos el último cambio de signo como límite superior refinado
k2 = max(cambios)
idx_max_exp = int(idx_candidatos_exp[k2])
# Aseguramos longitud mínima de la ventana exponencial
if idx_max_exp - idx_min_exp + 1 < min_puntos_region:
idx_min_exp = max(0, idx_max_exp - (min_puntos_region - 1))
idx_exp1 = int(idx_min_exp)
idx_exp2 = int(idx_max_exp)
# ------------------------------------------------------
# 4) Ventana de saturación electrónica [idx_sat1, idx_sat2]
# → por encima de la rodilla, donde la derivada es pequeña
# ------------------------------------------------------
idx_min_sat = idx_knee + 1
if idx_min_sat >= n:
idx_min_sat = max(n - min_puntos_region, 0)
# Umbral para considerar "casi saturación": derivada pequeña
# Usamos un porcentaje de la derivada máxima en la zona electrónica
deriv_max = np.max(np.abs(Id_elec)) if len(Id_elec) > 0 else np.max(np.abs(Id_sg))
umbral = 0.2 * deriv_max # 20% de la derivada máxima
idx_candidatos_sat = np.arange(idx_min_sat, n)
Id_cand_sat = Id_sg[idx_candidatos_sat]
mask_sat_plana = np.abs(Id_cand_sat) < umbral
idx_sat_planos = idx_candidatos_sat[mask_sat_plana]
if len(idx_sat_planos) >= min_puntos_region:
# Cogemos una ventana contigua al principio de la zona plana
idx_sat1 = int(idx_sat_planos[0])
idx_sat2 = int(min(n - 1, idx_sat1 + min_puntos_region - 1))
else:
# Fallback: usamos simplemente una ventana fija al final
idx_sat2 = n - 1
idx_sat1 = max(idx_min_sat, idx_sat2 - (min_puntos_region - 1))
return idx_exp1, idx_exp2, idx_sat1, idx_sat2
# ================== CONSTANTES FÍSICAS (SI) ==================
E_CHARGE = 1.602176634e-19 # C
K_BOLTZ = 1.380649e-23 # J/K
M_ELECTRON = 9.10938356e-31 # kg
# ================== CONFIG SONDA (se setea desde GUI) ==================
# La GUI DEBE llamar a set_sonda(...) antes de cualquier postprocesado que calcule n_e.
_PROBE_CFG = None # dict con: {"geom": "cilindrica"/"esferica", "rp_m": float, "L_m": float|None}
def set_sonda(geom: str, rp_mm: float, L_mm: float | None = None):
"""
Configura la sonda desde la GUI.
- geom: "cilindrica" o "esferica"
- rp_mm: radio en mm (obligatorio)
- L_mm: longitud en mm (obligatorio solo si cilindrica)
"""
global _PROBE_CFG
g = (geom or "").strip().lower()
if g not in ("cilindrica", "esferica"):
raise ValueError("geom debe ser 'cilindrica' o 'esferica'")
rp_m = float(rp_mm) * 1e-3
if rp_m <= 0:
raise ValueError("rp_mm debe ser > 0")
cfg = {"geom": g, "rp_m": rp_m, "L_m": None}
if g == "cilindrica":
if L_mm is None:
raise ValueError("L_mm es obligatorio para sonda cilindrica")
L_m = float(L_mm) * 1e-3
if L_m <= 0:
raise ValueError("L_mm debe ser > 0")
cfg["L_m"] = L_m
_PROBE_CFG = cfg
def get_probe_area_m2() -> float:
"""
Devuelve el área efectiva (m²) usando la config seteada desde GUI.
Lanza error si no se ha configurado la sonda.
"""
if _PROBE_CFG is None:
raise RuntimeError("Sonda no configurada. Llama a set_sonda(...) desde la GUI antes de postprocesar.")
g = _PROBE_CFG["geom"]
rp = _PROBE_CFG["rp_m"]
if g == "cilindrica":
L = _PROBE_CFG["L_m"]
if L is None:
raise RuntimeError("Config inválida: falta L_m para sonda cilindrica.")
return 2.0 * np.pi * rp * L # área lateral 2πrL
else: # esferica
return 4.0 * np.pi * rp**2 # 4πr²
def ajustar_rectas_y_Vsp(V, I_sg, idx_exp1, idx_exp2, idx_sat1, idx_sat2):
"""
Ajusta dos rectas en la curva IV (en semilog):
- ln(I) vs V en la región exponencial [idx_exp1, idx_exp2]
- ln(I) vs V en la región de saturación electrónica [idx_sat1, idx_sat2]
A partir de esas rectas calcula:
- V_sp: potencial de plasma (intersección de ambas)
- I_sp: corriente en V_sp
- Te_K: temperatura electrónica en Kelvin
- Te_eV: temperatura electrónica en eV
Devuelve también los parámetros de las rectas para poder plotearlas.
"""
V = np.asarray(V)
I_sg = np.asarray(I_sg)
# --- Región exponencial ---
V_exp = V[idx_exp1:idx_exp2 + 1]
I_exp = I_sg[idx_exp1:idx_exp2 + 1]
# Nos quedamos solo con puntos con corriente > 0
mask_exp_pos = I_exp > 0
V_exp = V_exp[mask_exp_pos]
I_exp = I_exp[mask_exp_pos]
if len(V_exp) < 2:
raise RuntimeError("Muy pocos puntos en la región exponencial para hacer el ajuste.")
y_exp = np.log(I_exp) # ln(I)
# y = a_exp + m_exp * V
m_exp, a_exp = np.polyfit(V_exp, y_exp, 1)
# --- Región de saturación ---
V_sat = V[idx_sat1:idx_sat2 + 1]
I_sat = I_sg[idx_sat1:idx_sat2 + 1]
mask_sat_pos = I_sat > 0
V_sat = V_sat[mask_sat_pos]
I_sat = I_sat[mask_sat_pos]
if len(V_sat) < 2:
raise RuntimeError("Muy pocos puntos en la región de saturación para hacer el ajuste.")
y_sat = np.log(I_sat)
m_sat, a_sat = np.polyfit(V_sat, y_sat, 1)
# --- Intersección de las dos rectas: V_sp ---
if np.isclose(m_exp, m_sat):
raise RuntimeError("Las pendientes de las dos rectas son casi iguales; no se puede encontrar V_sp de forma fiable.")
V_sp = (a_sat - a_exp) / (m_exp - m_sat)
I_sp = np.exp(a_exp + m_exp * V_sp) # mismo valor en las dos rectas
# --- Temperatura electrónica ---
# De la teoría: d(ln I)/dV = m_exp = e / (k_B * T_e)
Te_K = E_CHARGE / (K_BOLTZ * m_exp)
# En eV: (k_B T_e / e) = 1 / m_exp
Te_eV = 1.0 / m_exp
params_exp = (m_exp, a_exp)
params_sat = (m_sat, a_sat)
return V_sp, I_sp, Te_K, Te_eV, params_exp, params_sat
def postprocesar_curva_media_catodo(media_file: Path):
"""
Lee curva_media_lineal.txt y calcula:
- I corregida (restando I_i_sat)
- derivada 1
- derivada 2
Genera figuras y las guarda en <grupo_dir>/Postprocesado/
"""
df = pd.read_csv(media_file)
V_raw = df["Voltage(V)"].to_numpy()
I_raw = df["MeanCurrent(mA)"].to_numpy()
# Crear carpeta Postprocesado
post_dir = media_file.parent / "Postprocesado"
post_dir.mkdir(exist_ok=True)
V_min = V_raw[0]
V_min_index = np.argmin(abs(V_raw - V_min))
V_max = V_raw[-1]
V_max_index = np.argmin(abs(V_raw - V_max))
V = V_raw[V_min_index:V_max_index + 1]
I = I_raw[V_min_index:V_max_index + 1]
# Punto donde I cruza cero
V_f_index = np.argmin(abs(I))
V_f = V[V_f_index]
# Ion saturation current
I_i_sat = np.average(I[:V_f_index]) * 1.2
# ---- Señal corregida ----
I_corr = I - I_i_sat
# ==== Filtros SavitzkyGolay ====
wl = 40 # window length
if wl >= len(I_corr): # evitar errores
wl = len(I_corr) - 1
if wl % 2 == 0:
wl -= 1
I_sg = savgol_filter(I_corr, wl, polyorder=2, deriv=0)
Id_sg = savgol_filter(I_corr, wl, polyorder=2, deriv=1)
Idd_sg = savgol_filter(I_corr, wl, polyorder=2, deriv=2)
# ==== Sugerir ventanas exponencial y de saturación (NO calcula V_sp todavía) ====
idx_exp1, idx_exp2, idx_sat1, idx_sat2 = sugerir_ventanas_exponencial_y_saturacion(
V, I_sg, Id_sg, Idd_sg, V_f, min_puntos_region=8
)
# ==== Ajustar rectas en semilog y obtener V_sp y T_e ====
try:
V_sp, I_sp, Te_K, Te_eV, params_exp, params_sat = ajustar_rectas_y_Vsp(
V, I_sg, idx_exp1, idx_exp2, idx_sat1, idx_sat2
)
except RuntimeError as e:
print(f"[AVISO] Problema en el ajuste para {media_file.name}: {e}")
V_sp = None
I_sp = None
Te_K = None
Te_eV = None
params_exp = None
params_sat = None
if Te_eV is not None and V_sp is not None and I_sp is not None:
I_sat_e_A = I_sp # A
I_sat_e_mA = I_sat_e_A * 1e3 # mA
# I_se = (1/4) e n_e A sqrt( 2 k_B T_e / (pi m_e) )
# => n_e = 4 I_se / (e A) * sqrt( pi m_e / (2 k_B T_e) )
A_probe = get_probe_area_m2()
n_e = (4.0 * I_sat_e_A / (E_CHARGE * A_probe)) * \
np.sqrt(np.pi * M_ELECTRON / (2.0 * K_BOLTZ * Te_K))
else:
I_sat_e_A = np.nan
n_e = np.nan
# ====================== FIGURAS ======================
# Curva original + I_i_sat
fig, ax = plt.subplots()
ax.plot(V_raw, I_raw)
ax.axhline(I_i_sat, color='r', linestyle='--', label="Corriente de saturación iónica $I_{i,\\mathrm{sat}}$")
ax.plot(V_f, 0.0, 'bo', label="Potencial flotante $V_f$")
ax.legend()
fig.savefig(post_dir / "I_raw_with_Iisat.png", dpi=300)
plt.close(fig)
# I corregida
fig, ax = plt.subplots()
ax.plot(V, I_sg)
fig.savefig(post_dir / "I_corrected.png", dpi=300)
plt.close(fig)
# Derivada 1
fig, ax = plt.subplots()
ax.plot(V, Id_sg)
fig.savefig(post_dir / "Id.png", dpi=300)
plt.close(fig)
# Derivada 2
fig, ax = plt.subplots()
ax.plot(V, Idd_sg)
fig.savefig(post_dir / "Idd.png", dpi=300)
plt.close(fig)
# ================= FIGURA SEMILOG RAW + I CORREGIDA =================
mask_raw_pos = I_raw > 0
mask_corr_pos = I_sg > 0
fig, ax = plt.subplots()
ax.set_yscale("log")
# Raw (curva experimental)
ax.plot(V_raw[mask_raw_pos], I_raw[mask_raw_pos], "-", label="Datos experimental")
# Corriente corregida y suavizada
ax.plot(V[mask_corr_pos], I_sg[mask_corr_pos], "-",
label="Curva suavizada")
nV = len(V)
ok_idx = (0 <= idx_exp1 < nV and 0 <= idx_exp2 < nV and
0 <= idx_sat1 < nV and 0 <= idx_sat2 < nV)
if ok_idx:
V_exp1 = V[idx_exp1]; V_exp2 = V[idx_exp2]
V_sat1 = V[idx_sat1]; V_sat2 = V[idx_sat2]
ax.axvspan(V_exp1, V_exp2, color="orange", alpha=0.15, label="Región exponencial")
ax.axvspan(V_sat1, V_sat2, color="green", alpha=0.15, label="Región saturación e⁻")
else:
print(f"[AVISO] Ventanas fuera de rango en {media_file.name}; se omite el sombreado.")
# ==== Dibujar las rectas ajustadas y el punto V_sp ====
if params_exp is not None and params_sat is not None and V_sp is not None:
m_exp, a_exp = params_exp
m_sat, a_sat = params_sat
# Rango de voltaje para dibujar las rectas (desde zona exp a saturación)
V_fit = np.linspace(V_exp1, V_sat2, 200)
# Ajustes en ln(I)
lnI_exp_fit = a_exp + m_exp * V_fit
lnI_sat_fit = a_sat + m_sat * V_fit
# Pasamos de ln(I) a I
I_exp_fit = np.exp(lnI_exp_fit)
I_sat_fit = np.exp(lnI_sat_fit)
ax.plot(V_fit, I_exp_fit, "--", color="black",
label="Ajuste región exponencial")
ax.plot(V_fit, I_sat_fit, "--", color="gray",
label="Ajuste región saturación e$^-$")
# Punto de intersección (V_sp, I_sp)
ax.plot(V_sp, I_sp, "ko", label=r"$V_{sp}$")
ax.set_xlabel("Voltaje (V)")
ax.set_ylabel("Corriente (A)")
ax.grid(True, which="both")
ax.legend()
fig.savefig(post_dir / "I_semilog.png", dpi=300, bbox_inches="tight")
plt.close(fig)
# Guardar también los datos procesados (curva corregida)
out_df = pd.DataFrame({
"V": V,
"I_corr(mA)": I_sg,
"Id(mA/V)": Id_sg,
"Idd(mA/V^2)": Idd_sg
})
out_df.to_csv(post_dir / "curva_postprocesada.txt", index=False)
# ====================== CSV CON PARÁMETROS DEL PLASMA ======================
# Un CSV por grupo, en su carpeta Postprocesado
params_df = pd.DataFrame([{
"grupo": media_file.parent.name,
"V_f(V)": V_f,
"V_sp(V)": V_sp,
"I_sat_e(A)": I_sat_e_A,
"I_i_sat(mA)": I_i_sat,
"T_e(eV)": Te_eV,
"T_e(K)": Te_K,
"n_e(m^-3)": n_e
}])
params_df["n_e_sci(m^-3)"] = params_df["n_e(m^-3)"].map(lambda x: f"{x:.6e}" if np.isfinite(x) else "")
params_df.to_csv(post_dir / "parametros_plasma.csv", index=False)
# Guardar también los datos procesados
out_df = pd.DataFrame({
"V": V,
"I_corr(mA)": I_sg,
"Id(mA/V)": Id_sg,
"Idd(mA/V^2)": Idd_sg
})
out_df.to_csv(post_dir / "curva_postprocesada.txt", index=False)
##############################################################################
##################### POSTPROCESADO DE CURVAS HILOS ########################
##############################################################################
def postprocesar_curva_media_hilo(media_file: Path):
"""
Lee curva_media_lineal.txt y:
- estima V_f
- estima I_i_sat
- calcula I_corr = I - I_i_sat
- suaviza + derivadas (Savgol)
- sugiere ventanas exp/sat
- ajusta rectas en semilog (si se puede) -> V_sp, T_e
- calcula n_e (si se puede)
- genera figuras + CSVs en <grupo_dir>/Postprocesado/
"""
# ===================== 0) LEER =====================
df = pd.read_csv(media_file)
# Si por algún motivo vienen como strings, forzamos float aquí
V_raw = pd.to_numeric(df["Voltage(V)"], errors="coerce").to_numpy(dtype=float)
I_raw = pd.to_numeric(df["MeanCurrent(mA)"], errors="coerce").to_numpy(dtype=float)
post_dir = media_file.parent / "Postprocesado"
post_dir.mkdir(exist_ok=True)
# ===================== 1) DEPURAR / ORDENAR =====================
finite = np.isfinite(V_raw) & np.isfinite(I_raw)
V = V_raw[finite]
I = I_raw[finite]
if len(V) < 10:
raise RuntimeError("Curva media demasiado corta tras quitar NaN/inf.")
order = np.argsort(V)
V = V[order]
I = I[order]
# Si hay voltajes repetidos, promediamos su corriente (evita problemas de interp/derivadas)
Vu, inv = np.unique(V, return_inverse=True)
if len(Vu) != len(V):
I_acc = np.zeros_like(Vu, dtype=float)
n_acc = np.zeros_like(Vu, dtype=float)
np.add.at(I_acc, inv, I)
np.add.at(n_acc, inv, 1.0)
V = Vu
I = I_acc / np.maximum(n_acc, 1.0)
# ===================== 2) V_f =====================
# aproximación: punto donde |I| es mínimo
V_f_index = int(np.argmin(np.abs(I)))
V_f = float(V[V_f_index])
# ===================== 3) I_i_sat =====================
# Para estimar I_i_sat necesitamos "zona iónica": puntos a la izquierda de V_f.
# Si V_f cae demasiado al inicio, hacemos un fallback robusto.
if V_f_index >= 3:
I_i_sat = float(np.mean(I[:V_f_index]) * 1.2)
else:
# Fallback: usar el 10% más negativo en voltaje como “zona iónica”
n = len(V)
k = max(3, int(0.10 * n))
I_i_sat = float(np.mean(I[:k]) * 1.2)
# ===================== 4) CORRECCIÓN =====================
I_corr = I - I_i_sat
# ===================== 5) SAVITZKYGOLAY =====================
wl = 41 # impar
if wl >= len(I_corr):
wl = len(I_corr) - 1
if wl % 2 == 0:
wl -= 1
if wl < 7:
raise RuntimeError("Muy pocos puntos para SavitzkyGolay (wl demasiado pequeño).")
I_sg = savgol_filter(I_corr, wl, polyorder=2, deriv=0)
Id_sg = savgol_filter(I_corr, wl, polyorder=2, deriv=1)
Idd_sg = savgol_filter(I_corr, wl, polyorder=2, deriv=2)
# ===================== 6) VENTANAS AUTOMÁTICAS =====================
idx_exp1, idx_exp2, idx_sat1, idx_sat2 = sugerir_ventanas_exponencial_y_saturacion(
V, I_sg, Id_sg, Idd_sg, V_f, min_puntos_region=8
)
# Blindaje de índices
n = len(V)
idx_exp1 = int(np.clip(idx_exp1, 0, n - 1))
idx_exp2 = int(np.clip(idx_exp2, 0, n - 1))
idx_sat1 = int(np.clip(idx_sat1, 0, n - 1))
idx_sat2 = int(np.clip(idx_sat2, 0, n - 1))
if idx_exp2 < idx_exp1:
idx_exp1, idx_exp2 = idx_exp2, idx_exp1
if idx_sat2 < idx_sat1:
idx_sat1, idx_sat2 = idx_sat2, idx_sat1
# Ventanas mínimas + corriente positiva dentro (necesario para ln)
ventana_exp_ok = (idx_exp2 - idx_exp1 + 1) >= 2 and np.any(I_sg[idx_exp1:idx_exp2+1] > 0)
ventana_sat_ok = (idx_sat2 - idx_sat1 + 1) >= 2 and np.any(I_sg[idx_sat1:idx_sat2+1] > 0)
# ===================== 7) AJUSTE (V_sp, T_e) =====================
V_sp = I_sp = Te_K = Te_eV = None
params_exp = params_sat = None
fit_ok = False
fit_error = ""
if ventana_exp_ok and ventana_sat_ok:
try:
V_sp, I_sp, Te_K, Te_eV, params_exp, params_sat = ajustar_rectas_y_Vsp(
V, I_sg, idx_exp1, idx_exp2, idx_sat1, idx_sat2
)
fit_ok = True
except RuntimeError as e:
fit_error = str(e)
print(f"[AVISO] Problema en el ajuste para {media_file.name}: {fit_error}")
else:
fit_error = f"ventana_exp_ok={ventana_exp_ok}, ventana_sat_ok={ventana_sat_ok}"
# ===================== 8) n_e (si hay ajuste) =====================
if (Te_K is not None) and (I_sp is not None) and np.isfinite(Te_K) and np.isfinite(I_sp):
I_sat_e_A = float(I_sp)
A_probe = get_probe_area_m2()
n_e = (4.0 * I_sat_e_A / (E_CHARGE * A_probe)) * \
np.sqrt(np.pi * M_ELECTRON / (2.0 * K_BOLTZ * Te_K))
else:
I_sat_e_A = np.nan
n_e = np.nan
# ===================== 9) FIGURAS =====================
# 9.1 Curva original + I_i_sat + V_f
fig, ax = plt.subplots()
ax.plot(V, I, label="Curva media (mA)")
ax.axhline(I_i_sat, linestyle="--", label=r"$I_{i,\mathrm{sat}}$")
ax.plot(V_f, I[V_f_index], "o", label=r"$V_f$")
ax.set_xlabel("Voltage (V)")
ax.set_ylabel("Current (mA)")
ax.grid(True)
ax.legend()
ax.set_title("Curva media + estimación $I_{i,\\mathrm{sat}}$ y $V_f$")
fig.savefig(post_dir / "I_raw_with_Iisat.png", dpi=300, bbox_inches="tight")
plt.close(fig)
# 9.2 I corregida (Savgol)
fig, ax = plt.subplots()
ax.plot(V, I_sg, label="I_corr suavizada")
ax.set_xlabel("Voltage (V)")
ax.set_ylabel("Current (mA)")
ax.grid(True)
ax.legend()
ax.set_title("I corregida (Savgol)")
fig.savefig(post_dir / "I_corrected.png", dpi=300, bbox_inches="tight")
plt.close(fig)
# 9.3 Derivada 1
fig, ax = plt.subplots()
ax.plot(V, Id_sg, label="dI/dV")
ax.set_xlabel("Voltage (V)")
ax.set_ylabel("dI/dV (mA/V)")
ax.grid(True)
ax.legend()
ax.set_title("Derivada 1")
fig.savefig(post_dir / "Id.png", dpi=300, bbox_inches="tight")
plt.close(fig)
# 9.4 Derivada 2
fig, ax = plt.subplots()
ax.plot(V, Idd_sg, label="d²I/dV²")
ax.set_xlabel("Voltage (V)")
ax.set_ylabel("d²I/dV² (mA/V²)")
ax.grid(True)
ax.legend()
ax.set_title("Derivada 2")
fig.savefig(post_dir / "Idd.png", dpi=300, bbox_inches="tight")
plt.close(fig)
# 9.5 Semilog (raw vs corregida) + ventanas + fits + V_sp (µA + eje Y acotado)
k_uA = 1e3 # mA -> µA
mask_raw_pos = I > 0
mask_corr_pos = I_sg > 0
fig, ax = plt.subplots()
ax.set_yscale("log")
if np.any(mask_raw_pos):
ax.plot(V[mask_raw_pos], (I[mask_raw_pos] * k_uA), "-", label="Datos experimentales")
if np.any(mask_corr_pos):
ax.plot(V[mask_corr_pos], (I_sg[mask_corr_pos] * k_uA), "-", label="Curva suavizada")
if ventana_exp_ok:
ax.axvspan(V[idx_exp1], V[idx_exp2], color='orange', alpha=0.15, label="Región exponencial")
if ventana_sat_ok:
ax.axvspan(V[idx_sat1], V[idx_sat2], color='green', alpha=0.15, label="Región saturación e⁻")
# Rectas ajustadas y V_sp
if (params_exp is not None) and (params_sat is not None) and (V_sp is not None) and (I_sp is not None):
m_exp, a_exp = params_exp
m_sat, a_sat = params_sat
V_fit_min = V[idx_exp1] if ventana_exp_ok else V[0]
V_fit_max = V[idx_sat2] if ventana_sat_ok else V[-1]
if V_fit_max > V_fit_min:
V_fit = np.linspace(V_fit_min, V_fit_max, 200)
I_exp_fit = np.exp(a_exp + m_exp * V_fit) * k_uA
I_sat_fit = np.exp(a_sat + m_sat * V_fit) * k_uA
ax.plot(V_fit, I_exp_fit, "--", label="Fit exponencial")
ax.plot(V_fit, I_sat_fit, "--", label="Fit saturación e⁻")
if I_sp > 0:
ax.plot(V_sp, I_sp * k_uA, "o", label=r"$V_{sp}$")
# Acotar eje Y (en µA) usando percentiles para no recortar outliers
y_pool = []
if np.any(mask_raw_pos):
y_pool.append(I[mask_raw_pos] * k_uA)
if np.any(mask_corr_pos):
y_pool.append(I_sg[mask_corr_pos] * k_uA)
if len(y_pool) > 0:
y_all = np.concatenate(y_pool)
y_all = y_all[np.isfinite(y_all) & (y_all > 0)]
if y_all.size > 0:
lo, hi = np.percentile(y_all, [1, 99]) # cambia a [5,95] si quieres más recorte
lo = 10**np.floor(np.log10(lo))
hi = 10**np.ceil(np.log10(hi))
ax.set_ylim(lo, hi)
ax.set_xlabel("Voltage (V)")
ax.set_ylabel("Current (µA)")
ax.grid(True, which="both")
ax.legend()
ax.set_title("Curva IV (semilog) + ventanas + ajustes")
fig.savefig(post_dir / "I_semilog.png", dpi=300, bbox_inches="tight")
plt.close(fig)
# ===================== 10) GUARDAR DATOS =====================
# 10.1 datos postprocesados
out_df = pd.DataFrame({
"V(V)": V,
"I_raw(mA)": I,
"I_i_sat(mA)": np.full_like(V, I_i_sat, dtype=float),
"I_corr_sg(mA)": I_sg,
"dI_dV(mA/V)": Id_sg,
"d2I_dV2(mA/V^2)": Idd_sg,
})
out_df.to_csv(post_dir / "curva_postprocesada.txt", index=False)
# 10.2 parámetros del plasma
params_df = pd.DataFrame([{
"grupo": media_file.parent.name,
"V_f(V)": V_f,
"I_i_sat(mA)": I_i_sat,
"V_sp(V)": V_sp,
"I_sat_e(A)": I_sat_e_A,
"T_e(eV)": Te_eV,
"T_e(K)": Te_K,
"n_e(m^-3)": n_e
}])
# FORZAMOS el formato científico con 4 decimales para que no se pierda precisión
if np.isfinite(n_e):
params_df["n_e_sci(m^-3)"] = "{:.4e}".format(n_e)
else:
params_df["n_e_sci(m^-3)"] = ""
params_df.to_csv(post_dir / "parametros_plasma.csv", index=False)
def leer_IV_numeric(filepath: Path):
df = pd.read_csv(filepath, sep=None, engine="python") # autodetect separador
# Forzar numérico con tolerancia a comas decimales y basura
for col in ["Voltage(V)", "Current(mA)"]:
if col not in df.columns:
raise KeyError(f"Falta columna '{col}' en {filepath.name}. Columnas: {list(df.columns)}")
s = df[col].astype(str).str.strip()
s = s.str.replace(",", ".", regex=False) # coma decimal -> punto
s = s.str.replace(r"[^0-9eE\+\-\.]", "", regex=True) # quita unidades/basura
df[col] = pd.to_numeric(s, errors="coerce")
V = df["Voltage(V)"].to_numpy(dtype=float)
I_mA = (-df["Current(mA)"]).to_numpy(dtype=float) # inviertes signo aquí
I_A = I_mA / 1000.0
# Quitar NaN/inf
finite = np.isfinite(V) & np.isfinite(I_mA) & np.isfinite(I_A)
V, I_mA, I_A = V[finite], I_mA[finite], I_A[finite]
return V, I_mA, I_A
def curva_correcta_hilo(V, I_A,
min_puntos=20,
frac_finitos=0.98,
std_min=1e-11) -> bool:
V = np.asarray(V)
I_A = np.asarray(I_A)
# Si vienen como strings/object, intenta convertir
if V.dtype.kind not in "fc":
V = pd.to_numeric(pd.Series(V).astype(str).str.replace(",", ".", regex=False),
errors="coerce").to_numpy(dtype=float)
if I_A.dtype.kind not in "fc":
I_A = pd.to_numeric(pd.Series(I_A).astype(str).str.replace(",", ".", regex=False),
errors="coerce").to_numpy(dtype=float)
# Si tras convertir no hay datos, es fallida
if V.size == 0 or I_A.size == 0:
return False
finite = np.isfinite(V) & np.isfinite(I_A)
# Si no hay ni un punto finito, fallida
if finite.size == 0 or not np.any(finite):
return False
if finite.mean() < frac_finitos:
return False
V = V[finite]
I_A = I_A[finite]
if len(V) < min_puntos:
return False
if np.std(I_A) < std_min:
return False
return True
def depurar_y_guardar_txt(filepath: Path, out_dep_lineal: Path) -> Path:
V, I_mA, I_A = leer_IV_numeric(filepath)
order = np.argsort(V)
V, I_mA, I_A = V[order], I_mA[order], I_A[order]
base = filepath.stem
out_txt = out_dep_lineal / f"{base}_depurado.txt"
pd.DataFrame({"V": V, "I_mA": I_mA, "I_A": I_A}).to_csv(out_txt, index=False)
return out_txt
def es_archivo_hilo(nombre: str) -> bool:
n = nombre.lower()
return n.startswith("hilo_") or n.startswith("hiloyresis_") or n.startswith("hilo_yresis_")
def es_archivo_catodo(nombre: str) -> bool:
n = nombre.lower()
return ("catodo" in n) or ("polz_" in n)
##############################################################################
# ================== IONES (Bohm) ==================
##############################################################################
# Constantes (SI)
E_CHARGE = 1.602176634e-19
K_BOLTZ = 1.380649e-23
M_ELECTRON = 9.10938356e-31
def run_check_ionico(
data_dir: Path,
alpha: float = 0.5,
ion_mass_kg: float = 6.6335209e-26, # Ar+ por defecto
iisat_min_mA: float = 0.0, # umbral opcional para considerar "sin iones"
) -> list[Path]:
"""
Funciona para CÁTODOS y para HILOS:
- Busca todos los: Correctos/**/Postprocesado/parametros_plasma.csv
- Calcula n_i por Bohm y T_e_Bohm (según tu iones.py)
- Guarda Calc_iones.csv en la misma carpeta Postprocesado.
Requiere que la GUI haya llamado antes a set_sonda(...),
porque usa get_probe_area_m2() para el área de la sonda.
"""
data_dir = Path(data_dir)
out_correct = data_dir / "Correctos"
if not out_correct.exists():
raise RuntimeError(f"No existe {out_correct}. Ejecuta primero el pipeline para generar Correctos/.")
# Área de sonda desde la config seteada por GUI
A_probe = get_probe_area_m2()
escritos: list[Path] = []
param_files = list(out_correct.rglob("Postprocesado/parametros_plasma.csv"))
if not param_files:
raise RuntimeError("No se encontraron parametros_plasma.csv en Correctos/**/Postprocesado/. Ejecuta primero 'Ejecutar'.")
for csv_path in param_files:
try:
df = pd.read_csv(csv_path)
# Listas resultado
ni_list = []
TeB_K = []
TeB_eV = []
sin_iones = []
for _, row in df.iterrows():
I_i_sat_mA = row.get("I_i_sat(mA)", np.nan)
Te_K = row.get("T_e(K)", np.nan)
Vf = row.get("V_f(V)", np.nan)
Vsp = row.get("V_sp(V)", np.nan)
ne = row.get("n_e(m^-3)", np.nan)
# Criterio "SIN IONES" (igual que tu iones.py)
no_iones = (
(not np.isfinite(I_i_sat_mA)) or
(I_i_sat_mA >= 0.0) or
(abs(I_i_sat_mA) <= iisat_min_mA) or
(not np.isfinite(Te_K)) or
(not np.isfinite(ne)) or (ne <= 0) or
(not np.isfinite(Vf)) or
(not np.isfinite(Vsp))
)
if no_iones:
sin_iones.append(True)
ni_list.append(0.0)
TeB_eV.append(np.nan)
TeB_K.append(np.nan)
continue
sin_iones.append(False)
# --- Bohm: n_i ---
I_i_sat_A = abs(float(I_i_sat_mA)) * 1e-3 # mA -> A
c_s = np.sqrt(K_BOLTZ * float(Te_K) / float(ion_mass_kg))
n_i = I_i_sat_A / (float(alpha) * E_CHARGE * A_probe * c_s)
ni_list.append(float(n_i))
# --- Recalcular Te (Bohm) como en tu script ---
arg = ((4.0 * n_i * float(alpha)) / float(ne)) * np.sqrt((np.pi * M_ELECTRON) / (2.0 * float(ion_mass_kg)))
if (not np.isfinite(arg)) or (arg <= 0.0) or (float(Vf) - float(Vsp)) == 0.0:
TeB_eV.append(np.nan)
TeB_K.append(np.nan)
continue
Te_eV = (float(Vf) - float(Vsp)) / np.log(arg)
if (not np.isfinite(Te_eV)) or (Te_eV <= 0.0):
TeB_eV.append(np.nan)
TeB_K.append(np.nan)
continue
TeB_eV.append(float(Te_eV))
TeB_K.append(float(Te_eV * E_CHARGE / K_BOLTZ))
df_out = df.copy()
df_out["SIN_IONES"] = sin_iones
# Solo añadimos columnas si hay al menos un caso válido
if (~pd.Series(sin_iones)).any():
df_out["n_i_Bohm(m^-3)"] = ni_list
df_out["T_e_Bohm(K)"] = TeB_K
df_out["T_e_Bohm(eV)"] = TeB_eV
# Formato científico “bonito”
for col in ["n_e(m^-3)", "n_i_Bohm(m^-3)"]:
if col in df_out.columns:
s = pd.to_numeric(df_out[col], errors="coerce")
df_out[col.replace("(m^-3)", "_sci(m^-3)")] = s.map(lambda x: f"{x:.6e}" if np.isfinite(x) and x > 0 else "")
out_path = csv_path.parent / "Calc_iones.csv"
df_out.to_csv(out_path, index=False)
escritos.append(out_path)
except Exception as e:
# No abortamos todo por un grupo malo
print(f"[AVISO] Check iónico falló en {csv_path}: {e}")
if not escritos:
raise RuntimeError("No se pudo generar ningún Calc_iones.csv (revisa columnas/valores en parametros_plasma.csv).")
print(f"[OK] Check iónico: {len(escritos)} archivos Calc_iones.csv generados.")
return escritos
##############################################################################
####################### POSTPROCESADO DE DATOS #############################
##############################################################################
def parse_group_regex(filename: str, pattern: str, group_format: str) -> str:
m = re.search(pattern, filename, flags=re.IGNORECASE)
if not m:
return "otros"
gd = m.groupdict()
try:
return group_format.format(**gd)
except KeyError:
return "otros"
def parse_group_tokens(filename: str, delimiter: str, token_map: dict, order: list[str]) -> str:
stem = Path(filename).stem
parts = [p.strip() for p in stem.split(delimiter) if p.strip() != ""]
values = {}
for key, idx in token_map.items():
if 0 <= idx < len(parts):
values[key] = parts[idx]
else:
values[key] = "NA"
folders = []
for k in order:
v = str(values.get(k, "NA")).strip()
v = v.replace("/", "-").replace("\\", "-")
folders.append(v if v else "NA")
return "/".join(folders) if folders else "otros"
def run_catodos(data_dir: Path):
data_files = [f for f in data_dir.iterdir()
if f.is_file()
and f.suffix.lower() in (".txt", ".csv")
and es_archivo_catodo(f.name)]
out_correct = data_dir / "Correctos"
out_failed = data_dir / "Fallidos"
out_correct.mkdir(exist_ok=True)
out_failed.mkdir(exist_ok=True)
out_failed_lineal = out_failed / "Lineal"
out_failed_log = out_failed / "Logaritmica"
out_failed_lineal.mkdir(exist_ok=True)
out_failed_log.mkdir(exist_ok=True)
stats_por_grupo = {}
correct_files_por_grupo = {}
n_ok = n_fail = 0
global group_dirs_cache
group_dirs_cache = {}
for fp in data_files:
try:
# ✅ AGRUPACIÓN AUTO (tu función)
grupo = obtener_grupo_desde_nombre(fp.name)
stats_por_grupo.setdefault(grupo, {"total": 0, "ok": 0, "fail": 0})
stats_por_grupo[grupo]["total"] += 1
g_lineal, g_log, g_dep_lineal, g_dep_log = preparar_carpetas_grupo(out_correct, grupo)
es_ok = procesar_archivo(fp, g_lineal, g_log,
out_failed_lineal, out_failed_log)
if es_ok:
n_ok += 1
stats_por_grupo[grupo]["ok"] += 1
procesar_archivo_depurado(fp, g_dep_lineal, g_dep_log)
dep_txt = g_dep_lineal / f"{fp.stem}_depurado.txt"
correct_files_por_grupo.setdefault(grupo, []).append(dep_txt)
else:
n_fail += 1
stats_por_grupo[grupo]["fail"] += 1
except Exception as e:
n_fail += 1
print(f"[AVISO] Error {fp.name}: {e}")
# medias por grupo
for grupo, files in correct_files_por_grupo.items():
if files:
grupo_dir = out_correct / Path(*grupo.split("/"))
calcular_curva_media_grupo(files, grupo_dir)
print(f"[CATODOS] OK={n_ok}, FAIL={n_fail}")
def preparar_carpetas_grupo_hilos(out_correct: Path, grupo: str):
grupo_dir = out_correct / Path(*grupo.split("/"))
(grupo_dir / "Lineal").mkdir(parents=True, exist_ok=True)
(grupo_dir / "Logaritmica").mkdir(parents=True, exist_ok=True)
dep = grupo_dir / "Depurado"
(dep / "Lineal").mkdir(parents=True, exist_ok=True)
(dep / "Logaritmica").mkdir(parents=True, exist_ok=True)
return (grupo_dir/"Lineal", grupo_dir/"Logaritmica", dep/"Lineal", dep/"Logaritmica")
def run_hilos(data_dir: Path, name_pattern: str, group_format: str):
# filtra solo nombres hilo (o si quieres, todos .txt y que el regex decida)
data_files = [f for f in data_dir.iterdir() if f.is_file() and f.suffix.lower() == ".txt"]
out_correct = data_dir / "Correctos"
out_failed = data_dir / "Fallidos"
out_correct.mkdir(exist_ok=True)
out_failed.mkdir(exist_ok=True)
correct_files_por_grupo = {}
n_ok = n_fail = 0
for fp in data_files:
try:
grupo = parse_group_regex(fp.name, name_pattern, group_format)
V, I_mA, I_A = leer_IV_numeric(fp) # tu lector robusto de hilos
es_ok = curva_correcta_hilo(V, I_A) # tu criterio hilos
if es_ok:
n_ok += 1
g_lineal, g_log, g_dep_lineal, g_dep_log = preparar_carpetas_grupo_hilos(out_correct, grupo)
# guardar gráficas individuales (puedes reutilizar procesar_archivo_hilo o adaptar procesar_archivo)
# aquí tiro por lo simple: usa tu procesar_archivo_hilo si ya lo tienes
# o copia el plot como antes
dep_txt = depurar_y_guardar_txt(fp, g_dep_lineal) # guarda V,I depurado
correct_files_por_grupo.setdefault(grupo, []).append(dep_txt)
else:
n_fail += 1
except Exception as e:
n_fail += 1
print(f"[AVISO] Error {fp.name}: {e}")
# medias por grupo
for grupo, files in correct_files_por_grupo.items():
if files:
grupo_dir = out_correct / Path(*grupo.split("/"))
calcular_curva_media_grupo(files, grupo_dir)
print(f"[HILOS] OK={n_ok}, FAIL={n_fail}")
def postprocesar_todas_las_medias(out_correct: Path, post_fn):
for media_file in out_correct.rglob("curva_media_lineal.txt"):
try:
post_fn(media_file)
except Exception as e:
print(f"[AVISO] Postprocesado falló en {media_file}: {e}")
def run_catodos_tokens(data_dir: Path, delimiter: str, token_map: dict, order_list: list[str]):
data_files = [f for f in data_dir.iterdir()
if f.is_file() and f.suffix.lower() in (".txt", ".dat", ".csv")]
out_correct = data_dir / "Correctos"
out_failed = data_dir / "Fallidos"
out_correct.mkdir(exist_ok=True)
out_failed.mkdir(exist_ok=True)
out_failed_lineal = out_failed / "Lineal"
out_failed_log = out_failed / "Logaritmica"
out_failed_lineal.mkdir(exist_ok=True)
out_failed_log.mkdir(exist_ok=True)
stats_por_grupo = {}
correct_files_por_grupo = {}
n_ok = n_fail = 0
global group_dirs_cache
group_dirs_cache = {}
for fp in data_files:
try:
grupo = parse_group_tokens(fp.name, delimiter, token_map, order_list)
stats_por_grupo.setdefault(grupo, {"total": 0, "ok": 0, "fail": 0})
stats_por_grupo[grupo]["total"] += 1
g_lineal, g_log, g_dep_lineal, g_dep_log = preparar_carpetas_grupo(out_correct, grupo)
es_ok = procesar_archivo(fp, g_lineal, g_log,
out_failed_lineal, out_failed_log)
if es_ok:
n_ok += 1
stats_por_grupo[grupo]["ok"] += 1
procesar_archivo_depurado(fp, g_dep_lineal, g_dep_log)
dep_txt = g_dep_lineal / f"{fp.stem}_depurado.txt"
correct_files_por_grupo.setdefault(grupo, []).append(dep_txt)
else:
n_fail += 1
stats_por_grupo[grupo]["fail"] += 1
except Exception as e:
n_fail += 1
print(f"[AVISO] Error {fp.name}: {e}")
for grupo, files in correct_files_por_grupo.items():
if files:
grupo_dir = out_correct / Path(*grupo.split("/"))
calcular_curva_media_grupo(files, grupo_dir)
print(f"[CATODOS/TOKENS] OK={n_ok}, FAIL={n_fail}")
def run_full_catodos_tokens(data_dir: Path, delimiter: str, token_map: dict, order_list: list[str]):
run_catodos_tokens(data_dir, delimiter, token_map, order_list)
out_correct = data_dir / "Correctos"
if out_correct.exists():
postprocesar_todas_las_medias(out_correct, postprocesar_curva_media_catodo)
def run_full_catodos(data_dir: Path):
run_catodos(data_dir)
out_correct = data_dir / "Correctos"
if out_correct.exists():
postprocesar_todas_las_medias(out_correct, postprocesar_curva_media_catodo)
def run_full_hilos(data_dir: Path, name_pattern: str, group_format: str):
# 1) Ejecuta el pipeline actual (clasificación + depurado + medias)
run_hilos(data_dir, name_pattern, group_format)
# 2) Postprocesa TODAS las medias generadas en Correctos
out_correct = data_dir / "Correctos"
if not out_correct.exists():
return
lista_resultados_totales = []
for media_f in out_correct.rglob("curva_media_lineal.txt"):
try:
# Postproceso físico
postprocesar_curva_media_hilo(media_f)
# Recoger parámetros
p_path = media_f.parent / "Postprocesado" / "parametros_plasma.csv"
if not p_path.exists():
continue
df_p = pd.read_csv(p_path)
# Reconstruir Tipo / Intensidad / Polarización desde la ruta:
# Correctos/<tipo>/<amp>/<pol>/curva_media_lineal.txt
rel = media_f.relative_to(out_correct)
partes = rel.parts # (tipo, amp, pol, "curva_media_lineal.txt")
if len(partes) < 4:
continue
tipo = partes[0]
amp = partes[1].replace("A", "")
pol = partes[2]
df_p.insert(0, "Tipo", tipo)
df_p.insert(1, "Intensidad_Hilo(A)", amp)
df_p.insert(2, "Polarizacion(V)", pol)
lista_resultados_totales.append(df_p)
except Exception as e:
print(f"[AVISO] Postprocesado hilos falló en {media_f}: {e}")
# 3) CSV maestro + gráficas de tendencias
if lista_resultados_totales:
df_f = pd.concat(lista_resultados_totales, ignore_index=True)
df_f["Intensidad_Hilo(A)"] = pd.to_numeric(df_f["Intensidad_Hilo(A)"], errors="coerce")
df_f["Polarizacion(V)"] = pd.to_numeric(df_f["Polarizacion(V)"], errors="coerce")
df_f = df_f.dropna(subset=["Intensidad_Hilo(A)", "Polarizacion(V)"])
df_f = df_f.sort_values(by="Intensidad_Hilo(A)")
df_f["n_e_sci(m^-3)"] = df_f["n_e(m^-3)"].apply(
lambda x: "{:.4e}".format(x) if pd.notnull(x) else ""
)
csv_maestro = data_dir / "resumen_total_hilos.csv"
df_f.to_csv(csv_maestro, index=False)
print(f"[HILOS] CSV maestro: {csv_maestro}")
# Tendencias
graficar_tendencias_plasma_separadas(csv_maestro)
graficar_hilo_una_sola_figura(data_dir)
def run_hilos_tokens(data_dir: Path, delimiter: str, token_map: dict, order_list: list[str]):
data_files = [f for f in data_dir.iterdir()
if f.is_file() and f.suffix.lower() in (".txt", ".csv")]
out_correct = data_dir / "Correctos"
out_failed = data_dir / "Fallidos"
out_correct.mkdir(exist_ok=True)
out_failed.mkdir(exist_ok=True)
correct_files_por_grupo = {}
n_ok = n_fail = 0
for fp in data_files:
try:
grupo = parse_group_tokens(fp.name, delimiter, token_map, order_list)
V, I_mA, I_A = leer_IV_numeric(fp)
es_ok = curva_correcta_hilo(V, I_A)
if es_ok:
n_ok += 1
g_lineal, g_log, g_dep_lineal, g_dep_log = preparar_carpetas_grupo_hilos(out_correct, grupo)
dep_txt = depurar_y_guardar_txt(fp, g_dep_lineal) # ✅ Depurado/Lineal
correct_files_por_grupo.setdefault(grupo, []).append(dep_txt)
else:
n_fail += 1
except Exception as e:
n_fail += 1
print(f"[AVISO] Error {fp.name}: {e}")
for grupo, files in correct_files_por_grupo.items():
if files:
grupo_dir = out_correct / Path(*grupo.split("/"))
calcular_curva_media_grupo(files, grupo_dir)
print(f"[HILOS/TOKENS] OK={n_ok}, FAIL={n_fail}")
def run_full_hilos_tokens(data_dir: Path, delimiter: str, token_map: dict, order_list: list[str]):
run_hilos_tokens(data_dir, delimiter, token_map, order_list)
out_correct = data_dir / "Correctos"
if not out_correct.exists():
return
lista_resultados_totales = []
for media_f in out_correct.rglob("curva_media_lineal.txt"):
try:
postprocesar_curva_media_hilo(media_f)
p_path = media_f.parent / "Postprocesado" / "parametros_plasma.csv"
if not p_path.exists():
continue
df_p = pd.read_csv(p_path)
# Reconstruir columnas desde la ruta:
# Correctos/<tipo>/<vcp>/<ich>/curva_media_lineal.txt (según tu order_list)
rel = media_f.relative_to(out_correct)
partes = rel.parts
if len(partes) < 4:
continue
# Usamos order_list para mapear carpetas a campos
# Ej: order_list=["tipo","vcp","ich"] -> partes[0]=tipo, partes[1]=vcp, partes[2]=ich
campos = {}
for k, val in zip(order_list, partes[:-1]): # sin el filename
campos[k] = val
# mete columnas “bonitas” si existen
if "tipo" in campos:
df_p.insert(0, "Tipo", campos["tipo"])
if "ich" in campos:
df_p.insert(1, "Intensidad_Hilo(A)", str(campos["ich"]).replace("A", ""))
if "vcp" in campos:
df_p.insert(2, "Polarizacion(V)", campos["vcp"])
lista_resultados_totales.append(df_p)
except Exception as e:
print(f"[AVISO] Postprocesado hilos (tokens) falló en {media_f}: {e}")
if lista_resultados_totales:
df_f = pd.concat(lista_resultados_totales, ignore_index=True)
if "Intensidad_Hilo(A)" in df_f.columns:
df_f["Intensidad_Hilo(A)"] = pd.to_numeric(df_f["Intensidad_Hilo(A)"], errors="coerce")
if "Polarizacion(V)" in df_f.columns:
df_f["Polarizacion(V)"] = pd.to_numeric(df_f["Polarizacion(V)"], errors="coerce")
# limpia
keep_cols = [c for c in ["Intensidad_Hilo(A)", "Polarizacion(V)"] if c in df_f.columns]
if keep_cols:
df_f = df_f.dropna(subset=keep_cols)
if "Intensidad_Hilo(A)" in df_f.columns:
df_f = df_f.sort_values(by="Intensidad_Hilo(A)")
if "n_e(m^-3)" in df_f.columns:
df_f["n_e_sci(m^-3)"] = df_f["n_e(m^-3)"].apply(
lambda x: "{:.4e}".format(x) if pd.notnull(x) else ""
)
csv_maestro = data_dir / "resumen_total_hilos.csv"
df_f.to_csv(csv_maestro, index=False)
print(f"[HILOS/TOKENS] CSV maestro: {csv_maestro}")
# Si quieres las mismas tendencias:
try:
graficar_tendencias_plasma_separadas(csv_maestro)
graficar_hilo_una_sola_figura(data_dir)
except Exception as e:
print(f"[AVISO] Tendencias hilos (tokens) fallaron: {e}")
def graficar_tendencias_plasma_separadas(csv_path: Path):
import pandas as pd
import matplotlib.pyplot as plt
df = pd.read_csv(csv_path)
col_amp = "Intensidad_Hilo(A)"
col_te = "T_e(eV)"
col_ne = "n_e(m^-3)"
col_pol = "Polarizacion(V)"
col_tipo = "Tipo"
# A numérico
for c in [col_amp, col_te, col_ne, col_pol]:
df[c] = pd.to_numeric(df[c], errors="coerce")
df = df.dropna(subset=[col_amp, col_te, col_ne, col_pol, col_tipo])
if df.empty:
print("[AVISO] No hay datos válidos para graficar.")
return
# Normaliza intensidades para evitar 4.5 vs 4.50
df[col_amp] = df[col_amp].round(2)
# Ahora sí: separar por Tipo y Polarización
for (tipo, pol), dfg in df.groupby([col_tipo, col_pol]):
# Colapsa duplicados (mismo x) con media
dfg = (dfg.groupby(col_amp, as_index=False)
.agg({col_ne: "mean", col_te: "mean"}))
dfg = dfg.sort_values(col_amp)
# Ajustes de fuente (elige valores)
FS_LABEL = 14 # ejes X/Y
FS_TICKS = 12 # números de los ejes
FS_TITLE = 15 # título
FS_LEGEND = 12 # si hay leyenda
plt.figure(figsize=(9, 6))
plt.plot(dfg[col_amp], dfg[col_ne], "o-")
plt.yscale("log")
plt.xlabel("Intensidad del Hilo (A)", fontsize=FS_LABEL)
plt.ylabel(r"Densidad Electrónica $n_e$ ($m^{-3}$)", fontsize=FS_LABEL)
plt.tick_params(axis="both", which="both", labelsize=FS_TICKS)
plt.grid(True, which="both", alpha=0.3)
plt.tight_layout()
plt.savefig(csv_path.parent / f"EVOLUCION_ne_{tipo}_Pol_{pol:g}.png", dpi=300)
plt.close()
plt.figure(figsize=(9, 6))
plt.plot(dfg[col_amp], dfg[col_te], "o-")
plt.xlabel("Intensidad del Hilo (A)", fontsize=FS_LABEL)
plt.ylabel(r"Temperatura Electrónica $T_e$ (eV)", fontsize=FS_LABEL)
plt.tick_params(axis="both", which="both", labelsize=FS_TICKS)
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig(csv_path.parent / f"EVOLUCION_Te_{tipo}_Pol_{pol:g}.png", dpi=300)
plt.close()
print(f"--> Guardadas tendencias | {tipo} | Pol {pol:g} V")
def graficar_hilo_una_sola_figura(data_dir: Path, out_png: str = "EVOLUCION_hilo_UNICA.png"):
"""
Una sola figura con UNA curva por intensidad.
- Ignora la polarización (no la muestra ni la usa para separar).
- Corrige el eje X si existe Offset(V): V_sweep = Voltage(V) - Offset(V).
- Si hay varias medidas para la misma intensidad, calcula una curva MEDIA
(interp + nanmean) y grafica solo esa.
"""
# Acepta: hilo_-40_4.00A_...txt, hilo_+40_4.00A_...txt, etc.
rx = re.compile(r"^(hilo(?:_?yresis)?)[_](?P<pol>[-+]?\d+)[_]+(?P<i>[\d.]+)A_.*\.txt$", re.IGNORECASE)
files = sorted([f for f in data_dir.iterdir()
if f.is_file()
and es_archivo_hilo(f.name)
and f.suffix.lower() == ".txt"])
if not files:
print(f"[AVISO] No hay archivos hilo_*.txt en {data_dir}")
return
# ---------- 1) Agrupar por intensidad ----------
grupos = {} # {intensidad_float: [Path, Path, ...]}
for fp in files:
m = rx.match(fp.name)
if not m:
continue
iA = float(m.group("i"))
# Agrupación robusta ante 4.2 vs 4.20, etc.
iA_key = round(iA, 2)
grupos.setdefault(iA_key, []).append(fp)
if not grupos:
print(f"[AVISO] No se han podido agrupar ficheros por intensidad en {data_dir}")
return
# ---------- 2) Función local para leer y corregir un fichero ----------
def _leer_curva_corregida(fp: Path):
df = pd.read_csv(fp, sep=None, engine="python")
# Columnas mínimas
if "Voltage(V)" not in df.columns or "Current(mA)" not in df.columns:
return None
# Parse numérico robusto
def _to_num(series):
s = series.astype(str).str.strip()
s = s.str.replace(",", ".", regex=False)
s = s.str.replace(r"[^0-9eE\+\-\.]", "", regex=True)
return pd.to_numeric(s, errors="coerce")
V = _to_num(df["Voltage(V)"])
I = _to_num(df["Current(mA)"])
# Eje corregido si existe offset
if "Offset(V)" in df.columns:
Off = _to_num(df["Offset(V)"])
Vx = V - Off
else:
Vx = V
# Convención de signo coherente con tu pipeline
I_mA = -I
ok = Vx.notna() & I_mA.notna() & np.isfinite(Vx) & np.isfinite(I_mA)
Vx = Vx[ok].to_numpy(dtype=float)
I_mA = I_mA[ok].to_numpy(dtype=float)
if Vx.size < 10:
return None
idx = np.argsort(Vx)
Vx = Vx[idx]
I_mA = I_mA[idx]
# Si hay V repetidos, promediamos (evita artefactos en interp)
Vu, inv = np.unique(Vx, return_inverse=True)
if Vu.size != Vx.size:
I_acc = np.zeros_like(Vu, dtype=float)
n_acc = np.zeros_like(Vu, dtype=float)
np.add.at(I_acc, inv, I_mA)
np.add.at(n_acc, inv, 1.0)
I_mA = I_acc / np.maximum(n_acc, 1.0)
Vx = Vu
return Vx, I_mA
# ---------- 3) Para cada intensidad: media por interpolación ----------
fig, ax = plt.subplots(figsize=(10, 7))
intensidades = sorted(grupos.keys())
N = len(intensidades)
# Colormap continuo con N colores realmente distintos
cmap = plt.get_cmap("viridis") # si quieres prueba "plasma" o "turbo" si tu matplotlib lo soporta
linestyles = ["-", "--", "-.", ":"]
n_ls = len(linestyles)
for idx, iA in enumerate(intensidades):
curvas = []
for fp in grupos[iA]:
out = _leer_curva_corregida(fp)
if out is not None:
curvas.append(out)
if len(curvas) == 0:
continue
Vmins = [c[0][0] for c in curvas]
Vmaxs = [c[0][-1] for c in curvas]
Vmin = max(Vmins)
Vmax = min(Vmaxs)
if not np.isfinite(Vmin) or not np.isfinite(Vmax) or Vmax <= Vmin:
Vgrid = np.unique(np.concatenate([c[0] for c in curvas]))
else:
Vgrid = np.linspace(Vmin, Vmax, 400)
I_stack = []
for Vx, I_mA in curvas:
I_interp = np.interp(Vgrid, Vx, I_mA, left=np.nan, right=np.nan)
I_stack.append(I_interp)
I_stack = np.vstack(I_stack)
I_mean = np.nanmean(I_stack, axis=0)
if not np.any(np.isfinite(I_mean)):
continue
color = cmap(idx / max(N - 1, 1))
ls = linestyles[idx % n_ls]
ax.plot(Vgrid, I_mean, lw=1.8, color=color, linestyle=ls, label=f"I={iA:g}A")
ax.set_xlabel("Voltaje (V)")
ax.set_ylabel("Corriente (mA)")
ax.set_title("Evolución Curvas IV del hilo (una curva por intensidad, eje alineado)")
ax.grid(True, alpha=0.3)
ax.legend(
loc="center left",
bbox_to_anchor=(1.02, 0.5),
fontsize=9,
ncol=1,
frameon=True
)
fig.tight_layout()
out_path = data_dir / out_png
out_path.parent.mkdir(parents=True, exist_ok=True)
fig.savefig(out_path, dpi=300, bbox_inches="tight")
plt.close(fig)
print(f"[OK] Guardada figura única en: {out_path}")